Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create snapshots on leading Partition Processors #2253

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions crates/storage-api/src/fsm_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod fsm_variable {
pub(crate) const OUTBOX_SEQ_NUMBER: u64 = 1;

pub(crate) const APPLIED_LSN: u64 = 2;
pub(crate) const ARCHIVED_LSN: u64 = 3;
}

pub trait ReadOnlyFsmTable {
Expand All @@ -49,6 +50,14 @@ pub trait ReadOnlyFsmTable {
.map(|seq_number| seq_number.map(|seq_number| Lsn::from(u64::from(seq_number))))
})
}

fn get_archived_lsn(&mut self) -> impl Future<Output = Result<Option<Lsn>>> + Send + '_ {
self.get::<SequenceNumber>(fsm_variable::ARCHIVED_LSN)
.map(|result| {
result
.map(|seq_number| seq_number.map(|seq_number| Lsn::from(u64::from(seq_number))))
})
}
}

pub trait FsmTable: ReadOnlyFsmTable {
Expand All @@ -67,6 +76,13 @@ pub trait FsmTable: ReadOnlyFsmTable {
)
}

fn put_archived_lsn(&mut self, lsn: Lsn) -> impl Future<Output = ()> + Send {
self.put(
fsm_variable::ARCHIVED_LSN,
SequenceNumber::from(u64::from(lsn)),
)
}

fn put_inbox_seq_number(
&mut self,
seq_number: MessageIndex,
Expand Down
1 change: 1 addition & 0 deletions crates/types/protobuf/restate/cluster.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ message PartitionProcessorStatus {
uint64 num_skipped_records = 8;
ReplayStatus replay_status = 9;
optional restate.common.Lsn last_persisted_log_lsn = 10;
optional restate.common.Lsn last_archived_log_lsn = 12;
pcholakov marked this conversation as resolved.
Show resolved Hide resolved
// Set if replay_status is CATCHING_UP
optional restate.common.Lsn target_tail_lsn = 11;
}
2 changes: 2 additions & 0 deletions crates/types/src/cluster/cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub struct PartitionProcessorStatus {
pub num_skipped_records: u64,
pub replay_status: ReplayStatus,
pub last_persisted_log_lsn: Option<Lsn>,
pub last_archived_log_lsn: Option<Lsn>,
// Set if replay_status is CatchingUp
pub target_tail_lsn: Option<Lsn>,
}
Expand All @@ -145,6 +146,7 @@ impl Default for PartitionProcessorStatus {
num_skipped_records: 0,
replay_status: ReplayStatus::Starting,
last_persisted_log_lsn: None,
last_archived_log_lsn: None,
target_tail_lsn: None,
}
}
Expand Down
9 changes: 5 additions & 4 deletions crates/types/src/config/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ static HOSTNAME: Lazy<String> = Lazy::new(|| {
});

impl CommonOptions {
pub fn shutdown_grace_period(&self) -> std::time::Duration {
pub fn shutdown_grace_period(&self) -> Duration {
self.shutdown_timeout.into()
}
// todo: It's imperative that the node doesn't change its name after start. Move this to a
Expand Down Expand Up @@ -353,7 +353,7 @@ impl Default for CommonOptions {
histogram_inactivity_timeout: None,
disable_prometheus: false,
service_client: Default::default(),
shutdown_timeout: std::time::Duration::from_secs(60).into(),
shutdown_timeout: Duration::from_secs(60).into(),
tracing: TracingOptions::default(),
log_filter: "warn,restate=info".to_string(),
log_format: Default::default(),
Expand All @@ -365,11 +365,11 @@ impl Default for CommonOptions {
rocksdb_total_memory_size: NonZeroUsize::new(6_000_000_000).unwrap(), // 4GB
rocksdb_bg_threads: None,
rocksdb_high_priority_bg_threads: NonZeroU32::new(2).unwrap(),
rocksdb_write_stall_threshold: std::time::Duration::from_secs(3).into(),
rocksdb_write_stall_threshold: Duration::from_secs(3).into(),
rocksdb_enable_stall_on_memory_limit: false,
rocksdb_perf_level: PerfStatsLevel::EnableCount,
rocksdb: Default::default(),
metadata_update_interval: std::time::Duration::from_secs(3).into(),
metadata_update_interval: Duration::from_secs(3).into(),
network_error_retry_policy: RetryPolicy::exponential(
Duration::from_millis(10),
2.0,
Expand Down Expand Up @@ -598,6 +598,7 @@ impl Default for TracingOptions {
}
}
}

#[cfg(test)]
mod tests {
use crate::nodes_config::Role;
Expand Down
26 changes: 23 additions & 3 deletions crates/types/src/config/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::num::{NonZeroU16, NonZeroUsize};
use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
use std::path::PathBuf;
use std::time::Duration;
use tracing::warn;
Expand Down Expand Up @@ -61,7 +61,10 @@ pub struct WorkerOptions {
/// value is, the higher the throughput and latency are.
max_command_batch_size: NonZeroUsize,

#[serde(flatten)]
/// # Snapshots
///
/// Snapshots provide a mechanism for safely trimming the log and efficient bootstrapping of new
/// worker nodes.
pub snapshots: SnapshotsOptions,
}

Expand Down Expand Up @@ -355,7 +358,24 @@ impl Default for StorageOptions {
#[cfg_attr(feature = "schemars", schemars(rename = "SnapshotsOptions", default))]
#[serde(rename_all = "kebab-case")]
#[builder(default)]
pub struct SnapshotsOptions {}
pub struct SnapshotsOptions {
/// # Automatic snapshot creation frequency
///
/// Number of log records that trigger a snapshot to be created.
///
/// As snapshots are created asynchronously, the actual number of new records that will trigger
/// a snapshot will vary. The counter for the subsequent snapshot begins from the LSN at which
/// the previous snapshot export was initiated. Only leader Partition Processors will take
/// snapshots for a given partition.
///
/// This setting does not influence explicitly requested snapshots triggered using `restatectl`.
///
/// Default: `None` - automatic snapshots are disabled by default
pub snapshot_interval_num_records: Option<NonZeroU64>,
}

// [worker.snapshots]
// snapshot-interval-num-records = 100_000

impl SnapshotsOptions {
pub fn snapshots_base_dir(&self) -> PathBuf {
Expand Down
13 changes: 13 additions & 0 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ where
status_watch_tx,
status,
inflight_create_snapshot_task: None,
last_snapshot_lsn_watch: watch::channel(None),
})
}

Expand Down Expand Up @@ -255,6 +256,7 @@ pub struct PartitionProcessor<Codec, InvokerSender> {
max_command_batch_size: usize,
partition_store: PartitionStore,
inflight_create_snapshot_task: Option<TaskHandle<anyhow::Result<()>>>,
last_snapshot_lsn_watch: (watch::Sender<Option<Lsn>>, watch::Receiver<Option<Lsn>>),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better way to do this? I want to update the LSN from a background task spawned to manage the snapshot creation (and soon, upload to blob store).

}

impl<Codec, InvokerSender> PartitionProcessor<Codec, InvokerSender>
Expand Down Expand Up @@ -291,6 +293,9 @@ where
let last_applied_lsn = last_applied_lsn.unwrap_or(Lsn::INVALID);

self.status.last_applied_log_lsn = Some(last_applied_lsn);
self.last_snapshot_lsn_watch
.0
.send(partition_store.get_archived_lsn().await?)?;

// propagate errors and let the PPM handle error retries
let current_tail = self
Expand Down Expand Up @@ -399,6 +404,7 @@ where
self.on_rpc(rpc, &mut partition_store).await;
}
_ = status_update_timer.tick() => {
self.status.last_archived_log_lsn = *self.last_snapshot_lsn_watch.1.borrow();
self.status_watch_tx.send_modify(|old| {
old.clone_from(&self.status);
old.updated_at = MillisSinceEpoch::now();
Expand Down Expand Up @@ -524,6 +530,8 @@ where
let snapshot_base_path = config.worker.snapshots.snapshots_dir(self.partition_id);
let partition_store = self.partition_store.clone();
let snapshot_span = tracing::info_span!("create-snapshot");
let snapshot_lsn_tx = self.last_snapshot_lsn_watch.0.clone();

let inflight_create_snapshot_task = restate_core::task_center().spawn_unmanaged(
TaskKind::PartitionSnapshotProducer,
"create-snapshot",
Expand All @@ -536,6 +544,11 @@ where
)
.await;

if let Ok(metadata) = result.as_ref() {
// update the Partition Processor's internal state
let _ = snapshot_lsn_tx.send(Some(metadata.min_applied_lsn));
}

if let Some(tx) = maybe_sender {
tx.send(result.map(|metadata| metadata.snapshot_id)).ok();
}
Expand Down
14 changes: 13 additions & 1 deletion crates/worker/src/partition/snapshot_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ use std::path::PathBuf;
use std::time::SystemTime;

use anyhow::bail;
use tracing::{info, warn};
use tracing::{debug, info, warn};

use restate_partition_store::snapshots::{PartitionSnapshotMetadata, SnapshotFormatVersion};
use restate_partition_store::PartitionStore;
use restate_storage_api::fsm_table::{FsmTable, ReadOnlyFsmTable};
use restate_storage_api::Transaction;
use restate_types::identifiers::SnapshotId;

/// Encapsulates producing a Restate partition snapshot out of a partition store.
Expand Down Expand Up @@ -69,6 +71,16 @@ impl SnapshotProducer {
"Partition snapshot written"
);

let previous_archived_snapshot_lsn = partition_store.get_archived_lsn().await?;
let mut tx = partition_store.transaction();
tx.put_archived_lsn(snapshot.min_applied_lsn).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that you are relying on the PartitionProcessor that there is no concurrency between SnapshotProducers. Otherwise there is the risk that an older snapshot overwrites a newer one here because there can now be multiple writers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that partition_store.transaction() only gives you transaction like guarantees if there is a single writer. Concurrent writers can overwrite changes from each other w/o the system noticing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indeed, that is the case. I'll have to think about this a bit more carefully when we move this responsibility up into the PartitionProcessorManager.

tx.commit().await?;
debug!(
previous_archived_lsn = ?previous_archived_snapshot_lsn,
updated_archived_lsn = ?snapshot.min_applied_lsn,
"Updated persisted archived snapshot LSN"
);

Ok(snapshot_meta)
}
}
67 changes: 61 additions & 6 deletions crates/worker/src/partition_processor_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ mod persisted_lsn_watchdog;
mod processor_state;
mod spawn_processor_task;

use std::collections::BTreeMap;
use std::ops::RangeInclusive;
use std::sync::Arc;

use futures::stream::StreamExt;
use metrics::gauge;
use std::collections::BTreeMap;
use std::ops::{Add, RangeInclusive};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::sync::{mpsc, watch};
use tokio::task::JoinSet;
use tokio::time::MissedTickBehavior;
use tracing::{debug, error, info, instrument, warn};

use crate::metric_definitions::NUM_ACTIVE_PARTITIONS;
Expand Down Expand Up @@ -53,7 +55,7 @@ use restate_types::epoch::EpochMetadata;
use restate_types::health::HealthStatus;
use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey};
use restate_types::live::Live;
use restate_types::logs::Lsn;
use restate_types::logs::{Lsn, SequenceNumber};
use restate_types::metadata_store::keys::partition_processor_epoch_key;
use restate_types::net::metadata::MetadataKind;
use restate_types::net::partition_processor::{
Expand Down Expand Up @@ -215,12 +217,18 @@ impl PartitionProcessorManager {
let mut logs_version_watcher = self.metadata.watch(MetadataKind::Logs);
let mut partition_table_version_watcher = self.metadata.watch(MetadataKind::PartitionTable);

let mut latest_snapshot_check_interval = tokio::time::interval(Duration::from_secs(5));
latest_snapshot_check_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

self.health_status.update(WorkerStatus::Ready);
loop {
tokio::select! {
Some(command) = self.rx.recv() => {
self.on_command(command);
}
_ = latest_snapshot_check_interval.tick() => {
self.request_partition_snapshots();
}
Some(control_processors) = self.incoming_update_processors.next() => {
self.pending_control_processors = Some(control_processors.into_body());
self.on_control_processors();
Expand Down Expand Up @@ -285,7 +293,8 @@ impl PartitionProcessorManager {
}
}

#[instrument(level = "debug", skip_all, fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner)))]
#[instrument(level = "debug", skip_all, fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner)
))]
fn on_asynchronous_event(&mut self, event: AsynchronousEvent) {
let AsynchronousEvent {
partition_id,
Expand Down Expand Up @@ -647,6 +656,52 @@ impl PartitionProcessorManager {
}
}

fn request_partition_snapshots(&mut self) {
let Some(records_per_snapshot) = self
.updateable_config
.live_load()
.worker
.snapshots
.snapshot_interval_num_records
else {
return;
};

for (partition_id, state) in self.processor_states.iter() {
let status = state.partition_processor_status();
match status {
Some(status)
if status.replay_status == ReplayStatus::Active
&& status.last_applied_log_lsn.unwrap_or(Lsn::INVALID)
>= status
.last_archived_log_lsn
.unwrap_or(Lsn::OLDEST)
.add(Lsn::from(records_per_snapshot.get())) =>
{
debug!(
%partition_id,
last_archived_lsn = %status.last_archived_log_lsn.unwrap_or(SequenceNumber::OLDEST),
last_applied_lsn = %status.last_applied_log_lsn.unwrap_or(SequenceNumber::INVALID),
"Creating partition snapshot",
);
let (tx, _) = oneshot::channel();

// ignore errors and don't request further snapshots if internal queue is full; we will try again later
if self
.tx
.try_send(ProcessorsManagerCommand::CreateSnapshot(*partition_id, tx))
.is_err()
{
break;
}
}
_ => {
continue;
}
}
}
}

/// Creates a task that when started will spawn a new partition processor.
///
/// This allows multiple partition processors to be started concurrently without holding
Expand Down
22 changes: 15 additions & 7 deletions tools/restatectl/src/commands/partition/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ use crate::util::grpc_connect;
#[command(
after_long_help = "In addition to partition processors, the command displays the current \
sequencer for the partition's log when the reported applied LSN falls within the tail a \
replicated segment, under the heading SEQNCR. If ANSI color is enabled, the leadership epoch \
and the active sequencer will be highlighted in green they are the most recent and co-located \
with the leader processor, respectively."
replicated segment. If ANSI color is enabled, the leadership epoch and the active sequencer \
will be highlighted in green they are the most recent and co-located with the leader \
processor, respectively."
)]
pub struct ListPartitionsOpts {
/// Sort order
Expand Down Expand Up @@ -139,10 +139,11 @@ pub async fn list_partitions(
"STATUS",
"LEADER",
"EPOCH",
"SEQNCR",
"APPLIED",
"PERSISTED",
"SKIPPED",
"SEQUENCER",
"APPLIED-LSN",
"PERSISTED-LSN",
"SKIPPED-RECORDS",
"ARCHIVED-LSN",
"LAST-UPDATE",
]);

Expand Down Expand Up @@ -271,6 +272,13 @@ pub async fn list_partitions(
.unwrap_or("-".to_owned()),
),
Cell::new(processor.status.num_skipped_records),
Cell::new(
processor
.status
.last_archived_log_lsn
.map(|x| x.to_string())
.unwrap_or("-".to_owned()),
),
render_as_duration(processor.status.updated_at, Tense::Past),
]);
});
Expand Down
Loading