Skip to content

Commit

Permalink
Refactoring: new "snapshots" module
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Nov 12, 2024
1 parent 7e041fd commit 7f44686
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 143 deletions.
4 changes: 2 additions & 2 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ use crate::metric_definitions::{
};
use crate::partition::invoker_storage_reader::InvokerStorageReader;
use crate::partition::leadership::{LeadershipState, PartitionProcessorMetadata};
use crate::partition::snapshot_producer::{SnapshotProducer, SnapshotRepository};
use crate::partition::snapshots::{SnapshotProducer, SnapshotRepository};
use crate::partition::state_machine::{ActionCollector, StateMachine};

mod cleaner;
pub mod invoker_storage_reader;
mod leadership;
pub mod shuffle;
mod snapshot_producer;
mod snapshots;
mod state_machine;
pub mod types;

Expand Down
2 changes: 2 additions & 0 deletions crates/worker/src/partition/snapshots/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod producer;

Check warning on line 1 in crates/worker/src/partition/snapshots/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

Diff in /home/runner/work/restate/restate/crates/worker/src/partition/snapshots/mod.rs
pub mod repository;
155 changes: 155 additions & 0 deletions crates/worker/src/partition/snapshots/producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::SystemTime;

use anyhow::{anyhow, bail, Context};
use async_trait::async_trait;
use aws_config::default_provider::credentials::DefaultCredentialsChain;
use aws_config::BehaviorVersion;
use aws_credential_types::provider::ProvideCredentials;
use object_store::aws::AmazonS3Builder;
use object_store::{ObjectStore, PutPayload};
use tempfile::NamedTempFile;
use tracing::{debug, error, info, trace, trace_span, warn};
use url::Url;

use crate::partition::snapshots::repository::SnapshotRepository;
use restate_core::task_center;
use restate_partition_store::snapshots::{PartitionSnapshotMetadata, SnapshotFormatVersion};
use restate_partition_store::PartitionStore;
use restate_storage_api::fsm_table::{FsmTable, ReadOnlyFsmTable};
use restate_storage_api::Transaction;
use restate_types::config::{Configuration, SnapshotsOptions};
use restate_types::identifiers::{PartitionId, SnapshotId};
use restate_types::live::Live;

/// Encapsulates exporting and publishing partition snapshots.
#[derive(Clone)]
pub struct SnapshotProducer {
snapshot_source: SnapshotSource,
partition_store: PartitionStore,
partition_snapshots_path: PathBuf,
snapshot_repository: SnapshotRepository,
}

#[derive(Clone)]
pub struct SnapshotSource {
pub cluster_name: String,
pub node_name: String,
}

impl SnapshotProducer {
pub async fn create(
partition_store: PartitionStore,
config: Live<Configuration>,
snapshot_repository: SnapshotRepository,
) -> anyhow::Result<Self> {
let config = config.pinned();
let partition_id = partition_store.partition_id();

Ok(SnapshotProducer {
snapshot_source: SnapshotSource {
cluster_name: config.common.cluster_name().into(),
node_name: config.common.node_name().into(),
},
partition_store,
snapshot_repository,
partition_snapshots_path: config.worker.snapshots.snapshots_dir(partition_id),
})
}

/// Exports a partition store snapshot and writes it to the snapshot repository.
///
/// The final snapshot key will follow the structure:
/// `[<prefix>/]<partition_id>/<sort_key>/<snapshot_id>_<lsn>.tar`.
pub async fn create_snapshot(&mut self) -> anyhow::Result<PartitionSnapshotMetadata> {
if let Err(e) = tokio::fs::create_dir_all(&self.partition_snapshots_path).await {
warn!(
path = ?self.partition_snapshots_path,
error = ?e,
"Failed to create partition snapshot directory"
);
bail!("Failed to create partition snapshot directory: {:?}", e);
}

let snapshot_id = SnapshotId::new();
let snapshot_path = self.partition_snapshots_path.join(snapshot_id.to_string());

trace!(%snapshot_id, "Creating partition snapshot export directory: {:?}", snapshot_path);
let snapshot = self
.partition_store
.export_snapshot(snapshot_path.clone())
.await
.context("Export partition snapshot")?;

let snapshot_metadata = PartitionSnapshotMetadata {
version: SnapshotFormatVersion::V1,
cluster_name: self.snapshot_source.cluster_name.clone(),
node_name: self.snapshot_source.node_name.clone(),
partition_id: self.partition_store.partition_id(),
created_at: humantime::Timestamp::from(SystemTime::now()),
snapshot_id,
key_range: self.partition_store.partition_key_range().clone(),
min_applied_lsn: snapshot.min_applied_lsn,
db_comparator_name: snapshot.db_comparator_name.clone(),
files: snapshot.files.clone(),
};
let metadata_json = serde_json::to_string_pretty(&snapshot_metadata)?;

let snapshot_lsn = snapshot_metadata.min_applied_lsn;
let metadata_path = snapshot_path.join("metadata.json");
tokio::fs::write(metadata_path.clone(), metadata_json)
.await
.context("Writing snapshot metadata failed")?;

self.snapshot_repository
.put(
self.partition_store.partition_id(),
&snapshot_metadata,
snapshot_path.as_path(),
)
.await
.context("Snapshot repository upload failed")?;

let previous_archived_snapshot_lsn = self.partition_store.get_archived_lsn().await?;
let mut tx = self.partition_store.transaction();
tx.put_archived_lsn(snapshot_lsn).await;
tx.commit()
.await
.context("Updating archived snapshot LSN")?;
trace!(
%snapshot_id,
previous_archived_lsn = ?previous_archived_snapshot_lsn,
updated_archived_lsn = ?snapshot_lsn,
"Updated persisted archived snapshot LSN"
);

let cleanup = tokio::fs::remove_dir_all(snapshot_path.clone()).await;
match cleanup {
Ok(_) => {
debug!(%snapshot_id, "Cleaned up snapshot export directory: {:?}", snapshot_path);
}
Err(e) => {
error!(%snapshot_id, "Failed to clean up snapshot directory: {}", e);
}
}

info!(
%snapshot_id,
partition_id = ?self.partition_store.partition_id(),
?snapshot_lsn,
"Successfully published partition snapshot"
);
Ok(snapshot_metadata)
}
}
Original file line number Diff line number Diff line change
@@ -1,36 +1,21 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::SystemTime;

use anyhow::{anyhow, bail, Context};
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use aws_config::default_provider::credentials::DefaultCredentialsChain;
use aws_config::BehaviorVersion;
use aws_credential_types::provider::ProvideCredentials;
use object_store::aws::AmazonS3Builder;
use object_store::{ObjectStore, PutPayload};
use tempfile::NamedTempFile;
use tracing::{debug, error, info, trace, trace_span, warn};
use tracing::{debug, trace_span, warn};
use url::Url;

use restate_core::task_center;
use restate_partition_store::snapshots::{PartitionSnapshotMetadata, SnapshotFormatVersion};
use restate_partition_store::PartitionStore;
use restate_storage_api::fsm_table::{FsmTable, ReadOnlyFsmTable};
use restate_storage_api::Transaction;
use restate_types::config::{Configuration, SnapshotsOptions};
use restate_types::identifiers::{PartitionId, SnapshotId};
use restate_types::live::Live;
use restate_partition_store::snapshots::PartitionSnapshotMetadata;
use restate_types::config::SnapshotsOptions;
use restate_types::identifiers::PartitionId;

/// Provides read and write access to the long-term partition snapshot storage destination.
#[derive(Clone)]
Expand Down Expand Up @@ -208,124 +193,3 @@ impl object_store::CredentialProvider for AwsSdkCredentialsProvider {
}))
}
}

/// Encapsulates exporting and publishing partition snapshots.
#[derive(Clone)]
pub struct SnapshotProducer {
snapshot_source: SnapshotSource,
partition_store: PartitionStore,
partition_snapshots_path: PathBuf,
snapshot_repository: SnapshotRepository,
}

#[derive(Clone)]
pub struct SnapshotSource {
pub cluster_name: String,
pub node_name: String,
}

impl SnapshotProducer {
pub async fn create(
partition_store: PartitionStore,
config: Live<Configuration>,
snapshot_repository: SnapshotRepository,
) -> anyhow::Result<Self> {
let config = config.pinned();
let partition_id = partition_store.partition_id();

Ok(SnapshotProducer {
snapshot_source: SnapshotSource {
cluster_name: config.common.cluster_name().into(),
node_name: config.common.node_name().into(),
},
partition_store,
snapshot_repository,
partition_snapshots_path: config.worker.snapshots.snapshots_dir(partition_id),
})
}

/// Exports a partition store snapshot and writes it to the snapshot repository.
///
/// The final snapshot key will follow the structure:
/// `[<prefix>/]<partition_id>/<sort_key>/<snapshot_id>_<lsn>.tar`.
pub async fn create_snapshot(&mut self) -> anyhow::Result<PartitionSnapshotMetadata> {
if let Err(e) = tokio::fs::create_dir_all(&self.partition_snapshots_path).await {
warn!(
path = ?self.partition_snapshots_path,
error = ?e,
"Failed to create partition snapshot directory"
);
bail!("Failed to create partition snapshot directory: {:?}", e);
}

let snapshot_id = SnapshotId::new();
let snapshot_path = self.partition_snapshots_path.join(snapshot_id.to_string());

trace!(%snapshot_id, "Creating partition snapshot export directory: {:?}", snapshot_path);
let snapshot = self
.partition_store
.export_snapshot(snapshot_path.clone())
.await
.context("Export partition snapshot")?;

let snapshot_metadata = PartitionSnapshotMetadata {
version: SnapshotFormatVersion::V1,
cluster_name: self.snapshot_source.cluster_name.clone(),
node_name: self.snapshot_source.node_name.clone(),
partition_id: self.partition_store.partition_id(),
created_at: humantime::Timestamp::from(SystemTime::now()),
snapshot_id,
key_range: self.partition_store.partition_key_range().clone(),
min_applied_lsn: snapshot.min_applied_lsn,
db_comparator_name: snapshot.db_comparator_name.clone(),
files: snapshot.files.clone(),
};
let metadata_json = serde_json::to_string_pretty(&snapshot_metadata)?;

let snapshot_lsn = snapshot_metadata.min_applied_lsn;
let metadata_path = snapshot_path.join("metadata.json");
tokio::fs::write(metadata_path.clone(), metadata_json)
.await
.context("Writing snapshot metadata failed")?;

self.snapshot_repository
.put(
self.partition_store.partition_id(),
&snapshot_metadata,
snapshot_path.as_path(),
)
.await
.context("Snapshot repository upload failed")?;

let previous_archived_snapshot_lsn = self.partition_store.get_archived_lsn().await?;
let mut tx = self.partition_store.transaction();
tx.put_archived_lsn(snapshot_lsn).await;
tx.commit()
.await
.context("Updating archived snapshot LSN")?;
trace!(
%snapshot_id,
previous_archived_lsn = ?previous_archived_snapshot_lsn,
updated_archived_lsn = ?snapshot_lsn,
"Updated persisted archived snapshot LSN"
);

let cleanup = tokio::fs::remove_dir_all(snapshot_path.clone()).await;
match cleanup {
Ok(_) => {
debug!(%snapshot_id, "Cleaned up snapshot export directory: {:?}", snapshot_path);
}
Err(e) => {
error!(%snapshot_id, "Failed to clean up snapshot directory: {}", e);
}
}

info!(
%snapshot_id,
partition_id = ?self.partition_store.partition_id(),
?snapshot_lsn,
"Successfully published partition snapshot"
);
Ok(snapshot_metadata)
}
}

0 comments on commit 7f44686

Please sign in to comment.