From fd0ae7afe5435255ba0af37ba2ec92d9f94c4b14 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Wed, 11 Sep 2024 11:53:10 +0200 Subject: [PATCH 1/5] wip --- relay-server/src/lib.rs | 8 ++++++ relay-server/src/service.rs | 20 ++++++++++++--- relay-server/src/services/buffer/mod.rs | 15 ++++++----- relay-server/src/services/cogs.rs | 8 +++--- relay-server/src/services/global_config.rs | 5 ++-- relay-server/src/services/health_check.rs | 9 ++++--- .../src/services/metrics/aggregator.rs | 14 ++++++++--- relay-server/src/services/metrics/router.rs | 5 ++-- relay-server/src/services/outcome.rs | 13 +++++----- .../src/services/outcome_aggregator.rs | 5 ++-- relay-server/src/services/processor.rs | 8 ++++-- relay-server/src/services/project_cache.rs | 7 ++++-- relay-server/src/services/project_local.rs | 5 ++-- relay-server/src/services/project_upstream.rs | 5 ++-- relay-server/src/services/relays.rs | 5 ++-- relay-server/src/services/server.rs | 15 +++++++---- relay-server/src/services/spooler/mod.rs | 14 ++++++++--- relay-server/src/services/stats.rs | 9 ++++--- relay-server/src/services/store.rs | 7 ++++-- relay-server/src/services/test_store.rs | 7 ++++-- relay-server/src/services/upstream.rs | 7 ++++-- relay-system/src/controller.rs | 2 +- relay-system/src/service.rs | 25 ++++++++++--------- 23 files changed, 143 insertions(+), 75 deletions(-) diff --git a/relay-server/src/lib.rs b/relay-server/src/lib.rs index 986db2e5d4..923a78c56d 100644 --- a/relay-server/src/lib.rs +++ b/relay-server/src/lib.rs @@ -278,6 +278,7 @@ mod testutils; use std::sync::Arc; +use futures::StreamExt; use relay_config::Config; use relay_system::{Controller, Service}; @@ -303,6 +304,13 @@ pub fn run(config: Config) -> anyhow::Result<()> { Controller::start(config.shutdown_timeout()); let service = ServiceState::start(config.clone())?; HttpServer::new(config, service.clone())?.start(); + + for x in service.join_handles() {} + // while let Some(res) = service.join_handles().next() { + + // } + + // TODO: await simultaneously Controller::shutdown_handle().finished().await; anyhow::Ok(()) })?; diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index f8fbdfe48c..10bc72fe92 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -9,12 +9,14 @@ use crate::services::stats::RelayStats; use anyhow::{Context, Result}; use axum::extract::FromRequestParts; use axum::http::request::Parts; +use futures::stream::FuturesUnordered; use rayon::ThreadPool; use relay_cogs::Cogs; use relay_config::{Config, RedisConnection, RedisPoolConfigs}; use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools}; use relay_system::{channel, Addr, Service}; use tokio::runtime::Runtime; +use tokio::task::JoinHandle; use crate::services::cogs::{CogsService, CogsServiceRecorder}; use crate::services::global_config::{GlobalConfigManager, GlobalConfigService}; @@ -141,6 +143,7 @@ struct StateInner { config: Arc, memory_checker: MemoryChecker, registry: Registry, + join_handles: Vec>, } /// Server state. @@ -221,7 +224,7 @@ impl ServiceState { let cogs = CogsService::new(&config); let cogs = Cogs::new(CogsServiceRecorder::new(&config, cogs.start())); - EnvelopeProcessorService::new( + let processor_handle = EnvelopeProcessorService::new( create_processor_pool(&config)?, config.clone(), global_config_handle, @@ -251,7 +254,7 @@ impl ServiceState { // Keep all the services in one context. let project_cache_services = Services { - envelope_buffer: envelope_buffer.as_ref().map(ObservableEnvelopeBuffer::addr), + envelope_buffer: envelope_buffer.as_ref().map(|(b, _)| b.addr()), aggregator: aggregator.clone(), envelope_processor: processor.clone(), outcome_aggregator: outcome_aggregator.clone(), @@ -301,13 +304,20 @@ impl ServiceState { global_config, project_cache, upstream_relay, - envelope_buffer, + envelope_buffer: envelope_buffer.as_ref().map(|(b, _)| b.clone()), }; let state = StateInner { config: config.clone(), memory_checker: MemoryChecker::new(memory_stat, config.clone()), registry, + join_handles: { + let mut j = Vec::from_iter([processor_handle]); + if let Some((_, handle)) = envelope_buffer { + j.push(handle); + } + j + }, }; Ok(ServiceState { @@ -376,6 +386,10 @@ impl ServiceState { pub fn outcome_aggregator(&self) -> &Addr { &self.inner.registry.outcome_aggregator } + + pub fn join_handles(&self) -> &[JoinHandle<()>] { + self.inner.join_handles.as_slice() + } } fn create_redis_pool( diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 1b625e1b21..ad78d7d9e3 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -11,6 +11,7 @@ use relay_system::Request; use relay_system::SendError; use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service}; use tokio::sync::watch; +use tokio::task::JoinHandle; use crate::envelope::Envelope; use crate::services::buffer::envelope_buffer::Peek; @@ -128,12 +129,10 @@ impl EnvelopeBufferService { } /// Returns both the [`Addr`] to this service, and a reference to the capacity flag. - pub fn start_observable(self) -> ObservableEnvelopeBuffer { + pub fn start_observable(self) -> (ObservableEnvelopeBuffer, JoinHandle<()>) { let has_capacity = self.has_capacity.clone(); - ObservableEnvelopeBuffer { - addr: self.start(), - has_capacity, - } + let (addr, join_handle) = self.start_joinable(); + (ObservableEnvelopeBuffer { addr, has_capacity }, join_handle) } /// Wait for the configured amount of time and make sure the project cache is ready to receive. @@ -259,7 +258,7 @@ impl EnvelopeBufferService { impl Service for EnvelopeBufferService { type Interface = EnvelopeBuffer; - fn spawn_handler(mut self, mut rx: Receiver) { + fn spawn_handler(mut self, mut rx: Receiver) -> JoinHandle<()> { let config = self.config.clone(); let memory_checker = self.memory_checker.clone(); let mut global_config_rx = self.global_config_rx.clone(); @@ -312,7 +311,7 @@ impl Service for EnvelopeBufferService { } relay_log::info!("EnvelopeBufferService stop"); - }); + }) } } @@ -364,7 +363,7 @@ mod tests { service.has_capacity.store(false, Ordering::Relaxed); // Observable has correct value: - let ObservableEnvelopeBuffer { addr, has_capacity } = service.start_observable(); + let (ObservableEnvelopeBuffer { addr, has_capacity }, _) = service.start_observable(); assert!(!has_capacity.load(Ordering::Relaxed)); // Send a message to trigger update of `has_capacity` flag: diff --git a/relay-server/src/services/cogs.rs b/relay-server/src/services/cogs.rs index 19daf7516d..af75498e5a 100644 --- a/relay-server/src/services/cogs.rs +++ b/relay-server/src/services/cogs.rs @@ -1,8 +1,8 @@ -use std::sync::atomic::{AtomicBool, Ordering}; - use relay_cogs::{CogsMeasurement, CogsRecorder, ResourceId}; use relay_config::Config; use relay_system::{Addr, FromMessage, Interface, Service}; +use std::sync::atomic::{AtomicBool, Ordering}; +use tokio::task::JoinHandle; use crate::statsd::RelayCounters; @@ -54,12 +54,12 @@ impl CogsService { impl Service for CogsService { type Interface = CogsReport; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { while let Some(message) = rx.recv().await { self.handle_report(message); } - }); + }) } } diff --git a/relay-server/src/services/global_config.rs b/relay-server/src/services/global_config.rs index 7e401b626f..1ff14f7145 100644 --- a/relay-server/src/services/global_config.rs +++ b/relay-server/src/services/global_config.rs @@ -22,6 +22,7 @@ use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Serv use reqwest::Method; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch}; +use tokio::task::JoinHandle; use tokio::time::Instant; use crate::services::upstream::{ @@ -338,7 +339,7 @@ impl GlobalConfigService { impl Service for GlobalConfigService { type Interface = GlobalConfigManager; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { let mut shutdown_handle = Controller::shutdown_handle(); @@ -384,7 +385,7 @@ impl Service for GlobalConfigService { } } relay_log::info!("global config service stopped"); - }); + }) } } diff --git a/relay-server/src/services/health_check.rs b/relay-server/src/services/health_check.rs index 2899fc14b3..b82c0dd808 100644 --- a/relay-server/src/services/health_check.rs +++ b/relay-server/src/services/health_check.rs @@ -4,6 +4,7 @@ use relay_config::Config; use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Sender, Service}; use std::future::Future; use tokio::sync::watch; +use tokio::task::JoinHandle; use tokio::time::{timeout, Instant}; use crate::services::metrics::RouterHandle; @@ -189,13 +190,13 @@ impl HealthCheckService { impl Service for HealthCheckService { type Interface = HealthCheck; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { let (update_tx, update_rx) = watch::channel(StatusUpdate::new(Status::Unhealthy)); let check_interval = self.config.health_refresh_interval(); // Add 10% buffer to the internal timeouts to avoid race conditions. let status_timeout = (check_interval + self.config.health_probe_timeout()).mul_f64(1.1); - tokio::spawn(async move { + let j1 = tokio::spawn(async move { let shutdown = Controller::shutdown_handle(); while shutdown.get().is_none() { @@ -212,7 +213,7 @@ impl Service for HealthCheckService { update_tx.send(StatusUpdate::new(Status::Unhealthy)).ok(); }); - tokio::spawn(async move { + let _j2 = tokio::spawn(async move { while let Some(HealthCheck(message, sender)) = rx.recv().await { let update = update_rx.borrow(); @@ -225,6 +226,8 @@ impl Service for HealthCheckService { }); } }); + + j1 // TODO: should return j1 + j2 } } diff --git a/relay-server/src/services/metrics/aggregator.rs b/relay-server/src/services/metrics/aggregator.rs index 707d0eec72..e7d1bfe21d 100644 --- a/relay-server/src/services/metrics/aggregator.rs +++ b/relay-server/src/services/metrics/aggregator.rs @@ -246,7 +246,10 @@ impl AggregatorService { impl Service for AggregatorService { type Interface = Aggregator; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler( + mut self, + mut rx: relay_system::Receiver, + ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let mut ticker = tokio::time::interval(Duration::from_millis(self.flush_interval_ms)); let mut shutdown = Controller::shutdown_handle(); @@ -264,7 +267,7 @@ impl Service for AggregatorService { else => break, } } - }); + }) } } @@ -361,7 +364,10 @@ mod tests { impl Service for TestReceiver { type Interface = TestInterface; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + fn spawn_handler( + self, + mut rx: relay_system::Receiver, + ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { while let Some(message) = rx.recv().await { let buckets = message.0.buckets; @@ -370,7 +376,7 @@ mod tests { self.add_buckets(buckets); } } - }); + }) } } diff --git a/relay-server/src/services/metrics/router.rs b/relay-server/src/services/metrics/router.rs index 522eb729f1..2a14b2cdc2 100644 --- a/relay-server/src/services/metrics/router.rs +++ b/relay-server/src/services/metrics/router.rs @@ -5,6 +5,7 @@ use relay_config::aggregator::Condition; use relay_config::{AggregatorServiceConfig, ScopedAggregatorConfig}; use relay_metrics::MetricNamespace; use relay_system::{Addr, NoResponse, Recipient, Service}; +use tokio::task::JoinHandle; use crate::services::metrics::{ Aggregator, AggregatorHandle, AggregatorService, FlushBuckets, MergeBuckets, @@ -53,7 +54,7 @@ impl RouterService { impl Service for RouterService { type Interface = Aggregator; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { let mut router = StartedRouter::start(self); relay_log::info!("metrics router started"); @@ -72,7 +73,7 @@ impl Service for RouterService { } } relay_log::info!("metrics router stopped"); - }); + }) } } diff --git a/relay-server/src/services/outcome.rs b/relay-server/src/services/outcome.rs index 55a4a8b73a..7298015f61 100644 --- a/relay-server/src/services/outcome.rs +++ b/relay-server/src/services/outcome.rs @@ -29,6 +29,7 @@ use relay_sampling::evaluation::MatchedRuleIds; use relay_statsd::metric; use relay_system::{Addr, FromMessage, Interface, NoResponse, Service}; use serde::{Deserialize, Serialize}; +use tokio::task::JoinHandle; #[cfg(feature = "processing")] use crate::service::ServiceError; @@ -682,7 +683,7 @@ impl HttpOutcomeProducer { impl Service for HttpOutcomeProducer { type Interface = TrackRawOutcome; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { loop { tokio::select! { @@ -694,7 +695,7 @@ impl Service for HttpOutcomeProducer { else => break, } } - }); + }) } } @@ -775,7 +776,7 @@ impl ClientReportOutcomeProducer { impl Service for ClientReportOutcomeProducer { type Interface = TrackOutcome; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { loop { tokio::select! { @@ -787,7 +788,7 @@ impl Service for ClientReportOutcomeProducer { else => break, } } - }); + }) } } @@ -1034,7 +1035,7 @@ impl OutcomeProducerService { impl Service for OutcomeProducerService { type Interface = OutcomeProducer; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { let Self { config, inner } = self; tokio::spawn(async move { @@ -1045,7 +1046,7 @@ impl Service for OutcomeProducerService { broker.handle_message(message, &config); } relay_log::info!("OutcomeProducer stopped."); - }); + }) } } diff --git a/relay-server/src/services/outcome_aggregator.rs b/relay-server/src/services/outcome_aggregator.rs index 0a13cbf361..ac24c64b79 100644 --- a/relay-server/src/services/outcome_aggregator.rs +++ b/relay-server/src/services/outcome_aggregator.rs @@ -10,6 +10,7 @@ use relay_config::{Config, EmitOutcomes}; use relay_quotas::{DataCategory, Scoping}; use relay_statsd::metric; use relay_system::{Addr, Controller, Service, Shutdown}; +use tokio::task::JoinHandle; use crate::services::outcome::{Outcome, OutcomeProducer, TrackOutcome}; use crate::statsd::RelayTimers; @@ -138,7 +139,7 @@ impl OutcomeAggregator { impl Service for OutcomeAggregator { type Interface = TrackOutcome; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { let mut shutdown = Controller::shutdown_handle(); relay_log::info!("outcome aggregator started"); @@ -157,6 +158,6 @@ impl Service for OutcomeAggregator { } relay_log::info!("outcome aggregator stopped"); - }); + }) } } diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 1c9261b44c..97f9153545 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -2890,7 +2890,11 @@ impl EnvelopeProcessorService { impl Service for EnvelopeProcessorService { type Interface = EnvelopeProcessor; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + #[must_use] + fn spawn_handler( + self, + mut rx: relay_system::Receiver, + ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { while let Some(message) = rx.recv().await { let service = self.clone(); @@ -2899,7 +2903,7 @@ impl Service for EnvelopeProcessorService { .spawn(move || service.handle_message(message)) .await; } - }); + }) } } diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index ef2f3f813f..4185f07149 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -1370,7 +1370,10 @@ impl ProjectCacheService { impl Service for ProjectCacheService { type Interface = ProjectCache; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + fn spawn_handler( + self, + mut rx: relay_system::Receiver, + ) -> tokio::task::JoinHandle<()> { let Self { config, memory_checker, @@ -1506,7 +1509,7 @@ impl Service for ProjectCacheService { } relay_log::info!("project cache stopped"); - }); + }) } } diff --git a/relay-server/src/services/project_local.rs b/relay-server/src/services/project_local.rs index 745203780a..ce6e9bbf73 100644 --- a/relay-server/src/services/project_local.rs +++ b/relay-server/src/services/project_local.rs @@ -7,6 +7,7 @@ use relay_base_schema::project::{ProjectId, ProjectKey}; use relay_config::Config; use relay_system::{AsyncResponse, FromMessage, Interface, Receiver, Sender, Service}; use tokio::sync::mpsc; +use tokio::task::JoinHandle; use tokio::time::Instant; use crate::services::project::{ParsedProjectState, ProjectState}; @@ -164,7 +165,7 @@ async fn spawn_poll_local_states( impl Service for LocalProjectSourceService { type Interface = LocalProjectSource; - fn spawn_handler(mut self, mut rx: Receiver) { + fn spawn_handler(mut self, mut rx: Receiver) -> JoinHandle<()> { // Use a channel with size 1. If the channel is full because the consumer does not // collect the result, the producer will block, which is acceptable. let (state_tx, mut state_rx) = mpsc::channel(1); @@ -185,7 +186,7 @@ impl Service for LocalProjectSourceService { } } relay_log::info!("project local cache stopped"); - }); + }) } } diff --git a/relay-server/src/services/project_upstream.rs b/relay-server/src/services/project_upstream.rs index 4d7d5338ed..c459261079 100644 --- a/relay-server/src/services/project_upstream.rs +++ b/relay-server/src/services/project_upstream.rs @@ -16,6 +16,7 @@ use relay_system::{ }; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; +use tokio::task::JoinHandle; use tokio::time::Instant; use crate::services::project::state::UpstreamProjectState; @@ -592,7 +593,7 @@ impl UpstreamProjectSourceService { impl Service for UpstreamProjectSourceService { type Interface = UpstreamProjectSource; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { relay_log::info!("project upstream cache started"); loop { @@ -607,6 +608,6 @@ impl Service for UpstreamProjectSourceService { } } relay_log::info!("project upstream cache stopped"); - }); + }) } } diff --git a/relay-server/src/services/relays.rs b/relay-server/src/services/relays.rs index 922b1c7dda..d98811aeba 100644 --- a/relay-server/src/services/relays.rs +++ b/relay-server/src/services/relays.rs @@ -10,6 +10,7 @@ use relay_system::{ }; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; +use tokio::task::JoinHandle; use crate::services::upstream::{Method, RequestPriority, SendQuery, UpstreamQuery, UpstreamRelay}; use crate::utils::{RetryBackoff, SleepHandle}; @@ -334,7 +335,7 @@ impl RelayCacheService { impl Service for RelayCacheService { type Interface = RelayCache; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { relay_log::info!("key cache started"); @@ -351,6 +352,6 @@ impl Service for RelayCacheService { } relay_log::info!("key cache stopped"); - }); + }) } } diff --git a/relay-server/src/services/server.rs b/relay-server/src/services/server.rs index d52fa76148..a842659603 100644 --- a/relay-server/src/services/server.rs +++ b/relay-server/src/services/server.rs @@ -167,7 +167,7 @@ impl Accept for KeepAliveAcceptor { } } -fn serve(listener: TcpListener, app: App, config: Arc) { +fn serve(listener: TcpListener, app: App, config: Arc) -> tokio::task::JoinHandle<()> { let handle = Handle::new(); let mut server = axum_server::from_tcp(listener) @@ -189,9 +189,9 @@ fn serve(listener: TcpListener, app: App, config: Arc) { .keep_alive_timeout(config.keepalive_timeout()); let service = ServiceExt::::into_make_service_with_connect_info::(app); - tokio::spawn(server.serve(service)); + let server_handle = tokio::spawn(server.serve(service)); - tokio::spawn(async move { + let shutdown_handle = tokio::spawn(async move { let Shutdown { timeout } = Controller::shutdown_handle().notified().await; relay_log::info!("Shutting down HTTP server"); @@ -200,6 +200,8 @@ fn serve(listener: TcpListener, app: App, config: Arc) { None => handle.shutdown(), } }); + + server_handle // TODO: return both } /// HTTP server service. @@ -227,7 +229,10 @@ impl HttpServer { impl Service for HttpServer { type Interface = (); - fn spawn_handler(self, _rx: relay_system::Receiver) { + fn spawn_handler( + self, + _rx: relay_system::Receiver, + ) -> tokio::task::JoinHandle<()> { let Self { config, service, @@ -239,6 +244,6 @@ impl Service for HttpServer { relay_statsd::metric!(counter(RelayCounters::ServerStarting) += 1); let app = make_app(service); - serve(listener, app, config); + serve(listener, app, config) } } diff --git a/relay-server/src/services/spooler/mod.rs b/relay-server/src/services/spooler/mod.rs index 00265b0e92..f94adde8da 100644 --- a/relay-server/src/services/spooler/mod.rs +++ b/relay-server/src/services/spooler/mod.rs @@ -1272,7 +1272,10 @@ impl BufferService { impl Service for BufferService { type Interface = Buffer; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler( + mut self, + mut rx: relay_system::Receiver, + ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let mut shutdown = Controller::shutdown_handle(); @@ -1299,7 +1302,7 @@ impl Service for BufferService { else => break, } } - }); + }) } } @@ -1591,7 +1594,10 @@ mod tests { impl Service for TestHealthService { type Interface = TestHealth; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + fn spawn_handler( + self, + mut rx: relay_system::Receiver, + ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { loop { tokio::select! { @@ -1599,7 +1605,7 @@ mod tests { else => break, } } - }); + }) } } diff --git a/relay-server/src/services/stats.rs b/relay-server/src/services/stats.rs index a7b5adc66e..0a4cb15119 100644 --- a/relay-server/src/services/stats.rs +++ b/relay-server/src/services/stats.rs @@ -136,9 +136,12 @@ impl RelayStats { impl Service for RelayStats { type Interface = (); - fn spawn_handler(self, _rx: relay_system::Receiver) { + fn spawn_handler( + self, + _rx: relay_system::Receiver, + ) -> tokio::task::JoinHandle<()> { let Some(mut ticker) = self.config.metrics_periodic_interval().map(interval) else { - return; + return tokio::spawn(async {}); }; tokio::spawn(async move { @@ -150,6 +153,6 @@ impl Service for RelayStats { ); ticker.tick().await; } - }); + }) } } diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 1af723ed68..7ab2d77ae9 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -1044,7 +1044,10 @@ impl StoreService { impl Service for StoreService { type Interface = Store; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + fn spawn_handler( + self, + mut rx: relay_system::Receiver, + ) -> tokio::task::JoinHandle<()> { let this = Arc::new(self); tokio::spawn(async move { @@ -1058,7 +1061,7 @@ impl Service for StoreService { } relay_log::info!("store forwarder stopped"); - }); + }) } } diff --git a/relay-server/src/services/test_store.rs b/relay-server/src/services/test_store.rs index c61621fc0c..e57da07d1b 100644 --- a/relay-server/src/services/test_store.rs +++ b/relay-server/src/services/test_store.rs @@ -134,11 +134,14 @@ impl TestStoreService { impl relay_system::Service for TestStoreService { type Interface = TestStore; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler( + mut self, + mut rx: relay_system::Receiver, + ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { while let Some(message) = rx.recv().await { self.handle_message(message); } - }); + }) } } diff --git a/relay-server/src/services/upstream.rs b/relay-server/src/services/upstream.rs index e44861a122..59b7ea4b76 100644 --- a/relay-server/src/services/upstream.rs +++ b/relay-server/src/services/upstream.rs @@ -1498,7 +1498,10 @@ impl UpstreamRelayService { impl Service for UpstreamRelayService { type Interface = UpstreamRelay; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + fn spawn_handler( + self, + mut rx: relay_system::Receiver, + ) -> tokio::task::JoinHandle<()> { let Self { config } = self; let client = SharedClient::build(config.clone()); @@ -1540,6 +1543,6 @@ impl Service for UpstreamRelayService { else => break, } } - }); + }) // TODO: return both } } diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs index 60c5c829a1..cfb3a817b8 100644 --- a/relay-system/src/controller.rs +++ b/relay-system/src/controller.rs @@ -129,7 +129,7 @@ impl ShutdownHandle { /// impl Service for MyService { /// type Interface = (); /// -/// fn spawn_handler(self, mut rx: relay_system::Receiver) { +/// fn spawn_handler(self, mut rx: relay_system::Receiver) -> tokio::task::JoinHandle<()> { /// tokio::spawn(async move { /// let mut shutdown = Controller::shutdown_handle(); /// diff --git a/relay-system/src/service.rs b/relay-system/src/service.rs index a09d019e13..865082594c 100644 --- a/relay-system/src/service.rs +++ b/relay-system/src/service.rs @@ -9,8 +9,8 @@ use std::time::Duration; use futures::future::Shared; use futures::FutureExt; -use tokio::runtime::Runtime; use tokio::sync::{mpsc, oneshot}; +use tokio::task::JoinHandle; use tokio::time::MissedTickBehavior; use crate::statsd::SystemGauges; @@ -1001,19 +1001,20 @@ pub trait Service: Sized { /// /// Receives an inbound channel for all messages sent through the service's [`Addr`]. Note /// that this function is synchronous, so that this needs to spawn a task internally. - fn spawn_handler(self, rx: Receiver); + fn spawn_handler(self, rx: Receiver) -> JoinHandle<()>; - /// Starts the service in the current runtime and returns an address for it. - fn start(self) -> Addr { + /// Starts the service in the current runtime and returns the address and a join handle. + fn start_joinable(self) -> (Addr, JoinHandle<()>) { let (addr, rx) = channel(Self::name()); - self.spawn_handler(rx); - addr + (addr, self.spawn_handler(rx)) } - /// Starts the service in the given runtime and returns an address for it. - fn start_in(self, runtime: &Runtime) -> Addr { - let _guard = runtime.enter(); - self.start() + /// Starts the service in the current runtime and returns an address for it. + /// + /// The main task of the service is detached. + fn start(self) -> Addr { + let (addr, _join_handle) = self.start_joinable(); + addr } /// Returns a unique name for this service implementation. @@ -1046,12 +1047,12 @@ mod tests { impl Service for MockService { type Interface = MockMessage; - fn spawn_handler(self, mut rx: Receiver) { + fn spawn_handler(self, mut rx: Receiver) -> JoinHandle<()> { tokio::spawn(async move { while rx.recv().await.is_some() { tokio::time::sleep(BACKLOG_INTERVAL * 2).await; } - }); + }) } fn name() -> &'static str { From ffbf7dc86401ad3d3b3098264a9c5d9fc362331f Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Wed, 11 Sep 2024 15:35:46 +0200 Subject: [PATCH 2/5] resume_unwind --- relay-server/src/lib.rs | 29 ++++++++++++++++++++++------- relay-server/src/service.rs | 28 ++++++++++++---------------- relay-server/src/services/server.rs | 8 +++----- 3 files changed, 37 insertions(+), 28 deletions(-) diff --git a/relay-server/src/lib.rs b/relay-server/src/lib.rs index 923a78c56d..9bade220a0 100644 --- a/relay-server/src/lib.rs +++ b/relay-server/src/lib.rs @@ -281,6 +281,7 @@ use std::sync::Arc; use futures::StreamExt; use relay_config::Config; use relay_system::{Controller, Service}; +use tokio::select; use crate::service::ServiceState; use crate::services::server::HttpServer; @@ -302,16 +303,30 @@ pub fn run(config: Config) -> anyhow::Result<()> { // information on all services. main_runtime.block_on(async { Controller::start(config.shutdown_timeout()); - let service = ServiceState::start(config.clone())?; + let (service, mut join_handles) = ServiceState::start(config.clone())?; HttpServer::new(config, service.clone())?.start(); - for x in service.join_handles() {} - // while let Some(res) = service.join_handles().next() { + loop { + select! { + Some(res) = join_handles.next() => { + match res { + Ok(()) => { + relay_log::trace!("Service exited normally."); + } + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } + } + } + } + _ = Controller::shutdown_handle().finished() => { + break + } + else => break + } + } - // } - - // TODO: await simultaneously - Controller::shutdown_handle().finished().await; anyhow::Ok(()) })?; diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 10bc72fe92..55094bb4df 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -143,7 +143,6 @@ struct StateInner { config: Arc, memory_checker: MemoryChecker, registry: Registry, - join_handles: Vec>, } /// Server state. @@ -154,7 +153,7 @@ pub struct ServiceState { impl ServiceState { /// Starts all services and returns addresses to all of them. - pub fn start(config: Arc) -> Result { + pub fn start(config: Arc) -> Result<(Self, FuturesUnordered>)> { let upstream_relay = UpstreamRelayService::new(config.clone()).start(); let test_store = TestStoreService::new(config.clone()).start(); @@ -311,18 +310,19 @@ impl ServiceState { config: config.clone(), memory_checker: MemoryChecker::new(memory_stat, config.clone()), registry, - join_handles: { - let mut j = Vec::from_iter([processor_handle]); - if let Some((_, handle)) = envelope_buffer { - j.push(handle); - } - j - }, }; - Ok(ServiceState { - inner: Arc::new(state), - }) + let join_handles = FuturesUnordered::from_iter([processor_handle]); + if let Some((_, handle)) = envelope_buffer { + join_handles.push(handle); + }; + + Ok(( + ServiceState { + inner: Arc::new(state), + }, + join_handles, + )) } /// Returns a reference to the Relay configuration. @@ -386,10 +386,6 @@ impl ServiceState { pub fn outcome_aggregator(&self) -> &Addr { &self.inner.registry.outcome_aggregator } - - pub fn join_handles(&self) -> &[JoinHandle<()>] { - self.inner.join_handles.as_slice() - } } fn create_redis_pool( diff --git a/relay-server/src/services/server.rs b/relay-server/src/services/server.rs index a842659603..ec2670fbbd 100644 --- a/relay-server/src/services/server.rs +++ b/relay-server/src/services/server.rs @@ -189,9 +189,9 @@ fn serve(listener: TcpListener, app: App, config: Arc) -> tokio::task::J .keep_alive_timeout(config.keepalive_timeout()); let service = ServiceExt::::into_make_service_with_connect_info::(app); - let server_handle = tokio::spawn(server.serve(service)); + let _server_handle = tokio::spawn(server.serve(service)); - let shutdown_handle = tokio::spawn(async move { + tokio::spawn(async move { let Shutdown { timeout } = Controller::shutdown_handle().notified().await; relay_log::info!("Shutting down HTTP server"); @@ -199,9 +199,7 @@ fn serve(listener: TcpListener, app: App, config: Arc) -> tokio::task::J Some(timeout) => handle.graceful_shutdown(Some(timeout)), None => handle.shutdown(), } - }); - - server_handle // TODO: return both + }) // TODO: return both } /// HTTP server service. From bf61ede204c81313b5ce09dafa351a73ad58409b Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 12 Sep 2024 07:53:46 +0200 Subject: [PATCH 3/5] ref --- relay-server/src/lib.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/relay-server/src/lib.rs b/relay-server/src/lib.rs index 9bade220a0..88bc976981 100644 --- a/relay-server/src/lib.rs +++ b/relay-server/src/lib.rs @@ -309,14 +309,10 @@ pub fn run(config: Config) -> anyhow::Result<()> { loop { select! { Some(res) = join_handles.next() => { - match res { - Ok(()) => { - relay_log::trace!("Service exited normally."); - } - Err(e) => { - if e.is_panic() { - std::panic::resume_unwind(e.into_panic()); - } + if let Err(e) = res { + if e.is_panic() { + // Re-trigger panic to terminate the process: + std::panic::resume_unwind(e.into_panic()); } } } From 734c9cb34125d4df377d3df6e06346aa3af27779 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 12 Sep 2024 07:54:48 +0200 Subject: [PATCH 4/5] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 30f0c68da3..ba33820816 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Use `matches_any_origin` to scrub HTTP hosts in spans. ([#3939](https://github.com/getsentry/relay/pull/3939)). - Keep frames from both ends of the stacktrace when trimming frames. ([#3905](https://github.com/getsentry/relay/pull/3905)) +- Abort the process when a service panics. ([#4026](https://github.com/getsentry/relay/pull/4026)) **Features**: From 71137dea8b3ce26eda428cd31552aa0ef1015ee2 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 12 Sep 2024 07:59:14 +0200 Subject: [PATCH 5/5] ref --- relay-server/src/services/metrics/aggregator.rs | 12 ++++-------- relay-server/src/services/processor.rs | 6 ++---- relay-server/src/services/project_cache.rs | 6 ++---- relay-server/src/services/server.rs | 8 +++----- relay-server/src/services/spooler/mod.rs | 12 ++++-------- relay-server/src/services/stats.rs | 6 ++---- relay-server/src/services/store.rs | 6 ++---- relay-server/src/services/test_store.rs | 6 ++---- relay-server/src/services/upstream.rs | 8 +++----- 9 files changed, 24 insertions(+), 46 deletions(-) diff --git a/relay-server/src/services/metrics/aggregator.rs b/relay-server/src/services/metrics/aggregator.rs index e7d1bfe21d..5f11e152ac 100644 --- a/relay-server/src/services/metrics/aggregator.rs +++ b/relay-server/src/services/metrics/aggregator.rs @@ -8,6 +8,7 @@ use relay_config::AggregatorServiceConfig; use relay_metrics::aggregator::AggregateMetricsError; use relay_metrics::{aggregator, Bucket, UnixTimestamp}; use relay_system::{Controller, FromMessage, Interface, NoResponse, Recipient, Service, Shutdown}; +use tokio::task::JoinHandle; use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; @@ -246,10 +247,7 @@ impl AggregatorService { impl Service for AggregatorService { type Interface = Aggregator; - fn spawn_handler( - mut self, - mut rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { let mut ticker = tokio::time::interval(Duration::from_millis(self.flush_interval_ms)); let mut shutdown = Controller::shutdown_handle(); @@ -324,6 +322,7 @@ mod tests { use relay_common::time::UnixTimestamp; use relay_metrics::{aggregator::AggregatorConfig, BucketMetadata, BucketValue}; + use tokio::task::JoinHandle; use super::*; @@ -364,10 +363,7 @@ mod tests { impl Service for TestReceiver { type Interface = TestInterface; - fn spawn_handler( - self, - mut rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { while let Some(message) = rx.recv().await { let buckets = message.0.buckets; diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 97f9153545..2bfb6383e3 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -41,6 +41,7 @@ use relay_statsd::metric; use relay_system::{Addr, FromMessage, NoResponse, Service}; use reqwest::header; use smallvec::{smallvec, SmallVec}; +use tokio::task::JoinHandle; #[cfg(feature = "processing")] use { @@ -2891,10 +2892,7 @@ impl Service for EnvelopeProcessorService { type Interface = EnvelopeProcessor; #[must_use] - fn spawn_handler( - self, - mut rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { while let Some(message) = rx.recv().await { let service = self.clone(); diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 4185f07149..0abded5c9e 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -23,6 +23,7 @@ use relay_system::{Addr, FromMessage, Interface, Sender, Service}; #[cfg(feature = "processing")] use tokio::sync::Semaphore; use tokio::sync::{mpsc, watch}; +use tokio::task::JoinHandle; use tokio::time::Instant; use crate::services::metrics::{Aggregator, FlushBuckets}; @@ -1370,10 +1371,7 @@ impl ProjectCacheService { impl Service for ProjectCacheService { type Interface = ProjectCache; - fn spawn_handler( - self, - mut rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { let Self { config, memory_checker, diff --git a/relay-server/src/services/server.rs b/relay-server/src/services/server.rs index ec2670fbbd..3f2cad4750 100644 --- a/relay-server/src/services/server.rs +++ b/relay-server/src/services/server.rs @@ -13,6 +13,7 @@ use relay_config::Config; use relay_system::{Controller, Service, Shutdown}; use socket2::TcpKeepalive; use tokio::net::{TcpSocket, TcpStream}; +use tokio::task::JoinHandle; use tower::ServiceBuilder; use tower_http::compression::predicate::SizeAbove; use tower_http::compression::{CompressionLayer, DefaultPredicate, Predicate}; @@ -167,7 +168,7 @@ impl Accept for KeepAliveAcceptor { } } -fn serve(listener: TcpListener, app: App, config: Arc) -> tokio::task::JoinHandle<()> { +fn serve(listener: TcpListener, app: App, config: Arc) -> JoinHandle<()> { let handle = Handle::new(); let mut server = axum_server::from_tcp(listener) @@ -227,10 +228,7 @@ impl HttpServer { impl Service for HttpServer { type Interface = (); - fn spawn_handler( - self, - _rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(self, _rx: relay_system::Receiver) -> JoinHandle<()> { let Self { config, service, diff --git a/relay-server/src/services/spooler/mod.rs b/relay-server/src/services/spooler/mod.rs index f94adde8da..1b98af2f7e 100644 --- a/relay-server/src/services/spooler/mod.rs +++ b/relay-server/src/services/spooler/mod.rs @@ -50,6 +50,7 @@ use sqlx::sqlite::{ use sqlx::{Pool, Row, Sqlite}; use tokio::fs::DirBuilder; use tokio::sync::mpsc; +use tokio::task::JoinHandle; use crate::envelope::{Envelope, EnvelopeError}; use crate::extractors::StartTime; @@ -1272,10 +1273,7 @@ impl BufferService { impl Service for BufferService { type Interface = Buffer; - fn spawn_handler( - mut self, - mut rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { let mut shutdown = Controller::shutdown_handle(); @@ -1331,6 +1329,7 @@ mod tests { use std::str::FromStr; use std::sync::Mutex; use std::time::{Duration, Instant}; + use tokio::task::JoinHandle; use uuid::Uuid; use crate::services::project_cache::SpoolHealth; @@ -1594,10 +1593,7 @@ mod tests { impl Service for TestHealthService { type Interface = TestHealth; - fn spawn_handler( - self, - mut rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { loop { tokio::select! { diff --git a/relay-server/src/services/stats.rs b/relay-server/src/services/stats.rs index 0a4cb15119..7e27d60ce1 100644 --- a/relay-server/src/services/stats.rs +++ b/relay-server/src/services/stats.rs @@ -5,6 +5,7 @@ use relay_config::{Config, RelayMode}; use relay_redis::{RedisPool, RedisPools}; use relay_statsd::metric; use relay_system::{Addr, Service}; +use tokio::task::JoinHandle; use tokio::time::interval; use crate::services::upstream::{IsNetworkOutage, UpstreamRelay}; @@ -136,10 +137,7 @@ impl RelayStats { impl Service for RelayStats { type Interface = (); - fn spawn_handler( - self, - _rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(self, _rx: relay_system::Receiver) -> JoinHandle<()> { let Some(mut ticker) = self.config.metrics_periodic_interval().map(interval) else { return tokio::spawn(async {}); }; diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 7ab2d77ae9..8ea8bb54c5 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -7,6 +7,7 @@ use std::collections::BTreeMap; use std::error::Error; use std::sync::Arc; use std::time::Instant; +use tokio::task::JoinHandle; use bytes::Bytes; use relay_base_schema::data_category::DataCategory; @@ -1044,10 +1045,7 @@ impl StoreService { impl Service for StoreService { type Interface = Store; - fn spawn_handler( - self, - mut rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { let this = Arc::new(self); tokio::spawn(async move { diff --git a/relay-server/src/services/test_store.rs b/relay-server/src/services/test_store.rs index e57da07d1b..fbd02a8ccb 100644 --- a/relay-server/src/services/test_store.rs +++ b/relay-server/src/services/test_store.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use relay_config::{Config, RelayMode}; use relay_event_schema::protocol::EventId; use relay_system::{AsyncResponse, FromMessage, NoResponse, Sender}; +use tokio::task::JoinHandle; use crate::envelope::Envelope; use crate::services::outcome::Outcome; @@ -134,10 +135,7 @@ impl TestStoreService { impl relay_system::Service for TestStoreService { type Interface = TestStore; - fn spawn_handler( - mut self, - mut rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { while let Some(message) = rx.recv().await { self.handle_message(message); diff --git a/relay-server/src/services/upstream.rs b/relay-server/src/services/upstream.rs index 59b7ea4b76..59fbec2a45 100644 --- a/relay-server/src/services/upstream.rs +++ b/relay-server/src/services/upstream.rs @@ -28,6 +28,7 @@ pub use reqwest::Method; use serde::de::DeserializeOwned; use serde::Serialize; use tokio::sync::mpsc; +use tokio::task::JoinHandle; use tokio::time::Instant; use crate::http::{HttpError, Request, RequestBuilder, Response, StatusCode}; @@ -1255,7 +1256,7 @@ enum ConnectionState { /// The connection is interrupted and reconnection is in progress. /// /// If the task has finished, connection should be considered `Connected`. - Reconnecting(tokio::task::JoinHandle<()>), + Reconnecting(JoinHandle<()>), } /// Maintains outage state of the connection to the upstream. @@ -1498,10 +1499,7 @@ impl UpstreamRelayService { impl Service for UpstreamRelayService { type Interface = UpstreamRelay; - fn spawn_handler( - self, - mut rx: relay_system::Receiver, - ) -> tokio::task::JoinHandle<()> { + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { let Self { config } = self; let client = SharedClient::build(config.clone());