Skip to content

Commit

Permalink
Run embedded metadata store on node grpc server
Browse files Browse the repository at this point in the history
By letting the embedded metadata store run on the node grpc server
we can get rid of an additional address that users need to configure.
Instead of being reachable under port 5123, the metadata store is now
reachable under port 5122 by default.
  • Loading branch information
tillrohrmann committed Oct 28, 2024
1 parent 5d23e54 commit e8fb789
Show file tree
Hide file tree
Showing 19 changed files with 188 additions and 211 deletions.
10 changes: 1 addition & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/core/src/network/server_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl NetworkServerBuilder {
pub async fn run(
self,
node_health: HealthStatus<NodeStatus>,
axum_router: axum::routing::Router,
axum_router: Option<axum::routing::Router>,
bind_address: &BindAddress,
) -> Result<(), anyhow::Error> {
// Trace layer
Expand All @@ -68,6 +68,7 @@ impl NetworkServerBuilder {
.level(tracing::Level::ERROR);

let axum_router = axum_router
.unwrap_or_default()
.layer(TraceLayer::new_for_http().make_span_with(span_factory.clone()))
.fallback(handler_404);

Expand Down
72 changes: 44 additions & 28 deletions crates/local-cluster-runner/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@ pub struct Node {
self.base_config.common.advertised_address = AdvertisedAddress::Uds(node_socket);
}
#[mutator(requires = [base_dir])]
pub fn with_socket_metadata(self) {
let metadata_socket: PathBuf = "metadata.sock".into();
self.base_config.metadata_store.bind_address = BindAddress::Uds(metadata_socket.clone());
self.base_config.common.metadata_store_client.metadata_store_client = MetadataStoreClient::Embedded { address: AdvertisedAddress::Uds(metadata_socket) }
pub fn with_local_embedded_metadata_store(self) {
self.base_config.common.metadata_store_client.metadata_store_client = MetadataStoreClient::Embedded { address: self.base_config.common.advertised_address.clone() }
}
pub fn with_embedded_metadata_store(self, address: AdvertisedAddress) {
self.base_config.common.metadata_store_client.metadata_store_client = MetadataStoreClient::Embedded { address }
}
pub fn with_random_ports(self) {
Expand Down Expand Up @@ -110,6 +111,14 @@ pub enum NodeStartError {
SpawnError(io::Error),
}

/// Configures where the metadata store runs.
pub enum MetadataStoreLocation {
// It should run as part of this node
Local,
// It runs remotely and can be accessed under this address
Remote(AdvertisedAddress),
}

impl Node {
pub fn node_name(&self) -> &str {
self.base_config.node_name()
Expand All @@ -131,16 +140,28 @@ impl Node {
base_config: Configuration,
binary_source: BinarySource,
roles: EnumSet<Role>,
metadata_store_location: MetadataStoreLocation,
) -> Self {
Self::builder()
let builder = Self::builder()
.binary_source(binary_source)
.base_config(base_config)
.with_node_name(node_name)
.with_node_socket()
.with_socket_metadata()
.with_random_ports()
.with_roles(roles)
.build()
.with_roles(roles);

let builder = match metadata_store_location {
MetadataStoreLocation::Local => {
assert!(
roles.contains(Role::MetadataStore),
"The MetadataStore role is required to run the metadata store locally"
);
builder.with_local_embedded_metadata_store()
}
MetadataStoreLocation::Remote(address) => builder.with_embedded_metadata_store(address),
};

builder.build()
}

// Creates a group of Nodes with a single metadata node "metadata-node" running the
Expand All @@ -155,18 +176,23 @@ impl Node {
) -> Vec<Self> {
let mut nodes = Vec::with_capacity((size + 1) as usize);

{
let mut base_config = base_config.clone();
let metadata_node_address = {
let mut node_config = base_config.clone();
// let any node write the initial NodesConfiguration
base_config.common.allow_bootstrap = true;
base_config.common.force_node_id = Some(PlainNodeId::new(1));
nodes.push(Self::new_test_node(
node_config.common.allow_bootstrap = true;
node_config.common.force_node_id = Some(PlainNodeId::new(1));
let metadata_node = Self::new_test_node(
"metadata-node",
base_config,
node_config,
binary_source.clone(),
enum_set!(Role::Admin | Role::MetadataStore),
));
}
MetadataStoreLocation::Local,
);
let metadata_node_address = metadata_node.config().common.advertised_address.clone();
nodes.push(metadata_node);

metadata_node_address
};

for node in 1..=size {
let mut base_config = base_config.clone();
Expand All @@ -176,6 +202,7 @@ impl Node {
base_config,
binary_source.clone(),
roles,
MetadataStoreLocation::Remote(metadata_node_address.clone()),
));
}

Expand Down Expand Up @@ -204,9 +231,6 @@ impl Node {
{
*file = base_dir.join(&*file)
}
if let BindAddress::Uds(file) = &mut self.base_config.metadata_store.bind_address {
*file = base_dir.join(&*file)
}
if let BindAddress::Uds(file) = &mut self.base_config.common.bind_address {
*file = base_dir.join(&*file)
}
Expand Down Expand Up @@ -614,14 +638,6 @@ impl StartedNode {
}
}

pub fn metadata_address(&self) -> Option<&BindAddress> {
if self.config().has_role(Role::MetadataStore) {
Some(&self.config().metadata_store.bind_address)
} else {
None
}
}

/// Obtain a stream of loglines matching this pattern. The stream will end
/// when the stdout and stderr files on the process close.
pub fn lines(&self, pattern: Regex) -> impl Stream<Item = String> + '_ {
Expand Down
9 changes: 0 additions & 9 deletions crates/metadata-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,14 @@ derive_builder = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
humantime = { workspace = true }
hyper = { workspace = true }
hyper-util = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
rocksdb = { workspace = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
static_assertions = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tokio-util = { workspace = true, features = ["net"] }
tonic = { workspace = true, features = ["transport", "codegen", "prost"] }
tonic-reflection = { workspace = true }
tonic-health = { workspace = true }
tower = { workspace = true }
tower-http = { workspace = true, features = ["trace"] }
tracing = { workspace = true }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/metadata-store/src/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use restate_types::{
config::{MetadataStoreClient as MetadataStoreClientConfig, MetadataStoreClientOptions},
errors::GenericError,
};
pub use service::{Error, LocalMetadataStoreService};
pub use service::{BuildError, Error, LocalMetadataStoreService};

use crate::local::grpc::client::LocalMetadataStoreClient;

Expand Down
106 changes: 27 additions & 79 deletions crates/metadata-store/src/local/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,10 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use http::Request;
use hyper::body::Incoming;
use hyper_util::service::TowerToHyperService;
use restate_types::health::HealthStatus;
use tonic::body::boxed;
use tonic::server::NamedService;
use tower::ServiceExt;
use tower_http::classify::{GrpcCode, GrpcErrorsAsFailures, SharedClassifier};

use restate_core::network::net_util;
use restate_core::{task_center, ShutdownError, TaskKind};
use restate_core::network::NetworkServerBuilder;
use restate_core::ShutdownError;
use restate_rocksdb::RocksError;
use restate_types::config::{MetadataStoreOptions, RocksDbOptions};
use restate_types::live::BoxedLiveLoad;
Expand All @@ -31,99 +24,54 @@ use crate::local::store::LocalMetadataStore;

pub struct LocalMetadataStoreService {
health_status: HealthStatus<MetadataServerStatus>,
opts: BoxedLiveLoad<MetadataStoreOptions>,
rocksdb_options: BoxedLiveLoad<RocksDbOptions>,
store: LocalMetadataStore,
}

#[derive(Debug, thiserror::Error)]
pub enum BuildError {
#[error("building local metadata store failed: {0}")]
LocalMetadataStore(#[from] RocksError),
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("failed running grpc server: {0}")]
GrpcServer(#[from] net_util::Error),
#[error("error while running server server grpc reflection service: {0}")]
GrpcReflection(#[from] tonic_reflection::server::Error),
#[error("system is shutting down")]
Shutdown(#[from] ShutdownError),
#[error("rocksdb error: {0}")]
RocksDB(#[from] RocksError),
}

impl LocalMetadataStoreService {
pub fn from_options(
pub async fn create(
health_status: HealthStatus<MetadataServerStatus>,
opts: BoxedLiveLoad<MetadataStoreOptions>,
options: &MetadataStoreOptions,
rocksdb_options: BoxedLiveLoad<RocksDbOptions>,
) -> Self {
server_builder: &mut NetworkServerBuilder,
) -> Result<Self, BuildError> {
let store = LocalMetadataStore::create(options, rocksdb_options).await?;

server_builder.register_grpc_service(
MetadataStoreSvcServer::new(LocalMetadataStoreHandler::new(store.request_sender())),
grpc_svc::FILE_DESCRIPTOR_SET,
);

health_status.update(MetadataServerStatus::StartingUp);
Self {
health_status,
opts,
rocksdb_options,
}
}

pub fn grpc_service_name(&self) -> &str {
MetadataStoreSvcServer::<LocalMetadataStoreHandler>::NAME
Ok(Self {
health_status,
store,
})
}

pub async fn run(self) -> Result<(), Error> {
let LocalMetadataStoreService {
health_status,
mut opts,
rocksdb_options,
store,
} = self;
let options = opts.live_load();
let bind_address = options.bind_address.clone();
let store = LocalMetadataStore::create(options, rocksdb_options).await?;

let trace_layer = tower_http::trace::TraceLayer::new(SharedClassifier::new(
GrpcErrorsAsFailures::new().with_success(GrpcCode::FailedPrecondition),
))
.make_span_with(
tower_http::trace::DefaultMakeSpan::new()
.include_headers(true)
.level(tracing::Level::ERROR),
);

let reflection_service_builder = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(grpc_svc::FILE_DESCRIPTOR_SET);

let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
health_reporter
.set_serving::<MetadataStoreSvcServer<LocalMetadataStoreHandler>>()
.await;

let server_builder = tonic::transport::Server::builder()
.layer(trace_layer)
.add_service(health_service)
.add_service(MetadataStoreSvcServer::new(LocalMetadataStoreHandler::new(
store.request_sender(),
)))
.add_service(reflection_service_builder.build_v1()?);

let service = TowerToHyperService::new(
server_builder
.into_service()
.map_request(|req: Request<Incoming>| req.map(boxed)),
);

task_center().spawn_child(
TaskKind::RpcServer,
"metadata-store-grpc",
None,
async move {
net_util::run_hyper_server(
&bind_address,
service,
"metadata-store-grpc",
|| health_status.update(MetadataServerStatus::Ready),
|| health_status.update(MetadataServerStatus::Unknown),
)
.await?;
Ok(())
},
)?;

health_status.update(MetadataServerStatus::Ready);
store.run().await;
health_status.update(MetadataServerStatus::Unknown);

Ok(())
}
Expand Down
Loading

0 comments on commit e8fb789

Please sign in to comment.