Replies: 7 comments
-
Cool idea! As of 52657ea, you can aggregate with DuckDB in a downstream target. Sketch: library(targets)
library(tarchetypes)
scenarios <- tidyr::expand_grid(
tibble::tibble(
n1 = c(20, 30, 50),
n2 = c(40, 30, 50)
),
delta = c(0.2, 0.4, 0.6)
)
generate_data <- function(n1, n2, delta) {
tibble::tibble(
id = seq_len(n1 + n2),
group = rep(seq_len(2), c(n1, n2)),
y = rnorm(sum(n1 + n2), mean = dplyr::if_else(group == 1, 0, delta))
)
}
analyze_data <- function(data, design_parameter1, design_paramter2) {
t.test(y ~ group, data = data) |>
broom::tidy()
}
aggregate_upstream_branches <- function() {
# Get the names of all upstream branches
# while making sure not to load detritus leftover in _targets/objects/:
upstream <- targets::tar_definition()$subpipeline$targets
classes <- purrr::map_chr(as.list(upstream), ~class(.x)[1L])
branches <- names(upstream)[classes != "tar_pattern"]
# Robustly get the destination file:
objects <- file.path(targets::tar_path_store(), "objects")
destination <- file.path(objects, targets::tar_name())
# Build the query. This query can be quite long since it does not
# use a grep pattern, and I hope this does not cut down on efficiency,
# but it is important because we want to make sure we select
# only the correct branches.
base <- "COPY (SELECT * FROM read_parquet([%s])) TO '%s' (FORMAT 'parquet')"
paths <- sprintf("'%s'", file.path(objects, branches))
query <- sprintf(base, paste(paths, collapse = ","), destination)
# Run the query and make sure the connection closes on exit
# (even in case of errors).
connection <- DBI::dbConnect(duckdb::duckdb(), dbdir = ":memory:")
on.exit(DBI::dbDisconnect(connection))
DBI::dbExecute(connection, query)
}
results <- tar_map_rep(
name = results,
command = generate_data(n1, n2, delta) |>
analyze_data(),
values = scenarios,
names = everything(),
batches = 10,
reps = 10,
format = tar_format_nanoparquet(),
memory = "transient",
garbage_collection = 100,
combine = FALSE
)
list(
results,
tar_target_raw( # Allow the `deps` argument.
name = "combined",
command = quote(aggregate_upstream_branches()),
# List all dependencies manually.
deps = c(names(results$static_branches), "aggregate_upstream_branches"),
# The tidyverse team really likes nanoparquet,
# although it does not support lists.
format = tar_format_nanoparquet(),
# Let aggregate_upstream_branches() retreive the upstream dependency data:
retrieval = "none",
# Let aggregate_upstream_branches() store the output data:
storage = "none",
)
) This is all a bit low-level, so target factories for aggregation will be powerful. Target factories will help people grab efficient aggregation tools of the shelf instead of reinventing the wheel using advanced knowledge of I supplied all 900 branch names literally in the query instead of using a regex. This may make the query less efficient (hopefully not by much) but it is important because detritus from previous runs of the pipeline can be leftover in With the new |
Beta Was this translation helpful? Give feedback.
-
Hey Will, thanks for the pointers - a welcome reason for me to dig deeper into {targets} :) One issue with storing the individual batches in a db right away would be concurrent writing from many processes. Not really what {duckdb} seems to be optimized for. Maybe requires a different db. Maybe that's the even better approach - modifying Will explore a bit more how your approach scales. A couple of hundred / low four digit scenarios should be covered for it to be practical. Beyond that one probably needa a database as storage. |
Beta Was this translation helpful? Give feedback.
-
I would like to see how far we can take |
Beta Was this translation helpful? Give feedback.
-
Edit: due to recent changes in development Ultimately, I think it will be nicer on many levels to aggregate at the level of each scenario instead of across scenarios. |
Beta Was this translation helpful? Give feedback.
-
Converting to a discussion for |
Beta Was this translation helpful? Give feedback.
-
The next version of # _targets.R
library(tarchetypes)
assess_hyperparameters <- function(sigma1, sigma2) {
posterior_samples <- stats::rnorm(1000, 0, sigma1 + sigma2)
tibble::tibble(
posterior_median = median(posterior_samples),
posterior_quantile_0.025 = quantile(posterior_samples, 0.025),
posterior_quantile_0.975 = quantile(posterior_samples, 0.975)
)
}
hyperparameters <- tibble::tibble(
scenario = c("tight", "medium", "diffuse"),
sigma1 = c(10, 50, 50),
sigma2 = c(10, 5, 10)
)
list(
tar_map_rep(
name = sensitivity_analysis,
command = assess_hyperparameters(sigma1, sigma2),
values = hyperparameters,
names = tidyselect::any_of("scenario"),
batches = 2,
reps = 3
)
) graph LR
style Legend fill:#FFFFFF00,stroke:#000000;
style Graph fill:#FFFFFF00,stroke:#000000;
subgraph Legend
direction LR
xf1522833a4d242c5([""Up to date""]):::uptodate --- xd03d7c7dd2ddda2b([""Stem""]):::none
xd03d7c7dd2ddda2b([""Stem""]):::none --- x6f7e04ea3427f824[""Pattern""]:::none
x6f7e04ea3427f824[""Pattern""]:::none --- xbecb13963f49e50b{{""Object""}}:::none
xbecb13963f49e50b{{""Object""}}:::none --- xeb2d7cac8a1ce544>""Function""]:::none
end
subgraph Graph
direction LR
x116cbcb96f639238["sensitivity_analysis_medium"]:::uptodate --> x05d492c93c627756(["sensitivity_analysis_medium_combine"]):::uptodate
xfb618e5f0bf9f491>"assess_hyperparameters"]:::uptodate --> x0bcddef2bb7ac2fd["sensitivity_analysis_tight"]:::uptodate
x4cfe997ecf486280(["sensitivity_analysis_batch"]):::uptodate --> x0bcddef2bb7ac2fd["sensitivity_analysis_tight"]:::uptodate
xfb618e5f0bf9f491>"assess_hyperparameters"]:::uptodate --> x116cbcb96f639238["sensitivity_analysis_medium"]:::uptodate
x4cfe997ecf486280(["sensitivity_analysis_batch"]):::uptodate --> x116cbcb96f639238["sensitivity_analysis_medium"]:::uptodate
x97d6cb6d826356c7["sensitivity_analysis_diffuse"]:::uptodate --> xeac3a51d65d82961(["sensitivity_analysis_diffuse_combine"]):::uptodate
x0bcddef2bb7ac2fd["sensitivity_analysis_tight"]:::uptodate --> xc33686cd7ec32d0b(["sensitivity_analysis_tight_combine"]):::uptodate
xfb618e5f0bf9f491>"assess_hyperparameters"]:::uptodate --> x97d6cb6d826356c7["sensitivity_analysis_diffuse"]:::uptodate
x4cfe997ecf486280(["sensitivity_analysis_batch"]):::uptodate --> x97d6cb6d826356c7["sensitivity_analysis_diffuse"]:::uptodate
xeac3a51d65d82961(["sensitivity_analysis_diffuse_combine"]):::uptodate --> x6e393b7cbdce2e26(["sensitivity_analysis"]):::uptodate
x05d492c93c627756(["sensitivity_analysis_medium_combine"]):::uptodate --> x6e393b7cbdce2e26(["sensitivity_analysis"]):::uptodate
xc33686cd7ec32d0b(["sensitivity_analysis_tight_combine"]):::uptodate --> x6e393b7cbdce2e26(["sensitivity_analysis"]):::uptodate
xce1fb2071ac2c431{{"hyperparameters"}}:::uptodate --> xce1fb2071ac2c431{{"hyperparameters"}}:::uptodate
end
classDef uptodate stroke:#000000,color:#ffffff,fill:#354823;
classDef none stroke:#000000,color:#000000,fill:#94a4ac;
linkStyle 0 stroke-width:0px;
linkStyle 1 stroke-width:0px;
linkStyle 2 stroke-width:0px;
linkStyle 3 stroke-width:0px;
linkStyle 16 stroke-width:0px;
|
Beta Was this translation helpful? Give feedback.
-
Prework
Proposal
tar_map_rep()
is very handy for large scale statistical simulations. However, with 100s of scenarios and 100k+ reps per scenario the resulting data can become quite big and the combination of the individual batches becomes slow.An alternative would be to use {duckdb} to combine the individual targets (using parquet?) this can be much faster and can automatically buffer to hard disk if memory is running out.
Removing
combine = FALSE
results in about 20s for aggregation and 14Gb of memory usage.With {duckdb} only 3Gb of memory are used and the aggregation is almost instantaneous (to be fair, the results are not written to disk, but using {duckdb} instead of {duckplyr} this should not be an issue and there is no need to load the full data into memory at any point)
Memory usage is very low ~350 Mb.
You could even use {duckdb} for downstream processing and aggregation although {dplyr} still seems to have an edge here (2 vCPUs).
I would argue that the main advantage of using {duckdb} is the more robust combination of results (doesn't crash and is faster). One could even consider using a native .duckdb file as format instead of going via parquet.
Beta Was this translation helpful? Give feedback.
All reactions