Skip to content

Commit

Permalink
Introduce config option to automatically create partition snapshots
Browse files Browse the repository at this point in the history
A new configurable snapshot interval is added, expressed as a minimum number of
records after which a snapshot
of the partition store will be automatically taken.
  • Loading branch information
pcholakov committed Nov 14, 2024
1 parent 484a0df commit 1d97374
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 21 deletions.
16 changes: 16 additions & 0 deletions crates/storage-api/src/fsm_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod fsm_variable {
pub(crate) const OUTBOX_SEQ_NUMBER: u64 = 1;

pub(crate) const APPLIED_LSN: u64 = 2;
pub(crate) const ARCHIVED_LSN: u64 = 3;
}

pub trait ReadOnlyFsmTable {
Expand All @@ -49,6 +50,14 @@ pub trait ReadOnlyFsmTable {
.map(|seq_number| seq_number.map(|seq_number| Lsn::from(u64::from(seq_number))))
})
}

fn get_archived_lsn(&mut self) -> impl Future<Output = Result<Option<Lsn>>> + Send + '_ {
self.get::<SequenceNumber>(fsm_variable::ARCHIVED_LSN)
.map(|result| {
result
.map(|seq_number| seq_number.map(|seq_number| Lsn::from(u64::from(seq_number))))
})
}
}

pub trait FsmTable: ReadOnlyFsmTable {
Expand All @@ -67,6 +76,13 @@ pub trait FsmTable: ReadOnlyFsmTable {
)
}

fn put_archived_lsn(&mut self, lsn: Lsn) -> impl Future<Output = ()> + Send {
self.put(
fsm_variable::ARCHIVED_LSN,
SequenceNumber::from(u64::from(lsn)),
)
}

fn put_inbox_seq_number(
&mut self,
seq_number: MessageIndex,
Expand Down
1 change: 1 addition & 0 deletions crates/types/protobuf/restate/cluster.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ message PartitionProcessorStatus {
uint64 num_skipped_records = 8;
ReplayStatus replay_status = 9;
optional restate.common.Lsn last_persisted_log_lsn = 10;
optional restate.common.Lsn last_archived_log_lsn = 12;
// Set if replay_status is CATCHING_UP
optional restate.common.Lsn target_tail_lsn = 11;
}
2 changes: 2 additions & 0 deletions crates/types/src/cluster/cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub struct PartitionProcessorStatus {
pub num_skipped_records: u64,
pub replay_status: ReplayStatus,
pub last_persisted_log_lsn: Option<Lsn>,
pub last_archived_log_lsn: Option<Lsn>,
// Set if replay_status is CatchingUp
pub target_tail_lsn: Option<Lsn>,
}
Expand All @@ -145,6 +146,7 @@ impl Default for PartitionProcessorStatus {
num_skipped_records: 0,
replay_status: ReplayStatus::Starting,
last_persisted_log_lsn: None,
last_archived_log_lsn: None,
target_tail_lsn: None,
}
}
Expand Down
9 changes: 5 additions & 4 deletions crates/types/src/config/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ static HOSTNAME: Lazy<String> = Lazy::new(|| {
});

impl CommonOptions {
pub fn shutdown_grace_period(&self) -> std::time::Duration {
pub fn shutdown_grace_period(&self) -> Duration {
self.shutdown_timeout.into()
}
// todo: It's imperative that the node doesn't change its name after start. Move this to a
Expand Down Expand Up @@ -353,7 +353,7 @@ impl Default for CommonOptions {
histogram_inactivity_timeout: None,
disable_prometheus: false,
service_client: Default::default(),
shutdown_timeout: std::time::Duration::from_secs(60).into(),
shutdown_timeout: Duration::from_secs(60).into(),
tracing: TracingOptions::default(),
log_filter: "warn,restate=info".to_string(),
log_format: Default::default(),
Expand All @@ -365,11 +365,11 @@ impl Default for CommonOptions {
rocksdb_total_memory_size: NonZeroUsize::new(6_000_000_000).unwrap(), // 4GB
rocksdb_bg_threads: None,
rocksdb_high_priority_bg_threads: NonZeroU32::new(2).unwrap(),
rocksdb_write_stall_threshold: std::time::Duration::from_secs(3).into(),
rocksdb_write_stall_threshold: Duration::from_secs(3).into(),
rocksdb_enable_stall_on_memory_limit: false,
rocksdb_perf_level: PerfStatsLevel::EnableCount,
rocksdb: Default::default(),
metadata_update_interval: std::time::Duration::from_secs(3).into(),
metadata_update_interval: Duration::from_secs(3).into(),
network_error_retry_policy: RetryPolicy::exponential(
Duration::from_millis(10),
2.0,
Expand Down Expand Up @@ -598,6 +598,7 @@ impl Default for TracingOptions {
}
}
}

#[cfg(test)]
mod tests {
use crate::nodes_config::Role;
Expand Down
26 changes: 23 additions & 3 deletions crates/types/src/config/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::num::{NonZeroU16, NonZeroUsize};
use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
use std::path::PathBuf;
use std::time::Duration;
use tracing::warn;
Expand Down Expand Up @@ -61,7 +61,10 @@ pub struct WorkerOptions {
/// value is, the higher the throughput and latency are.
max_command_batch_size: NonZeroUsize,

#[serde(flatten)]
/// # Snapshots
///
/// When using the replicated loglet in a distributed cluster, snapshots provide a mechanism for
/// safely trimming the log and for bootstrapping new worker nodes.
pub snapshots: SnapshotsOptions,
}

Expand Down Expand Up @@ -355,7 +358,24 @@ impl Default for StorageOptions {
#[cfg_attr(feature = "schemars", schemars(rename = "SnapshotsOptions", default))]
#[serde(rename_all = "kebab-case")]
#[builder(default)]
pub struct SnapshotsOptions {}
pub struct SnapshotsOptions {
/// # Automatic snapshot creation frequency
///
/// Number of log records that trigger a snapshot to be created.
///
/// As snapshots are created asynchronously, the actual number of new records that will trigger
/// a snapshot will vary. The counter for the subsequent snapshot begins from the LSN at which
/// the previous snapshot export was initiated. Only leader Partition Processors will take
/// snapshots for a given partition.
///
/// This setting does not influence explicitly requested snapshots triggered using `restatectl`.
///
/// Default: `None` - automatic snapshots are disabled by default
pub snapshot_interval_num_records: Option<NonZeroU64>,
}

// [worker.snapshots]
// snapshot-interval-num-records = 100_000

impl SnapshotsOptions {
pub fn snapshots_base_dir(&self) -> PathBuf {
Expand Down
13 changes: 13 additions & 0 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ where
status_watch_tx,
status,
inflight_create_snapshot_task: None,
last_snapshot_lsn_watch: watch::channel(None),
})
}

Expand Down Expand Up @@ -255,6 +256,7 @@ pub struct PartitionProcessor<Codec, InvokerSender> {
max_command_batch_size: usize,
partition_store: PartitionStore,
inflight_create_snapshot_task: Option<TaskHandle<anyhow::Result<()>>>,
last_snapshot_lsn_watch: (watch::Sender<Option<Lsn>>, watch::Receiver<Option<Lsn>>),
}

impl<Codec, InvokerSender> PartitionProcessor<Codec, InvokerSender>
Expand Down Expand Up @@ -291,6 +293,9 @@ where
let last_applied_lsn = last_applied_lsn.unwrap_or(Lsn::INVALID);

self.status.last_applied_log_lsn = Some(last_applied_lsn);
self.last_snapshot_lsn_watch
.0
.send(partition_store.get_archived_lsn().await?)?;

// propagate errors and let the PPM handle error retries
let current_tail = self
Expand Down Expand Up @@ -399,6 +404,7 @@ where
self.on_rpc(rpc, &mut partition_store).await;
}
_ = status_update_timer.tick() => {
self.status.last_archived_log_lsn = *self.last_snapshot_lsn_watch.1.borrow();
self.status_watch_tx.send_modify(|old| {
old.clone_from(&self.status);
old.updated_at = MillisSinceEpoch::now();
Expand Down Expand Up @@ -524,6 +530,8 @@ where
let snapshot_base_path = config.worker.snapshots.snapshots_dir(self.partition_id);
let partition_store = self.partition_store.clone();
let snapshot_span = tracing::info_span!("create-snapshot");
let snapshot_lsn_tx = self.last_snapshot_lsn_watch.0.clone();

let inflight_create_snapshot_task = restate_core::task_center().spawn_unmanaged(
TaskKind::PartitionSnapshotProducer,
"create-snapshot",
Expand All @@ -536,6 +544,11 @@ where
)
.await;

if let Ok(metadata) = result.as_ref() {
// update the Partition Processor's internal state
let _ = snapshot_lsn_tx.send(Some(metadata.min_applied_lsn));
}

if let Some(tx) = maybe_sender {
tx.send(result.map(|metadata| metadata.snapshot_id)).ok();
}
Expand Down
14 changes: 13 additions & 1 deletion crates/worker/src/partition/snapshot_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ use std::path::PathBuf;
use std::time::SystemTime;

use anyhow::bail;
use tracing::{info, warn};
use tracing::{debug, info, warn};

use restate_partition_store::snapshots::{PartitionSnapshotMetadata, SnapshotFormatVersion};
use restate_partition_store::PartitionStore;
use restate_storage_api::fsm_table::{FsmTable, ReadOnlyFsmTable};
use restate_storage_api::Transaction;
use restate_types::identifiers::SnapshotId;

/// Encapsulates producing a Restate partition snapshot out of a partition store.
Expand Down Expand Up @@ -69,6 +71,16 @@ impl SnapshotProducer {
"Partition snapshot written"
);

let previous_archived_snapshot_lsn = partition_store.get_archived_lsn().await?;
let mut tx = partition_store.transaction();
tx.put_archived_lsn(snapshot.min_applied_lsn).await;
tx.commit().await?;
debug!(
previous_archived_lsn = ?previous_archived_snapshot_lsn,
updated_archived_lsn = ?snapshot.min_applied_lsn,
"Updated persisted archived snapshot LSN"
);

Ok(snapshot_meta)
}
}
67 changes: 61 additions & 6 deletions crates/worker/src/partition_processor_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ mod persisted_lsn_watchdog;
mod processor_state;
mod spawn_processor_task;

use std::collections::BTreeMap;
use std::ops::RangeInclusive;
use std::sync::Arc;

use futures::stream::StreamExt;
use metrics::gauge;
use std::collections::BTreeMap;
use std::ops::{Add, RangeInclusive};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::sync::{mpsc, watch};
use tokio::task::JoinSet;
use tokio::time::MissedTickBehavior;
use tracing::{debug, error, info, instrument, warn};

use crate::metric_definitions::NUM_ACTIVE_PARTITIONS;
Expand Down Expand Up @@ -53,7 +55,7 @@ use restate_types::epoch::EpochMetadata;
use restate_types::health::HealthStatus;
use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey};
use restate_types::live::Live;
use restate_types::logs::Lsn;
use restate_types::logs::{Lsn, SequenceNumber};
use restate_types::metadata_store::keys::partition_processor_epoch_key;
use restate_types::net::metadata::MetadataKind;
use restate_types::net::partition_processor::{
Expand Down Expand Up @@ -215,12 +217,18 @@ impl PartitionProcessorManager {
let mut logs_version_watcher = self.metadata.watch(MetadataKind::Logs);
let mut partition_table_version_watcher = self.metadata.watch(MetadataKind::PartitionTable);

let mut latest_snapshot_check_interval = tokio::time::interval(Duration::from_secs(5));
latest_snapshot_check_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

self.health_status.update(WorkerStatus::Ready);
loop {
tokio::select! {
Some(command) = self.rx.recv() => {
self.on_command(command);
}
_ = latest_snapshot_check_interval.tick() => {
self.request_partition_snapshots();
}
Some(control_processors) = self.incoming_update_processors.next() => {
self.pending_control_processors = Some(control_processors.into_body());
self.on_control_processors();
Expand Down Expand Up @@ -285,7 +293,8 @@ impl PartitionProcessorManager {
}
}

#[instrument(level = "debug", skip_all, fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner)))]
#[instrument(level = "debug", skip_all, fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner)
))]
fn on_asynchronous_event(&mut self, event: AsynchronousEvent) {
let AsynchronousEvent {
partition_id,
Expand Down Expand Up @@ -647,6 +656,52 @@ impl PartitionProcessorManager {
}
}

fn request_partition_snapshots(&mut self) {
let Some(records_per_snapshot) = self
.updateable_config
.live_load()
.worker
.snapshots
.snapshot_interval_num_records
else {
return;
};

for (partition_id, state) in self.processor_states.iter() {
let status = state.partition_processor_status();
match status {
Some(status)
if status.replay_status == ReplayStatus::Active
&& status.last_applied_log_lsn.unwrap_or(Lsn::INVALID)
>= status
.last_archived_log_lsn
.unwrap_or(Lsn::OLDEST)
.add(Lsn::from(records_per_snapshot.get())) =>
{
debug!(
%partition_id,
last_archived_lsn = %status.last_archived_log_lsn.unwrap_or(SequenceNumber::OLDEST),
last_applied_lsn = %status.last_applied_log_lsn.unwrap_or(SequenceNumber::INVALID),
"Creating partition snapshot",
);
let (tx, _) = oneshot::channel();

// ignore errors and don't request further snapshots if internal queue is full; we will try again later
if self
.tx
.try_send(ProcessorsManagerCommand::CreateSnapshot(*partition_id, tx))
.is_err()
{
break;
}
}
_ => {
continue;
}
}
}
}

/// Creates a task that when started will spawn a new partition processor.
///
/// This allows multiple partition processors to be started concurrently without holding
Expand Down
22 changes: 15 additions & 7 deletions tools/restatectl/src/commands/partition/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ use crate::util::grpc_connect;
#[command(
after_long_help = "In addition to partition processors, the command displays the current \
sequencer for the partition's log when the reported applied LSN falls within the tail a \
replicated segment, under the heading SEQNCR. If ANSI color is enabled, the leadership epoch \
and the active sequencer will be highlighted in green they are the most recent and co-located \
with the leader processor, respectively."
replicated segment. If ANSI color is enabled, the leadership epoch and the active sequencer \
will be highlighted in green they are the most recent and co-located with the leader \
processor, respectively."
)]
pub struct ListPartitionsOpts {
/// Sort order
Expand Down Expand Up @@ -139,10 +139,11 @@ pub async fn list_partitions(
"STATUS",
"LEADER",
"EPOCH",
"SEQNCR",
"APPLIED",
"PERSISTED",
"SKIPPED",
"SEQUENCER",
"APPLIED-LSN",
"PERSISTED-LSN",
"SKIPPED-RECORDS",
"ARCHIVED-LSN",
"LAST-UPDATE",
]);

Expand Down Expand Up @@ -271,6 +272,13 @@ pub async fn list_partitions(
.unwrap_or("-".to_owned()),
),
Cell::new(processor.status.num_skipped_records),
Cell::new(
processor
.status
.last_archived_log_lsn
.map(|x| x.to_string())
.unwrap_or("-".to_owned()),
),
render_as_duration(processor.status.updated_at, Tense::Past),
]);
});
Expand Down

0 comments on commit 1d97374

Please sign in to comment.