Skip to content

Commit

Permalink
Separate Ingress from Worker role
Browse files Browse the repository at this point in the history
This commit separates the ingress from the worker role. To support backward
compatibility this feature is not yet turned on by default and users need
to enable it explicity via `experimental-feature-enable-separate-ingress-role`
to be able to configure the ingress role separately. When enabled, users can
place the ingress role on a set of freely chosen nodes.

This fixes restatedev#1312.
  • Loading branch information
tillrohrmann committed Nov 4, 2024
1 parent 3a293a4 commit 1e55f80
Show file tree
Hide file tree
Showing 16 changed files with 220 additions and 54 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/ingress-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
mod handler;
mod layers;
mod metric_definitions;
pub mod rpc_request_dispatcher;
mod server;

pub use server::{HyperServerIngress, IngressServerError, StartSignal};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// Copyright (c) 2024-2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
Expand All @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::{RequestDispatcher, RequestDispatcherError};
use anyhow::Context;
use restate_core::network::partition_processor_rpc_client::{
AttachInvocationResponse, GetInvocationOutputResponse,
Expand All @@ -16,7 +17,6 @@ use restate_core::network::partition_processor_rpc_client::{
PartitionProcessorRpcClient, PartitionProcessorRpcClientError,
};
use restate_core::network::TransportConnect;
use restate_ingress_http::{RequestDispatcher, RequestDispatcherError};
use restate_types::identifiers::PartitionProcessorRpcRequestId;
use restate_types::invocation::{InvocationQuery, InvocationRequest, InvocationResponse};
use restate_types::net::partition_processor::{InvocationOutput, SubmittedInvocationNotification};
Expand Down
13 changes: 13 additions & 0 deletions crates/ingress-http/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use hyper_util::rt::TokioIo;
use hyper_util::server::conn::auto;
use restate_core::{cancellation_watcher, task_center, TaskKind};
use restate_types::config::IngressOptions;
use restate_types::health::HealthStatus;
use restate_types::live::Live;
use restate_types::protobuf::common::IngressStatus;
use restate_types::schema::invocation_target::InvocationTargetResolver;
use restate_types::schema::service::ServiceMetadataResolver;
use std::convert::Infallible;
Expand Down Expand Up @@ -60,6 +62,7 @@ pub struct HyperServerIngress<Schemas, Dispatcher> {

// Signals
start_signal_tx: oneshot::Sender<SocketAddr>,
health: HealthStatus<IngressStatus>,
}

impl<Schemas, Dispatcher> HyperServerIngress<Schemas, Dispatcher>
Expand All @@ -71,13 +74,15 @@ where
ingress_options: &IngressOptions,
dispatcher: Dispatcher,
schemas: Live<Schemas>,
health: HealthStatus<IngressStatus>,
) -> HyperServerIngress<Schemas, Dispatcher> {
crate::metric_definitions::describe_metrics();
let (hyper_ingress_server, _) = HyperServerIngress::new(
ingress_options.bind_address,
ingress_options.concurrent_api_requests_limit(),
schemas,
dispatcher,
health,
);

hyper_ingress_server
Expand All @@ -94,14 +99,17 @@ where
concurrency_limit: usize,
schemas: Live<Schemas>,
dispatcher: Dispatcher,
health: HealthStatus<IngressStatus>,
) -> (Self, StartSignal) {
health.update(IngressStatus::StartingUp);
let (start_signal_tx, start_signal_rx) = oneshot::channel();

let ingress = Self {
listening_addr,
concurrency_limit,
schemas,
dispatcher,
health,
start_signal_tx,
};

Expand All @@ -114,6 +122,7 @@ where
concurrency_limit,
schemas,
dispatcher,
health,
start_signal_tx,
} = self;

Expand Down Expand Up @@ -151,6 +160,7 @@ where

// Send start signal
let _ = start_signal_tx.send(local_addr);
health.update(IngressStatus::Ready);

// We start a loop to continuously accept incoming connections
loop {
Expand Down Expand Up @@ -239,6 +249,7 @@ mod tests {
use hyper_util::rt::TokioExecutor;
use restate_core::{TaskCenter, TaskKind, TestCoreEnv};
use restate_test_util::assert_eq;
use restate_types::health::Health;
use restate_types::identifiers::WithInvocationId;
use restate_types::invocation::InvocationTarget;
use restate_types::net::partition_processor::IngressResponseResult;
Expand Down Expand Up @@ -328,13 +339,15 @@ mod tests {
mock_request_dispatcher: MockRequestDispatcher,
) -> (SocketAddr, TestHandle) {
let node_env = TestCoreEnv::create_with_single_node(1, 1).await;
let health = Health::default();

// Create the ingress and start it
let (ingress, start_signal) = HyperServerIngress::new(
"0.0.0.0:0".parse().unwrap(),
Semaphore::MAX_PERMITS,
Live::from_value(mock_schemas()),
Arc::new(mock_request_dispatcher),
health.ingress_status(),
);
node_env
.tc
Expand Down
11 changes: 11 additions & 0 deletions crates/local-cluster-runner/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,17 @@ impl Node {
for node in 1..=size {
let mut base_config = base_config.clone();
base_config.common.force_node_id = Some(PlainNodeId::new(node));

// Create a separate ingress role when running a worker
let roles = if roles.contains(Role::Worker) {
base_config
.ingress
.experimental_feature_enable_separate_ingress_role = true;
roles | Role::Ingress
} else {
roles
};

nodes.push(Self::new_test_node(
format!("node-{node}"),
base_config,
Expand Down
1 change: 1 addition & 0 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ restate-admin = { workspace = true }
restate-bifrost = { workspace = true }
restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-ingress-http = { workspace = true }
restate-log-server = { workspace = true }
restate-metadata-store = { workspace = true }
restate-rocksdb = { workspace = true }
Expand Down
48 changes: 44 additions & 4 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ use restate_types::logs::RecordCache;
use restate_types::metadata_store::keys::NODES_CONFIG_KEY;
use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role};
use restate_types::protobuf::common::{
AdminStatus, LogServerStatus, MetadataServerStatus, NodeStatus, WorkerStatus,
AdminStatus, IngressStatus, LogServerStatus, MetadataServerStatus, NodeStatus, WorkerStatus,
};
use restate_types::Version;

use crate::cluster_marker::ClusterValidationError;
use crate::network_server::NetworkServer;
use crate::roles::{AdminRole, BaseRole, WorkerRole};
use crate::roles::{AdminRole, BaseRole, IngressRole, WorkerRole};

#[derive(Debug, thiserror::Error, CodedError)]
pub enum Error {
Expand Down Expand Up @@ -112,6 +112,7 @@ pub struct Node {
base_role: BaseRole,
admin_role: Option<AdminRole<GrpcConnector>>,
worker_role: Option<WorkerRole<GrpcConnector>>,
ingress_role: Option<IngressRole<GrpcConnector>>,
#[cfg(feature = "replicated-loglet")]
log_server: Option<LogServerService>,
networking: Networking<GrpcConnector>,
Expand Down Expand Up @@ -156,7 +157,6 @@ impl Node {
metadata_manager.register_in_message_router(&mut router_builder);
let partition_routing_refresher =
PartitionRoutingRefresher::new(metadata_store_client.clone());
let updating_schema_information = metadata.updateable_schema();

#[cfg(feature = "replicated-loglet")]
let record_cache = RecordCache::new(
Expand Down Expand Up @@ -212,20 +212,41 @@ impl Node {
Some(
WorkerRole::create(
health.worker_status(),
health.ingress_status(),
metadata.clone(),
updateable_config.clone(),
&mut router_builder,
networking.clone(),
bifrost_svc.handle(),
metadata_store_client.clone(),
updating_schema_information,
)
.await?,
)
} else {
None
};

let ingress_role = if config
.ingress
.experimental_feature_enable_separate_ingress_role
&& config.has_role(Role::Ingress)
{
Some(IngressRole::create(
updateable_config
.clone()
.map(|config| &config.ingress)
.boxed(),
health.ingress_status(),
networking.clone(),
metadata.updateable_schema(),
metadata.updateable_partition_table(),
partition_routing_refresher.partition_routing(),
&mut router_builder,
))
} else {
None
};

let admin_role = if config.has_role(Role::Admin) {
Some(
AdminRole::create(
Expand Down Expand Up @@ -273,6 +294,7 @@ impl Node {
metadata_store_client,
base_role,
admin_role,
ingress_role,
worker_role,
#[cfg(feature = "replicated-loglet")]
log_server,
Expand Down Expand Up @@ -429,6 +451,17 @@ impl Node {
)?;
}

if let Some(ingress_role) = self.ingress_role {
assert!(
config
.ingress
.experimental_feature_enable_separate_ingress_role,
"Need to enable separate ingress role feature to start a separate ingress role"
);

tc.spawn_child(TaskKind::Ingress, "ingress-http", None, ingress_role.run())?;
}

tc.spawn(TaskKind::RpcServer, "node-rpc-server", None, {
let health = self.health.clone();
let common_options = config.common.clone();
Expand Down Expand Up @@ -486,6 +519,13 @@ impl Node {
.await;
trace!("Log-server is reporting ready");
}
Role::Ingress => {
self.health
.ingress_status()
.wait_for_value(IngressStatus::Ready)
.await;
trace!("Ingress is reporting ready");
}
}
}
info!("Restate server is ready");
Expand Down
61 changes: 61 additions & 0 deletions crates/node/src/roles/ingress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use restate_core::network::partition_processor_rpc_client::PartitionProcessorRpcClient;
use restate_core::network::rpc_router::ConnectionAwareRpcRouter;
use restate_core::network::{MessageRouterBuilder, Networking, TransportConnect};
use restate_core::routing_info::PartitionRouting;
use restate_ingress_http::rpc_request_dispatcher::RpcRequestDispatcher;
use restate_ingress_http::HyperServerIngress;
use restate_types::config::IngressOptions;
use restate_types::health::HealthStatus;
use restate_types::live::{BoxedLiveLoad, Live};
use restate_types::partition_table::PartitionTable;
use restate_types::protobuf::common::IngressStatus;
use restate_types::schema::Schema;

type IngressHttp<T> = HyperServerIngress<Schema, RpcRequestDispatcher<T>>;

pub struct IngressRole<T> {
ingress_http: IngressHttp<T>,
}

impl<T: TransportConnect> IngressRole<T> {
pub fn create(
mut ingress_options: BoxedLiveLoad<IngressOptions>,
health: HealthStatus<IngressStatus>,
networking: Networking<T>,
schema: Live<Schema>,
partition_table: Live<PartitionTable>,
partition_routing: PartitionRouting,
router_builder: &mut MessageRouterBuilder,
) -> Self {
let rpc_router = ConnectionAwareRpcRouter::new(router_builder);

let dispatcher = RpcRequestDispatcher::new(PartitionProcessorRpcClient::new(
networking,
rpc_router,
partition_table,
partition_routing,
));
let ingress_http = HyperServerIngress::from_options(
ingress_options.live_load(),
dispatcher,
schema,
health,
);

Self { ingress_http }
}

pub async fn run(self) -> anyhow::Result<()> {
self.ingress_http.run().await
}
}
2 changes: 2 additions & 0 deletions crates/node/src/roles/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

mod admin;
mod base;
mod ingress;
mod worker;

pub use admin::{AdminRole, AdminRoleBuildError};
pub use base::BaseRole;
pub use ingress::IngressRole;
pub use worker::{WorkerRole, WorkerRoleBuildError};
7 changes: 3 additions & 4 deletions crates/node/src/roles/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ use restate_storage_query_datafusion::context::QueryContext;
use restate_types::config::Configuration;
use restate_types::health::HealthStatus;
use restate_types::live::Live;
use restate_types::protobuf::common::WorkerStatus;
use restate_types::protobuf::common::{IngressStatus, WorkerStatus};
use restate_types::schema::subscriptions::SubscriptionResolver;
use restate_types::schema::Schema;
use restate_types::Version;
use restate_worker::SubscriptionController;
use restate_worker::Worker;
Expand Down Expand Up @@ -72,22 +71,22 @@ impl<T: TransportConnect> WorkerRole<T> {
#[allow(clippy::too_many_arguments)]
pub async fn create(
health_status: HealthStatus<WorkerStatus>,
ingress_health_status: HealthStatus<IngressStatus>,
metadata: Metadata,
updateable_config: Live<Configuration>,
router_builder: &mut MessageRouterBuilder,
networking: Networking<T>,
bifrost: Bifrost,
metadata_store_client: MetadataStoreClient,
updating_schema_information: Live<Schema>,
) -> Result<Self, WorkerRoleBuildError> {
let worker = Worker::create(
updateable_config,
health_status,
ingress_health_status,
metadata.clone(),
networking,
bifrost,
router_builder,
updating_schema_information,
metadata_store_client,
)
.await?;
Expand Down
6 changes: 6 additions & 0 deletions crates/types/protobuf/restate/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,9 @@ enum MetadataServerStatus {
MetadataServerStatus_READY = 1;
MetadataServerStatus_STARTING_UP = 2;
}

enum IngressStatus {
IngressStatus_UNKNOWN = 0;
IngressStatus_READY = 1;
IngressStatus_STARTING_UP = 2;
}
Loading

0 comments on commit 1e55f80

Please sign in to comment.