From 1e523bead504888c144833b8dac9d3e876318a8e Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 5 Aug 2024 14:36:11 +0200 Subject: [PATCH 1/6] Make grpc server reusable by other metadata store implementations --- .../src/{local => }/grpc/client.rs | 10 +-- .../src/{local => }/grpc/handler.rs | 12 +-- .../src/{local => }/grpc/mod.rs | 2 + crates/metadata-store/src/grpc/server.rs | 62 ++++++++++++++ .../src/grpc/service_builder.rs | 81 ++++++++++++++++++ crates/metadata-store/src/lib.rs | 62 ++++++++++++++ crates/metadata-store/src/local/mod.rs | 7 +- crates/metadata-store/src/local/service.rs | 56 ++++--------- crates/metadata-store/src/local/store.rs | 82 ++++--------------- crates/metadata-store/src/local/tests.rs | 10 +-- 10 files changed, 257 insertions(+), 127 deletions(-) rename crates/metadata-store/src/{local => }/grpc/client.rs (93%) rename crates/metadata-store/src/{local => }/grpc/handler.rs (93%) rename crates/metadata-store/src/{local => }/grpc/mod.rs (99%) create mode 100644 crates/metadata-store/src/grpc/server.rs create mode 100644 crates/metadata-store/src/grpc/service_builder.rs diff --git a/crates/metadata-store/src/local/grpc/client.rs b/crates/metadata-store/src/grpc/client.rs similarity index 93% rename from crates/metadata-store/src/local/grpc/client.rs rename to crates/metadata-store/src/grpc/client.rs index 601be26e9..1272cf673 100644 --- a/crates/metadata-store/src/local/grpc/client.rs +++ b/crates/metadata-store/src/grpc/client.rs @@ -20,16 +20,16 @@ use restate_core::network::net_util::create_tonic_channel_from_advertised_addres use restate_types::net::AdvertisedAddress; use restate_types::Version; +use crate::grpc::pb_conversions::ConversionError; use crate::grpc_svc::metadata_store_svc_client::MetadataStoreSvcClient; use crate::grpc_svc::{DeleteRequest, GetRequest, PutRequest}; -use crate::local::grpc::pb_conversions::ConversionError; -/// Client end to interact with the [`LocalMetadataStore`]. +/// Client end to interact with the metadata store. #[derive(Debug, Clone)] -pub struct LocalMetadataStoreClient { +pub struct GrpcMetadataStoreClient { svc_client: MetadataStoreSvcClient, } -impl LocalMetadataStoreClient { +impl GrpcMetadataStoreClient { pub fn new(metadata_store_address: AdvertisedAddress) -> Self { let channel = create_tonic_channel_from_advertised_address(metadata_store_address) .expect("should not fail"); @@ -41,7 +41,7 @@ impl LocalMetadataStoreClient { } #[async_trait] -impl MetadataStore for LocalMetadataStoreClient { +impl MetadataStore for GrpcMetadataStoreClient { async fn get(&self, key: ByteString) -> Result, ReadError> { let response = self .svc_client diff --git a/crates/metadata-store/src/local/grpc/handler.rs b/crates/metadata-store/src/grpc/handler.rs similarity index 93% rename from crates/metadata-store/src/local/grpc/handler.rs rename to crates/metadata-store/src/grpc/handler.rs index 79d948ddd..c125c8cc5 100644 --- a/crates/metadata-store/src/local/grpc/handler.rs +++ b/crates/metadata-store/src/grpc/handler.rs @@ -8,28 +8,28 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::grpc::pb_conversions::ConversionError; use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvc; use crate::grpc_svc::{DeleteRequest, GetRequest, GetResponse, GetVersionResponse, PutRequest}; -use crate::local::grpc::pb_conversions::ConversionError; -use crate::local::store::{Error, MetadataStoreRequest, RequestSender}; +use crate::{Error, MetadataStoreRequest, RequestSender}; use async_trait::async_trait; use tokio::sync::oneshot; use tonic::{Request, Response, Status}; -/// Grpc svc handler for the [`LocalMetadataStore`]. +/// Grpc svc handler for the metadata store. #[derive(Debug)] -pub struct LocalMetadataStoreHandler { +pub struct MetadataStoreHandler { request_tx: RequestSender, } -impl LocalMetadataStoreHandler { +impl MetadataStoreHandler { pub fn new(request_tx: RequestSender) -> Self { Self { request_tx } } } #[async_trait] -impl MetadataStoreSvc for LocalMetadataStoreHandler { +impl MetadataStoreSvc for MetadataStoreHandler { async fn get(&self, request: Request) -> Result, Status> { let (result_tx, result_rx) = oneshot::channel(); diff --git a/crates/metadata-store/src/local/grpc/mod.rs b/crates/metadata-store/src/grpc/mod.rs similarity index 99% rename from crates/metadata-store/src/local/grpc/mod.rs rename to crates/metadata-store/src/grpc/mod.rs index 6d34ffad7..fdf08f7b1 100644 --- a/crates/metadata-store/src/local/grpc/mod.rs +++ b/crates/metadata-store/src/grpc/mod.rs @@ -10,6 +10,8 @@ pub mod client; pub mod handler; +pub mod server; +pub mod service_builder; pub mod pb_conversions { use crate::grpc_svc; diff --git a/crates/metadata-store/src/grpc/server.rs b/crates/metadata-store/src/grpc/server.rs new file mode 100644 index 000000000..1e669751f --- /dev/null +++ b/crates/metadata-store/src/grpc/server.rs @@ -0,0 +1,62 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use http::Request; +use hyper::body::Incoming; +use hyper_util::service::TowerToHyperService; +use restate_core::network::net_util; +use restate_core::ShutdownError; +use restate_types::net::BindAddress; +use tonic::body::boxed; +use tonic::service::Routes; +use tower::ServiceExt; + +pub struct GrpcServer { + bind_address: BindAddress, + routes: Routes, +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("failed running grpc server: {0}")] + GrpcServer(#[from] net_util::Error), + #[error("system is shutting down")] + Shutdown(#[from] ShutdownError), +} + +impl GrpcServer { + pub fn new(bind_address: BindAddress, routes: Routes) -> Self { + Self { + bind_address, + routes, + } + } + + pub async fn run(self) -> Result<(), Error> { + // Trace layer + let span_factory = tower_http::trace::DefaultMakeSpan::new() + .include_headers(true) + .level(tracing::Level::ERROR); + + let server_builder = tonic::transport::Server::builder() + .layer(tower_http::trace::TraceLayer::new_for_grpc().make_span_with(span_factory)) + .add_routes(self.routes); + + let service = TowerToHyperService::new( + server_builder + .into_service() + .map_request(|req: Request| req.map(boxed)), + ); + + net_util::run_hyper_server(&self.bind_address, service, "metadata-store-grpc").await?; + + Ok(()) + } +} diff --git a/crates/metadata-store/src/grpc/service_builder.rs b/crates/metadata-store/src/grpc/service_builder.rs new file mode 100644 index 000000000..80d55131a --- /dev/null +++ b/crates/metadata-store/src/grpc/service_builder.rs @@ -0,0 +1,81 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use http::{Request, Response}; +use std::convert::Infallible; +use tonic::body::BoxBody; +use tonic::server::NamedService; +use tonic::service::{Routes, RoutesBuilder}; +use tonic_health::ServingStatus; +use tower::Service; + +#[derive(Debug)] +pub struct GrpcServiceBuilder<'a> { + reflection_service_builder: Option>, + routes_builder: RoutesBuilder, + svc_names: Vec<&'static str>, +} + +impl<'a> Default for GrpcServiceBuilder<'a> { + fn default() -> Self { + let routes_builder = RoutesBuilder::default(); + + Self { + reflection_service_builder: Some(tonic_reflection::server::Builder::configure()), + routes_builder, + svc_names: Vec::default(), + } + } +} + +impl<'a> GrpcServiceBuilder<'a> { + pub fn add_service(&mut self, svc: S) + where + S: Service, Response = Response, Error = Infallible> + + NamedService + + Clone + + Send + + 'static, + S::Future: Send + 'static, + { + self.svc_names.push(S::NAME); + self.routes_builder.add_service(svc); + } + + pub fn register_file_descriptor_set_for_reflection<'b: 'a>( + &mut self, + encoded_file_descriptor_set: &'b [u8], + ) { + self.reflection_service_builder = Some( + self.reflection_service_builder + .take() + .expect("be present") + .register_encoded_file_descriptor_set(encoded_file_descriptor_set), + ); + } + + pub async fn build(mut self) -> Result { + let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); + + for svc_name in self.svc_names { + health_reporter + .set_service_status(svc_name, ServingStatus::Serving) + .await; + } + + self.routes_builder.add_service(health_service); + self.routes_builder.add_service( + self.reflection_service_builder + .expect("be present") + .build()?, + ); + Ok(self.routes_builder.routes()) + } +} diff --git a/crates/metadata-store/src/lib.rs b/crates/metadata-store/src/lib.rs index 198840cc9..483444b93 100644 --- a/crates/metadata-store/src/lib.rs +++ b/crates/metadata-store/src/lib.rs @@ -8,9 +8,71 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +mod grpc; mod grpc_svc; pub mod local; +use bytestring::ByteString; +use restate_core::metadata_store::VersionedValue; pub use restate_core::metadata_store::{ MetadataStoreClient, Precondition, ReadError, ReadModifyWriteError, WriteError, }; +use restate_types::errors::GenericError; +use restate_types::storage::{StorageDecodeError, StorageEncodeError}; +use restate_types::Version; +use tokio::sync::{mpsc, oneshot}; + +pub type RequestSender = mpsc::Sender; +pub type RequestReceiver = mpsc::Receiver; + +type Result = std::result::Result; + +#[derive(Debug)] +pub enum MetadataStoreRequest { + Get { + key: ByteString, + result_tx: oneshot::Sender>>, + }, + GetVersion { + key: ByteString, + result_tx: oneshot::Sender>>, + }, + Put { + key: ByteString, + value: VersionedValue, + precondition: Precondition, + result_tx: oneshot::Sender>, + }, + Delete { + key: ByteString, + precondition: Precondition, + result_tx: oneshot::Sender>, + }, +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("storage error: {0}")] + Storage(#[from] GenericError), + #[error("failed precondition: {0}")] + FailedPrecondition(String), + #[error("invalid argument: {0}")] + InvalidArgument(String), + #[error("encode error: {0}")] + Encode(#[from] StorageEncodeError), + #[error("decode error: {0}")] + Decode(#[from] StorageDecodeError), +} + +impl Error { + fn kv_pair_exists() -> Self { + Error::FailedPrecondition("key-value pair already exists".to_owned()) + } + + fn version_mismatch(expected: Version, actual: Option) -> Self { + Error::FailedPrecondition(format!( + "Expected version '{}' but found version '{:?}'", + expected, actual + )) + } +} diff --git a/crates/metadata-store/src/local/mod.rs b/crates/metadata-store/src/local/mod.rs index 67f5df7a5..f8a68e797 100644 --- a/crates/metadata-store/src/local/mod.rs +++ b/crates/metadata-store/src/local/mod.rs @@ -8,7 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -mod grpc; mod store; mod service; @@ -20,9 +19,9 @@ use restate_types::{ }; pub use service::LocalMetadataStoreService; -use crate::local::grpc::client::LocalMetadataStoreClient; +use crate::grpc::client::GrpcMetadataStoreClient; -/// Creates a [`MetadataStoreClient`] for the [`LocalMetadataStoreService`]. +/// Creates a [`MetadataStoreClient`] for the [`GrpcMetadataStoreClient`]. pub async fn create_client( metadata_store_client_options: MetadataStoreClientOptions, ) -> Result { @@ -30,7 +29,7 @@ pub async fn create_client( let client = match metadata_store_client_options.metadata_store_client { MetadataStoreClientConfig::Embedded { address } => { - let store = LocalMetadataStoreClient::new(address); + let store = GrpcMetadataStoreClient::new(address); MetadataStoreClient::new(store, backoff_policy) } MetadataStoreClientConfig::Etcd { addresses } => { diff --git a/crates/metadata-store/src/local/service.rs b/crates/metadata-store/src/local/service.rs index f0f4d9444..225e8e922 100644 --- a/crates/metadata-store/src/local/service.rs +++ b/crates/metadata-store/src/local/service.rs @@ -8,21 +8,19 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::grpc::handler::MetadataStoreHandler; +use crate::grpc::server::GrpcServer; +use crate::grpc::service_builder::GrpcServiceBuilder; use crate::grpc_svc; use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer; -use crate::local::grpc::handler::LocalMetadataStoreHandler; use crate::local::store::LocalMetadataStore; -use http::Request; -use hyper::body::Incoming; -use hyper_util::service::TowerToHyperService; -use restate_core::network::net_util; +use futures::TryFutureExt; use restate_core::{task_center, ShutdownError, TaskKind}; use restate_rocksdb::RocksError; use restate_types::config::{MetadataStoreOptions, RocksDbOptions}; use restate_types::live::BoxedLiveLoad; -use tonic::body::boxed; +#[cfg(test)] use tonic::server::NamedService; -use tower::ServiceExt; pub struct LocalMetadataStoreService { opts: BoxedLiveLoad, @@ -31,9 +29,7 @@ pub struct LocalMetadataStoreService { #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("failed running grpc server: {0}")] - GrpcServer(#[from] net_util::Error), - #[error("error while running server server grpc reflection service: {0}")] + #[error("error while running server grpc reflection service: {0}")] GrpcReflection(#[from] tonic_reflection::server::Error), #[error("system is shutting down")] Shutdown(#[from] ShutdownError), @@ -52,8 +48,9 @@ impl LocalMetadataStoreService { } } - pub fn grpc_service_name(&self) -> &str { - MetadataStoreSvcServer::::NAME + #[cfg(test)] + pub fn grpc_service_name() -> &'static str { + MetadataStoreSvcServer::::NAME } pub async fn run(self) -> Result<(), Error> { @@ -64,41 +61,20 @@ impl LocalMetadataStoreService { let options = opts.live_load(); let bind_address = options.bind_address.clone(); let store = LocalMetadataStore::create(options, rocksdb_options).await?; - // Trace layer - let span_factory = tower_http::trace::DefaultMakeSpan::new() - .include_headers(true) - .level(tracing::Level::ERROR); + let mut builder = GrpcServiceBuilder::default(); - let reflection_service_builder = tonic_reflection::server::Builder::configure() - .register_encoded_file_descriptor_set(grpc_svc::FILE_DESCRIPTOR_SET); + builder.register_file_descriptor_set_for_reflection(grpc_svc::FILE_DESCRIPTOR_SET); + builder.add_service(MetadataStoreSvcServer::new(MetadataStoreHandler::new( + store.request_sender(), + ))); - let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); - health_reporter - .set_serving::>() - .await; - - let server_builder = tonic::transport::Server::builder() - .layer(tower_http::trace::TraceLayer::new_for_grpc().make_span_with(span_factory)) - .add_service(health_service) - .add_service(MetadataStoreSvcServer::new(LocalMetadataStoreHandler::new( - store.request_sender(), - ))) - .add_service(reflection_service_builder.build()?); - - let service = TowerToHyperService::new( - server_builder - .into_service() - .map_request(|req: Request| req.map(boxed)), - ); + let grpc_server = GrpcServer::new(bind_address, builder.build().await?); task_center().spawn_child( TaskKind::RpcServer, "metadata-store-grpc", None, - async move { - net_util::run_hyper_server(&bind_address, service, "metadata-store-grpc").await?; - Ok(()) - }, + grpc_server.run().map_err(Into::into), )?; store.run().await; diff --git a/crates/metadata-store/src/local/store.rs b/crates/metadata-store/src/local/store.rs index 632f456b1..0748c7677 100644 --- a/crates/metadata-store/src/local/store.rs +++ b/crates/metadata-store/src/local/store.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::{Error, MetadataStoreRequest, RequestReceiver, RequestSender, Result}; use bytes::BytesMut; use bytestring::ByteString; use restate_core::cancellation_watcher; @@ -18,75 +19,16 @@ use restate_rocksdb::{ }; use restate_types::config::{MetadataStoreOptions, RocksDbOptions}; use restate_types::live::BoxedLiveLoad; -use restate_types::storage::{ - StorageCodec, StorageDecode, StorageDecodeError, StorageEncode, StorageEncodeError, -}; +use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode}; use restate_types::Version; use rocksdb::{BoundColumnFamily, DBCompressionType, WriteBatch, WriteOptions, DB}; use std::sync::Arc; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use tracing::{debug, trace}; -pub type RequestSender = mpsc::Sender; -pub type RequestReceiver = mpsc::Receiver; - -type Result = std::result::Result; - const DB_NAME: &str = "local-metadata-store"; const KV_PAIRS: &str = "kv_pairs"; -#[derive(Debug)] -pub enum MetadataStoreRequest { - Get { - key: ByteString, - result_tx: oneshot::Sender>>, - }, - GetVersion { - key: ByteString, - result_tx: oneshot::Sender>>, - }, - Put { - key: ByteString, - value: VersionedValue, - precondition: Precondition, - result_tx: oneshot::Sender>, - }, - Delete { - key: ByteString, - precondition: Precondition, - result_tx: oneshot::Sender>, - }, -} - -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("storage error: {0}")] - Storage(#[from] rocksdb::Error), - #[error("rocksdb error: {0}")] - RocksDb(#[from] RocksError), - #[error("failed precondition: {0}")] - FailedPrecondition(String), - #[error("invalid argument: {0}")] - InvalidArgument(String), - #[error("encode error: {0}")] - Encode(#[from] StorageEncodeError), - #[error("decode error: {0}")] - Decode(#[from] StorageDecodeError), -} - -impl Error { - fn kv_pair_exists() -> Self { - Error::FailedPrecondition("key-value pair already exists".to_owned()) - } - - fn version_mismatch(expected: Version, actual: Option) -> Self { - Error::FailedPrecondition(format!( - "Expected version '{}' but found version '{:?}'", - expected, actual - )) - } -} - /// Single node metadata store which stores the key value pairs in RocksDB. /// /// In order to avoid issues arising from concurrency, we run the metadata @@ -217,7 +159,10 @@ impl LocalMetadataStore { fn get(&self, key: &ByteString) -> Result> { let cf_handle = self.kv_cf_handle(); - let slice = self.db.get_pinned_cf(&cf_handle, key)?; + let slice = self + .db + .get_pinned_cf(&cf_handle, key) + .map_err(|err| Error::Storage(err.into()))?; if let Some(bytes) = slice { Ok(Some(Self::decode(bytes)?)) @@ -228,7 +173,10 @@ impl LocalMetadataStore { fn get_version(&self, key: &ByteString) -> Result> { let cf_handle = self.kv_cf_handle(); - let slice = self.db.get_pinned_cf(&cf_handle, key)?; + let slice = self + .db + .get_pinned_cf(&cf_handle, key) + .map_err(|err| Error::Storage(err.into()))?; if let Some(bytes) = slice { // todo only deserialize the version part @@ -278,8 +226,7 @@ impl LocalMetadataStore { let cf_handle = self.kv_cf_handle(); let mut wb = WriteBatch::default(); wb.put_cf(&cf_handle, key, self.buffer.as_ref()); - Ok(self - .rocksdb + self.rocksdb .write_batch( "local-metadata-write-batch", Priority::High, @@ -287,7 +234,8 @@ impl LocalMetadataStore { write_options, wb, ) - .await?) + .await + .map_err(|err| Error::Storage(err.into())) } fn delete(&mut self, key: &ByteString, precondition: Precondition) -> Result<()> { @@ -320,7 +268,7 @@ impl LocalMetadataStore { let write_options = self.write_options(); self.db .delete_cf_opt(&self.kv_cf_handle(), key, &write_options) - .map_err(Into::into) + .map_err(|err| Error::Storage(err.into())) } fn encode(value: &T, buf: &mut BytesMut) -> Result<()> { diff --git a/crates/metadata-store/src/local/tests.rs b/crates/metadata-store/src/local/tests.rs index ea23b3e6a..e1b90bac6 100644 --- a/crates/metadata-store/src/local/tests.rs +++ b/crates/metadata-store/src/local/tests.rs @@ -30,7 +30,7 @@ use restate_types::net::{AdvertisedAddress, BindAddress}; use restate_types::retries::RetryPolicy; use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned}; -use crate::local::grpc::client::LocalMetadataStoreClient; +use crate::grpc::client::GrpcMetadataStoreClient; use crate::local::service::LocalMetadataStoreService; use crate::{MetadataStoreClient, Precondition, WriteError}; @@ -309,7 +309,7 @@ async fn durable_storage() -> anyhow::Result<()> { Ok(()) } -/// Creates a test environment with the [`RocksDBMetadataStore`] and a [`MetadataStoreClient`] +/// Creates a test environment with the [`RocksDBMetadataStore`] and a [`GrpcMetadataStoreClient`] /// connected to it. async fn create_test_environment( opts: &MetadataStoreOptions, @@ -354,7 +354,7 @@ async fn start_metadata_store( task_center: &TaskCenter, ) -> anyhow::Result { let service = LocalMetadataStoreService::from_options(opts, updateables_rocksdb_options); - let grpc_service_name = service.grpc_service_name().to_owned(); + let grpc_service_name = LocalMetadataStoreService::grpc_service_name().to_owned(); task_center.spawn( TaskKind::MetadataStore, @@ -387,9 +387,9 @@ async fn start_metadata_store( }) .await?; - let rocksdb_client = LocalMetadataStoreClient::new(address); + let grpc_client = GrpcMetadataStoreClient::new(address); let client = MetadataStoreClient::new( - rocksdb_client, + grpc_client, Some(metadata_store_client_options.metadata_store_client_backoff_policy), ); From c096c57bdb500c3fbdfb0f4c221310d7f61b4d12 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 5 Aug 2024 15:48:50 +0200 Subject: [PATCH 2/6] Make it configurable which metadata store to run This commit makes it configurable which metadata will be run by the Node when starting the Restate server. --- crates/metadata-store/src/grpc/handler.rs | 8 +- crates/metadata-store/src/lib.rs | 86 ++++++++++++++++------ crates/metadata-store/src/local/service.rs | 24 +++--- crates/metadata-store/src/local/store.rs | 36 ++++----- crates/metadata-store/src/local/tests.rs | 2 +- crates/node/src/lib.rs | 38 +++++++--- crates/types/src/config/metadata_store.rs | 15 ++++ 7 files changed, 138 insertions(+), 71 deletions(-) diff --git a/crates/metadata-store/src/grpc/handler.rs b/crates/metadata-store/src/grpc/handler.rs index c125c8cc5..8c5237a83 100644 --- a/crates/metadata-store/src/grpc/handler.rs +++ b/crates/metadata-store/src/grpc/handler.rs @@ -11,7 +11,7 @@ use crate::grpc::pb_conversions::ConversionError; use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvc; use crate::grpc_svc::{DeleteRequest, GetRequest, GetResponse, GetVersionResponse, PutRequest}; -use crate::{Error, MetadataStoreRequest, RequestSender}; +use crate::{MetadataStoreRequest, RequestError, RequestSender}; use async_trait::async_trait; use tokio::sync::oneshot; use tonic::{Request, Response, Status}; @@ -129,10 +129,10 @@ impl MetadataStoreSvc for MetadataStoreHandler { } } -impl From for Status { - fn from(err: Error) -> Self { +impl From for Status { + fn from(err: RequestError) -> Self { match err { - Error::FailedPrecondition(msg) => Status::failed_precondition(msg), + RequestError::FailedPrecondition(msg) => Status::failed_precondition(msg), err => Status::internal(err.to_string()), } } diff --git a/crates/metadata-store/src/lib.rs b/crates/metadata-store/src/lib.rs index 483444b93..2f50c5d91 100644 --- a/crates/metadata-store/src/lib.rs +++ b/crates/metadata-store/src/lib.rs @@ -17,60 +17,102 @@ use restate_core::metadata_store::VersionedValue; pub use restate_core::metadata_store::{ MetadataStoreClient, Precondition, ReadError, ReadModifyWriteError, WriteError, }; +use restate_core::ShutdownError; use restate_types::errors::GenericError; use restate_types::storage::{StorageDecodeError, StorageEncodeError}; use restate_types::Version; use tokio::sync::{mpsc, oneshot}; +pub type BoxedMetadataStoreService = Box; + pub type RequestSender = mpsc::Sender; pub type RequestReceiver = mpsc::Receiver; -type Result = std::result::Result; +#[derive(Debug, thiserror::Error)] +pub enum RequestError { + #[error("storage error: {0}")] + Storage(#[from] GenericError), + #[error("failed precondition: {0}")] + FailedPrecondition(String), + #[error("invalid argument: {0}")] + InvalidArgument(String), + #[error("encode error: {0}")] + Encode(#[from] StorageEncodeError), + #[error("decode error: {0}")] + Decode(#[from] StorageDecodeError), +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("error while running server grpc reflection service: {0}")] + GrpcReflection(#[from] tonic_reflection::server::Error), + #[error("system is shutting down")] + Shutdown(#[from] ShutdownError), + #[error(transparent)] + Generic(#[from] GenericError), +} + +#[async_trait::async_trait] +pub trait MetadataStoreServiceBoxed { + async fn run_boxed(self: Box) -> Result<(), Error>; +} + +#[async_trait::async_trait] +impl MetadataStoreServiceBoxed for T { + async fn run_boxed(self: Box) -> Result<(), Error> { + (*self).run().await + } +} + +#[async_trait::async_trait] +pub trait MetadataStoreService: MetadataStoreServiceBoxed + Send { + async fn run(self) -> Result<(), Error>; + + fn boxed(self) -> BoxedMetadataStoreService + where + Self: Sized + 'static, + { + Box::new(self) + } +} + +#[async_trait::async_trait] +impl MetadataStoreService for Box { + async fn run(self) -> Result<(), Error> { + self.run_boxed().await + } +} #[derive(Debug)] pub enum MetadataStoreRequest { Get { key: ByteString, - result_tx: oneshot::Sender>>, + result_tx: oneshot::Sender, RequestError>>, }, GetVersion { key: ByteString, - result_tx: oneshot::Sender>>, + result_tx: oneshot::Sender, RequestError>>, }, Put { key: ByteString, value: VersionedValue, precondition: Precondition, - result_tx: oneshot::Sender>, + result_tx: oneshot::Sender>, }, Delete { key: ByteString, precondition: Precondition, - result_tx: oneshot::Sender>, + result_tx: oneshot::Sender>, }, } -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("storage error: {0}")] - Storage(#[from] GenericError), - #[error("failed precondition: {0}")] - FailedPrecondition(String), - #[error("invalid argument: {0}")] - InvalidArgument(String), - #[error("encode error: {0}")] - Encode(#[from] StorageEncodeError), - #[error("decode error: {0}")] - Decode(#[from] StorageDecodeError), -} - -impl Error { +impl RequestError { fn kv_pair_exists() -> Self { - Error::FailedPrecondition("key-value pair already exists".to_owned()) + RequestError::FailedPrecondition("key-value pair already exists".to_owned()) } fn version_mismatch(expected: Version, actual: Option) -> Self { - Error::FailedPrecondition(format!( + RequestError::FailedPrecondition(format!( "Expected version '{}' but found version '{:?}'", expected, actual )) diff --git a/crates/metadata-store/src/local/service.rs b/crates/metadata-store/src/local/service.rs index 225e8e922..394cb8cc8 100644 --- a/crates/metadata-store/src/local/service.rs +++ b/crates/metadata-store/src/local/service.rs @@ -11,12 +11,11 @@ use crate::grpc::handler::MetadataStoreHandler; use crate::grpc::server::GrpcServer; use crate::grpc::service_builder::GrpcServiceBuilder; -use crate::grpc_svc; use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer; use crate::local::store::LocalMetadataStore; +use crate::{grpc_svc, Error, MetadataStoreService}; use futures::TryFutureExt; -use restate_core::{task_center, ShutdownError, TaskKind}; -use restate_rocksdb::RocksError; +use restate_core::{task_center, TaskKind}; use restate_types::config::{MetadataStoreOptions, RocksDbOptions}; use restate_types::live::BoxedLiveLoad; #[cfg(test)] @@ -27,16 +26,6 @@ pub struct LocalMetadataStoreService { rocksdb_options: BoxedLiveLoad, } -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("error while running server grpc reflection service: {0}")] - GrpcReflection(#[from] tonic_reflection::server::Error), - #[error("system is shutting down")] - Shutdown(#[from] ShutdownError), - #[error("rocksdb error: {0}")] - RocksDB(#[from] RocksError), -} - impl LocalMetadataStoreService { pub fn from_options( opts: BoxedLiveLoad, @@ -52,15 +41,20 @@ impl LocalMetadataStoreService { pub fn grpc_service_name() -> &'static str { MetadataStoreSvcServer::::NAME } +} - pub async fn run(self) -> Result<(), Error> { +#[async_trait::async_trait] +impl MetadataStoreService for LocalMetadataStoreService { + async fn run(self) -> Result<(), Error> { let LocalMetadataStoreService { mut opts, rocksdb_options, } = self; let options = opts.live_load(); let bind_address = options.bind_address.clone(); - let store = LocalMetadataStore::create(options, rocksdb_options).await?; + let store = LocalMetadataStore::create(options, rocksdb_options) + .await + .map_err(|err| Error::Generic(err.into()))?; let mut builder = GrpcServiceBuilder::default(); builder.register_file_descriptor_set_for_reflection(grpc_svc::FILE_DESCRIPTOR_SET); diff --git a/crates/metadata-store/src/local/store.rs b/crates/metadata-store/src/local/store.rs index 0748c7677..834b42d59 100644 --- a/crates/metadata-store/src/local/store.rs +++ b/crates/metadata-store/src/local/store.rs @@ -8,7 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::{Error, MetadataStoreRequest, RequestReceiver, RequestSender, Result}; +use crate::{MetadataStoreRequest, RequestError, RequestReceiver, RequestSender}; use bytes::BytesMut; use bytestring::ByteString; use restate_core::cancellation_watcher; @@ -157,12 +157,12 @@ impl LocalMetadataStore { }; } - fn get(&self, key: &ByteString) -> Result> { + fn get(&self, key: &ByteString) -> Result, RequestError> { let cf_handle = self.kv_cf_handle(); let slice = self .db .get_pinned_cf(&cf_handle, key) - .map_err(|err| Error::Storage(err.into()))?; + .map_err(|err| RequestError::Storage(err.into()))?; if let Some(bytes) = slice { Ok(Some(Self::decode(bytes)?)) @@ -171,12 +171,12 @@ impl LocalMetadataStore { } } - fn get_version(&self, key: &ByteString) -> Result> { + fn get_version(&self, key: &ByteString) -> Result, RequestError> { let cf_handle = self.kv_cf_handle(); let slice = self .db .get_pinned_cf(&cf_handle, key) - .map_err(|err| Error::Storage(err.into()))?; + .map_err(|err| RequestError::Storage(err.into()))?; if let Some(bytes) = slice { // todo only deserialize the version part @@ -192,7 +192,7 @@ impl LocalMetadataStore { key: &ByteString, value: &VersionedValue, precondition: Precondition, - ) -> Result<()> { + ) -> Result<(), RequestError> { match precondition { Precondition::None => Ok(self.write_versioned_kv_pair(key, value).await?), Precondition::DoesNotExist => { @@ -200,7 +200,7 @@ impl LocalMetadataStore { if current_version.is_none() { Ok(self.write_versioned_kv_pair(key, value).await?) } else { - Err(Error::kv_pair_exists()) + Err(RequestError::kv_pair_exists()) } } Precondition::MatchesVersion(version) => { @@ -208,7 +208,7 @@ impl LocalMetadataStore { if current_version == Some(version) { Ok(self.write_versioned_kv_pair(key, value).await?) } else { - Err(Error::version_mismatch(version, current_version)) + Err(RequestError::version_mismatch(version, current_version)) } } } @@ -218,7 +218,7 @@ impl LocalMetadataStore { &mut self, key: &ByteString, value: &VersionedValue, - ) -> Result<()> { + ) -> Result<(), RequestError> { self.buffer.clear(); Self::encode(value, &mut self.buffer)?; @@ -235,10 +235,10 @@ impl LocalMetadataStore { wb, ) .await - .map_err(|err| Error::Storage(err.into())) + .map_err(|err| RequestError::Storage(err.into())) } - fn delete(&mut self, key: &ByteString, precondition: Precondition) -> Result<()> { + fn delete(&mut self, key: &ByteString, precondition: Precondition) -> Result<(), RequestError> { match precondition { Precondition::None => self.delete_kv_pair(key), // this condition does not really make sense for the delete operation @@ -249,7 +249,7 @@ impl LocalMetadataStore { // nothing to do Ok(()) } else { - Err(Error::kv_pair_exists()) + Err(RequestError::kv_pair_exists()) } } Precondition::MatchesVersion(version) => { @@ -258,30 +258,30 @@ impl LocalMetadataStore { if current_version == Some(version) { self.delete_kv_pair(key) } else { - Err(Error::version_mismatch(version, current_version)) + Err(RequestError::version_mismatch(version, current_version)) } } } } - fn delete_kv_pair(&mut self, key: &ByteString) -> Result<()> { + fn delete_kv_pair(&mut self, key: &ByteString) -> Result<(), RequestError> { let write_options = self.write_options(); self.db .delete_cf_opt(&self.kv_cf_handle(), key, &write_options) - .map_err(|err| Error::Storage(err.into())) + .map_err(|err| RequestError::Storage(err.into())) } - fn encode(value: &T, buf: &mut BytesMut) -> Result<()> { + fn encode(value: &T, buf: &mut BytesMut) -> Result<(), RequestError> { StorageCodec::encode(value, buf)?; Ok(()) } - fn decode(buf: impl AsRef<[u8]>) -> Result { + fn decode(buf: impl AsRef<[u8]>) -> Result { let value = StorageCodec::decode(&mut buf.as_ref())?; Ok(value) } - fn log_error(result: &Result, request: &str) { + fn log_error(result: &Result, request: &str) { if let Err(err) = &result { debug!("failed to process request '{}': '{}'", request, err) } diff --git a/crates/metadata-store/src/local/tests.rs b/crates/metadata-store/src/local/tests.rs index e1b90bac6..43e6b6866 100644 --- a/crates/metadata-store/src/local/tests.rs +++ b/crates/metadata-store/src/local/tests.rs @@ -32,7 +32,7 @@ use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned}; use crate::grpc::client::GrpcMetadataStoreClient; use crate::local::service::LocalMetadataStoreService; -use crate::{MetadataStoreClient, Precondition, WriteError}; +use crate::{MetadataStoreClient, MetadataStoreService, Precondition, WriteError}; #[derive(Debug, Clone, PartialOrd, PartialEq, Serialize, Deserialize)] struct Value { diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index c28fd95e4..f4eac1e91 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -31,9 +31,11 @@ use restate_core::{task_center, TaskKind}; #[cfg(feature = "replicated-loglet")] use restate_log_server::LogServerService; use restate_metadata_store::local::LocalMetadataStoreService; -use restate_metadata_store::MetadataStoreClient; +use restate_metadata_store::{ + BoxedMetadataStoreService, MetadataStoreClient, MetadataStoreService, +}; use restate_types::cluster_controller::SchedulingPlan; -use restate_types::config::{CommonOptions, Configuration}; +use restate_types::config::{CommonOptions, Configuration, Kind}; use restate_types::live::Live; use restate_types::logs::metadata::{bootstrap_logs_metadata, Logs}; use restate_types::metadata_store::keys::{ @@ -109,7 +111,7 @@ pub struct Node { metadata_manager: MetadataManager, metadata_store_client: MetadataStoreClient, bifrost: BifrostService, - metadata_store_role: Option, + metadata_store_role: Option, admin_role: Option, worker_role: Option, #[cfg(feature = "replicated-loglet")] @@ -118,7 +120,7 @@ pub struct Node { } impl Node { - pub async fn create(updateable_config: Live) -> Result { + pub async fn create(mut updateable_config: Live) -> Result { let tc = task_center(); let config = updateable_config.pinned(); // ensure we have cluster admin role if bootstrapping. @@ -138,13 +140,7 @@ impl Node { cluster_marker::validate_and_update_cluster_marker(config.common.cluster_name())?; let metadata_store_role = if config.has_role(Role::MetadataStore) { - Some(LocalMetadataStoreService::from_options( - updateable_config.clone().map(|c| &c.metadata_store).boxed(), - updateable_config - .clone() - .map(|config| &config.metadata_store.rocksdb) - .boxed(), - )) + Some(Self::create_metadata_store(&mut updateable_config)) } else { None }; @@ -270,6 +266,26 @@ impl Node { }) } + fn create_metadata_store( + updateable_config: &mut Live, + ) -> BoxedMetadataStoreService { + let config = updateable_config.live_load(); + + match config.metadata_store.kind { + Kind::Local => LocalMetadataStoreService::from_options( + updateable_config.clone().map(|c| &c.metadata_store).boxed(), + updateable_config + .clone() + .map(|config| &config.metadata_store.rocksdb) + .boxed(), + ) + .boxed(), + Kind::Raft => { + unimplemented!("not yet supported") + } + } + } + pub async fn start(self) -> Result<(), anyhow::Error> { let tc = task_center(); diff --git a/crates/types/src/config/metadata_store.rs b/crates/types/src/config/metadata_store.rs index 370f6094e..e034444ff 100644 --- a/crates/types/src/config/metadata_store.rs +++ b/crates/types/src/config/metadata_store.rs @@ -60,6 +60,20 @@ pub struct MetadataStoreOptions { /// /// The RocksDB options which will be used to configure the metadata store's RocksDB instance. pub rocksdb: RocksDbOptions, + + /// # Type of metadata store to start + /// + /// The type of metadata store to start when running the metadata store role. + pub kind: Kind, +} + +#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[serde(rename_all = "kebab-case")] +pub enum Kind { + #[default] + Local, + Raft, } impl MetadataStoreOptions { @@ -112,6 +126,7 @@ impl Default for MetadataStoreOptions { rocksdb_memory_budget: None, rocksdb_memory_ratio: 0.01, rocksdb, + kind: Kind::default(), } } } From d620ae9abd8616759402198ae95ced91da9df496 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 6 Aug 2024 09:47:41 +0200 Subject: [PATCH 3/6] Add single node Raft metadata store with memory storage This commit adds the skeleton of the Raft metadata store. At the moment only a single node with memory storage is supported. This fixes #1785. --- Cargo.lock | 204 +++++++++ crates/metadata-store/Cargo.toml | 6 + crates/metadata-store/src/grpc/handler.rs | 2 +- crates/metadata-store/src/lib.rs | 47 +- crates/metadata-store/src/local/store.rs | 26 +- crates/metadata-store/src/raft/mod.rs | 12 + crates/metadata-store/src/raft/service.rs | 59 +++ crates/metadata-store/src/raft/store.rs | 530 ++++++++++++++++++++++ crates/node/src/lib.rs | 8 +- 9 files changed, 865 insertions(+), 29 deletions(-) create mode 100644 crates/metadata-store/src/raft/mod.rs create mode 100644 crates/metadata-store/src/raft/service.rs create mode 100644 crates/metadata-store/src/raft/store.rs diff --git a/Cargo.lock b/Cargo.lock index c2c38a6de..004caf32a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2566,6 +2566,16 @@ dependencies = [ "dirs-sys", ] +[[package]] +name = "dirs-next" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1" +dependencies = [ + "cfg-if", + "dirs-sys-next", +] + [[package]] name = "dirs-sys" version = "0.4.1" @@ -2578,6 +2588,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "dirs-sys-next" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + [[package]] name = "doc-comment" version = "0.3.3" @@ -2957,6 +2978,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -2980,6 +3010,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "getset" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e45727250e75cc04ff2846a66397da8ef2b3db8e40e0cef4df67950a07621eb9" +dependencies = [ + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "gimli" version = "0.28.1" @@ -5098,6 +5140,36 @@ dependencies = [ "prost 0.13.1", ] +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +dependencies = [ + "bytes", +] + +[[package]] +name = "protobuf-build" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2df9942df2981178a930a72d442de47e2f0df18ad68e50a30f816f1848215ad0" +dependencies = [ + "bitflags 1.3.2", + "protobuf", + "protobuf-codegen", + "regex", +] + +[[package]] +name = "protobuf-codegen" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "033460afb75cf755fcfc16dfaed20b86468082a2ea24e05ac35ab4a099a017d6" +dependencies = [ + "protobuf", +] + [[package]] name = "quanta" version = "0.12.3" @@ -5188,6 +5260,36 @@ dependencies = [ "nibble_vec", ] +[[package]] +name = "raft" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f12688b23a649902762d4c11d854d73c49c9b93138f2de16403ef9f571ad5bae" +dependencies = [ + "bytes", + "fxhash", + "getset", + "protobuf", + "raft-proto", + "rand", + "slog", + "slog-envlogger", + "slog-stdlog", + "slog-term", + "thiserror", +] + +[[package]] +name = "raft-proto" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb6884896294f553e8d5cfbdb55080b9f5f2f43394afff59c9f077e0f4b46d6b" +dependencies = [ + "bytes", + "protobuf", + "protobuf-build", +] + [[package]] name = "rand" version = "0.8.5" @@ -5939,12 +6041,15 @@ dependencies = [ "hyper-util", "prost 0.13.1", "prost-types 0.13.1", + "protobuf", + "raft", "restate-core", "restate-rocksdb", "restate-types", "rocksdb", "schemars", "serde", + "slog", "static_assertions", "tempfile", "test-log", @@ -5959,7 +6064,9 @@ dependencies = [ "tower", "tower-http 0.5.2", "tracing", + "tracing-slog", "tracing-subscriber", + "ulid", ] [[package]] @@ -7091,6 +7198,74 @@ dependencies = [ "autocfg", ] +[[package]] +name = "slog" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8347046d4ebd943127157b94d63abb990fcf729dc4e9978927fdf4ac3c998d06" + +[[package]] +name = "slog-async" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72c8038f898a2c79507940990f05386455b3a317d8f18d4caea7cbc3d5096b84" +dependencies = [ + "crossbeam-channel", + "slog", + "take_mut", + "thread_local", +] + +[[package]] +name = "slog-envlogger" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "906a1a0bc43fed692df4b82a5e2fbfc3733db8dad8bb514ab27a4f23ad04f5c0" +dependencies = [ + "log", + "regex", + "slog", + "slog-async", + "slog-scope", + "slog-stdlog", + "slog-term", +] + +[[package]] +name = "slog-scope" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f95a4b4c3274cd2869549da82b57ccc930859bdbf5bcea0424bc5f140b3c786" +dependencies = [ + "arc-swap", + "lazy_static", + "slog", +] + +[[package]] +name = "slog-stdlog" +version = "4.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6706b2ace5bbae7291d3f8d2473e2bfab073ccd7d03670946197aec98471fa3e" +dependencies = [ + "log", + "slog", + "slog-scope", +] + +[[package]] +name = "slog-term" +version = "2.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6e022d0b998abfe5c3782c1f03551a596269450ccd677ea51c56f8b214610e8" +dependencies = [ + "is-terminal", + "slog", + "term", + "thread_local", + "time", +] + [[package]] name = "smallvec" version = "1.13.2" @@ -7331,6 +7506,12 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "take_mut" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" + [[package]] name = "tempfile" version = "3.10.1" @@ -7343,6 +7524,17 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "term" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59df8ac95d96ff9bede18eb7300b0fda5e5d8d90960e76f8e14ae765eedbf1f" +dependencies = [ + "dirs-next", + "rustversion", + "winapi", +] + [[package]] name = "termcolor" version = "1.4.1" @@ -7910,6 +8102,17 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-slog" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9306d2ca06aa9dfc8aa729ff884e9dca181f588a298cc5c59d4fdd91372570bf" +dependencies = [ + "once_cell", + "slog", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.18" @@ -8029,6 +8232,7 @@ checksum = "34778c17965aa2a08913b57e1f34db9b4a63f5de31768b55bf20d2795f921259" dependencies = [ "getrandom", "rand", + "serde", "web-time", ] diff --git a/crates/metadata-store/Cargo.toml b/crates/metadata-store/Cargo.toml index b04d107c8..b1bf2753b 100644 --- a/crates/metadata-store/Cargo.toml +++ b/crates/metadata-store/Cargo.toml @@ -18,6 +18,7 @@ restate-rocksdb = { workspace = true } restate-types = { workspace = true } anyhow = { workspace = true } +assert2 = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } bytestring = { workspace = true } @@ -29,9 +30,12 @@ hyper = { workspace = true } hyper-util = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } +protobuf = "2.28.0" +raft = { version = "0.7.0" } rocksdb = { workspace = true } schemars = { workspace = true, optional = true } serde = { workspace = true } +slog = { version = "2.7.0" } static_assertions = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } @@ -43,6 +47,8 @@ tonic-health = { workspace = true } tower = { workspace = true } tower-http = { workspace = true, features = ["trace"] } tracing = { workspace = true } +tracing-slog = { version = "0.3.0" } +ulid = { workspace = true, features = ["serde"] } [dev-dependencies] restate-core = { workspace = true, features = ["test-util"] } diff --git a/crates/metadata-store/src/grpc/handler.rs b/crates/metadata-store/src/grpc/handler.rs index 8c5237a83..938e09d8e 100644 --- a/crates/metadata-store/src/grpc/handler.rs +++ b/crates/metadata-store/src/grpc/handler.rs @@ -132,7 +132,7 @@ impl MetadataStoreSvc for MetadataStoreHandler { impl From for Status { fn from(err: RequestError) -> Self { match err { - RequestError::FailedPrecondition(msg) => Status::failed_precondition(msg), + RequestError::FailedPrecondition(msg) => Status::failed_precondition(msg.to_string()), err => Status::internal(err.to_string()), } } diff --git a/crates/metadata-store/src/lib.rs b/crates/metadata-store/src/lib.rs index 2f50c5d91..3ab083883 100644 --- a/crates/metadata-store/src/lib.rs +++ b/crates/metadata-store/src/lib.rs @@ -11,6 +11,7 @@ mod grpc; mod grpc_svc; pub mod local; +pub mod raft; use bytestring::ByteString; use restate_core::metadata_store::VersionedValue; @@ -30,10 +31,10 @@ pub type RequestReceiver = mpsc::Receiver; #[derive(Debug, thiserror::Error)] pub enum RequestError { - #[error("storage error: {0}")] - Storage(#[from] GenericError), + #[error("internal error: {0}")] + Internal(#[from] GenericError), #[error("failed precondition: {0}")] - FailedPrecondition(String), + FailedPrecondition(#[from] PreconditionViolation), #[error("invalid argument: {0}")] InvalidArgument(String), #[error("encode error: {0}")] @@ -42,6 +43,27 @@ pub enum RequestError { Decode(#[from] StorageDecodeError), } +#[derive(Debug, thiserror::Error)] +pub enum PreconditionViolation { + #[error("key-value pair already exists")] + Exists, + #[error("expected version '{expected}' but found version '{actual:?}'")] + VersionMismatch { + expected: Version, + actual: Option, + }, +} + +impl PreconditionViolation { + fn kv_pair_exists() -> Self { + PreconditionViolation::Exists + } + + fn version_mismatch(expected: Version, actual: Option) -> Self { + PreconditionViolation::VersionMismatch { expected, actual } + } +} + #[derive(Debug, thiserror::Error)] pub enum Error { #[error("error while running server grpc reflection service: {0}")] @@ -52,6 +74,12 @@ pub enum Error { Generic(#[from] GenericError), } +impl Error { + pub fn generic(err: impl Into) -> Error { + Error::Generic(err.into()) + } +} + #[async_trait::async_trait] pub trait MetadataStoreServiceBoxed { async fn run_boxed(self: Box) -> Result<(), Error>; @@ -105,16 +133,3 @@ pub enum MetadataStoreRequest { result_tx: oneshot::Sender>, }, } - -impl RequestError { - fn kv_pair_exists() -> Self { - RequestError::FailedPrecondition("key-value pair already exists".to_owned()) - } - - fn version_mismatch(expected: Version, actual: Option) -> Self { - RequestError::FailedPrecondition(format!( - "Expected version '{}' but found version '{:?}'", - expected, actual - )) - } -} diff --git a/crates/metadata-store/src/local/store.rs b/crates/metadata-store/src/local/store.rs index 834b42d59..55a6b33e7 100644 --- a/crates/metadata-store/src/local/store.rs +++ b/crates/metadata-store/src/local/store.rs @@ -8,7 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::{MetadataStoreRequest, RequestError, RequestReceiver, RequestSender}; +use crate::{ + MetadataStoreRequest, PreconditionViolation, RequestError, RequestReceiver, RequestSender, +}; use bytes::BytesMut; use bytestring::ByteString; use restate_core::cancellation_watcher; @@ -162,7 +164,7 @@ impl LocalMetadataStore { let slice = self .db .get_pinned_cf(&cf_handle, key) - .map_err(|err| RequestError::Storage(err.into()))?; + .map_err(|err| RequestError::Internal(err.into()))?; if let Some(bytes) = slice { Ok(Some(Self::decode(bytes)?)) @@ -176,7 +178,7 @@ impl LocalMetadataStore { let slice = self .db .get_pinned_cf(&cf_handle, key) - .map_err(|err| RequestError::Storage(err.into()))?; + .map_err(|err| RequestError::Internal(err.into()))?; if let Some(bytes) = slice { // todo only deserialize the version part @@ -200,7 +202,7 @@ impl LocalMetadataStore { if current_version.is_none() { Ok(self.write_versioned_kv_pair(key, value).await?) } else { - Err(RequestError::kv_pair_exists()) + Err(PreconditionViolation::kv_pair_exists())? } } Precondition::MatchesVersion(version) => { @@ -208,7 +210,10 @@ impl LocalMetadataStore { if current_version == Some(version) { Ok(self.write_versioned_kv_pair(key, value).await?) } else { - Err(RequestError::version_mismatch(version, current_version)) + Err(PreconditionViolation::version_mismatch( + version, + current_version, + ))? } } } @@ -235,7 +240,7 @@ impl LocalMetadataStore { wb, ) .await - .map_err(|err| RequestError::Storage(err.into())) + .map_err(|err| RequestError::Internal(err.into())) } fn delete(&mut self, key: &ByteString, precondition: Precondition) -> Result<(), RequestError> { @@ -249,7 +254,7 @@ impl LocalMetadataStore { // nothing to do Ok(()) } else { - Err(RequestError::kv_pair_exists()) + Err(PreconditionViolation::kv_pair_exists())? } } Precondition::MatchesVersion(version) => { @@ -258,7 +263,10 @@ impl LocalMetadataStore { if current_version == Some(version) { self.delete_kv_pair(key) } else { - Err(RequestError::version_mismatch(version, current_version)) + Err(PreconditionViolation::version_mismatch( + version, + current_version, + ))? } } } @@ -268,7 +276,7 @@ impl LocalMetadataStore { let write_options = self.write_options(); self.db .delete_cf_opt(&self.kv_cf_handle(), key, &write_options) - .map_err(|err| RequestError::Storage(err.into())) + .map_err(|err| RequestError::Internal(err.into())) } fn encode(value: &T, buf: &mut BytesMut) -> Result<(), RequestError> { diff --git a/crates/metadata-store/src/raft/mod.rs b/crates/metadata-store/src/raft/mod.rs new file mode 100644 index 000000000..707997220 --- /dev/null +++ b/crates/metadata-store/src/raft/mod.rs @@ -0,0 +1,12 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +pub mod service; +mod store; diff --git a/crates/metadata-store/src/raft/service.rs b/crates/metadata-store/src/raft/service.rs new file mode 100644 index 000000000..496320c67 --- /dev/null +++ b/crates/metadata-store/src/raft/service.rs @@ -0,0 +1,59 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::grpc::handler::MetadataStoreHandler; +use crate::grpc::server::GrpcServer; +use crate::grpc::service_builder::GrpcServiceBuilder; +use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer; +use crate::raft::store::RaftMetadataStore; +use crate::{grpc_svc, Error, MetadataStoreService}; +use futures::TryFutureExt; +use restate_core::{task_center, TaskKind}; +use restate_types::config::MetadataStoreOptions; +use restate_types::live::BoxedLiveLoad; + +pub struct RaftMetadataStoreService { + options: BoxedLiveLoad, +} + +impl RaftMetadataStoreService { + pub fn new(options: BoxedLiveLoad) -> Self { + Self { options } + } +} + +#[async_trait::async_trait] +impl MetadataStoreService for RaftMetadataStoreService { + async fn run(mut self) -> Result<(), Error> { + let store_options = self.options.live_load(); + let store = RaftMetadataStore::new().map_err(Error::generic)?; + + let mut builder = GrpcServiceBuilder::default(); + + builder.register_file_descriptor_set_for_reflection(grpc_svc::FILE_DESCRIPTOR_SET); + builder.add_service(MetadataStoreSvcServer::new(MetadataStoreHandler::new( + store.request_sender(), + ))); + + let grpc_server = + GrpcServer::new(store_options.bind_address.clone(), builder.build().await?); + + task_center().spawn_child( + TaskKind::RpcServer, + "metadata-store-grpc", + None, + grpc_server.run().map_err(Into::into), + )?; + + store.run().await.map_err(Error::generic)?; + + Ok(()) + } +} diff --git a/crates/metadata-store/src/raft/store.rs b/crates/metadata-store/src/raft/store.rs new file mode 100644 index 000000000..94292a2d3 --- /dev/null +++ b/crates/metadata-store/src/raft/store.rs @@ -0,0 +1,530 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::{ + MetadataStoreRequest, PreconditionViolation, RequestError, RequestReceiver, RequestSender, +}; +use assert2::let_assert; +use bytes::{Bytes, BytesMut}; +use bytestring::ByteString; +use protobuf::{Message as ProtobufMessage, ProtobufError}; +use raft::prelude::{ConfChange, ConfChangeV2, ConfState, Entry, EntryType, Message}; +use raft::storage::MemStorage; +use raft::{Config, RawNode}; +use restate_core::cancellation_watcher; +use restate_core::metadata_store::{Precondition, VersionedValue}; +use restate_types::storage::{StorageCodec, StorageDecodeError, StorageEncodeError}; +use restate_types::{flexbuffers_storage_encode_decode, Version}; +use slog::o; +use std::collections::HashMap; +use std::time::Duration; +use tokio::sync::{mpsc, oneshot}; +use tokio::time; +use tokio::time::MissedTickBehavior; +use tracing::{debug, info, warn}; +use tracing_slog::TracingSlogDrain; +use ulid::Ulid; + +#[derive(Debug, thiserror::Error)] +#[error("failed creating raft node: {0}")] +pub struct BuildError(#[from] raft::Error); + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("failed appending entries: {0}")] + Append(#[from] raft::Error), + #[error("failed deserializing raft serialized requests: {0}")] + DecodeRequest(StorageDecodeError), + #[error("failed deserializing conf change: {0}")] + DecodeConf(ProtobufError), + #[error("failed applying conf change: {0}")] + ApplyConfChange(raft::Error), +} + +pub struct RaftMetadataStore { + _logger: slog::Logger, + raw_node: RawNode, + tick_interval: time::Interval, + + callbacks: HashMap, + kv_entries: HashMap, + + request_tx: RequestSender, + request_rx: RequestReceiver, +} + +impl RaftMetadataStore { + pub fn new() -> Result { + let (request_tx, request_rx) = mpsc::channel(2); + + let config = Config { + id: 1, + ..Default::default() + }; + + let store = MemStorage::new_with_conf_state(ConfState::from((vec![1], vec![]))); + let drain = TracingSlogDrain; + let logger = slog::Logger::root(drain, o!()); + + let raw_node = RawNode::new(&config, store, &logger)?; + + let mut tick_interval = time::interval(Duration::from_millis(100)); + tick_interval.set_missed_tick_behavior(MissedTickBehavior::Burst); + + Ok(Self { + // we only need to keep it alive + _logger: logger, + raw_node, + tick_interval, + callbacks: HashMap::default(), + kv_entries: HashMap::default(), + request_rx, + request_tx, + }) + } + + pub fn request_sender(&self) -> RequestSender { + self.request_tx.clone() + } + + pub async fn run(mut self) -> Result<(), Error> { + let mut cancellation = std::pin::pin!(cancellation_watcher()); + + loop { + tokio::select! { + _ = &mut cancellation => { + break; + } + Some(request) = self.request_rx.recv() => { + // todo: Unclear whether every replica should be allowed to propose. Maybe + // only the leader should propose and respond to clients. + let (callback, request) = Self::split_request(request); + + if let Err(err) = request + .encode_to_vec() + .map_err(Into::into) + .and_then(|request| self.raw_node + .propose(vec![], request) + .map_err(|err| RequestError::Internal(err.into()))) { + info!("Failed processing request: {err}"); + callback.fail(err); + continue; + } + + self.register_callback(callback); + } + _ = self.tick_interval.tick() => { + self.raw_node.tick(); + } + } + + self.on_ready()?; + } + + debug!("Stop running RaftMetadataStore."); + + Ok(()) + } + + fn on_ready(&mut self) -> Result<(), Error> { + if !self.raw_node.has_ready() { + return Ok(()); + } + + let mut ready = self.raw_node.ready(); + + // first need to send outgoing messages + if !ready.messages().is_empty() { + self.send_messages(ready.take_messages()); + } + + // apply snapshot if one was sent + if !ready.snapshot().is_empty() { + if let Err(err) = self + .raw_node + .store() + .wl() + .apply_snapshot(ready.snapshot().clone()) + { + warn!("failed applying snapshot: {err}"); + } + } + + // then handle committed entries + self.handle_committed_entries(ready.take_committed_entries())?; + + // append new Raft entries to storage + self.raw_node.store().wl().append(ready.entries())?; + + // update the hard state if an update was produced (e.g. vote has happened) + if let Some(hs) = ready.hs() { + self.raw_node.store().wl().set_hardstate(hs.clone()); + } + + // send persisted messages (after entries were appended and hard state was updated) + if !ready.persisted_messages().is_empty() { + self.send_messages(ready.take_persisted_messages()); + } + + // advance the raft node + let mut light_ready = self.raw_node.advance(ready); + + // update the commit index if it changed + if let Some(commit) = light_ready.commit_index() { + self.raw_node + .store() + .wl() + .mut_hard_state() + .set_commit(commit); + } + + // send outgoing messages + if !light_ready.messages().is_empty() { + self.send_messages(light_ready.take_messages()); + } + + // handle committed entries + if !light_ready.committed_entries().is_empty() { + self.handle_committed_entries(light_ready.take_committed_entries())?; + } + + self.raw_node.advance_apply(); + + Ok(()) + } + + fn register_callback(&mut self, callback: Callback) { + self.callbacks.insert(callback.request_id, callback); + } + + fn send_messages(&self, _messages: Vec) { + // todo: Send messages to other peers + } + + fn handle_committed_entries(&mut self, committed_entries: Vec) -> Result<(), Error> { + for entry in committed_entries { + if entry.data.is_empty() { + // new leader was elected + continue; + } + + match entry.get_entry_type() { + EntryType::EntryNormal => self.handle_normal_entry(entry)?, + EntryType::EntryConfChange => self.handle_conf_change(entry)?, + EntryType::EntryConfChangeV2 => self.handle_conf_change_v2(entry)?, + } + } + + Ok(()) + } + + fn handle_normal_entry(&mut self, entry: Entry) -> Result<(), Error> { + let request = Request::decode_from_bytes(entry.data).map_err(Error::DecodeRequest)?; + self.handle_request(request); + + Ok(()) + } + + fn handle_request(&mut self, request: Request) { + match request.kind { + RequestKind::Get { key } => { + let result = self.get(key); + if let Some(callback) = self.callbacks.remove(&request.request_id) { + callback.complete_get(result); + } + } + RequestKind::GetVersion { key } => { + let result = self.get_version(key); + if let Some(callback) = self.callbacks.remove(&request.request_id) { + callback.complete_get_version(result); + } + } + RequestKind::Put { + key, + value, + precondition, + } => { + let result = self.put(key, value, precondition); + if let Some(callback) = self.callbacks.remove(&request.request_id) { + callback.complete_put(result.map_err(Into::into)); + } + } + RequestKind::Delete { key, precondition } => { + let result = self.delete(key, precondition); + if let Some(callback) = self.callbacks.remove(&request.request_id) { + callback.complete_delete(result.map_err(Into::into)); + } + } + } + } + + fn get(&self, key: ByteString) -> Option { + self.kv_entries.get(&key).cloned() + } + + fn get_version(&self, key: ByteString) -> Option { + self.kv_entries.get(&key).map(|entry| entry.version) + } + + fn put( + &mut self, + key: ByteString, + value: VersionedValue, + precondition: Precondition, + ) -> Result<(), PreconditionViolation> { + match precondition { + Precondition::None => { + self.kv_entries.insert(key, value); + } + Precondition::DoesNotExist => { + if self.kv_entries.contains_key(&key) { + return Err(PreconditionViolation::kv_pair_exists()); + } + + self.kv_entries.insert(key, value); + } + Precondition::MatchesVersion(expected_version) => { + let actual_version = self.kv_entries.get(&key).map(|entry| entry.version); + + if actual_version == Some(expected_version) { + self.kv_entries.insert(key, value); + } else { + return Err(PreconditionViolation::version_mismatch( + expected_version, + actual_version, + )); + } + } + } + + Ok(()) + } + + fn delete( + &mut self, + key: ByteString, + precondition: Precondition, + ) -> Result<(), PreconditionViolation> { + match precondition { + Precondition::None => { + self.kv_entries.remove(&key); + } + Precondition::DoesNotExist => { + if self.kv_entries.contains_key(&key) { + return Err(PreconditionViolation::kv_pair_exists()); + } + } + Precondition::MatchesVersion(expected_version) => { + let actual_version = self.kv_entries.get(&key).map(|entry| entry.version); + + if actual_version == Some(expected_version) { + self.kv_entries.remove(&key); + } else { + return Err(PreconditionViolation::version_mismatch( + expected_version, + actual_version, + )); + } + } + } + + Ok(()) + } + + fn handle_conf_change(&mut self, entry: Entry) -> Result<(), Error> { + let mut cc = ConfChange::default(); + cc.merge_from_bytes(&entry.data) + .map_err(Error::DecodeConf)?; + let cs = self + .raw_node + .apply_conf_change(&cc) + .map_err(Error::ApplyConfChange)?; + self.raw_node.store().wl().set_conf_state(cs); + Ok(()) + } + + fn handle_conf_change_v2(&mut self, entry: Entry) -> Result<(), Error> { + let mut cc = ConfChangeV2::default(); + cc.merge_from_bytes(&entry.data) + .map_err(Error::DecodeConf)?; + let cs = self + .raw_node + .apply_conf_change(&cc) + .map_err(Error::ApplyConfChange)?; + self.raw_node.store().wl().set_conf_state(cs); + Ok(()) + } + + fn split_request(request: MetadataStoreRequest) -> (Callback, Request) { + let (request_kind, callback_kind) = match request { + MetadataStoreRequest::Get { key, result_tx } => { + (RequestKind::Get { key }, CallbackKind::Get { result_tx }) + } + MetadataStoreRequest::GetVersion { key, result_tx } => ( + RequestKind::GetVersion { key }, + CallbackKind::GetVersion { result_tx }, + ), + MetadataStoreRequest::Put { + key, + value, + precondition, + result_tx, + } => ( + RequestKind::Put { + key, + value, + precondition, + }, + CallbackKind::Put { result_tx }, + ), + MetadataStoreRequest::Delete { + key, + precondition, + result_tx, + } => ( + RequestKind::Delete { key, precondition }, + CallbackKind::Delete { result_tx }, + ), + }; + + let request_id = Ulid::new(); + + let callback = Callback { + request_id, + kind: callback_kind, + }; + + let request = Request { + request_id, + kind: request_kind, + }; + + (callback, request) + } +} + +struct Callback { + request_id: Ulid, + kind: CallbackKind, +} + +impl Callback { + fn fail(self, err: impl Into) { + match self.kind { + CallbackKind::Get { result_tx } => { + // err only if the oneshot receiver has gone away + let _ = result_tx.send(Err(err.into())); + } + CallbackKind::GetVersion { result_tx } => { + // err only if the oneshot receiver has gone away + let _ = result_tx.send(Err(err.into())); + } + CallbackKind::Put { result_tx } => { + // err only if the oneshot receiver has gone away + let _ = result_tx.send(Err(err.into())); + } + CallbackKind::Delete { result_tx } => { + // err only if the oneshot receiver has gone away + let _ = result_tx.send(Err(err.into())); + } + }; + } + + fn complete_get(self, result: Option) { + let_assert!( + CallbackKind::Get { result_tx } = self.kind, + "expected 'Get' callback" + ); + // err if caller has gone + let _ = result_tx.send(Ok(result)); + } + + fn complete_get_version(self, result: Option) { + let_assert!( + CallbackKind::GetVersion { result_tx } = self.kind, + "expected 'GetVersion' callback" + ); + // err if caller has gone + let _ = result_tx.send(Ok(result)); + } + + fn complete_put(self, result: Result<(), RequestError>) { + let_assert!( + CallbackKind::Put { result_tx } = self.kind, + "expected 'Put' callback" + ); + // err if caller has gone + let _ = result_tx.send(result); + } + + fn complete_delete(self, result: Result<(), RequestError>) { + let_assert!( + CallbackKind::Delete { result_tx } = self.kind, + "expected 'Delete' callback" + ); + // err if caller has gone + let _ = result_tx.send(result); + } +} + +enum CallbackKind { + Get { + result_tx: oneshot::Sender, RequestError>>, + }, + GetVersion { + result_tx: oneshot::Sender, RequestError>>, + }, + Put { + result_tx: oneshot::Sender>, + }, + Delete { + result_tx: oneshot::Sender>, + }, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +struct Request { + request_id: Ulid, + kind: RequestKind, +} + +flexbuffers_storage_encode_decode!(Request); + +impl Request { + fn encode_to_vec(&self) -> Result, StorageEncodeError> { + let mut buffer = BytesMut::new(); + // todo: Removing support for BufMut requires an extra copy from BytesMut to Vec :-( + StorageCodec::encode(self, &mut buffer)?; + Ok(buffer.to_vec()) + } + + fn decode_from_bytes(mut bytes: Bytes) -> Result { + StorageCodec::decode::(&mut bytes) + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +enum RequestKind { + Get { + key: ByteString, + }, + GetVersion { + key: ByteString, + }, + Put { + key: ByteString, + value: VersionedValue, + precondition: Precondition, + }, + Delete { + key: ByteString, + precondition: Precondition, + }, +} diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index f4eac1e91..eecd0477c 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -31,6 +31,7 @@ use restate_core::{task_center, TaskKind}; #[cfg(feature = "replicated-loglet")] use restate_log_server::LogServerService; use restate_metadata_store::local::LocalMetadataStoreService; +use restate_metadata_store::raft::service::RaftMetadataStoreService; use restate_metadata_store::{ BoxedMetadataStoreService, MetadataStoreClient, MetadataStoreService, }; @@ -280,9 +281,10 @@ impl Node { .boxed(), ) .boxed(), - Kind::Raft => { - unimplemented!("not yet supported") - } + Kind::Raft => RaftMetadataStoreService::new( + updateable_config.clone().map(|c| &c.metadata_store).boxed(), + ) + .boxed(), } } From 2e4b33135fc8db6b9caf0a388e99e71cabf39b97 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 6 Aug 2024 10:07:39 +0200 Subject: [PATCH 4/6] Classify ProposalDropped as Unavailable Tonic status The raft metadata store does not accept new proposals if there is no known leader. In this situation, request failed with an internal ProposalDropped error. This commit changes the behavior so that a ProposalDropped error will be translated into an unavailable Tonic status. That way, the request will get automatically retried. --- crates/metadata-store/src/grpc/handler.rs | 3 ++- crates/metadata-store/src/lib.rs | 4 +++- crates/metadata-store/src/raft/store.rs | 11 ++++++++++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/crates/metadata-store/src/grpc/handler.rs b/crates/metadata-store/src/grpc/handler.rs index 938e09d8e..54a40224f 100644 --- a/crates/metadata-store/src/grpc/handler.rs +++ b/crates/metadata-store/src/grpc/handler.rs @@ -132,7 +132,8 @@ impl MetadataStoreSvc for MetadataStoreHandler { impl From for Status { fn from(err: RequestError) -> Self { match err { - RequestError::FailedPrecondition(msg) => Status::failed_precondition(msg.to_string()), + RequestError::FailedPrecondition(err) => Status::failed_precondition(err.to_string()), + RequestError::Unavailable(err) => Status::unavailable(err.to_string()), err => Status::internal(err.to_string()), } } diff --git a/crates/metadata-store/src/lib.rs b/crates/metadata-store/src/lib.rs index 3ab083883..be7c13968 100644 --- a/crates/metadata-store/src/lib.rs +++ b/crates/metadata-store/src/lib.rs @@ -32,7 +32,9 @@ pub type RequestReceiver = mpsc::Receiver; #[derive(Debug, thiserror::Error)] pub enum RequestError { #[error("internal error: {0}")] - Internal(#[from] GenericError), + Internal(GenericError), + #[error("service currently unavailable: {0}")] + Unavailable(GenericError), #[error("failed precondition: {0}")] FailedPrecondition(#[from] PreconditionViolation), #[error("invalid argument: {0}")] diff --git a/crates/metadata-store/src/raft/store.rs b/crates/metadata-store/src/raft/store.rs index 94292a2d3..915597d2e 100644 --- a/crates/metadata-store/src/raft/store.rs +++ b/crates/metadata-store/src/raft/store.rs @@ -112,7 +112,7 @@ impl RaftMetadataStore { .map_err(Into::into) .and_then(|request| self.raw_node .propose(vec![], request) - .map_err(|err| RequestError::Internal(err.into()))) { + .map_err(RequestError::from)) { info!("Failed processing request: {err}"); callback.fail(err); continue; @@ -528,3 +528,12 @@ enum RequestKind { precondition: Precondition, }, } + +impl From for RequestError { + fn from(value: raft::Error) -> Self { + match value { + err @ raft::Error::ProposalDropped => RequestError::Unavailable(err.into()), + err => RequestError::Internal(err.into()), + } + } +} From a6d99f757a98f6a82a689344c487b3c9fa14a8f8 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 6 Aug 2024 15:52:07 +0200 Subject: [PATCH 5/6] Implement simple durable Raft storage based on RocksDB This commit adds RocksDbStorage which implements raft::Storage. The RocksDbStorage is a durable storage implementation which is used by the RaftMetadataStore to store the raft state durably. This fixes #1791. --- crates/metadata-store/src/lib.rs | 1 + crates/metadata-store/src/local/store.rs | 62 +--- crates/metadata-store/src/raft/mod.rs | 1 + crates/metadata-store/src/raft/service.rs | 2 +- crates/metadata-store/src/raft/storage.rs | 420 ++++++++++++++++++++++ crates/metadata-store/src/raft/store.rs | 76 ++-- crates/metadata-store/src/util.rs | 52 +++ 7 files changed, 537 insertions(+), 77 deletions(-) create mode 100644 crates/metadata-store/src/raft/storage.rs create mode 100644 crates/metadata-store/src/util.rs diff --git a/crates/metadata-store/src/lib.rs b/crates/metadata-store/src/lib.rs index be7c13968..da43f361e 100644 --- a/crates/metadata-store/src/lib.rs +++ b/crates/metadata-store/src/lib.rs @@ -12,6 +12,7 @@ mod grpc; mod grpc_svc; pub mod local; pub mod raft; +mod util; use bytestring::ByteString; use restate_core::metadata_store::VersionedValue; diff --git a/crates/metadata-store/src/local/store.rs b/crates/metadata-store/src/local/store.rs index 55a6b33e7..fc874bc6b 100644 --- a/crates/metadata-store/src/local/store.rs +++ b/crates/metadata-store/src/local/store.rs @@ -9,7 +9,7 @@ // by the Apache License, Version 2.0. use crate::{ - MetadataStoreRequest, PreconditionViolation, RequestError, RequestReceiver, RequestSender, + util, MetadataStoreRequest, PreconditionViolation, RequestError, RequestReceiver, RequestSender, }; use bytes::BytesMut; use bytestring::ByteString; @@ -23,7 +23,7 @@ use restate_types::config::{MetadataStoreOptions, RocksDbOptions}; use restate_types::live::BoxedLiveLoad; use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode}; use restate_types::Version; -use rocksdb::{BoundColumnFamily, DBCompressionType, WriteBatch, WriteOptions, DB}; +use rocksdb::{BoundColumnFamily, WriteBatch, WriteOptions, DB}; use std::sync::Arc; use tokio::sync::mpsc; use tracing::{debug, trace}; @@ -56,13 +56,17 @@ impl LocalMetadataStore { let db_name = DbName::new(DB_NAME); let db_manager = RocksDbManager::get(); let cfs = vec![CfName::new(KV_PAIRS)]; - let db_spec = DbSpecBuilder::new(db_name.clone(), options.data_dir(), db_options(options)) - .add_cf_pattern( - CfPrefixPattern::ANY, - cf_options(options.rocksdb_memory_budget()), - ) - .ensure_column_families(cfs) - .build_as_db(); + let db_spec = DbSpecBuilder::new( + db_name.clone(), + options.data_dir(), + util::db_options(options), + ) + .add_cf_pattern( + CfPrefixPattern::ANY, + util::cf_options(options.rocksdb_memory_budget()), + ) + .ensure_column_families(cfs) + .build_as_db(); let db = db_manager .open_db(updateable_rocksdb_options.clone(), db_spec) @@ -295,43 +299,3 @@ impl LocalMetadataStore { } } } - -fn db_options(_options: &MetadataStoreOptions) -> rocksdb::Options { - rocksdb::Options::default() -} - -fn cf_options( - memory_budget: usize, -) -> impl Fn(rocksdb::Options) -> rocksdb::Options + Send + Sync + 'static { - move |mut opts| { - set_memory_related_opts(&mut opts, memory_budget); - opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); - opts.set_num_levels(3); - - opts.set_compression_per_level(&[ - DBCompressionType::None, - DBCompressionType::None, - DBCompressionType::Zstd, - ]); - - // - opts - } -} - -fn set_memory_related_opts(opts: &mut rocksdb::Options, memtables_budget: usize) { - // We set the budget to allow 1 mutable + 3 immutable. - opts.set_write_buffer_size(memtables_budget / 4); - - // merge 2 memtables when flushing to L0 - opts.set_min_write_buffer_number_to_merge(2); - opts.set_max_write_buffer_number(4); - // start flushing L0->L1 as soon as possible. each file on level0 is - // (memtable_memory_budget / 2). This will flush level 0 when it's bigger than - // memtable_memory_budget. - opts.set_level_zero_file_num_compaction_trigger(2); - // doesn't really matter much, but we don't want to create too many files - opts.set_target_file_size_base(memtables_budget as u64 / 8); - // make Level1 size equal to Level0 size, so that L0->L1 compactions are fast - opts.set_max_bytes_for_level_base(memtables_budget as u64); -} diff --git a/crates/metadata-store/src/raft/mod.rs b/crates/metadata-store/src/raft/mod.rs index 707997220..7c14a5b27 100644 --- a/crates/metadata-store/src/raft/mod.rs +++ b/crates/metadata-store/src/raft/mod.rs @@ -9,4 +9,5 @@ // by the Apache License, Version 2.0. pub mod service; +mod storage; mod store; diff --git a/crates/metadata-store/src/raft/service.rs b/crates/metadata-store/src/raft/service.rs index 496320c67..e788c58c7 100644 --- a/crates/metadata-store/src/raft/service.rs +++ b/crates/metadata-store/src/raft/service.rs @@ -33,7 +33,7 @@ impl RaftMetadataStoreService { impl MetadataStoreService for RaftMetadataStoreService { async fn run(mut self) -> Result<(), Error> { let store_options = self.options.live_load(); - let store = RaftMetadataStore::new().map_err(Error::generic)?; + let store = RaftMetadataStore::create().await.map_err(Error::generic)?; let mut builder = GrpcServiceBuilder::default(); diff --git a/crates/metadata-store/src/raft/storage.rs b/crates/metadata-store/src/raft/storage.rs new file mode 100644 index 000000000..0e37bbbce --- /dev/null +++ b/crates/metadata-store/src/raft/storage.rs @@ -0,0 +1,420 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::util; +use protobuf::{Message, ProtobufError}; +use raft::eraftpb::{ConfState, Entry, Snapshot}; +use raft::prelude::HardState; +use raft::{GetEntriesContext, RaftState, Storage, StorageError}; +use restate_rocksdb::{ + CfName, CfPrefixPattern, DbName, DbSpecBuilder, IoMode, Priority, RocksDb, RocksDbManager, + RocksError, +}; +use restate_types::config::{MetadataStoreOptions, RocksDbOptions}; +use restate_types::live::BoxedLiveLoad; +use rocksdb::{BoundColumnFamily, ReadOptions, WriteBatch, WriteOptions, DB}; +use std::mem::size_of; +use std::sync::Arc; +use std::{error, mem}; + +const DB_NAME: &str = "raft-metadata-store"; +const RAFT_CF: &str = "raft"; + +const FIRST_RAFT_INDEX: u64 = 1; + +const RAFT_ENTRY_DISCRIMINATOR: u8 = 0x01; +const HARD_STATE_DISCRIMINATOR: u8 = 0x02; +const CONF_STATE_DISCRIMINATOR: u8 = 0x03; + +const RAFT_ENTRY_KEY_LENGTH: usize = 9; + +#[derive(Debug, thiserror::Error)] +pub enum BuildError { + #[error("failed creating RocksDb: {0}")] + RocksDb(#[from] RocksError), +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("failed reading/writing from/to RocksDb: {0}")] + RocksDb(#[from] RocksError), + #[error("failed reading/writing from/to raw RocksDb: {0}")] + RocksDbRaw(#[from] rocksdb::Error), + #[error("failed encoding value: {0}")] + Encode(#[from] ProtobufError), + #[error("index '{index}' is out of bounds; last index is '{last_index}'")] + IndexOutOfBounds { index: u64, last_index: u64 }, + #[error("raft log has been compacted; first index is {0}")] + Compacted(u64), +} + +/// Map our internal error type to [`raft::Error`] to fit the [`Storage`] trait definition. +impl From for raft::Error { + fn from(value: Error) -> Self { + match value { + err @ Error::RocksDb(_) + | err @ Error::RocksDbRaw(_) + | err @ Error::IndexOutOfBounds { .. } => storage_error(err), + Error::Encode(err) => raft::Error::CodecError(err), + Error::Compacted(_) => raft::Error::Store(StorageError::Compacted), + } + } +} + +pub struct RocksDbStorage { + db: Arc, + rocksdb: Arc, + + last_index: u64, + buffer: Vec, +} + +impl RocksDbStorage { + pub async fn create( + options: &MetadataStoreOptions, + rocksdb_options: BoxedLiveLoad, + ) -> Result { + let db_name = DbName::new(DB_NAME); + let db_manager = RocksDbManager::get(); + let cfs = vec![CfName::new(RAFT_CF)]; + let db_spec = DbSpecBuilder::new( + db_name.clone(), + options.data_dir(), + util::db_options(options), + ) + .add_cf_pattern( + CfPrefixPattern::ANY, + util::cf_options(options.rocksdb_memory_budget()), + ) + .ensure_column_families(cfs) + .build_as_db(); + + let db = db_manager.open_db(rocksdb_options, db_spec).await?; + let rocksdb = db_manager + .get_db(db_name) + .expect("raft metadata store db is open"); + + let last_index = Self::find_last_index(&db); + + Ok(Self { + db, + rocksdb, + last_index, + buffer: Vec::with_capacity(1024), + }) + } +} + +impl RocksDbStorage { + fn write_options(&self) -> WriteOptions { + let mut write_opts = WriteOptions::default(); + write_opts.disable_wal(false); + // always sync to not lose data + write_opts.set_sync(true); + write_opts + } + + fn find_last_index(db: &DB) -> u64 { + let cf = db.cf_handle(RAFT_CF).expect("RAFT_CF exists"); + let start = Self::raft_entry_key(0); + // end is exclusive so switch to the next discriminator + let mut end = [0; 9]; + end[0] = RAFT_ENTRY_DISCRIMINATOR + 1; + + let mut options = ReadOptions::default(); + options.set_async_io(true); + options.set_iterate_range(start..end); + let mut iterator = db.raw_iterator_cf_opt(&cf, options); + + iterator.seek_to_last(); + + if iterator.valid() { + let key_bytes = iterator.key().expect("key should be present"); + assert_eq!( + key_bytes.len(), + RAFT_ENTRY_KEY_LENGTH, + "raft entry keys must consist of '{}' bytes", + RAFT_ENTRY_KEY_LENGTH + ); + u64::from_be_bytes( + key_bytes[1..(1 + size_of::())] + .try_into() + .expect("buffer should be long enough"), + ) + } else { + // the first valid raft index starts at 1, so 0 means there are no replicated raft entries + 0 + } + } + + pub fn get_hard_state(&self) -> Result { + let key = Self::hard_state_key(); + self.get_value(key) + .map(|hard_state| hard_state.unwrap_or_default()) + } + + pub async fn store_hard_state(&mut self, hard_state: HardState) -> Result<(), Error> { + let key = Self::hard_state_key(); + self.put_value(key, hard_state).await + } + + pub fn get_conf_state(&self) -> Result { + let key = Self::conf_state_key(); + self.get_value(key) + .map(|hard_state| hard_state.unwrap_or_default()) + } + + pub async fn store_conf_state(&mut self, conf_state: ConfState) -> Result<(), Error> { + let key = Self::conf_state_key(); + self.put_value(key, conf_state).await + } + + pub fn get_entry(&self, idx: u64) -> Result, Error> { + let key = Self::raft_entry_key(idx); + self.get_value(key) + } + + fn get_value(&self, key: impl AsRef<[u8]>) -> Result, Error> { + let cf = self.get_cf_handle(); + let bytes = self.db.get_pinned_cf(&cf, key)?; + + if let Some(bytes) = bytes { + let mut value = T::default(); + value.merge_from_bytes(bytes.as_ref())?; + Ok(Some(value)) + } else { + Ok(None) + } + } + + async fn put_value( + &mut self, + key: impl AsRef<[u8]>, + value: T, + ) -> Result<(), Error> { + self.buffer.clear(); + value.write_to_vec(&mut self.buffer)?; + + let cf = self.get_cf_handle(); + let mut write_batch = WriteBatch::default(); + write_batch.put_cf(&cf, key.as_ref(), &self.buffer); + self.rocksdb + .write_batch( + "put_value", + Priority::High, + IoMode::Default, + self.write_options(), + write_batch, + ) + .await + .map_err(Into::into) + } + + pub async fn append(&mut self, entries: &Vec) -> Result<(), Error> { + let mut write_batch = WriteBatch::default(); + let mut buffer = mem::take(&mut self.buffer); + let mut last_index = self.last_index; + + { + let cf = self.get_cf_handle(); + + for entry in entries { + assert_eq!(last_index + 1, entry.index, "Expect raft log w/o holes"); + let key = Self::raft_entry_key(entry.index); + + buffer.clear(); + entry.write_to_vec(&mut buffer)?; + + write_batch.put_cf(&cf, key, &buffer); + last_index = entry.index; + } + } + + let result = self + .rocksdb + .write_batch( + "append", + Priority::High, + IoMode::Default, + self.write_options(), + write_batch, + ) + .await + .map_err(Into::into); + + self.buffer = buffer; + self.last_index = last_index; + + result + } + + fn get_cf_handle(&self) -> Arc { + self.db.cf_handle(RAFT_CF).expect("RAFT_CF exists") + } + + fn raft_entry_key(idx: u64) -> [u8; RAFT_ENTRY_KEY_LENGTH] { + let mut key = [0; RAFT_ENTRY_KEY_LENGTH]; + key[0] = RAFT_ENTRY_DISCRIMINATOR; + key[1..9].copy_from_slice(&idx.to_be_bytes()); + key + } + + fn hard_state_key() -> [u8; 1] { + [HARD_STATE_DISCRIMINATOR] + } + + fn conf_state_key() -> [u8; 1] { + [CONF_STATE_DISCRIMINATOR] + } + + fn check_index(&self, idx: u64) -> Result<(), Error> { + if idx < self.first_index() { + return Err(Error::Compacted(self.first_index())); + } else if idx > self.last_index() { + return Err(Error::IndexOutOfBounds { + index: idx, + last_index: self.last_index(), + }); + } + + Ok(()) + } + + fn check_range(&self, low: u64, high: u64) -> Result<(), Error> { + assert!(low < high, "Low '{low}' must be smaller than high '{high}'"); + + if low < self.first_index() { + return Err(Error::Compacted(self.first_index())); + } + + if high > self.last_index() + 1 { + return Err(Error::IndexOutOfBounds { + index: high, + last_index: self.last_index(), + }); + } + + Ok(()) + } + + fn last_index(&self) -> u64 { + self.last_index + } + + fn first_index(&self) -> u64 { + FIRST_RAFT_INDEX + } + + pub fn apply_snapshot(&mut self, _snapshot: Snapshot) -> Result<(), Error> { + unimplemented!("snapshots are currently not supported"); + } +} + +impl Storage for RocksDbStorage { + fn initial_state(&self) -> raft::Result { + let hard_state = self.get_hard_state()?; + Ok(RaftState::new(hard_state, self.get_conf_state()?)) + } + + fn entries( + &self, + low: u64, + high: u64, + max_size: impl Into>, + _context: GetEntriesContext, + ) -> raft::Result> { + self.check_range(low, high)?; + let start_key = Self::raft_entry_key(low); + let end_key = Self::raft_entry_key(high); + + let cf = self.get_cf_handle(); + let mut opts = ReadOptions::default(); + opts.set_iterate_range(start_key..end_key); + opts.set_async_io(true); + + let mut iterator = self.db.raw_iterator_cf_opt(&cf, opts); + iterator.seek(start_key); + + let mut result = + Vec::with_capacity(usize::try_from(high - low).expect("u64 fits into usize")); + + let max_size = + usize::try_from(max_size.into().unwrap_or(u64::MAX)).expect("u64 fits into usize"); + let mut size = 0; + let mut expected_idx = low; + + while iterator.valid() { + if size > 0 && size >= max_size { + break; + } + + if let Some(value) = iterator.value() { + let mut entry = Entry::default(); + entry.merge_from_bytes(value)?; + + if expected_idx != entry.index { + if expected_idx == low { + Err(StorageError::Compacted)?; + } else { + // missing raft entries :-( + Err(StorageError::Unavailable)?; + } + } + + result.push(entry); + expected_idx += 1; + size += value.len(); + } + + iterator.next(); + } + + // check for an occurred error + iterator + .status() + .map_err(|err| StorageError::Other(err.into()))?; + + Ok(result) + } + + fn term(&self, idx: u64) -> raft::Result { + // todo handle first_index - 1 once truncation is supported + if idx == 0 { + return Ok(0); + } + + self.check_index(idx)?; + self.get_entry(idx) + .map(|entry| entry.expect("should exist").term) + .map_err(Into::into) + } + + fn first_index(&self) -> raft::Result { + Ok(self.first_index()) + } + + fn last_index(&self) -> raft::Result { + Ok(self.last_index()) + } + + fn snapshot(&self, _request_index: u64, _to: u64) -> raft::Result { + // time is relative as some clever people figured out + Err(raft::Error::Store( + StorageError::SnapshotTemporarilyUnavailable, + )) + } +} + +pub fn storage_error(error: E) -> raft::Error +where + E: Into>, +{ + raft::Error::Store(StorageError::Other(error.into())) +} diff --git a/crates/metadata-store/src/raft/store.rs b/crates/metadata-store/src/raft/store.rs index 915597d2e..e859b89f8 100644 --- a/crates/metadata-store/src/raft/store.rs +++ b/crates/metadata-store/src/raft/store.rs @@ -8,6 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::raft::storage; +use crate::raft::storage::RocksDbStorage; use crate::{ MetadataStoreRequest, PreconditionViolation, RequestError, RequestReceiver, RequestSender, }; @@ -16,10 +18,10 @@ use bytes::{Bytes, BytesMut}; use bytestring::ByteString; use protobuf::{Message as ProtobufMessage, ProtobufError}; use raft::prelude::{ConfChange, ConfChangeV2, ConfState, Entry, EntryType, Message}; -use raft::storage::MemStorage; use raft::{Config, RawNode}; use restate_core::cancellation_watcher; use restate_core::metadata_store::{Precondition, VersionedValue}; +use restate_types::config::Configuration; use restate_types::storage::{StorageCodec, StorageDecodeError, StorageEncodeError}; use restate_types::{flexbuffers_storage_encode_decode, Version}; use slog::o; @@ -33,8 +35,14 @@ use tracing_slog::TracingSlogDrain; use ulid::Ulid; #[derive(Debug, thiserror::Error)] -#[error("failed creating raft node: {0}")] -pub struct BuildError(#[from] raft::Error); +pub enum BuildError { + #[error("failed creating raft node: {0}")] + Raft(#[from] raft::Error), + #[error("failed creating raft storage: {0}")] + Storage(#[from] storage::BuildError), + #[error("failed bootstrapping conf state: {0}")] + BootstrapConfState(#[from] storage::Error), +} #[derive(Debug, thiserror::Error)] pub enum Error { @@ -46,11 +54,13 @@ pub enum Error { DecodeConf(ProtobufError), #[error("failed applying conf change: {0}")] ApplyConfChange(raft::Error), + #[error("failed reading/writing from/to storage: {0}")] + Storage(#[from] storage::Error), } pub struct RaftMetadataStore { _logger: slog::Logger, - raw_node: RawNode, + raw_node: RawNode, tick_interval: time::Interval, callbacks: HashMap, @@ -61,7 +71,7 @@ pub struct RaftMetadataStore { } impl RaftMetadataStore { - pub fn new() -> Result { + pub async fn create() -> Result { let (request_tx, request_rx) = mpsc::channel(2); let config = Config { @@ -69,7 +79,16 @@ impl RaftMetadataStore { ..Default::default() }; - let store = MemStorage::new_with_conf_state(ConfState::from((vec![1], vec![]))); + let rocksdb_options = Configuration::updateable() + .map(|configuration| &configuration.common.rocksdb) + .boxed(); + let mut metadata_store_options = + Configuration::updateable().map(|configuration| &configuration.metadata_store); + let mut store = + RocksDbStorage::create(metadata_store_options.live_load(), rocksdb_options).await?; + let conf_state = ConfState::from((vec![1], vec![])); + store.store_conf_state(conf_state).await?; + let drain = TracingSlogDrain; let logger = slog::Logger::root(drain, o!()); @@ -125,7 +144,7 @@ impl RaftMetadataStore { } } - self.on_ready()?; + self.on_ready().await?; } debug!("Stop running RaftMetadataStore."); @@ -133,7 +152,7 @@ impl RaftMetadataStore { Ok(()) } - fn on_ready(&mut self) -> Result<(), Error> { + async fn on_ready(&mut self) -> Result<(), Error> { if !self.raw_node.has_ready() { return Ok(()); } @@ -149,8 +168,7 @@ impl RaftMetadataStore { if !ready.snapshot().is_empty() { if let Err(err) = self .raw_node - .store() - .wl() + .mut_store() .apply_snapshot(ready.snapshot().clone()) { warn!("failed applying snapshot: {err}"); @@ -158,14 +176,18 @@ impl RaftMetadataStore { } // then handle committed entries - self.handle_committed_entries(ready.take_committed_entries())?; + self.handle_committed_entries(ready.take_committed_entries()) + .await?; // append new Raft entries to storage - self.raw_node.store().wl().append(ready.entries())?; + self.raw_node.mut_store().append(ready.entries()).await?; // update the hard state if an update was produced (e.g. vote has happened) if let Some(hs) = ready.hs() { - self.raw_node.store().wl().set_hardstate(hs.clone()); + self.raw_node + .mut_store() + .store_hard_state(hs.clone()) + .await?; } // send persisted messages (after entries were appended and hard state was updated) @@ -177,12 +199,8 @@ impl RaftMetadataStore { let mut light_ready = self.raw_node.advance(ready); // update the commit index if it changed - if let Some(commit) = light_ready.commit_index() { - self.raw_node - .store() - .wl() - .mut_hard_state() - .set_commit(commit); + if let Some(_commit) = light_ready.commit_index() { + // update commit index in cached hard_state; no need to persist it though } // send outgoing messages @@ -192,7 +210,8 @@ impl RaftMetadataStore { // handle committed entries if !light_ready.committed_entries().is_empty() { - self.handle_committed_entries(light_ready.take_committed_entries())?; + self.handle_committed_entries(light_ready.take_committed_entries()) + .await?; } self.raw_node.advance_apply(); @@ -208,7 +227,10 @@ impl RaftMetadataStore { // todo: Send messages to other peers } - fn handle_committed_entries(&mut self, committed_entries: Vec) -> Result<(), Error> { + async fn handle_committed_entries( + &mut self, + committed_entries: Vec, + ) -> Result<(), Error> { for entry in committed_entries { if entry.data.is_empty() { // new leader was elected @@ -217,8 +239,8 @@ impl RaftMetadataStore { match entry.get_entry_type() { EntryType::EntryNormal => self.handle_normal_entry(entry)?, - EntryType::EntryConfChange => self.handle_conf_change(entry)?, - EntryType::EntryConfChangeV2 => self.handle_conf_change_v2(entry)?, + EntryType::EntryConfChange => self.handle_conf_change(entry).await?, + EntryType::EntryConfChangeV2 => self.handle_conf_change_v2(entry).await?, } } @@ -338,7 +360,7 @@ impl RaftMetadataStore { Ok(()) } - fn handle_conf_change(&mut self, entry: Entry) -> Result<(), Error> { + async fn handle_conf_change(&mut self, entry: Entry) -> Result<(), Error> { let mut cc = ConfChange::default(); cc.merge_from_bytes(&entry.data) .map_err(Error::DecodeConf)?; @@ -346,11 +368,11 @@ impl RaftMetadataStore { .raw_node .apply_conf_change(&cc) .map_err(Error::ApplyConfChange)?; - self.raw_node.store().wl().set_conf_state(cs); + self.raw_node.mut_store().store_conf_state(cs).await?; Ok(()) } - fn handle_conf_change_v2(&mut self, entry: Entry) -> Result<(), Error> { + async fn handle_conf_change_v2(&mut self, entry: Entry) -> Result<(), Error> { let mut cc = ConfChangeV2::default(); cc.merge_from_bytes(&entry.data) .map_err(Error::DecodeConf)?; @@ -358,7 +380,7 @@ impl RaftMetadataStore { .raw_node .apply_conf_change(&cc) .map_err(Error::ApplyConfChange)?; - self.raw_node.store().wl().set_conf_state(cs); + self.raw_node.mut_store().store_conf_state(cs).await?; Ok(()) } diff --git a/crates/metadata-store/src/util.rs b/crates/metadata-store/src/util.rs new file mode 100644 index 000000000..41b971da4 --- /dev/null +++ b/crates/metadata-store/src/util.rs @@ -0,0 +1,52 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use restate_types::config::MetadataStoreOptions; +use rocksdb::DBCompressionType; + +pub fn db_options(_options: &MetadataStoreOptions) -> rocksdb::Options { + rocksdb::Options::default() +} + +pub fn cf_options( + memory_budget: usize, +) -> impl Fn(rocksdb::Options) -> rocksdb::Options + Send + Sync + 'static { + move |mut opts| { + set_memory_related_opts(&mut opts, memory_budget); + opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); + opts.set_num_levels(3); + + opts.set_compression_per_level(&[ + DBCompressionType::None, + DBCompressionType::None, + DBCompressionType::Zstd, + ]); + + // + opts + } +} + +pub fn set_memory_related_opts(opts: &mut rocksdb::Options, memtables_budget: usize) { + // We set the budget to allow 1 mutable + 3 immutable. + opts.set_write_buffer_size(memtables_budget / 4); + + // merge 2 memtables when flushing to L0 + opts.set_min_write_buffer_number_to_merge(2); + opts.set_max_write_buffer_number(4); + // start flushing L0->L1 as soon as possible. each file on level0 is + // (memtable_memory_budget / 2). This will flush level 0 when it's bigger than + // memtable_memory_budget. + opts.set_level_zero_file_num_compaction_trigger(2); + // doesn't really matter much, but we don't want to create too many files + opts.set_target_file_size_base(memtables_budget as u64 / 8); + // make Level1 size equal to Level0 size, so that L0->L1 compactions are fast + opts.set_max_bytes_for_level_base(memtables_budget as u64); +} From cc49b413f940748d32201886718cd72317b32f8a Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 19 Aug 2024 13:09:55 +0200 Subject: [PATCH 6/6] Implement simple networking layer for RaftMetadataStore This fixes #1803. --- Cargo.lock | 1 + .../admin/src/cluster_controller/service.rs | 4 +- crates/metadata-store/Cargo.toml | 1 + crates/metadata-store/build.rs | 6 + .../proto/raft_metadata_store_svc.proto | 24 +++ .../src/raft/connection_manager.rs | 192 ++++++++++++++++++ crates/metadata-store/src/raft/grpc_svc.rs | 14 ++ crates/metadata-store/src/raft/handler.rs | 67 ++++++ crates/metadata-store/src/raft/mod.rs | 4 + crates/metadata-store/src/raft/networking.rs | 145 +++++++++++++ crates/metadata-store/src/raft/service.rs | 39 +++- crates/metadata-store/src/raft/storage.rs | 4 +- crates/metadata-store/src/raft/store.rs | 51 +++-- crates/node/src/lib.rs | 6 +- crates/types/src/config/metadata_store.rs | 18 +- 15 files changed, 551 insertions(+), 25 deletions(-) create mode 100644 crates/metadata-store/proto/raft_metadata_store_svc.proto create mode 100644 crates/metadata-store/src/raft/connection_manager.rs create mode 100644 crates/metadata-store/src/raft/grpc_svc.rs create mode 100644 crates/metadata-store/src/raft/handler.rs create mode 100644 crates/metadata-store/src/raft/networking.rs diff --git a/Cargo.lock b/Cargo.lock index 004caf32a..1a2ab6b64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6032,6 +6032,7 @@ dependencies = [ "bytestring", "codederror", "derive_builder", + "derive_more", "flexbuffers", "futures", "googletest", diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index 1d4a824fd..3524c26c1 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -241,7 +241,9 @@ where } } Ok(cluster_state) = cluster_state_watcher.next_cluster_state() => { - scheduler.on_cluster_state_update(cluster_state).await?; + if let Err(err) = scheduler.on_cluster_state_update(cluster_state).await { + warn!("Could not perform scheduling operation: {err}"); + } } Some(cmd) = self.command_rx.recv() => { self.on_cluster_cmd(cmd, bifrost_admin).await; diff --git a/crates/metadata-store/Cargo.toml b/crates/metadata-store/Cargo.toml index b1bf2753b..40ea78a0e 100644 --- a/crates/metadata-store/Cargo.toml +++ b/crates/metadata-store/Cargo.toml @@ -23,6 +23,7 @@ async-trait = { workspace = true } bytes = { workspace = true } bytestring = { workspace = true } derive_builder = { workspace = true } +derive_more = { workspace = true } futures = { workspace = true } http = { workspace = true } humantime = { workspace = true } diff --git a/crates/metadata-store/build.rs b/crates/metadata-store/build.rs index b2b100490..cdb69918f 100644 --- a/crates/metadata-store/build.rs +++ b/crates/metadata-store/build.rs @@ -21,5 +21,11 @@ fn main() -> Result<(), Box> { .protoc_arg("--experimental_allow_proto3_optional") .compile(&["./proto/metadata_store_svc.proto"], &["proto"])?; + tonic_build::configure() + .bytes(["."]) + .file_descriptor_set_path(out_dir.join("raft_metadata_store_svc.bin")) + .protoc_arg("--experimental_allow_proto3_optional") + .compile(&["./proto/raft_metadata_store_svc.proto"], &["proto"])?; + Ok(()) } diff --git a/crates/metadata-store/proto/raft_metadata_store_svc.proto b/crates/metadata-store/proto/raft_metadata_store_svc.proto new file mode 100644 index 000000000..84a4f4f07 --- /dev/null +++ b/crates/metadata-store/proto/raft_metadata_store_svc.proto @@ -0,0 +1,24 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate service protocol, which is +// released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/proto/blob/main/LICENSE + +syntax = "proto3"; + +import "google/protobuf/empty.proto"; + +package dev.restate.raft_metadata_store_svc; + +// Grpc service definition for the RaftMetadataStore implementation. +service RaftMetadataStoreSvc { + rpc Raft(stream RaftMessage) returns (stream RaftMessage); +} + +message RaftMessage { + bytes message = 1; +} + diff --git a/crates/metadata-store/src/raft/connection_manager.rs b/crates/metadata-store/src/raft/connection_manager.rs new file mode 100644 index 000000000..f579fc81c --- /dev/null +++ b/crates/metadata-store/src/raft/connection_manager.rs @@ -0,0 +1,192 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::raft::grpc_svc::RaftMessage; +use futures::StreamExt; +use protobuf::Message as ProtobufMessage; +use raft::prelude::Message; +use restate_core::{cancellation_watcher, ShutdownError, TaskCenter, TaskKind}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; +use tokio_stream::wrappers::ReceiverStream; +use tonic::codegen::BoxStream; +use tracing::{debug, instrument}; + +#[derive(Debug, thiserror::Error)] +pub enum ConnectionError { + #[error("internal error: {0}")] + Internal(String), + #[error(transparent)] + Shutdown(#[from] ShutdownError), +} + +#[derive(Clone, derive_more::Debug)] +pub struct ConnectionManager { + inner: Arc, + #[debug(skip)] + task_center: TaskCenter, +} + +impl ConnectionManager { + pub fn new(task_center: TaskCenter, identity: u64, router: mpsc::Sender) -> Self { + ConnectionManager { + inner: Arc::new(ConnectionManagerInner::new(identity, router)), + task_center, + } + } + + pub fn identity(&self) -> u64 { + self.inner.identity + } + + pub fn accept_connection( + &self, + raft_peer: u64, + incoming_rx: tonic::Streaming, + ) -> Result, ConnectionError> { + let (outgoing_tx, outgoing_rx) = mpsc::channel(128); + self.run_connection(raft_peer, outgoing_tx, incoming_rx)?; + + let outgoing_stream = ReceiverStream::new(outgoing_rx) + .map(Result::<_, tonic::Status>::Ok) + .boxed(); + Ok(outgoing_stream) + } + + pub fn run_connection( + &self, + remote_peer: u64, + outgoing_tx: mpsc::Sender, + incoming_rx: tonic::Streaming, + ) -> Result<(), ConnectionError> { + let mut guard = self.inner.connections.lock().unwrap(); + + if guard.contains_key(&remote_peer) { + // we already have a connection established to remote peer + return Ok(()); + } + + let connection = Connection::new(outgoing_tx); + guard.insert(remote_peer, connection); + + let reactor = ConnectionReactor { + remote_peer, + connection_manager: Arc::clone(&self.inner), + }; + + let _task_id = self.task_center.spawn_child( + TaskKind::ConnectionReactor, + "raft-connection-reactor", + None, + reactor.run(incoming_rx), + )?; + + Ok(()) + } + + pub fn get_connection(&self, target: u64) -> Option { + self.inner.connections.lock().unwrap().get(&target).cloned() + } +} + +struct ConnectionReactor { + remote_peer: u64, + connection_manager: Arc, +} + +impl ConnectionReactor { + #[instrument(level = "debug", skip_all, fields(remote_peer = %self.remote_peer))] + async fn run(self, mut incoming_rx: tonic::Streaming) -> anyhow::Result<()> { + let mut shutdown = std::pin::pin!(cancellation_watcher()); + debug!("Run connection reactor"); + + loop { + tokio::select! { + _ = &mut shutdown => { + break; + }, + message = incoming_rx.next() => { + match message { + Some(message) => { + match message { + Ok(message) => { + let message = Message::parse_from_carllerche_bytes(&message.message)?; + + assert_eq!(message.to, self.connection_manager.identity, "Expect to only receive messages for peer '{}'", self.connection_manager.identity); + + if self.connection_manager.router.send(message).await.is_err() { + // system is shutting down + debug!("System is shutting down; closing connection"); + break; + } + } + Err(err) => { + debug!("Closing connection because received error: {err}"); + break; + } + } + } + None => { + debug!("Remote peer closed connection"); + break + }, + } + } + } + } + + Ok(()) + } +} + +impl Drop for ConnectionReactor { + fn drop(&mut self) { + debug!(remote_peer = %self.remote_peer, "Close connection"); + self.connection_manager + .connections + .lock() + .expect("shouldn't be poisoned") + .remove(&self.remote_peer); + } +} + +#[derive(Debug)] +struct ConnectionManagerInner { + identity: u64, + connections: Mutex>, + router: mpsc::Sender, +} + +impl ConnectionManagerInner { + pub fn new(identity: u64, router: mpsc::Sender) -> Self { + ConnectionManagerInner { + identity, + router, + connections: Mutex::default(), + } + } +} + +#[derive(Debug, Clone)] +pub struct Connection { + tx: mpsc::Sender, +} + +impl Connection { + pub fn new(tx: mpsc::Sender) -> Self { + Connection { tx } + } + + pub fn try_send(&self, message: RaftMessage) -> Result<(), TrySendError> { + self.tx.try_send(message) + } +} diff --git a/crates/metadata-store/src/raft/grpc_svc.rs b/crates/metadata-store/src/raft/grpc_svc.rs new file mode 100644 index 000000000..a9064c261 --- /dev/null +++ b/crates/metadata-store/src/raft/grpc_svc.rs @@ -0,0 +1,14 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +tonic::include_proto!("dev.restate.raft_metadata_store_svc"); + +pub const FILE_DESCRIPTOR_SET: &[u8] = + tonic::include_file_descriptor_set!("raft_metadata_store_svc"); diff --git a/crates/metadata-store/src/raft/handler.rs b/crates/metadata-store/src/raft/handler.rs new file mode 100644 index 000000000..da9995ccb --- /dev/null +++ b/crates/metadata-store/src/raft/handler.rs @@ -0,0 +1,67 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::raft::connection_manager::{ConnectionError, ConnectionManager}; +use crate::raft::grpc_svc::raft_metadata_store_svc_server::RaftMetadataStoreSvc; +use crate::raft::grpc_svc::RaftMessage; +use std::str::FromStr; +use tonic::codegen::BoxStream; +use tonic::{Request, Response, Status, Streaming}; + +pub const RAFT_PEER_METADATA_KEY: &str = "x-restate-raft-peer"; + +#[derive(Debug)] +pub struct RaftMetadataStoreHandler { + connection_manager: ConnectionManager, +} + +impl RaftMetadataStoreHandler { + pub fn new(connection_manager: ConnectionManager) -> Self { + Self { connection_manager } + } +} + +#[async_trait::async_trait] +impl RaftMetadataStoreSvc for RaftMetadataStoreHandler { + type RaftStream = BoxStream; + + async fn raft( + &self, + request: Request>, + ) -> Result, Status> { + let raft_peer_metadata = + request + .metadata() + .get(RAFT_PEER_METADATA_KEY) + .ok_or(Status::invalid_argument(format!( + "'{}' is missing", + RAFT_PEER_METADATA_KEY + )))?; + let raft_peer = u64::from_str( + raft_peer_metadata + .to_str() + .map_err(|err| Status::invalid_argument(err.to_string()))?, + ) + .map_err(|err| Status::invalid_argument(err.to_string()))?; + let outgoing_rx = self + .connection_manager + .accept_connection(raft_peer, request.into_inner())?; + Ok(Response::new(outgoing_rx)) + } +} + +impl From for Status { + fn from(value: ConnectionError) -> Self { + match value { + ConnectionError::Internal(err) => Status::internal(err), + ConnectionError::Shutdown(err) => Status::aborted(err.to_string()), + } + } +} diff --git a/crates/metadata-store/src/raft/mod.rs b/crates/metadata-store/src/raft/mod.rs index 7c14a5b27..f39cee2ce 100644 --- a/crates/metadata-store/src/raft/mod.rs +++ b/crates/metadata-store/src/raft/mod.rs @@ -8,6 +8,10 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +mod connection_manager; +pub mod grpc_svc; +mod handler; +mod networking; pub mod service; mod storage; mod store; diff --git a/crates/metadata-store/src/raft/networking.rs b/crates/metadata-store/src/raft/networking.rs new file mode 100644 index 000000000..62b64561b --- /dev/null +++ b/crates/metadata-store/src/raft/networking.rs @@ -0,0 +1,145 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::raft::connection_manager::ConnectionManager; +use crate::raft::grpc_svc::RaftMessage; +use crate::raft::handler::RAFT_PEER_METADATA_KEY; +use bytes::{BufMut, BytesMut}; +use futures::FutureExt; +use protobuf::Message as ProtobufMessage; +use raft::prelude::Message; +use restate_core::network::net_util; +use restate_core::TaskCenter; +use restate_types::errors::GenericError; +use restate_types::net::AdvertisedAddress; +use std::collections::HashMap; +use std::mem; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; +use tokio_stream::wrappers::ReceiverStream; +use tonic::metadata::MetadataValue; +use tonic::IntoStreamingRequest; +use tracing::{debug, trace}; + +#[derive(Debug, thiserror::Error)] +pub enum TrySendError { + #[error("failed sending message")] + Send(T), + #[error("failed encoding message")] + Encode(T, GenericError), + #[error("unknown peer: {0}")] + UnknownPeer(u64), +} + +#[derive(derive_more::Debug)] +pub struct Networking { + connection_manager: ConnectionManager, + addresses: HashMap, + connection_attempts: HashMap>>, + serde_buffer: BytesMut, + #[debug(skip)] + task_center: TaskCenter, +} + +impl Networking { + pub fn new(task_center: TaskCenter, connection_manager: ConnectionManager) -> Self { + Networking { + connection_manager, + addresses: HashMap::default(), + connection_attempts: HashMap::default(), + serde_buffer: BytesMut::with_capacity(1024), + task_center, + } + } + + pub fn register_address(&mut self, peer: u64, address: AdvertisedAddress) { + self.addresses.insert(peer, address); + } + + pub fn try_send(&mut self, message: Message) -> Result<(), TrySendError> { + let target = message.to; + + if let Some(connection) = self.connection_manager.get_connection(target) { + let mut writer = mem::take(&mut self.serde_buffer).writer(); + message + .write_to_writer(&mut writer) + .map_err(|err| TrySendError::Encode(message.clone(), err.into()))?; + self.serde_buffer = writer.into_inner(); + + // todo: Maybe send message directly w/o indirection through RaftMessage + let raft_message = RaftMessage { + message: self.serde_buffer.split().freeze(), + }; + + connection + .try_send(raft_message) + .map_err(|_err| TrySendError::Send(message))?; + } else if let Some(address) = self.addresses.get(&target) { + if let Some(join_handle) = self.connection_attempts.remove(&target) { + if !join_handle.is_finished() { + return Ok(()); + } else { + match join_handle.now_or_never().expect("should be finished") { + Ok(result) => { + match result { + Ok(_) => trace!("Previous connection attempt to '{target}' succeeded but connection was closed in meantime."), + Err(err) => trace!("Previous connection attempt to '{target}' failed: {}", err) + } + + } + Err(err) => { + trace!("Previous connection attempt to '{target}' panicked: {}", err) + } + } + } + } + + self.connection_attempts.insert( + target, + Self::try_connecting_to( + self.task_center.clone(), + self.connection_manager.clone(), + target, + address.clone(), + ), + ); + } else { + return Err(TrySendError::UnknownPeer(target)); + } + + Ok(()) + } + + fn try_connecting_to( + task_center: TaskCenter, + connection_manager: ConnectionManager, + target: u64, + address: AdvertisedAddress, + ) -> JoinHandle> { + tokio::spawn(async move { + task_center.run_in_scope("raft-connection-attempt", None, async move { + trace!(%target, "Try connecting to raft peer"); + let channel = + net_util::create_tonic_channel_from_advertised_address(address.clone())?; + let mut raft_client = crate::raft::grpc_svc::raft_metadata_store_svc_client::RaftMetadataStoreSvcClient::new(channel); + let (outgoing_tx, outgoing_rx) = mpsc::channel(128); + + let mut request = ReceiverStream::new(outgoing_rx).into_streaming_request(); + // send our identity alongside with the request to the target + request.metadata_mut().insert(RAFT_PEER_METADATA_KEY, MetadataValue::try_from(connection_manager.identity().to_string())?); + let incoming_rx = raft_client.raft(request).await?; + + connection_manager.run_connection(target, outgoing_tx, incoming_rx.into_inner())?; + + Ok(()) + }).await + }) + } +} diff --git a/crates/metadata-store/src/raft/service.rs b/crates/metadata-store/src/raft/service.rs index e788c58c7..6a488fb36 100644 --- a/crates/metadata-store/src/raft/service.rs +++ b/crates/metadata-store/src/raft/service.rs @@ -12,20 +12,33 @@ use crate::grpc::handler::MetadataStoreHandler; use crate::grpc::server::GrpcServer; use crate::grpc::service_builder::GrpcServiceBuilder; use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer; +use crate::raft::connection_manager::ConnectionManager; +use crate::raft::grpc_svc::raft_metadata_store_svc_server::RaftMetadataStoreSvcServer; +use crate::raft::handler::RaftMetadataStoreHandler; +use crate::raft::networking::Networking; use crate::raft::store::RaftMetadataStore; use crate::{grpc_svc, Error, MetadataStoreService}; +use assert2::let_assert; use futures::TryFutureExt; use restate_core::{task_center, TaskKind}; -use restate_types::config::MetadataStoreOptions; +use restate_types::config::{Kind, MetadataStoreOptions, RocksDbOptions}; use restate_types::live::BoxedLiveLoad; +use tokio::sync::mpsc; pub struct RaftMetadataStoreService { options: BoxedLiveLoad, + rocksdb_options: BoxedLiveLoad, } impl RaftMetadataStoreService { - pub fn new(options: BoxedLiveLoad) -> Self { - Self { options } + pub fn new( + options: BoxedLiveLoad, + rocksdb_options: BoxedLiveLoad, + ) -> Self { + Self { + options, + rocksdb_options, + } } } @@ -33,7 +46,20 @@ impl RaftMetadataStoreService { impl MetadataStoreService for RaftMetadataStoreService { async fn run(mut self) -> Result<(), Error> { let store_options = self.options.live_load(); - let store = RaftMetadataStore::create().await.map_err(Error::generic)?; + let_assert!(Kind::Raft(raft_options) = &store_options.kind); + + let (router_tx, router_rx) = mpsc::channel(128); + let task_center = task_center(); + let connection_manager = + ConnectionManager::new(task_center.clone(), raft_options.id, router_tx); + let store = RaftMetadataStore::create( + raft_options, + self.rocksdb_options, + Networking::new(task_center.clone(), connection_manager.clone()), + router_rx, + ) + .await + .map_err(Error::generic)?; let mut builder = GrpcServiceBuilder::default(); @@ -41,11 +67,14 @@ impl MetadataStoreService for RaftMetadataStoreService { builder.add_service(MetadataStoreSvcServer::new(MetadataStoreHandler::new( store.request_sender(), ))); + builder.add_service(RaftMetadataStoreSvcServer::new( + RaftMetadataStoreHandler::new(connection_manager), + )); let grpc_server = GrpcServer::new(store_options.bind_address.clone(), builder.build().await?); - task_center().spawn_child( + task_center.spawn_child( TaskKind::RpcServer, "metadata-store-grpc", None, diff --git a/crates/metadata-store/src/raft/storage.rs b/crates/metadata-store/src/raft/storage.rs index 0e37bbbce..835e4cb35 100644 --- a/crates/metadata-store/src/raft/storage.rs +++ b/crates/metadata-store/src/raft/storage.rs @@ -124,9 +124,7 @@ impl RocksDbStorage { fn find_last_index(db: &DB) -> u64 { let cf = db.cf_handle(RAFT_CF).expect("RAFT_CF exists"); let start = Self::raft_entry_key(0); - // end is exclusive so switch to the next discriminator - let mut end = [0; 9]; - end[0] = RAFT_ENTRY_DISCRIMINATOR + 1; + let end = Self::raft_entry_key(u64::MAX); let mut options = ReadOptions::default(); options.set_async_io(true); diff --git a/crates/metadata-store/src/raft/store.rs b/crates/metadata-store/src/raft/store.rs index e859b89f8..2f8e7bea7 100644 --- a/crates/metadata-store/src/raft/store.rs +++ b/crates/metadata-store/src/raft/store.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::raft::networking::Networking; use crate::raft::storage; use crate::raft::storage::RocksDbStorage; use crate::{ @@ -21,7 +22,8 @@ use raft::prelude::{ConfChange, ConfChangeV2, ConfState, Entry, EntryType, Messa use raft::{Config, RawNode}; use restate_core::cancellation_watcher; use restate_core::metadata_store::{Precondition, VersionedValue}; -use restate_types::config::Configuration; +use restate_types::config::{Configuration, RaftOptions, RocksDbOptions}; +use restate_types::live::BoxedLiveLoad; use restate_types::storage::{StorageCodec, StorageDecodeError, StorageEncodeError}; use restate_types::{flexbuffers_storage_encode_decode, Version}; use slog::o; @@ -61,6 +63,8 @@ pub enum Error { pub struct RaftMetadataStore { _logger: slog::Logger, raw_node: RawNode, + networking: Networking, + raft_rx: mpsc::Receiver, tick_interval: time::Interval, callbacks: HashMap, @@ -71,28 +75,38 @@ pub struct RaftMetadataStore { } impl RaftMetadataStore { - pub async fn create() -> Result { + pub async fn create( + raft_options: &RaftOptions, + rocksdb_options: BoxedLiveLoad, + mut networking: Networking, + raft_rx: mpsc::Receiver, + ) -> Result { let (request_tx, request_rx) = mpsc::channel(2); let config = Config { - id: 1, + id: raft_options.id, ..Default::default() }; - let rocksdb_options = Configuration::updateable() - .map(|configuration| &configuration.common.rocksdb) - .boxed(); let mut metadata_store_options = Configuration::updateable().map(|configuration| &configuration.metadata_store); - let mut store = + let mut storage = RocksDbStorage::create(metadata_store_options.live_load(), rocksdb_options).await?; - let conf_state = ConfState::from((vec![1], vec![])); - store.store_conf_state(conf_state).await?; + + // todo: Only write configuration on initialization + let voters: Vec<_> = raft_options.peers.keys().cloned().collect(); + let conf_state = ConfState::from((voters, vec![])); + storage.store_conf_state(conf_state).await?; + + // todo: Persist address information with configuration + for (peer, address) in &raft_options.peers { + networking.register_address(*peer, address.clone()); + } let drain = TracingSlogDrain; let logger = slog::Logger::root(drain, o!()); - let raw_node = RawNode::new(&config, store, &logger)?; + let raw_node = RawNode::new(&config, storage, &logger)?; let mut tick_interval = time::interval(Duration::from_millis(100)); tick_interval.set_missed_tick_behavior(MissedTickBehavior::Burst); @@ -101,6 +115,8 @@ impl RaftMetadataStore { // we only need to keep it alive _logger: logger, raw_node, + raft_rx, + networking, tick_interval, callbacks: HashMap::default(), kv_entries: HashMap::default(), @@ -120,6 +136,13 @@ impl RaftMetadataStore { tokio::select! { _ = &mut cancellation => { break; + }, + raft = self.raft_rx.recv() => { + if let Some(raft) = raft { + self.raw_node.step(raft)?; + } else { + break; + } } Some(request) = self.request_rx.recv() => { // todo: Unclear whether every replica should be allowed to propose. Maybe @@ -223,8 +246,12 @@ impl RaftMetadataStore { self.callbacks.insert(callback.request_id, callback); } - fn send_messages(&self, _messages: Vec) { - // todo: Send messages to other peers + fn send_messages(&mut self, messages: Vec) { + for message in messages { + if let Err(err) = self.networking.try_send(message) { + info!("failed sending message: {err}"); + } + } } async fn handle_committed_entries( diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index eecd0477c..6aaada3a0 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -281,8 +281,12 @@ impl Node { .boxed(), ) .boxed(), - Kind::Raft => RaftMetadataStoreService::new( + Kind::Raft(_) => RaftMetadataStoreService::new( updateable_config.clone().map(|c| &c.metadata_store).boxed(), + updateable_config + .clone() + .map(|config| &config.metadata_store.rocksdb) + .boxed(), ) .boxed(), } diff --git a/crates/types/src/config/metadata_store.rs b/crates/types/src/config/metadata_store.rs index e034444ff..c0d07f73b 100644 --- a/crates/types/src/config/metadata_store.rs +++ b/crates/types/src/config/metadata_store.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::HashMap; use std::num::NonZeroUsize; use std::path::PathBuf; @@ -18,7 +19,7 @@ use restate_serde_util::NonZeroByteCount; use tracing::warn; use super::{data_dir, CommonOptions, RocksDbOptions, RocksDbOptionsBuilder}; -use crate::net::BindAddress; +use crate::net::{AdvertisedAddress, BindAddress}; /// # Metadata store options #[serde_as] @@ -67,13 +68,13 @@ pub struct MetadataStoreOptions { pub kind: Kind, } -#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] #[serde(rename_all = "kebab-case")] pub enum Kind { #[default] Local, - Raft, + Raft(RaftOptions), } impl MetadataStoreOptions { @@ -130,3 +131,14 @@ impl Default for MetadataStoreOptions { } } } + +#[serde_as] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[serde(rename_all = "kebab-case")] +pub struct RaftOptions { + pub id: u64, + #[cfg_attr(feature = "schemars", schemars(with = "Vec<(u64, String)>"))] + #[serde_as(as = "serde_with::Seq<(_, _)>")] + pub peers: HashMap, +}