Skip to content

Commit

Permalink
[r] Timestamp write and read of data frame and array objects (#2926)
Browse files Browse the repository at this point in the history
* Timestamp range support for dataframe

* Extended support

* (First half) new timestamp test for SOMADataFrame

* Second half of new timestamp passing all but three reopen tests

* [r] Expose `tiledb_timestamp` parameter to `$reopen()` (#2866)

* [python] pin `pandas-stubs>=2` during pre-commit `mypy` hook (#2854)

* pin `pandas-stubs>=2` during pre-commit `mypy` hook

also simplify some `if TYPE_CHECKING` blocks

fixes #2839

* add r-ci.yml exclusions

* `actions{cache,checkout,setup-python}` upgrades (#2856)

* CI: use `pip` directly (instead of `python -m pip`)

* CI: checkout@v4, setup-python@v5 cache@v4

* disable setup-python cache for lint/pre-commit

See actions/setup-python#919; setup-python cache likely doesn't help much here, as we only `pip install pre-commit`, and cache `~/.cache/pre-commit` separately using `actions/cache`.

* bump to codecov-action@v4

rm coverage/uploader pin added in #2827, per codecov/uploader#1673 (comment)

* `s/MacOS/macOS/

* fix typo from #2854

* [python] Offer better guidance on attribute names with `.` (#2864)

* [python] Move `_update_column` into pybind11 (#2862)

* [r] Expose `tiledb_timestamp` parameter to `$reopen()`
Allow `$reopen()` to reopen at a particular timestamp; by default, the
timestamp is set to `NULL` to reopen at the curren time. This is needed
for compatibility between libtiledbsoma's timestamp handling and
resume-mode

[SC-52694](https://app.shortcut.com/tiledb-inc/story/52694/allow-reopen-to-take-a-timestamp-for-reopening)

---------

Co-authored-by: Ryan Williams <[email protected]>
Co-authored-by: John Kerl <[email protected]>
Co-authored-by: nguyenv <[email protected]>

* [r] [NO BACKPORT] Expose timestamps publicly and plumb through for resume-mode (#2871)

Expose timestamps publicly through a new active binding; replace calls to `private$tiledb_timestamp` with `self$tiledb_timestamp`

Also plumb timestamps through for `write_soma()` in resume-mode

* Timestamp range support for dataframe

* (First half) new timestamp test for SOMADataFrame

* Second half of new timestamp passing all but three reopen tests

* Test can now use factor

* Read and write DataFrame and {Dense,Sparse}Array under timestamps

* Quieter warnings

* Adapt one timestamped test

* Pause one test predicate for collections

* clang-format

as obsessing over one whitespace char before a comment is added value

* Micro-fix following code review

More changes to follow in wider changeset

* Update changelog
Bump develop version

---------

Co-authored-by: Paul Hoffman <[email protected]>
Co-authored-by: Ryan Williams <[email protected]>
Co-authored-by: John Kerl <[email protected]>
Co-authored-by: nguyenv <[email protected]>
Co-authored-by: Paul Hoffman <[email protected]>
  • Loading branch information
6 people authored Aug 26, 2024
1 parent 771d671 commit 87b74ae
Show file tree
Hide file tree
Showing 39 changed files with 407 additions and 167 deletions.
2 changes: 1 addition & 1 deletion apis/r/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Description: Interface for working with 'TileDB'-based Stack of Matrices,
like those commonly used for single cell data analysis. It is documented at
<https://github.com/single-cell-data>; a formal specification available is at
<https://github.com/single-cell-data/SOMA/blob/main/abstract_specification.md>.
Version: 1.13.99.2
Version: 1.13.99.3
Authors@R: c(
person(given = "Aaron", family = "Wolen",
role = c("cre", "aut"), email = "[email protected]",
Expand Down
2 changes: 2 additions & 0 deletions apis/r/NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Changes

* Make use of timestamp ranges in libtiledbsoma

# tiledbsoma 1.13.0

## Changes
Expand Down
5 changes: 5 additions & 0 deletions apis/r/R/Factory.R
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ SOMADataFrameCreate <- function(
schema,
index_column_names = index_column_names,
platform_config = platform_config,
timestamps = rep(tiledb_timestamp, 2),
internal_use_only = "allowed_use"
)
}
Expand All @@ -75,6 +76,7 @@ SOMADataFrameOpen <- function(
tiledbsoma_ctx = NULL,
tiledb_timestamp = NULL
) {
spdl::debug("[SOMADataFrameOpen] uri {} ts ({},{})", uri, tiledb_timestamp[1], tiledb_timestamp[2])
sdf <- SOMADataFrame$new(
uri,
platform_config,
Expand Down Expand Up @@ -125,6 +127,7 @@ SOMASparseNDArrayCreate <- function(
type,
shape,
platform_config = platform_config,
timestamps = rep(tiledb_timestamp, 2),
internal_use_only = "allowed_use"
)
}
Expand Down Expand Up @@ -173,6 +176,7 @@ SOMADenseNDArrayCreate <- function(
tiledbsoma_ctx = NULL,
tiledb_timestamp = NULL
) {
spdl::debug("[SOMADenseNDArrayCreate] tstamp ({},{})", tiledb_timestamp[1], tiledb_timestamp[2])
dnda <- SOMADenseNDArray$new(
uri,
platform_config,
Expand All @@ -184,6 +188,7 @@ SOMADenseNDArrayCreate <- function(
type,
shape,
platform_config = platform_config,
timestamps = rep(tiledb_timestamp, 2),
internal_use_only = "allowed_use"
)
return(dnda)
Expand Down
30 changes: 16 additions & 14 deletions apis/r/R/RcppExports.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ createSOMAContext <- function(config = NULL) {
.Call(`_tiledbsoma_createSOMAContext`, config)
}

createSchemaFromArrow <- function(uri, nasp, nadimap, nadimsp, sparse, datatype, pclst, ctxxp) {
invisible(.Call(`_tiledbsoma_createSchemaFromArrow`, uri, nasp, nadimap, nadimsp, sparse, datatype, pclst, ctxxp))
createSchemaFromArrow <- function(uri, nasp, nadimap, nadimsp, sparse, datatype, pclst, ctxxp, tsvec = NULL) {
invisible(.Call(`_tiledbsoma_createSchemaFromArrow`, uri, nasp, nadimap, nadimsp, sparse, datatype, pclst, ctxxp, tsvec))
}

writeArrayFromArrow <- function(uri, naap, nasp, arraytype = "", config = NULL) {
invisible(.Call(`_tiledbsoma_writeArrayFromArrow`, uri, naap, nasp, arraytype, config))
writeArrayFromArrow <- function(uri, naap, nasp, arraytype = "", config = NULL, tsvec = NULL) {
invisible(.Call(`_tiledbsoma_writeArrayFromArrow`, uri, naap, nasp, arraytype, config, tsvec))
}

#' Get nnumber of metadata items
Expand Down Expand Up @@ -66,12 +66,14 @@ delete_metadata <- function(uri, key, is_array, ctxxp) {
#'
#' @param uri The array URI
#' @param key The array metadata key
#' @param value The metadata value
#' @
#' @param valuesxp The metadata value
#' @param type The datatype
#' @param is_array A boolean to indicate array or group
#' @param ctxxp An external pointer to the SOMAContext wrapper
#' @param tsvec An optional two-element datetime vector
#' @export
set_metadata <- function(uri, key, valuesxp, type, is_array, ctxxp) {
invisible(.Call(`_tiledbsoma_set_metadata`, uri, key, valuesxp, type, is_array, ctxxp))
set_metadata <- function(uri, key, valuesxp, type, is_array, ctxxp, tsvec = NULL) {
invisible(.Call(`_tiledbsoma_set_metadata`, uri, key, valuesxp, type, is_array, ctxxp, tsvec))
}

reindex_create <- function() {
Expand All @@ -87,8 +89,8 @@ reindex_lookup <- function(idx, kvec) {
}

#' @noRd
soma_array_reader_impl <- function(uri, colnames = NULL, qc = NULL, dim_points = NULL, dim_ranges = NULL, batch_size = "auto", result_order = "auto", loglevel = "auto", config = NULL) {
.Call(`_tiledbsoma_soma_array_reader`, uri, colnames, qc, dim_points, dim_ranges, batch_size, result_order, loglevel, config)
soma_array_reader_impl <- function(uri, colnames = NULL, qc = NULL, dim_points = NULL, dim_ranges = NULL, batch_size = "auto", result_order = "auto", loglevel = "auto", config = NULL, timestamprange = NULL) {
.Call(`_tiledbsoma_soma_array_reader`, uri, colnames, qc, dim_points, dim_ranges, batch_size, result_order, loglevel, config, timestamprange)
}

#' Set the logging level for the R package and underlying C++ library
Expand Down Expand Up @@ -148,8 +150,8 @@ shape <- function(uri, config = NULL) {
#' @param result_order Optional argument for query result order, defaults to \sQuote{auto}
#' @param loglevel Character value with the desired logging level, defaults to \sQuote{auto}
#' which lets prior setting prevail, any other value is set as new logging level.
#' @param timestamp_end Optional POSIXct (i.e. Datetime) type for end of interval for which
#' data is considered.
#' @param timestamprange Optional POSIXct (i.e. Datetime) vector with start and end of
#' interval for which data is considered.
#' @param sr An external pointer to a TileDB SOMAArray object
#'
#' @return \code{sr_setup} returns an external pointer to a SOMAArray. \code{sr_complete}
Expand All @@ -169,8 +171,8 @@ shape <- function(uri, config = NULL) {
#' summary(rl)
#' }
#' @noRd
sr_setup <- function(uri, config, colnames = NULL, qc = NULL, dim_points = NULL, dim_ranges = NULL, batch_size = "auto", result_order = "auto", timestamp_end = NULL, loglevel = "auto") {
.Call(`_tiledbsoma_sr_setup`, uri, config, colnames, qc, dim_points, dim_ranges, batch_size, result_order, timestamp_end, loglevel)
sr_setup <- function(uri, config, colnames = NULL, qc = NULL, dim_points = NULL, dim_ranges = NULL, batch_size = "auto", result_order = "auto", timestamprange = NULL, loglevel = "auto") {
.Call(`_tiledbsoma_sr_setup`, uri, config, colnames, qc, dim_points, dim_ranges, batch_size, result_order, timestamprange, loglevel)
}

sr_complete <- function(sr) {
Expand Down
7 changes: 4 additions & 3 deletions apis/r/R/SOMAArrayBase.R
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ SOMAArrayBase <- R6::R6Class(
private$soma_type_cache <- self$get_metadata(SOMA_OBJECT_TYPE_METADATA_KEY)
},

write_object_type_metadata = function() {
private$check_open_for_write()
write_object_type_metadata = function(timestamps=NULL) {
#private$check_open_for_write()

meta <- list()
meta[[SOMA_OBJECT_TYPE_METADATA_KEY]] <- self$class()
meta[[SOMA_ENCODING_VERSION_METADATA_KEY]] <- SOMA_ENCODING_VERSION
self$set_metadata(meta)
spdl::debug("[SOMAArrayBase::write_object_metadata] calling set metadata")
self$set_metadata(meta, c(self$tiledb_timestamp,self$tiledb_timestamp))
}
)
)
22 changes: 14 additions & 8 deletions apis/r/R/SOMADataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ SOMADataFrame <- R6::R6Class(
#' index columns. All named columns must exist in the schema, and at least
#' one index column name is required.
#' @template param-platform-config
#' @param timestamps Optional timestamp start and end range
#' @param internal_use_only Character value to signal this is a 'permitted' call,
#' as `create()` is considered internal and should not be called directly.
create = function(schema, index_column_names = c("soma_joinid"),
platform_config = NULL, internal_use_only = NULL) {
platform_config = NULL, timestamps = NULL, internal_use_only = NULL) {
if (is.null(internal_use_only) || internal_use_only != "allowed_use") {
stop(paste("Use of the create() method is for internal use only. Consider using a",
"factory method as e.g. 'SOMADataFrameCreate()'."), call. = FALSE)
Expand Down Expand Up @@ -53,12 +54,15 @@ SOMADataFrame <- R6::R6Class(
nasp <- nanoarrow::nanoarrow_allocate_schema()
schema$export_to_c(nasp)

spdl::debug("[SOMADataFrame$create] about to create schema from arrow")
ctxptr <- super$tiledbsoma_ctx$context()
createSchemaFromArrow(uri = self$uri, nasp, dnaap, dnasp, TRUE, "SOMADataFrame",
tiledb_create_options$to_list(FALSE), soma_context())
tiledb_create_options$to_list(FALSE), soma_context(), timestamps)

spdl::debug("[SOMADataFrame$create] about to call write_object_type_metadata")
private$write_object_type_metadata(timestamps)

self$open("WRITE", internal_use_only = "allowed_use")
private$write_object_type_metadata()
self
},

Expand All @@ -67,8 +71,10 @@ SOMADataFrame <- R6::R6Class(
#' @param values An [`arrow::Table`] or [`arrow::RecordBatch`]
#' containing all columns, including any index columns. The
#' schema for `values` must match the schema for the `SOMADataFrame`.
#' @param tsrange An optional two-element Datetime vector for the
#' start and end of the timestamp range
#'
write = function(values) {
write = function(values, tsrange = NULL) {
private$check_open_for_write()

# Prevent downcasting of int64 to int32 when materializing a column
Expand Down Expand Up @@ -97,8 +103,7 @@ SOMADataFrame <- R6::R6Class(

df <- as.data.frame(values)[schema_names]
arr <- self$object

writeArrayFromArrow(self$uri, naap, nasp, "SOMADataFrame")
writeArrayFromArrow(self$uri, naap, nasp, "SOMADataFrame", NULL, tsrange)

invisible(self)
},
Expand Down Expand Up @@ -152,14 +157,15 @@ SOMADataFrame <- R6::R6Class(
args = list(expr = str2lang(value_filter), ta = arr))
value_filter <- parsed@ptr
}

spdl::debug("[SOMADataFrame$read] calling sr_setup for {} at ({},{})", self$uri,
private$tiledb_timestamp[1], private$tiledb_timestamp[2])
cfg <- as.character(tiledb::config(self$tiledbsoma_ctx$context()))
rl <- sr_setup(uri = self$uri,
config = cfg,
colnames = column_names,
qc = value_filter,
dim_points = coords,
timestamp_end = private$tiledb_timestamp,
timestamprange = self$tiledb_timestamp, # NULL or two-elem vector
loglevel = log_level)
private$ctx_ptr <- rl$ctx
TableReadIter$new(rl$sr)
Expand Down
12 changes: 10 additions & 2 deletions apis/r/R/SOMADenseNDArray.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ SOMADenseNDArray <- R6::R6Class(
#' read. List elements can be named when specifying a subset of dimensions.
#' @template param-result-order
#' @param log_level Optional logging level with default value of `"warn"`.
#' @param timestamprange Optional POSIXct (i.e. Datetime) vector with start and end of
#' interval for which data is considered.
#' @return An [`arrow::Table`].
read_arrow_table = function(
coords = NULL,
Expand All @@ -52,10 +54,15 @@ SOMADenseNDArray <- R6::R6Class(
coords <- private$.convert_coords(coords)

cfg <- as.character(tiledb::config(self$tiledbsoma_ctx$context()))
spdl::debug("[SOMADenseNDArray$read_arrow_table] timestamp ({},{})",
self$tiledb_timestamp[1], self$tiledb_timestamp[2])

rl <- soma_array_reader(uri = uri,
dim_points = coords,
result_order = result_order,
loglevel = log_level, # NULL dealt with by soma_array_reader()
timestamprange = if (is.null(self$tiledb_timestamp)) self$tiledb_timestamp
else c(0, self$tiledb_timestamp[2]),
loglevel = log_level,
config = cfg)

soma_array_to_arrow_table(rl)
Expand Down Expand Up @@ -137,7 +144,8 @@ SOMADenseNDArray <- R6::R6Class(
nasp <- nanoarrow::nanoarrow_allocate_schema()
arrow::as_record_batch(tbl)$export_to_c(naap, nasp)
#arr[] <- values
writeArrayFromArrow(self$uri, naap, nasp, "SOMADenseNDArray")
writeArrayFromArrow(self$uri, naap, nasp, "SOMADenseNDArray", NULL,
c(self$tiledb_timestamp[1], self$tiledb_timestamp[1]))
spdl::debug("[SOMADenseNDArray::write] written")

# tiledb-r always closes the array after a write operation so we need to
Expand Down
8 changes: 5 additions & 3 deletions apis/r/R/SOMANDArrayBase.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ SOMANDArrayBase <- R6::R6Class(
#' element in the array.
#' @param shape a vector of integers defining the shape of the array.
#' @template param-platform-config
#' @param timestamps Optional timestamp start and end range
#' @param internal_use_only Character value to signal this is a 'permitted'
#' call, as `create()` is considered internal and should not be called
#' directly.
create = function(type, shape, platform_config = NULL, internal_use_only = NULL) {
create = function(type, shape, platform_config = NULL,
timestamps = NULL, internal_use_only = NULL) {
if (is.null(internal_use_only) || internal_use_only != "allowed_use") {
stop(paste("Use of the create() method is for internal use only. Consider using a",
"factory method as e.g. 'SOMASparseNDArrayCreate()'."), call. = FALSE)
Expand Down Expand Up @@ -59,10 +61,10 @@ SOMANDArrayBase <- R6::R6Class(
createSchemaFromArrow(uri = self$uri, nasp, dnaap, dnasp,
private$.is_sparse,
if (private$.is_sparse) "SOMASparseNDArray" else "SOMADenseNDArray",
tiledb_create_options$to_list(FALSE), soma_context())
tiledb_create_options$to_list(FALSE), soma_context(), timestamps)
#private$write_object_type_metadata(timestamps) ## FIXME: temp. commented out -- can this be removed overall?

self$open("WRITE", internal_use_only = "allowed_use")
private$write_object_type_metadata()
self
},

Expand Down
24 changes: 16 additions & 8 deletions apis/r/R/SOMASparseNDArray.R
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ SOMASparseNDArray <- R6::R6Class(
config = cfg,
dim_points = coords,
result_order = result_order,
timestamp_end = private$tiledb_timestamp,
timestamprange = if (is.null(self$tiledb_timestamp)) self$tiledb_timestamp
else c(0, self$tiledb_timestamp[2]),
loglevel = log_level)
private$ctx_ptr <- rl$ctx
SOMASparseNDArrayRead$new(rl$sr, self, coords)
Expand Down Expand Up @@ -174,7 +175,10 @@ SOMASparseNDArray <- R6::R6Class(
names(bbox_flat)[index:(index + 1L)] <- paste0(names(bbox)[i], c('_lower', '_upper'))
index <- index + 2L
}
self$set_metadata(bbox_flat)
self$set_metadata(bbox_flat, c(self$tiledb_timestamp[1],self$tiledb_timestamp[1]))
spdl::debug("[SOMASparseNDArray$write] Calling .write_coo_df ({},{})",
self$tiledb_timestamp[1],self$tiledb_timestamp[2])

private$.write_coo_dataframe(coo)

invisible(self)
Expand Down Expand Up @@ -220,10 +224,11 @@ SOMASparseNDArray <- R6::R6Class(

stopifnot(is.data.frame(values))
# private$log_array_ingestion()
arr <- self$object
if (!is.null(private$tiledb_timestamp)) {
arr@timestamp <- private$tiledb_timestamp
}
#arr <- self$object
#if (!is.null(self$tiledb_timestamp)) {
# # arr@timestamp <- self$tiledb_timestamp
# arr@timestamp_end <- self$tiledb_timestamp
#}
nms <- colnames(values)

## the 'soma_data' data type may not have been cached, and if so we need to fetch it
Expand All @@ -239,11 +244,14 @@ SOMASparseNDArray <- R6::R6Class(
arrow::field(nms[3], private$.type))

tbl <- arrow::arrow_table(values, schema = arrsch)
spdl::debug("[SOMASparseNDArray::write] array created, writing to {}", self$uri)
spdl::debug("[SOMASparseNDArray$write] array created, writing to {} at ({},{})", self$uri,
self$tiledb_timestamp[1],self$tiledb_timestamp[2])
naap <- nanoarrow::nanoarrow_allocate_array()
nasp <- nanoarrow::nanoarrow_allocate_schema()
arrow::as_record_batch(tbl)$export_to_c(naap, nasp)
writeArrayFromArrow(self$uri, naap, nasp, "SOMASparseNDArray")
writeArrayFromArrow(self$uri, naap, nasp, "SOMASparseNDArray", NULL,
if (is.null(self$tiledb_timestamp[1])) NULL
else c(self$tiledb_timestamp[1], self$tiledb_timestamp[1]))
},

# Internal marking of one or zero based matrices for iterated reads
Expand Down
3 changes: 1 addition & 2 deletions apis/r/R/SOMASparseNDArrayRead.R
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ SOMASparseNDArrayRead <- R6::R6Class(
sparse_matrix = function(zero_based=FALSE) {
#TODO implement zero_based argument, currently doesn't do anything

shape <- self$shape
shape <- self$shape
# if (any(private$shape > .Machine$integer.max)) {
if (any(shape > .Machine$integer.max)) {
warning(
Expand All @@ -147,7 +147,6 @@ SOMASparseNDArrayRead <- R6::R6Class(
# private$shape <- pmin(private$shape, .Machine$integer.max)
shape <- pmin(shape, .Machine$integer.max)
}

SparseReadIter$new(self$sr, shape, zero_based = zero_based)
},

Expand Down
Loading

0 comments on commit 87b74ae

Please sign in to comment.