Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): abort on panic #4026

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,10 @@ mod testutils;

use std::sync::Arc;

use futures::StreamExt;
use relay_config::Config;
use relay_system::{Controller, Service};
use tokio::select;

use crate::service::ServiceState;
use crate::services::server::HttpServer;
Expand All @@ -301,9 +303,30 @@ pub fn run(config: Config) -> anyhow::Result<()> {
// information on all services.
main_runtime.block_on(async {
Controller::start(config.shutdown_timeout());
let service = ServiceState::start(config.clone())?;
let (service, mut join_handles) = ServiceState::start(config.clone())?;
HttpServer::new(config, service.clone())?.start();
Controller::shutdown_handle().finished().await;

loop {
select! {
Some(res) = join_handles.next() => {
match res {
Ok(()) => {
relay_log::trace!("Service exited normally.");
}
Err(e) => {
if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we maybe want in a future iteration to define a respawn behavior of services? It might be tricky to make sure existing channels are re-setup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually re-triggers the panic and makes the process terminate. Respawning services is another option I would like to discuss on Monday, but it has its drawbacks (what if the service keeps panicking on every re-spawn?).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is something I thought of, I feel like for that we should have some global retry counters or heuristics to know when it's not possible to restart a service anymore.

}
}
}
}
_ = Controller::shutdown_handle().finished() => {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: when every service implements a shutdown listener, awaiting on finished becomes unnecessary: We can simply await on all the join_handles and guarantee that every service finished its main task.

break
}
else => break
}
}

anyhow::Ok(())
})?;

Expand Down
24 changes: 17 additions & 7 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ use crate::services::stats::RelayStats;
use anyhow::{Context, Result};
use axum::extract::FromRequestParts;
use axum::http::request::Parts;
use futures::stream::FuturesUnordered;
use rayon::ThreadPool;
use relay_cogs::Cogs;
use relay_config::{Config, RedisConnection, RedisPoolConfigs};
use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools};
use relay_system::{channel, Addr, Service};
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;

use crate::services::cogs::{CogsService, CogsServiceRecorder};
use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
Expand Down Expand Up @@ -151,7 +153,7 @@ pub struct ServiceState {

impl ServiceState {
/// Starts all services and returns addresses to all of them.
pub fn start(config: Arc<Config>) -> Result<Self> {
pub fn start(config: Arc<Config>) -> Result<(Self, FuturesUnordered<JoinHandle<()>>)> {
let upstream_relay = UpstreamRelayService::new(config.clone()).start();
let test_store = TestStoreService::new(config.clone()).start();

Expand Down Expand Up @@ -221,7 +223,7 @@ impl ServiceState {
let cogs = CogsService::new(&config);
let cogs = Cogs::new(CogsServiceRecorder::new(&config, cogs.start()));

EnvelopeProcessorService::new(
let processor_handle = EnvelopeProcessorService::new(
create_processor_pool(&config)?,
config.clone(),
global_config_handle,
Expand Down Expand Up @@ -251,7 +253,7 @@ impl ServiceState {

// Keep all the services in one context.
let project_cache_services = Services {
envelope_buffer: envelope_buffer.as_ref().map(ObservableEnvelopeBuffer::addr),
envelope_buffer: envelope_buffer.as_ref().map(|(b, _)| b.addr()),
aggregator: aggregator.clone(),
envelope_processor: processor.clone(),
outcome_aggregator: outcome_aggregator.clone(),
Expand Down Expand Up @@ -301,7 +303,7 @@ impl ServiceState {
global_config,
project_cache,
upstream_relay,
envelope_buffer,
envelope_buffer: envelope_buffer.as_ref().map(|(b, _)| b.clone()),
};

let state = StateInner {
Expand All @@ -310,9 +312,17 @@ impl ServiceState {
registry,
};

Ok(ServiceState {
inner: Arc::new(state),
})
let join_handles = FuturesUnordered::from_iter([processor_handle]);
if let Some((_, handle)) = envelope_buffer {
join_handles.push(handle);
};

Ok((
ServiceState {
inner: Arc::new(state),
},
join_handles,
))
}

/// Returns a reference to the Relay configuration.
Expand Down
15 changes: 7 additions & 8 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use relay_system::Request;
use relay_system::SendError;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service};
use tokio::sync::watch;
use tokio::task::JoinHandle;

use crate::envelope::Envelope;
use crate::services::buffer::envelope_buffer::Peek;
Expand Down Expand Up @@ -128,12 +129,10 @@ impl EnvelopeBufferService {
}

/// Returns both the [`Addr`] to this service, and a reference to the capacity flag.
pub fn start_observable(self) -> ObservableEnvelopeBuffer {
pub fn start_observable(self) -> (ObservableEnvelopeBuffer, JoinHandle<()>) {
let has_capacity = self.has_capacity.clone();
ObservableEnvelopeBuffer {
addr: self.start(),
has_capacity,
}
let (addr, join_handle) = self.start_joinable();
(ObservableEnvelopeBuffer { addr, has_capacity }, join_handle)
}

/// Wait for the configured amount of time and make sure the project cache is ready to receive.
Expand Down Expand Up @@ -259,7 +258,7 @@ impl EnvelopeBufferService {
impl Service for EnvelopeBufferService {
type Interface = EnvelopeBuffer;

fn spawn_handler(mut self, mut rx: Receiver<Self::Interface>) {
fn spawn_handler(mut self, mut rx: Receiver<Self::Interface>) -> JoinHandle<()> {
let config = self.config.clone();
let memory_checker = self.memory_checker.clone();
let mut global_config_rx = self.global_config_rx.clone();
Expand Down Expand Up @@ -312,7 +311,7 @@ impl Service for EnvelopeBufferService {
}

relay_log::info!("EnvelopeBufferService stop");
});
})
}
}

Expand Down Expand Up @@ -364,7 +363,7 @@ mod tests {
service.has_capacity.store(false, Ordering::Relaxed);

// Observable has correct value:
let ObservableEnvelopeBuffer { addr, has_capacity } = service.start_observable();
let (ObservableEnvelopeBuffer { addr, has_capacity }, _) = service.start_observable();
assert!(!has_capacity.load(Ordering::Relaxed));

// Send a message to trigger update of `has_capacity` flag:
Expand Down
8 changes: 4 additions & 4 deletions relay-server/src/services/cogs.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::sync::atomic::{AtomicBool, Ordering};

use relay_cogs::{CogsMeasurement, CogsRecorder, ResourceId};
use relay_config::Config;
use relay_system::{Addr, FromMessage, Interface, Service};
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::task::JoinHandle;

use crate::statsd::RelayCounters;

Expand Down Expand Up @@ -54,12 +54,12 @@ impl CogsService {
impl Service for CogsService {
type Interface = CogsReport;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
self.handle_report(message);
}
});
})
}
}

Expand Down
5 changes: 3 additions & 2 deletions relay-server/src/services/global_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Serv
use reqwest::Method;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};
use tokio::task::JoinHandle;
use tokio::time::Instant;

use crate::services::upstream::{
Expand Down Expand Up @@ -338,7 +339,7 @@ impl GlobalConfigService {
impl Service for GlobalConfigService {
type Interface = GlobalConfigManager;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
let mut shutdown_handle = Controller::shutdown_handle();

Expand Down Expand Up @@ -384,7 +385,7 @@ impl Service for GlobalConfigService {
}
}
relay_log::info!("global config service stopped");
});
})
}
}

Expand Down
9 changes: 6 additions & 3 deletions relay-server/src/services/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use relay_config::Config;
use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Sender, Service};
use std::future::Future;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio::time::{timeout, Instant};

use crate::services::metrics::RouterHandle;
Expand Down Expand Up @@ -189,13 +190,13 @@ impl HealthCheckService {
impl Service for HealthCheckService {
type Interface = HealthCheck;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
let (update_tx, update_rx) = watch::channel(StatusUpdate::new(Status::Unhealthy));
let check_interval = self.config.health_refresh_interval();
// Add 10% buffer to the internal timeouts to avoid race conditions.
let status_timeout = (check_interval + self.config.health_probe_timeout()).mul_f64(1.1);

tokio::spawn(async move {
let j1 = tokio::spawn(async move {
let shutdown = Controller::shutdown_handle();

while shutdown.get().is_none() {
Expand All @@ -212,7 +213,7 @@ impl Service for HealthCheckService {
update_tx.send(StatusUpdate::new(Status::Unhealthy)).ok();
});

tokio::spawn(async move {
let _j2 = tokio::spawn(async move {
while let Some(HealthCheck(message, sender)) = rx.recv().await {
let update = update_rx.borrow();

Expand All @@ -225,6 +226,8 @@ impl Service for HealthCheckService {
});
}
});

j1 // TODO: should return j1 + j2
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a few places where the spawn handler spawns more than one task. In a follow-up, we should transform these to something like

tokio::spawn(async {
    let subtask = tokio::spawn(async {...});
    /// ...
    subtask.await;
});

}
}

Expand Down
14 changes: 10 additions & 4 deletions relay-server/src/services/metrics/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,10 @@ impl AggregatorService {
impl Service for AggregatorService {
type Interface = Aggregator;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(
mut self,
mut rx: relay_system::Receiver<Self::Interface>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(Duration::from_millis(self.flush_interval_ms));
let mut shutdown = Controller::shutdown_handle();
Expand All @@ -264,7 +267,7 @@ impl Service for AggregatorService {
else => break,
}
}
});
})
}
}

Expand Down Expand Up @@ -361,7 +364,10 @@ mod tests {
impl Service for TestReceiver {
type Interface = TestInterface;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(
self,
mut rx: relay_system::Receiver<Self::Interface>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
let buckets = message.0.buckets;
Expand All @@ -370,7 +376,7 @@ mod tests {
self.add_buckets(buckets);
}
}
});
})
}
}

Expand Down
5 changes: 3 additions & 2 deletions relay-server/src/services/metrics/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use relay_config::aggregator::Condition;
use relay_config::{AggregatorServiceConfig, ScopedAggregatorConfig};
use relay_metrics::MetricNamespace;
use relay_system::{Addr, NoResponse, Recipient, Service};
use tokio::task::JoinHandle;

use crate::services::metrics::{
Aggregator, AggregatorHandle, AggregatorService, FlushBuckets, MergeBuckets,
Expand Down Expand Up @@ -53,7 +54,7 @@ impl RouterService {
impl Service for RouterService {
type Interface = Aggregator;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
let mut router = StartedRouter::start(self);
relay_log::info!("metrics router started");
Expand All @@ -72,7 +73,7 @@ impl Service for RouterService {
}
}
relay_log::info!("metrics router stopped");
});
})
}
}

Expand Down
13 changes: 7 additions & 6 deletions relay-server/src/services/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use relay_sampling::evaluation::MatchedRuleIds;
use relay_statsd::metric;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
use serde::{Deserialize, Serialize};
use tokio::task::JoinHandle;

#[cfg(feature = "processing")]
use crate::service::ServiceError;
Expand Down Expand Up @@ -682,7 +683,7 @@ impl HttpOutcomeProducer {
impl Service for HttpOutcomeProducer {
type Interface = TrackRawOutcome;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
tokio::select! {
Expand All @@ -694,7 +695,7 @@ impl Service for HttpOutcomeProducer {
else => break,
}
}
});
})
}
}

Expand Down Expand Up @@ -775,7 +776,7 @@ impl ClientReportOutcomeProducer {
impl Service for ClientReportOutcomeProducer {
type Interface = TrackOutcome;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
tokio::select! {
Expand All @@ -787,7 +788,7 @@ impl Service for ClientReportOutcomeProducer {
else => break,
}
}
});
})
}
}

Expand Down Expand Up @@ -1034,7 +1035,7 @@ impl OutcomeProducerService {
impl Service for OutcomeProducerService {
type Interface = OutcomeProducer;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
let Self { config, inner } = self;

tokio::spawn(async move {
Expand All @@ -1045,7 +1046,7 @@ impl Service for OutcomeProducerService {
broker.handle_message(message, &config);
}
relay_log::info!("OutcomeProducer stopped.");
});
})
}
}

Expand Down
Loading
Loading