From fab2a2a93b0929f8e7e2cfec93f1bb0ef03c0b9f Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Wed, 13 Nov 2024 17:26:55 +0200 Subject: [PATCH 1/2] [Produce-Snapshot] Update put snapshot key format --- .../src/partition/snapshots/repository.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/crates/worker/src/partition/snapshots/repository.rs b/crates/worker/src/partition/snapshots/repository.rs index 7df32b7b9..9f7b3ae44 100644 --- a/crates/worker/src/partition/snapshots/repository.rs +++ b/crates/worker/src/partition/snapshots/repository.rs @@ -89,7 +89,7 @@ impl SnapshotRepository { object_store, destination, prefix, - staging_path: base_dir.clone(), + staging_path: base_dir.clone().join("snapshot-staging"), }) } @@ -115,20 +115,16 @@ impl SnapshotRepository { // reverse order. We inject an explicit sort key into the snapshot prefix to make sure that // the latest snapshot is always first. let inverted_sort_key = format!("{:016x}", u64::MAX - lsn.as_u64()); - let key = format!( - "{partition_id}/{sk}/{snapshot_id}_{lsn}.tar", - sk = inverted_sort_key, - ); - // The snapshot data / metadata key format is: [/]//_.tar + // The snapshot data / metadata key format is: [/]/__.tar let snapshot_key = match self.prefix.as_str() { "" | "/" => format!( - "{partition_id}/{sk}/{snapshot_id}_{lsn}.tar", + "{partition_id}/{sk}_{lsn}_{snapshot_id}.tar", sk = inverted_sort_key, lsn = metadata.min_applied_lsn, ), prefix => format!( - "{trimmed_prefix}/{partition_id}/{sk}/{snapshot_id}_{lsn}.tar", + "{trimmed_prefix}/{partition_id}/{sk}_{lsn}_{snapshot_id}.tar", trimmed_prefix = prefix.trim_start_matches('/').trim_end_matches('/'), sk = inverted_sort_key, ), @@ -161,6 +157,10 @@ impl SnapshotRepository { let upload = self .object_store .put(&object_store::path::Path::from(snapshot_key), payload) + .put( + &object_store::path::Path::from(snapshot_key.clone()), + payload, + ) .await .context("Failed to put snapshot in repository")?; @@ -168,7 +168,7 @@ impl SnapshotRepository { %snapshot_id, etag = upload.e_tag.unwrap_or_default(), "Successfully published snapshot to repository as: {}", - key, + snapshot_key, ); Ok(()) } From c747970b1e3370e2abe541082db0922aba1509a1 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Wed, 13 Nov 2024 18:16:17 +0200 Subject: [PATCH 2/2] Introduce SnapshotRepository find_latest and wire up partition restore --- crates/core/src/task_center.rs | 17 ++++ crates/worker/Cargo.toml | 2 +- crates/worker/src/partition/mod.rs | 13 ++- .../src/partition/snapshots/repository.rs | 80 ++++++++++++++++++- .../worker/src/partition_processor_manager.rs | 68 +++++++++++++--- 5 files changed, 159 insertions(+), 21 deletions(-) diff --git a/crates/core/src/task_center.rs b/crates/core/src/task_center.rs index 83837edba..63039b016 100644 --- a/crates/core/src/task_center.rs +++ b/crates/core/src/task_center.rs @@ -651,6 +651,23 @@ impl TaskCenter { .spawn_blocking(move || tc.block_on(name, partition_id, future)) } + // Spawn a function on a blocking thread. + pub fn spawn_blocking_fn_unmanaged( + &self, + name: &'static str, + partition_id: Option, + f: F, + ) -> tokio::task::JoinHandle + where + F: FnOnce() -> O + Send + 'static, + O: Send + 'static, + { + let tc = self.clone(); + self.inner + .default_runtime_handle + .spawn_blocking(move || tc.run_in_scope_sync(name, partition_id, f)) + } + /// Cancelling the child will not cancel the parent. Note that parent task will not /// wait for children tasks. The parent task is allowed to finish before children. #[track_caller] diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 203b5fe9e..1cdd24d62 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -72,7 +72,7 @@ tempfile = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } -tokio-util = { workspace = true } +tokio-util = { workspace = true, features = ["io-util"] } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } url = { workspace = true } diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 139411cff..1b3126fd2 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -78,7 +78,7 @@ mod cleaner; pub mod invoker_storage_reader; mod leadership; pub mod shuffle; -mod snapshots; +pub mod snapshots; mod state_machine; pub mod types; @@ -148,6 +148,7 @@ where bifrost: Bifrost, mut partition_store: PartitionStore, configuration: Live, + snapshot_repository: SnapshotRepository, ) -> Result, StorageError> { let PartitionProcessorBuilder { partition_id, @@ -197,13 +198,9 @@ where last_seen_leader_epoch, ); - let config = configuration.pinned(); - let snapshot_producer = SnapshotProducer::create( - partition_store.clone(), - configuration, - SnapshotRepository::create(config.common.base_dir(), &config.worker.snapshots).await?, - ) - .await?; + let snapshot_producer = + SnapshotProducer::create(partition_store.clone(), configuration, snapshot_repository) + .await?; Ok(PartitionProcessor { task_center, diff --git a/crates/worker/src/partition/snapshots/repository.rs b/crates/worker/src/partition/snapshots/repository.rs index 9f7b3ae44..0c6353d65 100644 --- a/crates/worker/src/partition/snapshots/repository.rs +++ b/crates/worker/src/partition/snapshots/repository.rs @@ -19,11 +19,12 @@ use aws_credential_types::provider::ProvideCredentials; use object_store::aws::AmazonS3Builder; use object_store::{ObjectStore, PutPayload}; use tempfile::NamedTempFile; -use tracing::{debug, trace_span, warn}; +use tokio_util::io::{StreamReader, SyncIoBridge}; +use tracing::{debug, trace, trace_span, warn}; use url::Url; use restate_core::task_center; -use restate_partition_store::snapshots::PartitionSnapshotMetadata; +use restate_partition_store::snapshots::{LocalPartitionSnapshot, PartitionSnapshotMetadata}; use restate_types::config::SnapshotsOptions; use restate_types::identifiers::PartitionId; @@ -156,7 +157,6 @@ impl SnapshotRepository { let upload = self .object_store - .put(&object_store::path::Path::from(snapshot_key), payload) .put( &object_store::path::Path::from(snapshot_key.clone()), payload, @@ -172,6 +172,80 @@ impl SnapshotRepository { ); Ok(()) } + + pub(crate) async fn find_latest( + &self, + partition_id: PartitionId, + ) -> anyhow::Result> { + let list_prefix = match self.prefix.as_str() { + "" | "/" => format!("{}/", partition_id), + prefix => format!("{}/{}/", prefix, partition_id), + }; + let list_prefix = object_store::path::Path::from(list_prefix.as_str()); + + let list = self + .object_store + .list_with_delimiter(Some(&list_prefix)) + .await?; + + let latest = list.objects.first(); + + let Some(snapshot_entry) = latest else { + debug!(%partition_id, "No snapshots found in the snapshots repository"); + return Ok(None); + }; + + let snapshot_object = self + .object_store + .get(&snapshot_entry.location) + .await + .context("Failed to get snapshot from repository")?; + + // construct the bridge in a Tokio context, before moving to blocking pool + let snapshot_reader = SyncIoBridge::new(StreamReader::new(snapshot_object.into_stream())); + + let snapshot_name = snapshot_entry.location.filename().expect("has a name"); + let snapshot_base_path = &self.staging_path.join(snapshot_name); + tokio::fs::create_dir_all(snapshot_base_path).await?; + + let snapshot_dir = snapshot_base_path.clone(); + trace!(%partition_id, "Unpacking snapshot {} to: {:?}", snapshot_entry.location, snapshot_dir); + task_center() + .spawn_blocking_fn_unmanaged("unpack-snapshot", Some(partition_id), move || { + let mut tarball = tar::Archive::new(snapshot_reader); + for file in tarball.entries()? { + let mut file = file?; + trace!("Unpacking snapshot file: {:?}", file.header().path()?); + file.unpack_in(&snapshot_dir)?; + } + Ok::<(), anyhow::Error>(()) + }) + .await??; + + let metadata = tokio::fs::read(snapshot_base_path.join("metadata.json")).await?; + let mut metadata: PartitionSnapshotMetadata = serde_json::from_slice(metadata.as_slice())?; + + // Patch the file paths in the snapshot metadata to point to the correct staging directory on the local node. + let snapshot_base_path = snapshot_base_path + .to_path_buf() + .into_os_string() + .into_string() + .map_err(|path| anyhow::anyhow!("Invalid string: {:?}", path))? + .trim_end_matches('/') + .to_owned(); + metadata + .files + .iter_mut() + .for_each(|f| f.directory = snapshot_base_path.clone()); + trace!(%partition_id, "Restoring from snapshot metadata: {:?}", metadata); + + Ok(Some(LocalPartitionSnapshot { + base_dir: self.staging_path.clone(), + min_applied_lsn: metadata.min_applied_lsn, + db_comparator_name: metadata.db_comparator_name, + files: metadata.files, + })) + } } #[derive(Debug)] diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index f0f9b95ef..cd89883cc 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -76,6 +76,7 @@ use crate::metric_definitions::PARTITION_LAST_PERSISTED_LOG_LSN; use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_RECORD; use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_STATUS_UPDATE; use crate::partition::invoker_storage_reader::InvokerStorageReader; +use crate::partition::snapshots::SnapshotRepository; use crate::partition::PartitionProcessorControlCommand; use crate::PartitionProcessorBuilder; @@ -1124,6 +1125,9 @@ impl SpawnPartitionProcessorTask { invoker.handle(), ); + let snapshot_repository = + SnapshotRepository::create(config.common.base_dir(), &config.worker.snapshots).await?; + let invoker_name = Box::leak(Box::new(format!("invoker-{}", partition_id))); let invoker_config = configuration.clone().map(|c| &c.worker.invoker); @@ -1135,14 +1139,54 @@ impl SpawnPartitionProcessorTask { { let options = options.clone(); let key_range = key_range.clone(); - let partition_store = partition_store_manager - .open_partition_store( - partition_id, - key_range, - OpenMode::CreateIfMissing, - &options.storage.rocksdb, - ) - .await?; + + let partition_store = if !partition_store_manager + .has_partition(pp_builder.partition_id) + .await + { + info!( + partition_id = %partition_id, + "Looking for store snapshot to bootstrap partition", + ); + let snapshot = snapshot_repository.find_latest(partition_id).await?; + if let Some(snapshot) = snapshot { + info!( + partition_id = %partition_id, + "Found snapshot to bootstrap partition, restoring it", + ); + partition_store_manager + .restore_partition_store_snapshot( + partition_id, + key_range.clone(), + snapshot, + &options.storage.rocksdb, + ) + .await? + } else { + info!( + partition_id = %partition_id, + "No snapshot found to bootstrap partition, creating new store", + ); + partition_store_manager + .open_partition_store( + partition_id, + key_range, + OpenMode::CreateIfMissing, + &options.storage.rocksdb, + ) + .await? + } + } else { + partition_store_manager + .open_partition_store( + partition_id, + key_range, + OpenMode::OpenExisting, + &options.storage.rocksdb, + ) + .await? + }; + move || async move { tc.spawn_child( TaskKind::SystemService, @@ -1152,7 +1196,13 @@ impl SpawnPartitionProcessorTask { )?; let err = pp_builder - .build::(tc, bifrost, partition_store, configuration) + .build::( + tc, + bifrost, + partition_store, + configuration, + snapshot_repository, + ) .await? .run() .await