From 17e437ff47d42a4458e79c1791008b7d8b458738 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Fri, 8 Nov 2024 15:08:18 +0200 Subject: [PATCH] Introduce config option to automatically create partition snapshots A new configurable snapshot interval is added, expressed as a minimum number of records after which a snapshot of the partition store will be automatically taken. --- crates/storage-api/src/fsm_table/mod.rs | 16 +++++ crates/types/protobuf/restate/cluster.proto | 1 + crates/types/src/cluster/cluster_state.rs | 2 + crates/types/src/config/common.rs | 9 +-- crates/types/src/config/worker.rs | 26 ++++++- crates/worker/src/partition/mod.rs | 13 ++++ .../worker/src/partition/snapshot_producer.rs | 14 +++- .../src/partition_processor_manager/mod.rs | 67 +++++++++++++++++-- .../restatectl/src/commands/partition/list.rs | 22 ++++-- 9 files changed, 149 insertions(+), 21 deletions(-) diff --git a/crates/storage-api/src/fsm_table/mod.rs b/crates/storage-api/src/fsm_table/mod.rs index 550f31fd3..ae75ea0d7 100644 --- a/crates/storage-api/src/fsm_table/mod.rs +++ b/crates/storage-api/src/fsm_table/mod.rs @@ -25,6 +25,7 @@ mod fsm_variable { pub(crate) const OUTBOX_SEQ_NUMBER: u64 = 1; pub(crate) const APPLIED_LSN: u64 = 2; + pub(crate) const ARCHIVED_LSN: u64 = 3; } pub trait ReadOnlyFsmTable { @@ -49,6 +50,14 @@ pub trait ReadOnlyFsmTable { .map(|seq_number| seq_number.map(|seq_number| Lsn::from(u64::from(seq_number)))) }) } + + fn get_archived_lsn(&mut self) -> impl Future>> + Send + '_ { + self.get::(fsm_variable::ARCHIVED_LSN) + .map(|result| { + result + .map(|seq_number| seq_number.map(|seq_number| Lsn::from(u64::from(seq_number)))) + }) + } } pub trait FsmTable: ReadOnlyFsmTable { @@ -67,6 +76,13 @@ pub trait FsmTable: ReadOnlyFsmTable { ) } + fn put_archived_lsn(&mut self, lsn: Lsn) -> impl Future + Send { + self.put( + fsm_variable::ARCHIVED_LSN, + SequenceNumber::from(u64::from(lsn)), + ) + } + fn put_inbox_seq_number( &mut self, seq_number: MessageIndex, diff --git a/crates/types/protobuf/restate/cluster.proto b/crates/types/protobuf/restate/cluster.proto index 52f571271..8a342c9f0 100644 --- a/crates/types/protobuf/restate/cluster.proto +++ b/crates/types/protobuf/restate/cluster.proto @@ -65,6 +65,7 @@ message PartitionProcessorStatus { uint64 num_skipped_records = 8; ReplayStatus replay_status = 9; optional restate.common.Lsn last_persisted_log_lsn = 10; + optional restate.common.Lsn last_archived_log_lsn = 12; // Set if replay_status is CATCHING_UP optional restate.common.Lsn target_tail_lsn = 11; } diff --git a/crates/types/src/cluster/cluster_state.rs b/crates/types/src/cluster/cluster_state.rs index 284e16475..cf81c8c74 100644 --- a/crates/types/src/cluster/cluster_state.rs +++ b/crates/types/src/cluster/cluster_state.rs @@ -128,6 +128,7 @@ pub struct PartitionProcessorStatus { pub num_skipped_records: u64, pub replay_status: ReplayStatus, pub last_persisted_log_lsn: Option, + pub last_archived_log_lsn: Option, // Set if replay_status is CatchingUp pub target_tail_lsn: Option, } @@ -145,6 +146,7 @@ impl Default for PartitionProcessorStatus { num_skipped_records: 0, replay_status: ReplayStatus::Starting, last_persisted_log_lsn: None, + last_archived_log_lsn: None, target_tail_lsn: None, } } diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index 221fb4746..edf9144cb 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -233,7 +233,7 @@ static HOSTNAME: Lazy = Lazy::new(|| { }); impl CommonOptions { - pub fn shutdown_grace_period(&self) -> std::time::Duration { + pub fn shutdown_grace_period(&self) -> Duration { self.shutdown_timeout.into() } // todo: It's imperative that the node doesn't change its name after start. Move this to a @@ -353,7 +353,7 @@ impl Default for CommonOptions { histogram_inactivity_timeout: None, disable_prometheus: false, service_client: Default::default(), - shutdown_timeout: std::time::Duration::from_secs(60).into(), + shutdown_timeout: Duration::from_secs(60).into(), tracing: TracingOptions::default(), log_filter: "warn,restate=info".to_string(), log_format: Default::default(), @@ -365,11 +365,11 @@ impl Default for CommonOptions { rocksdb_total_memory_size: NonZeroUsize::new(6_000_000_000).unwrap(), // 4GB rocksdb_bg_threads: None, rocksdb_high_priority_bg_threads: NonZeroU32::new(2).unwrap(), - rocksdb_write_stall_threshold: std::time::Duration::from_secs(3).into(), + rocksdb_write_stall_threshold: Duration::from_secs(3).into(), rocksdb_enable_stall_on_memory_limit: false, rocksdb_perf_level: PerfStatsLevel::EnableCount, rocksdb: Default::default(), - metadata_update_interval: std::time::Duration::from_secs(3).into(), + metadata_update_interval: Duration::from_secs(3).into(), network_error_retry_policy: RetryPolicy::exponential( Duration::from_millis(10), 2.0, @@ -598,6 +598,7 @@ impl Default for TracingOptions { } } } + #[cfg(test)] mod tests { use crate::nodes_config::Role; diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index c9205cf48..b1c816582 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; use serde_with::serde_as; -use std::num::{NonZeroU16, NonZeroUsize}; +use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize}; use std::path::PathBuf; use std::time::Duration; use tracing::warn; @@ -61,7 +61,10 @@ pub struct WorkerOptions { /// value is, the higher the throughput and latency are. max_command_batch_size: NonZeroUsize, - #[serde(flatten)] + /// # Snapshots + /// + /// Snapshots provide a mechanism for safely trimming the log and efficient bootstrapping of new + /// worker nodes. pub snapshots: SnapshotsOptions, } @@ -355,7 +358,24 @@ impl Default for StorageOptions { #[cfg_attr(feature = "schemars", schemars(rename = "SnapshotsOptions", default))] #[serde(rename_all = "kebab-case")] #[builder(default)] -pub struct SnapshotsOptions {} +pub struct SnapshotsOptions { + /// # Automatic snapshot creation frequency + /// + /// Number of log records that trigger a snapshot to be created. + /// + /// As snapshots are created asynchronously, the actual number of new records that will trigger + /// a snapshot will vary. The counter for the subsequent snapshot begins from the LSN at which + /// the previous snapshot export was initiated. Only leader Partition Processors will take + /// snapshots for a given partition. + /// + /// This setting does not influence explicitly requested snapshots triggered using `restatectl`. + /// + /// Default: `None` - automatic snapshots are disabled by default + pub snapshot_interval_num_records: Option, +} + +// [worker.snapshots] +// snapshot-interval-num-records = 100_000 impl SnapshotsOptions { pub fn snapshots_base_dir(&self) -> PathBuf { diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 289d56f67..b733dbdc4 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -212,6 +212,7 @@ where status_watch_tx, status, inflight_create_snapshot_task: None, + last_snapshot_lsn_watch: watch::channel(None), }) } @@ -255,6 +256,7 @@ pub struct PartitionProcessor { max_command_batch_size: usize, partition_store: PartitionStore, inflight_create_snapshot_task: Option>>, + last_snapshot_lsn_watch: (watch::Sender>, watch::Receiver>), } impl PartitionProcessor @@ -291,6 +293,9 @@ where let last_applied_lsn = last_applied_lsn.unwrap_or(Lsn::INVALID); self.status.last_applied_log_lsn = Some(last_applied_lsn); + self.last_snapshot_lsn_watch + .0 + .send(partition_store.get_archived_lsn().await?)?; // propagate errors and let the PPM handle error retries let current_tail = self @@ -399,6 +404,7 @@ where self.on_rpc(rpc, &mut partition_store).await; } _ = status_update_timer.tick() => { + self.status.last_archived_log_lsn = *self.last_snapshot_lsn_watch.1.borrow(); self.status_watch_tx.send_modify(|old| { old.clone_from(&self.status); old.updated_at = MillisSinceEpoch::now(); @@ -524,6 +530,8 @@ where let snapshot_base_path = config.worker.snapshots.snapshots_dir(self.partition_id); let partition_store = self.partition_store.clone(); let snapshot_span = tracing::info_span!("create-snapshot"); + let snapshot_lsn_tx = self.last_snapshot_lsn_watch.0.clone(); + let inflight_create_snapshot_task = restate_core::task_center().spawn_unmanaged( TaskKind::PartitionSnapshotProducer, "create-snapshot", @@ -536,6 +544,11 @@ where ) .await; + if let Ok(metadata) = result.as_ref() { + // update the Partition Processor's internal state + let _ = snapshot_lsn_tx.send(Some(metadata.min_applied_lsn)); + } + if let Some(tx) = maybe_sender { tx.send(result.map(|metadata| metadata.snapshot_id)).ok(); } diff --git a/crates/worker/src/partition/snapshot_producer.rs b/crates/worker/src/partition/snapshot_producer.rs index 1a3ef4200..909f88196 100644 --- a/crates/worker/src/partition/snapshot_producer.rs +++ b/crates/worker/src/partition/snapshot_producer.rs @@ -12,10 +12,12 @@ use std::path::PathBuf; use std::time::SystemTime; use anyhow::bail; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use restate_partition_store::snapshots::{PartitionSnapshotMetadata, SnapshotFormatVersion}; use restate_partition_store::PartitionStore; +use restate_storage_api::fsm_table::{FsmTable, ReadOnlyFsmTable}; +use restate_storage_api::Transaction; use restate_types::identifiers::SnapshotId; /// Encapsulates producing a Restate partition snapshot out of a partition store. @@ -69,6 +71,16 @@ impl SnapshotProducer { "Partition snapshot written" ); + let previous_archived_snapshot_lsn = partition_store.get_archived_lsn().await?; + let mut tx = partition_store.transaction(); + tx.put_archived_lsn(snapshot.min_applied_lsn).await; + tx.commit().await?; + debug!( + previous_archived_lsn = ?previous_archived_snapshot_lsn, + updated_archived_lsn = ?snapshot.min_applied_lsn, + "Updated persisted archived snapshot LSN" + ); + Ok(snapshot_meta) } } diff --git a/crates/worker/src/partition_processor_manager/mod.rs b/crates/worker/src/partition_processor_manager/mod.rs index f56c63afa..ab017eaa3 100644 --- a/crates/worker/src/partition_processor_manager/mod.rs +++ b/crates/worker/src/partition_processor_manager/mod.rs @@ -13,14 +13,16 @@ mod persisted_lsn_watchdog; mod processor_state; mod spawn_processor_task; -use std::collections::BTreeMap; -use std::ops::RangeInclusive; -use std::sync::Arc; - use futures::stream::StreamExt; use metrics::gauge; +use std::collections::BTreeMap; +use std::ops::{Add, RangeInclusive}; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::oneshot; use tokio::sync::{mpsc, watch}; use tokio::task::JoinSet; +use tokio::time::MissedTickBehavior; use tracing::{debug, error, info, instrument, warn}; use crate::metric_definitions::NUM_ACTIVE_PARTITIONS; @@ -53,7 +55,7 @@ use restate_types::epoch::EpochMetadata; use restate_types::health::HealthStatus; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey}; use restate_types::live::Live; -use restate_types::logs::Lsn; +use restate_types::logs::{Lsn, SequenceNumber}; use restate_types::metadata_store::keys::partition_processor_epoch_key; use restate_types::net::metadata::MetadataKind; use restate_types::net::partition_processor::{ @@ -215,12 +217,18 @@ impl PartitionProcessorManager { let mut logs_version_watcher = self.metadata.watch(MetadataKind::Logs); let mut partition_table_version_watcher = self.metadata.watch(MetadataKind::PartitionTable); + let mut latest_snapshot_check_interval = tokio::time::interval(Duration::from_secs(5)); + latest_snapshot_check_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + self.health_status.update(WorkerStatus::Ready); loop { tokio::select! { Some(command) = self.rx.recv() => { self.on_command(command); } + _ = latest_snapshot_check_interval.tick() => { + self.request_partition_snapshots(); + } Some(control_processors) = self.incoming_update_processors.next() => { self.pending_control_processors = Some(control_processors.into_body()); self.on_control_processors(); @@ -285,7 +293,8 @@ impl PartitionProcessorManager { } } - #[instrument(level = "debug", skip_all, fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner)))] + #[instrument(level = "debug", skip_all, fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner) + ))] fn on_asynchronous_event(&mut self, event: AsynchronousEvent) { let AsynchronousEvent { partition_id, @@ -647,6 +656,52 @@ impl PartitionProcessorManager { } } + fn request_partition_snapshots(&mut self) { + let Some(records_per_snapshot) = self + .updateable_config + .live_load() + .worker + .snapshots + .snapshot_interval_num_records + else { + return; + }; + + for (partition_id, state) in self.processor_states.iter() { + let status = state.partition_processor_status(); + match status { + Some(status) + if status.replay_status == ReplayStatus::Active + && status.last_applied_log_lsn.unwrap_or(Lsn::INVALID) + >= status + .last_archived_log_lsn + .unwrap_or(Lsn::OLDEST) + .add(Lsn::from(records_per_snapshot.get())) => + { + debug!( + %partition_id, + last_archived_lsn = %status.last_archived_log_lsn.unwrap_or(SequenceNumber::OLDEST), + last_applied_lsn = %status.last_applied_log_lsn.unwrap_or(SequenceNumber::INVALID), + "Creating partition snapshot", + ); + let (tx, _) = oneshot::channel(); + + // ignore errors and don't request further snapshots if internal queue is full; we will try again later + if self + .tx + .try_send(ProcessorsManagerCommand::CreateSnapshot(*partition_id, tx)) + .is_err() + { + break; + } + } + _ => { + continue; + } + } + } + } + /// Creates a task that when started will spawn a new partition processor. /// /// This allows multiple partition processors to be started concurrently without holding diff --git a/tools/restatectl/src/commands/partition/list.rs b/tools/restatectl/src/commands/partition/list.rs index 899b5681f..a44157725 100644 --- a/tools/restatectl/src/commands/partition/list.rs +++ b/tools/restatectl/src/commands/partition/list.rs @@ -41,9 +41,9 @@ use crate::util::grpc_connect; #[command( after_long_help = "In addition to partition processors, the command displays the current \ sequencer for the partition's log when the reported applied LSN falls within the tail a \ - replicated segment, under the heading SEQNCR. If ANSI color is enabled, the leadership epoch \ - and the active sequencer will be highlighted in green they are the most recent and co-located \ - with the leader processor, respectively." + replicated segment. If ANSI color is enabled, the leadership epoch and the active sequencer \ + will be highlighted in green they are the most recent and co-located with the leader \ + processor, respectively." )] pub struct ListPartitionsOpts { /// Sort order @@ -139,10 +139,11 @@ pub async fn list_partitions( "STATUS", "LEADER", "EPOCH", - "SEQNCR", - "APPLIED", - "PERSISTED", - "SKIPPED", + "SEQUENCER", + "APPLIED-LSN", + "PERSISTED-LSN", + "SKIPPED-RECORDS", + "ARCHIVED-LSN", "LAST-UPDATE", ]); @@ -271,6 +272,13 @@ pub async fn list_partitions( .unwrap_or("-".to_owned()), ), Cell::new(processor.status.num_skipped_records), + Cell::new( + processor + .status + .last_archived_log_lsn + .map(|x| x.to_string()) + .unwrap_or("-".to_owned()), + ), render_as_duration(processor.status.updated_at, Tense::Past), ]); });