diff --git a/CHANGELOG.md b/CHANGELOG.md index 30f0c68da3..ba33820816 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Use `matches_any_origin` to scrub HTTP hosts in spans. ([#3939](https://github.com/getsentry/relay/pull/3939)). - Keep frames from both ends of the stacktrace when trimming frames. ([#3905](https://github.com/getsentry/relay/pull/3905)) +- Abort the process when a service panics. ([#4026](https://github.com/getsentry/relay/pull/4026)) **Features**: diff --git a/relay-server/src/lib.rs b/relay-server/src/lib.rs index 986db2e5d4..88bc976981 100644 --- a/relay-server/src/lib.rs +++ b/relay-server/src/lib.rs @@ -278,8 +278,10 @@ mod testutils; use std::sync::Arc; +use futures::StreamExt; use relay_config::Config; use relay_system::{Controller, Service}; +use tokio::select; use crate::service::ServiceState; use crate::services::server::HttpServer; @@ -301,9 +303,26 @@ pub fn run(config: Config) -> anyhow::Result<()> { // information on all services. main_runtime.block_on(async { Controller::start(config.shutdown_timeout()); - let service = ServiceState::start(config.clone())?; + let (service, mut join_handles) = ServiceState::start(config.clone())?; HttpServer::new(config, service.clone())?.start(); - Controller::shutdown_handle().finished().await; + + loop { + select! { + Some(res) = join_handles.next() => { + if let Err(e) = res { + if e.is_panic() { + // Re-trigger panic to terminate the process: + std::panic::resume_unwind(e.into_panic()); + } + } + } + _ = Controller::shutdown_handle().finished() => { + break + } + else => break + } + } + anyhow::Ok(()) })?; diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index f8fbdfe48c..55094bb4df 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -9,12 +9,14 @@ use crate::services::stats::RelayStats; use anyhow::{Context, Result}; use axum::extract::FromRequestParts; use axum::http::request::Parts; +use futures::stream::FuturesUnordered; use rayon::ThreadPool; use relay_cogs::Cogs; use relay_config::{Config, RedisConnection, RedisPoolConfigs}; use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools}; use relay_system::{channel, Addr, Service}; use tokio::runtime::Runtime; +use tokio::task::JoinHandle; use crate::services::cogs::{CogsService, CogsServiceRecorder}; use crate::services::global_config::{GlobalConfigManager, GlobalConfigService}; @@ -151,7 +153,7 @@ pub struct ServiceState { impl ServiceState { /// Starts all services and returns addresses to all of them. - pub fn start(config: Arc) -> Result { + pub fn start(config: Arc) -> Result<(Self, FuturesUnordered>)> { let upstream_relay = UpstreamRelayService::new(config.clone()).start(); let test_store = TestStoreService::new(config.clone()).start(); @@ -221,7 +223,7 @@ impl ServiceState { let cogs = CogsService::new(&config); let cogs = Cogs::new(CogsServiceRecorder::new(&config, cogs.start())); - EnvelopeProcessorService::new( + let processor_handle = EnvelopeProcessorService::new( create_processor_pool(&config)?, config.clone(), global_config_handle, @@ -251,7 +253,7 @@ impl ServiceState { // Keep all the services in one context. let project_cache_services = Services { - envelope_buffer: envelope_buffer.as_ref().map(ObservableEnvelopeBuffer::addr), + envelope_buffer: envelope_buffer.as_ref().map(|(b, _)| b.addr()), aggregator: aggregator.clone(), envelope_processor: processor.clone(), outcome_aggregator: outcome_aggregator.clone(), @@ -301,7 +303,7 @@ impl ServiceState { global_config, project_cache, upstream_relay, - envelope_buffer, + envelope_buffer: envelope_buffer.as_ref().map(|(b, _)| b.clone()), }; let state = StateInner { @@ -310,9 +312,17 @@ impl ServiceState { registry, }; - Ok(ServiceState { - inner: Arc::new(state), - }) + let join_handles = FuturesUnordered::from_iter([processor_handle]); + if let Some((_, handle)) = envelope_buffer { + join_handles.push(handle); + }; + + Ok(( + ServiceState { + inner: Arc::new(state), + }, + join_handles, + )) } /// Returns a reference to the Relay configuration. diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 1b625e1b21..ad78d7d9e3 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -11,6 +11,7 @@ use relay_system::Request; use relay_system::SendError; use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service}; use tokio::sync::watch; +use tokio::task::JoinHandle; use crate::envelope::Envelope; use crate::services::buffer::envelope_buffer::Peek; @@ -128,12 +129,10 @@ impl EnvelopeBufferService { } /// Returns both the [`Addr`] to this service, and a reference to the capacity flag. - pub fn start_observable(self) -> ObservableEnvelopeBuffer { + pub fn start_observable(self) -> (ObservableEnvelopeBuffer, JoinHandle<()>) { let has_capacity = self.has_capacity.clone(); - ObservableEnvelopeBuffer { - addr: self.start(), - has_capacity, - } + let (addr, join_handle) = self.start_joinable(); + (ObservableEnvelopeBuffer { addr, has_capacity }, join_handle) } /// Wait for the configured amount of time and make sure the project cache is ready to receive. @@ -259,7 +258,7 @@ impl EnvelopeBufferService { impl Service for EnvelopeBufferService { type Interface = EnvelopeBuffer; - fn spawn_handler(mut self, mut rx: Receiver) { + fn spawn_handler(mut self, mut rx: Receiver) -> JoinHandle<()> { let config = self.config.clone(); let memory_checker = self.memory_checker.clone(); let mut global_config_rx = self.global_config_rx.clone(); @@ -312,7 +311,7 @@ impl Service for EnvelopeBufferService { } relay_log::info!("EnvelopeBufferService stop"); - }); + }) } } @@ -364,7 +363,7 @@ mod tests { service.has_capacity.store(false, Ordering::Relaxed); // Observable has correct value: - let ObservableEnvelopeBuffer { addr, has_capacity } = service.start_observable(); + let (ObservableEnvelopeBuffer { addr, has_capacity }, _) = service.start_observable(); assert!(!has_capacity.load(Ordering::Relaxed)); // Send a message to trigger update of `has_capacity` flag: diff --git a/relay-server/src/services/cogs.rs b/relay-server/src/services/cogs.rs index 19daf7516d..af75498e5a 100644 --- a/relay-server/src/services/cogs.rs +++ b/relay-server/src/services/cogs.rs @@ -1,8 +1,8 @@ -use std::sync::atomic::{AtomicBool, Ordering}; - use relay_cogs::{CogsMeasurement, CogsRecorder, ResourceId}; use relay_config::Config; use relay_system::{Addr, FromMessage, Interface, Service}; +use std::sync::atomic::{AtomicBool, Ordering}; +use tokio::task::JoinHandle; use crate::statsd::RelayCounters; @@ -54,12 +54,12 @@ impl CogsService { impl Service for CogsService { type Interface = CogsReport; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { while let Some(message) = rx.recv().await { self.handle_report(message); } - }); + }) } } diff --git a/relay-server/src/services/global_config.rs b/relay-server/src/services/global_config.rs index 7e401b626f..1ff14f7145 100644 --- a/relay-server/src/services/global_config.rs +++ b/relay-server/src/services/global_config.rs @@ -22,6 +22,7 @@ use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Serv use reqwest::Method; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch}; +use tokio::task::JoinHandle; use tokio::time::Instant; use crate::services::upstream::{ @@ -338,7 +339,7 @@ impl GlobalConfigService { impl Service for GlobalConfigService { type Interface = GlobalConfigManager; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { let mut shutdown_handle = Controller::shutdown_handle(); @@ -384,7 +385,7 @@ impl Service for GlobalConfigService { } } relay_log::info!("global config service stopped"); - }); + }) } } diff --git a/relay-server/src/services/health_check.rs b/relay-server/src/services/health_check.rs index 2899fc14b3..b82c0dd808 100644 --- a/relay-server/src/services/health_check.rs +++ b/relay-server/src/services/health_check.rs @@ -4,6 +4,7 @@ use relay_config::Config; use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Sender, Service}; use std::future::Future; use tokio::sync::watch; +use tokio::task::JoinHandle; use tokio::time::{timeout, Instant}; use crate::services::metrics::RouterHandle; @@ -189,13 +190,13 @@ impl HealthCheckService { impl Service for HealthCheckService { type Interface = HealthCheck; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { let (update_tx, update_rx) = watch::channel(StatusUpdate::new(Status::Unhealthy)); let check_interval = self.config.health_refresh_interval(); // Add 10% buffer to the internal timeouts to avoid race conditions. let status_timeout = (check_interval + self.config.health_probe_timeout()).mul_f64(1.1); - tokio::spawn(async move { + let j1 = tokio::spawn(async move { let shutdown = Controller::shutdown_handle(); while shutdown.get().is_none() { @@ -212,7 +213,7 @@ impl Service for HealthCheckService { update_tx.send(StatusUpdate::new(Status::Unhealthy)).ok(); }); - tokio::spawn(async move { + let _j2 = tokio::spawn(async move { while let Some(HealthCheck(message, sender)) = rx.recv().await { let update = update_rx.borrow(); @@ -225,6 +226,8 @@ impl Service for HealthCheckService { }); } }); + + j1 // TODO: should return j1 + j2 } } diff --git a/relay-server/src/services/metrics/aggregator.rs b/relay-server/src/services/metrics/aggregator.rs index 707d0eec72..5f11e152ac 100644 --- a/relay-server/src/services/metrics/aggregator.rs +++ b/relay-server/src/services/metrics/aggregator.rs @@ -8,6 +8,7 @@ use relay_config::AggregatorServiceConfig; use relay_metrics::aggregator::AggregateMetricsError; use relay_metrics::{aggregator, Bucket, UnixTimestamp}; use relay_system::{Controller, FromMessage, Interface, NoResponse, Recipient, Service, Shutdown}; +use tokio::task::JoinHandle; use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; @@ -246,7 +247,7 @@ impl AggregatorService { impl Service for AggregatorService { type Interface = Aggregator; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { let mut ticker = tokio::time::interval(Duration::from_millis(self.flush_interval_ms)); let mut shutdown = Controller::shutdown_handle(); @@ -264,7 +265,7 @@ impl Service for AggregatorService { else => break, } } - }); + }) } } @@ -321,6 +322,7 @@ mod tests { use relay_common::time::UnixTimestamp; use relay_metrics::{aggregator::AggregatorConfig, BucketMetadata, BucketValue}; + use tokio::task::JoinHandle; use super::*; @@ -361,7 +363,7 @@ mod tests { impl Service for TestReceiver { type Interface = TestInterface; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { while let Some(message) = rx.recv().await { let buckets = message.0.buckets; @@ -370,7 +372,7 @@ mod tests { self.add_buckets(buckets); } } - }); + }) } } diff --git a/relay-server/src/services/metrics/router.rs b/relay-server/src/services/metrics/router.rs index 522eb729f1..2a14b2cdc2 100644 --- a/relay-server/src/services/metrics/router.rs +++ b/relay-server/src/services/metrics/router.rs @@ -5,6 +5,7 @@ use relay_config::aggregator::Condition; use relay_config::{AggregatorServiceConfig, ScopedAggregatorConfig}; use relay_metrics::MetricNamespace; use relay_system::{Addr, NoResponse, Recipient, Service}; +use tokio::task::JoinHandle; use crate::services::metrics::{ Aggregator, AggregatorHandle, AggregatorService, FlushBuckets, MergeBuckets, @@ -53,7 +54,7 @@ impl RouterService { impl Service for RouterService { type Interface = Aggregator; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { let mut router = StartedRouter::start(self); relay_log::info!("metrics router started"); @@ -72,7 +73,7 @@ impl Service for RouterService { } } relay_log::info!("metrics router stopped"); - }); + }) } } diff --git a/relay-server/src/services/outcome.rs b/relay-server/src/services/outcome.rs index 55a4a8b73a..7298015f61 100644 --- a/relay-server/src/services/outcome.rs +++ b/relay-server/src/services/outcome.rs @@ -29,6 +29,7 @@ use relay_sampling::evaluation::MatchedRuleIds; use relay_statsd::metric; use relay_system::{Addr, FromMessage, Interface, NoResponse, Service}; use serde::{Deserialize, Serialize}; +use tokio::task::JoinHandle; #[cfg(feature = "processing")] use crate::service::ServiceError; @@ -682,7 +683,7 @@ impl HttpOutcomeProducer { impl Service for HttpOutcomeProducer { type Interface = TrackRawOutcome; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { loop { tokio::select! { @@ -694,7 +695,7 @@ impl Service for HttpOutcomeProducer { else => break, } } - }); + }) } } @@ -775,7 +776,7 @@ impl ClientReportOutcomeProducer { impl Service for ClientReportOutcomeProducer { type Interface = TrackOutcome; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { loop { tokio::select! { @@ -787,7 +788,7 @@ impl Service for ClientReportOutcomeProducer { else => break, } } - }); + }) } } @@ -1034,7 +1035,7 @@ impl OutcomeProducerService { impl Service for OutcomeProducerService { type Interface = OutcomeProducer; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { let Self { config, inner } = self; tokio::spawn(async move { @@ -1045,7 +1046,7 @@ impl Service for OutcomeProducerService { broker.handle_message(message, &config); } relay_log::info!("OutcomeProducer stopped."); - }); + }) } } diff --git a/relay-server/src/services/outcome_aggregator.rs b/relay-server/src/services/outcome_aggregator.rs index 0a13cbf361..ac24c64b79 100644 --- a/relay-server/src/services/outcome_aggregator.rs +++ b/relay-server/src/services/outcome_aggregator.rs @@ -10,6 +10,7 @@ use relay_config::{Config, EmitOutcomes}; use relay_quotas::{DataCategory, Scoping}; use relay_statsd::metric; use relay_system::{Addr, Controller, Service, Shutdown}; +use tokio::task::JoinHandle; use crate::services::outcome::{Outcome, OutcomeProducer, TrackOutcome}; use crate::statsd::RelayTimers; @@ -138,7 +139,7 @@ impl OutcomeAggregator { impl Service for OutcomeAggregator { type Interface = TrackOutcome; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { let mut shutdown = Controller::shutdown_handle(); relay_log::info!("outcome aggregator started"); @@ -157,6 +158,6 @@ impl Service for OutcomeAggregator { } relay_log::info!("outcome aggregator stopped"); - }); + }) } } diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 1c9261b44c..2bfb6383e3 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -41,6 +41,7 @@ use relay_statsd::metric; use relay_system::{Addr, FromMessage, NoResponse, Service}; use reqwest::header; use smallvec::{smallvec, SmallVec}; +use tokio::task::JoinHandle; #[cfg(feature = "processing")] use { @@ -2890,7 +2891,8 @@ impl EnvelopeProcessorService { impl Service for EnvelopeProcessorService { type Interface = EnvelopeProcessor; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + #[must_use] + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { while let Some(message) = rx.recv().await { let service = self.clone(); @@ -2899,7 +2901,7 @@ impl Service for EnvelopeProcessorService { .spawn(move || service.handle_message(message)) .await; } - }); + }) } } diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index ef2f3f813f..0abded5c9e 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -23,6 +23,7 @@ use relay_system::{Addr, FromMessage, Interface, Sender, Service}; #[cfg(feature = "processing")] use tokio::sync::Semaphore; use tokio::sync::{mpsc, watch}; +use tokio::task::JoinHandle; use tokio::time::Instant; use crate::services::metrics::{Aggregator, FlushBuckets}; @@ -1370,7 +1371,7 @@ impl ProjectCacheService { impl Service for ProjectCacheService { type Interface = ProjectCache; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { let Self { config, memory_checker, @@ -1506,7 +1507,7 @@ impl Service for ProjectCacheService { } relay_log::info!("project cache stopped"); - }); + }) } } diff --git a/relay-server/src/services/project_local.rs b/relay-server/src/services/project_local.rs index 745203780a..ce6e9bbf73 100644 --- a/relay-server/src/services/project_local.rs +++ b/relay-server/src/services/project_local.rs @@ -7,6 +7,7 @@ use relay_base_schema::project::{ProjectId, ProjectKey}; use relay_config::Config; use relay_system::{AsyncResponse, FromMessage, Interface, Receiver, Sender, Service}; use tokio::sync::mpsc; +use tokio::task::JoinHandle; use tokio::time::Instant; use crate::services::project::{ParsedProjectState, ProjectState}; @@ -164,7 +165,7 @@ async fn spawn_poll_local_states( impl Service for LocalProjectSourceService { type Interface = LocalProjectSource; - fn spawn_handler(mut self, mut rx: Receiver) { + fn spawn_handler(mut self, mut rx: Receiver) -> JoinHandle<()> { // Use a channel with size 1. If the channel is full because the consumer does not // collect the result, the producer will block, which is acceptable. let (state_tx, mut state_rx) = mpsc::channel(1); @@ -185,7 +186,7 @@ impl Service for LocalProjectSourceService { } } relay_log::info!("project local cache stopped"); - }); + }) } } diff --git a/relay-server/src/services/project_upstream.rs b/relay-server/src/services/project_upstream.rs index 4d7d5338ed..c459261079 100644 --- a/relay-server/src/services/project_upstream.rs +++ b/relay-server/src/services/project_upstream.rs @@ -16,6 +16,7 @@ use relay_system::{ }; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; +use tokio::task::JoinHandle; use tokio::time::Instant; use crate::services::project::state::UpstreamProjectState; @@ -592,7 +593,7 @@ impl UpstreamProjectSourceService { impl Service for UpstreamProjectSourceService { type Interface = UpstreamProjectSource; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { relay_log::info!("project upstream cache started"); loop { @@ -607,6 +608,6 @@ impl Service for UpstreamProjectSourceService { } } relay_log::info!("project upstream cache stopped"); - }); + }) } } diff --git a/relay-server/src/services/relays.rs b/relay-server/src/services/relays.rs index 922b1c7dda..d98811aeba 100644 --- a/relay-server/src/services/relays.rs +++ b/relay-server/src/services/relays.rs @@ -10,6 +10,7 @@ use relay_system::{ }; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; +use tokio::task::JoinHandle; use crate::services::upstream::{Method, RequestPriority, SendQuery, UpstreamQuery, UpstreamRelay}; use crate::utils::{RetryBackoff, SleepHandle}; @@ -334,7 +335,7 @@ impl RelayCacheService { impl Service for RelayCacheService { type Interface = RelayCache; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { relay_log::info!("key cache started"); @@ -351,6 +352,6 @@ impl Service for RelayCacheService { } relay_log::info!("key cache stopped"); - }); + }) } } diff --git a/relay-server/src/services/server.rs b/relay-server/src/services/server.rs index d52fa76148..3f2cad4750 100644 --- a/relay-server/src/services/server.rs +++ b/relay-server/src/services/server.rs @@ -13,6 +13,7 @@ use relay_config::Config; use relay_system::{Controller, Service, Shutdown}; use socket2::TcpKeepalive; use tokio::net::{TcpSocket, TcpStream}; +use tokio::task::JoinHandle; use tower::ServiceBuilder; use tower_http::compression::predicate::SizeAbove; use tower_http::compression::{CompressionLayer, DefaultPredicate, Predicate}; @@ -167,7 +168,7 @@ impl Accept for KeepAliveAcceptor { } } -fn serve(listener: TcpListener, app: App, config: Arc) { +fn serve(listener: TcpListener, app: App, config: Arc) -> JoinHandle<()> { let handle = Handle::new(); let mut server = axum_server::from_tcp(listener) @@ -189,7 +190,7 @@ fn serve(listener: TcpListener, app: App, config: Arc) { .keep_alive_timeout(config.keepalive_timeout()); let service = ServiceExt::::into_make_service_with_connect_info::(app); - tokio::spawn(server.serve(service)); + let _server_handle = tokio::spawn(server.serve(service)); tokio::spawn(async move { let Shutdown { timeout } = Controller::shutdown_handle().notified().await; @@ -199,7 +200,7 @@ fn serve(listener: TcpListener, app: App, config: Arc) { Some(timeout) => handle.graceful_shutdown(Some(timeout)), None => handle.shutdown(), } - }); + }) // TODO: return both } /// HTTP server service. @@ -227,7 +228,7 @@ impl HttpServer { impl Service for HttpServer { type Interface = (); - fn spawn_handler(self, _rx: relay_system::Receiver) { + fn spawn_handler(self, _rx: relay_system::Receiver) -> JoinHandle<()> { let Self { config, service, @@ -239,6 +240,6 @@ impl Service for HttpServer { relay_statsd::metric!(counter(RelayCounters::ServerStarting) += 1); let app = make_app(service); - serve(listener, app, config); + serve(listener, app, config) } } diff --git a/relay-server/src/services/spooler/mod.rs b/relay-server/src/services/spooler/mod.rs index 00265b0e92..1b98af2f7e 100644 --- a/relay-server/src/services/spooler/mod.rs +++ b/relay-server/src/services/spooler/mod.rs @@ -50,6 +50,7 @@ use sqlx::sqlite::{ use sqlx::{Pool, Row, Sqlite}; use tokio::fs::DirBuilder; use tokio::sync::mpsc; +use tokio::task::JoinHandle; use crate::envelope::{Envelope, EnvelopeError}; use crate::extractors::StartTime; @@ -1272,7 +1273,7 @@ impl BufferService { impl Service for BufferService { type Interface = Buffer; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { let mut shutdown = Controller::shutdown_handle(); @@ -1299,7 +1300,7 @@ impl Service for BufferService { else => break, } } - }); + }) } } @@ -1328,6 +1329,7 @@ mod tests { use std::str::FromStr; use std::sync::Mutex; use std::time::{Duration, Instant}; + use tokio::task::JoinHandle; use uuid::Uuid; use crate::services::project_cache::SpoolHealth; @@ -1591,7 +1593,7 @@ mod tests { impl Service for TestHealthService { type Interface = TestHealth; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { loop { tokio::select! { @@ -1599,7 +1601,7 @@ mod tests { else => break, } } - }); + }) } } diff --git a/relay-server/src/services/stats.rs b/relay-server/src/services/stats.rs index a7b5adc66e..7e27d60ce1 100644 --- a/relay-server/src/services/stats.rs +++ b/relay-server/src/services/stats.rs @@ -5,6 +5,7 @@ use relay_config::{Config, RelayMode}; use relay_redis::{RedisPool, RedisPools}; use relay_statsd::metric; use relay_system::{Addr, Service}; +use tokio::task::JoinHandle; use tokio::time::interval; use crate::services::upstream::{IsNetworkOutage, UpstreamRelay}; @@ -136,9 +137,9 @@ impl RelayStats { impl Service for RelayStats { type Interface = (); - fn spawn_handler(self, _rx: relay_system::Receiver) { + fn spawn_handler(self, _rx: relay_system::Receiver) -> JoinHandle<()> { let Some(mut ticker) = self.config.metrics_periodic_interval().map(interval) else { - return; + return tokio::spawn(async {}); }; tokio::spawn(async move { @@ -150,6 +151,6 @@ impl Service for RelayStats { ); ticker.tick().await; } - }); + }) } } diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 1af723ed68..8ea8bb54c5 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -7,6 +7,7 @@ use std::collections::BTreeMap; use std::error::Error; use std::sync::Arc; use std::time::Instant; +use tokio::task::JoinHandle; use bytes::Bytes; use relay_base_schema::data_category::DataCategory; @@ -1044,7 +1045,7 @@ impl StoreService { impl Service for StoreService { type Interface = Store; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { let this = Arc::new(self); tokio::spawn(async move { @@ -1058,7 +1059,7 @@ impl Service for StoreService { } relay_log::info!("store forwarder stopped"); - }); + }) } } diff --git a/relay-server/src/services/test_store.rs b/relay-server/src/services/test_store.rs index c61621fc0c..fbd02a8ccb 100644 --- a/relay-server/src/services/test_store.rs +++ b/relay-server/src/services/test_store.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use relay_config::{Config, RelayMode}; use relay_event_schema::protocol::EventId; use relay_system::{AsyncResponse, FromMessage, NoResponse, Sender}; +use tokio::task::JoinHandle; use crate::envelope::Envelope; use crate::services::outcome::Outcome; @@ -134,11 +135,11 @@ impl TestStoreService { impl relay_system::Service for TestStoreService { type Interface = TestStore; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + fn spawn_handler(mut self, mut rx: relay_system::Receiver) -> JoinHandle<()> { tokio::spawn(async move { while let Some(message) = rx.recv().await { self.handle_message(message); } - }); + }) } } diff --git a/relay-server/src/services/upstream.rs b/relay-server/src/services/upstream.rs index e44861a122..59fbec2a45 100644 --- a/relay-server/src/services/upstream.rs +++ b/relay-server/src/services/upstream.rs @@ -28,6 +28,7 @@ pub use reqwest::Method; use serde::de::DeserializeOwned; use serde::Serialize; use tokio::sync::mpsc; +use tokio::task::JoinHandle; use tokio::time::Instant; use crate::http::{HttpError, Request, RequestBuilder, Response, StatusCode}; @@ -1255,7 +1256,7 @@ enum ConnectionState { /// The connection is interrupted and reconnection is in progress. /// /// If the task has finished, connection should be considered `Connected`. - Reconnecting(tokio::task::JoinHandle<()>), + Reconnecting(JoinHandle<()>), } /// Maintains outage state of the connection to the upstream. @@ -1498,7 +1499,7 @@ impl UpstreamRelayService { impl Service for UpstreamRelayService { type Interface = UpstreamRelay; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + fn spawn_handler(self, mut rx: relay_system::Receiver) -> JoinHandle<()> { let Self { config } = self; let client = SharedClient::build(config.clone()); @@ -1540,6 +1541,6 @@ impl Service for UpstreamRelayService { else => break, } } - }); + }) // TODO: return both } } diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs index 60c5c829a1..cfb3a817b8 100644 --- a/relay-system/src/controller.rs +++ b/relay-system/src/controller.rs @@ -129,7 +129,7 @@ impl ShutdownHandle { /// impl Service for MyService { /// type Interface = (); /// -/// fn spawn_handler(self, mut rx: relay_system::Receiver) { +/// fn spawn_handler(self, mut rx: relay_system::Receiver) -> tokio::task::JoinHandle<()> { /// tokio::spawn(async move { /// let mut shutdown = Controller::shutdown_handle(); /// diff --git a/relay-system/src/service.rs b/relay-system/src/service.rs index a09d019e13..865082594c 100644 --- a/relay-system/src/service.rs +++ b/relay-system/src/service.rs @@ -9,8 +9,8 @@ use std::time::Duration; use futures::future::Shared; use futures::FutureExt; -use tokio::runtime::Runtime; use tokio::sync::{mpsc, oneshot}; +use tokio::task::JoinHandle; use tokio::time::MissedTickBehavior; use crate::statsd::SystemGauges; @@ -1001,19 +1001,20 @@ pub trait Service: Sized { /// /// Receives an inbound channel for all messages sent through the service's [`Addr`]. Note /// that this function is synchronous, so that this needs to spawn a task internally. - fn spawn_handler(self, rx: Receiver); + fn spawn_handler(self, rx: Receiver) -> JoinHandle<()>; - /// Starts the service in the current runtime and returns an address for it. - fn start(self) -> Addr { + /// Starts the service in the current runtime and returns the address and a join handle. + fn start_joinable(self) -> (Addr, JoinHandle<()>) { let (addr, rx) = channel(Self::name()); - self.spawn_handler(rx); - addr + (addr, self.spawn_handler(rx)) } - /// Starts the service in the given runtime and returns an address for it. - fn start_in(self, runtime: &Runtime) -> Addr { - let _guard = runtime.enter(); - self.start() + /// Starts the service in the current runtime and returns an address for it. + /// + /// The main task of the service is detached. + fn start(self) -> Addr { + let (addr, _join_handle) = self.start_joinable(); + addr } /// Returns a unique name for this service implementation. @@ -1046,12 +1047,12 @@ mod tests { impl Service for MockService { type Interface = MockMessage; - fn spawn_handler(self, mut rx: Receiver) { + fn spawn_handler(self, mut rx: Receiver) -> JoinHandle<()> { tokio::spawn(async move { while rx.recv().await.is_some() { tokio::time::sleep(BACKLOG_INTERVAL * 2).await; } - }); + }) } fn name() -> &'static str {