Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): abort on panic #4026

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- Use `matches_any_origin` to scrub HTTP hosts in spans. ([#3939](https://github.com/getsentry/relay/pull/3939)).
- Keep frames from both ends of the stacktrace when trimming frames. ([#3905](https://github.com/getsentry/relay/pull/3905))
- Abort the process when a service panics. ([#4026](https://github.com/getsentry/relay/pull/4026))

**Features**:

Expand Down
23 changes: 21 additions & 2 deletions relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,10 @@ mod testutils;

use std::sync::Arc;

use futures::StreamExt;
use relay_config::Config;
use relay_system::{Controller, Service};
use tokio::select;

use crate::service::ServiceState;
use crate::services::server::HttpServer;
Expand All @@ -301,9 +303,26 @@ pub fn run(config: Config) -> anyhow::Result<()> {
// information on all services.
main_runtime.block_on(async {
Controller::start(config.shutdown_timeout());
let service = ServiceState::start(config.clone())?;
let (service, mut join_handles) = ServiceState::start(config.clone())?;
HttpServer::new(config, service.clone())?.start();
Controller::shutdown_handle().finished().await;

loop {
select! {
Some(res) = join_handles.next() => {
if let Err(e) = res {
if e.is_panic() {
// Re-trigger panic to terminate the process:
std::panic::resume_unwind(e.into_panic());
}
}
}
_ = Controller::shutdown_handle().finished() => {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: when every service implements a shutdown listener, awaiting on finished becomes unnecessary: We can simply await on all the join_handles and guarantee that every service finished its main task.

break
}
else => break
}
}

anyhow::Ok(())
})?;

Expand Down
24 changes: 17 additions & 7 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ use crate::services::stats::RelayStats;
use anyhow::{Context, Result};
use axum::extract::FromRequestParts;
use axum::http::request::Parts;
use futures::stream::FuturesUnordered;
use rayon::ThreadPool;
use relay_cogs::Cogs;
use relay_config::{Config, RedisConnection, RedisPoolConfigs};
use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools};
use relay_system::{channel, Addr, Service};
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;

use crate::services::cogs::{CogsService, CogsServiceRecorder};
use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
Expand Down Expand Up @@ -151,7 +153,7 @@ pub struct ServiceState {

impl ServiceState {
/// Starts all services and returns addresses to all of them.
pub fn start(config: Arc<Config>) -> Result<Self> {
pub fn start(config: Arc<Config>) -> Result<(Self, FuturesUnordered<JoinHandle<()>>)> {
let upstream_relay = UpstreamRelayService::new(config.clone()).start();
let test_store = TestStoreService::new(config.clone()).start();

Expand Down Expand Up @@ -221,7 +223,7 @@ impl ServiceState {
let cogs = CogsService::new(&config);
let cogs = Cogs::new(CogsServiceRecorder::new(&config, cogs.start()));

EnvelopeProcessorService::new(
let processor_handle = EnvelopeProcessorService::new(
create_processor_pool(&config)?,
config.clone(),
global_config_handle,
Expand Down Expand Up @@ -251,7 +253,7 @@ impl ServiceState {

// Keep all the services in one context.
let project_cache_services = Services {
envelope_buffer: envelope_buffer.as_ref().map(ObservableEnvelopeBuffer::addr),
envelope_buffer: envelope_buffer.as_ref().map(|(b, _)| b.addr()),
aggregator: aggregator.clone(),
envelope_processor: processor.clone(),
outcome_aggregator: outcome_aggregator.clone(),
Expand Down Expand Up @@ -301,7 +303,7 @@ impl ServiceState {
global_config,
project_cache,
upstream_relay,
envelope_buffer,
envelope_buffer: envelope_buffer.as_ref().map(|(b, _)| b.clone()),
};

let state = StateInner {
Expand All @@ -310,9 +312,17 @@ impl ServiceState {
registry,
};

Ok(ServiceState {
inner: Arc::new(state),
})
let join_handles = FuturesUnordered::from_iter([processor_handle]);
if let Some((_, handle)) = envelope_buffer {
join_handles.push(handle);
};

Ok((
ServiceState {
inner: Arc::new(state),
},
join_handles,
))
}

/// Returns a reference to the Relay configuration.
Expand Down
15 changes: 7 additions & 8 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use relay_system::Request;
use relay_system::SendError;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service};
use tokio::sync::watch;
use tokio::task::JoinHandle;

use crate::envelope::Envelope;
use crate::services::buffer::envelope_buffer::Peek;
Expand Down Expand Up @@ -128,12 +129,10 @@ impl EnvelopeBufferService {
}

/// Returns both the [`Addr`] to this service, and a reference to the capacity flag.
pub fn start_observable(self) -> ObservableEnvelopeBuffer {
pub fn start_observable(self) -> (ObservableEnvelopeBuffer, JoinHandle<()>) {
let has_capacity = self.has_capacity.clone();
ObservableEnvelopeBuffer {
addr: self.start(),
has_capacity,
}
let (addr, join_handle) = self.start_joinable();
(ObservableEnvelopeBuffer { addr, has_capacity }, join_handle)
}

/// Wait for the configured amount of time and make sure the project cache is ready to receive.
Expand Down Expand Up @@ -259,7 +258,7 @@ impl EnvelopeBufferService {
impl Service for EnvelopeBufferService {
type Interface = EnvelopeBuffer;

fn spawn_handler(mut self, mut rx: Receiver<Self::Interface>) {
fn spawn_handler(mut self, mut rx: Receiver<Self::Interface>) -> JoinHandle<()> {
let config = self.config.clone();
let memory_checker = self.memory_checker.clone();
let mut global_config_rx = self.global_config_rx.clone();
Expand Down Expand Up @@ -312,7 +311,7 @@ impl Service for EnvelopeBufferService {
}

relay_log::info!("EnvelopeBufferService stop");
});
})
}
}

Expand Down Expand Up @@ -364,7 +363,7 @@ mod tests {
service.has_capacity.store(false, Ordering::Relaxed);

// Observable has correct value:
let ObservableEnvelopeBuffer { addr, has_capacity } = service.start_observable();
let (ObservableEnvelopeBuffer { addr, has_capacity }, _) = service.start_observable();
assert!(!has_capacity.load(Ordering::Relaxed));

// Send a message to trigger update of `has_capacity` flag:
Expand Down
8 changes: 4 additions & 4 deletions relay-server/src/services/cogs.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::sync::atomic::{AtomicBool, Ordering};

use relay_cogs::{CogsMeasurement, CogsRecorder, ResourceId};
use relay_config::Config;
use relay_system::{Addr, FromMessage, Interface, Service};
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::task::JoinHandle;

use crate::statsd::RelayCounters;

Expand Down Expand Up @@ -54,12 +54,12 @@ impl CogsService {
impl Service for CogsService {
type Interface = CogsReport;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
self.handle_report(message);
}
});
})
}
}

Expand Down
5 changes: 3 additions & 2 deletions relay-server/src/services/global_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Serv
use reqwest::Method;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};
use tokio::task::JoinHandle;
use tokio::time::Instant;

use crate::services::upstream::{
Expand Down Expand Up @@ -338,7 +339,7 @@ impl GlobalConfigService {
impl Service for GlobalConfigService {
type Interface = GlobalConfigManager;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
let mut shutdown_handle = Controller::shutdown_handle();

Expand Down Expand Up @@ -384,7 +385,7 @@ impl Service for GlobalConfigService {
}
}
relay_log::info!("global config service stopped");
});
})
}
}

Expand Down
9 changes: 6 additions & 3 deletions relay-server/src/services/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use relay_config::Config;
use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Sender, Service};
use std::future::Future;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio::time::{timeout, Instant};

use crate::services::metrics::RouterHandle;
Expand Down Expand Up @@ -189,13 +190,13 @@ impl HealthCheckService {
impl Service for HealthCheckService {
type Interface = HealthCheck;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
let (update_tx, update_rx) = watch::channel(StatusUpdate::new(Status::Unhealthy));
let check_interval = self.config.health_refresh_interval();
// Add 10% buffer to the internal timeouts to avoid race conditions.
let status_timeout = (check_interval + self.config.health_probe_timeout()).mul_f64(1.1);

tokio::spawn(async move {
let j1 = tokio::spawn(async move {
let shutdown = Controller::shutdown_handle();

while shutdown.get().is_none() {
Expand All @@ -212,7 +213,7 @@ impl Service for HealthCheckService {
update_tx.send(StatusUpdate::new(Status::Unhealthy)).ok();
});

tokio::spawn(async move {
let _j2 = tokio::spawn(async move {
while let Some(HealthCheck(message, sender)) = rx.recv().await {
let update = update_rx.borrow();

Expand All @@ -225,6 +226,8 @@ impl Service for HealthCheckService {
});
}
});

j1 // TODO: should return j1 + j2
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a few places where the spawn handler spawns more than one task. In a follow-up, we should transform these to something like

tokio::spawn(async {
    let subtask = tokio::spawn(async {...});
    /// ...
    subtask.await;
});

}
}

Expand Down
10 changes: 6 additions & 4 deletions relay-server/src/services/metrics/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use relay_config::AggregatorServiceConfig;
use relay_metrics::aggregator::AggregateMetricsError;
use relay_metrics::{aggregator, Bucket, UnixTimestamp};
use relay_system::{Controller, FromMessage, Interface, NoResponse, Recipient, Service, Shutdown};
use tokio::task::JoinHandle;

use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};

Expand Down Expand Up @@ -246,7 +247,7 @@ impl AggregatorService {
impl Service for AggregatorService {
type Interface = Aggregator;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(Duration::from_millis(self.flush_interval_ms));
let mut shutdown = Controller::shutdown_handle();
Expand All @@ -264,7 +265,7 @@ impl Service for AggregatorService {
else => break,
}
}
});
})
}
}

Expand Down Expand Up @@ -321,6 +322,7 @@ mod tests {

use relay_common::time::UnixTimestamp;
use relay_metrics::{aggregator::AggregatorConfig, BucketMetadata, BucketValue};
use tokio::task::JoinHandle;

use super::*;

Expand Down Expand Up @@ -361,7 +363,7 @@ mod tests {
impl Service for TestReceiver {
type Interface = TestInterface;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
let buckets = message.0.buckets;
Expand All @@ -370,7 +372,7 @@ mod tests {
self.add_buckets(buckets);
}
}
});
})
}
}

Expand Down
5 changes: 3 additions & 2 deletions relay-server/src/services/metrics/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use relay_config::aggregator::Condition;
use relay_config::{AggregatorServiceConfig, ScopedAggregatorConfig};
use relay_metrics::MetricNamespace;
use relay_system::{Addr, NoResponse, Recipient, Service};
use tokio::task::JoinHandle;

use crate::services::metrics::{
Aggregator, AggregatorHandle, AggregatorService, FlushBuckets, MergeBuckets,
Expand Down Expand Up @@ -53,7 +54,7 @@ impl RouterService {
impl Service for RouterService {
type Interface = Aggregator;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) -> JoinHandle<()> {
tokio::spawn(async move {
let mut router = StartedRouter::start(self);
relay_log::info!("metrics router started");
Expand All @@ -72,7 +73,7 @@ impl Service for RouterService {
}
}
relay_log::info!("metrics router stopped");
});
})
}
}

Expand Down
Loading
Loading