From e6fc44d0669ea47fa7e2e37866a158d3b160ff22 Mon Sep 17 00:00:00 2001 From: Thom Wright Date: Mon, 29 Apr 2024 11:29:31 +0100 Subject: [PATCH] Fix span linking (#4) * Test spans * Fix following from the shared process span * Use beta of tracing-capture --- .gitignore | 2 + Cargo.toml | 2 + src/batch.rs | 19 ++++--- src/batcher.rs | 36 +++++++++++-- src/lib.rs | 107 +++++++++++++++++++++++++++++++++++++ src/worker.rs | 15 ++++-- tests/{mod.rs => tests.rs} | 31 ++--------- tests/types/mod.rs | 29 ++++++++++ 8 files changed, 198 insertions(+), 43 deletions(-) rename tests/{mod.rs => tests.rs} (93%) create mode 100644 tests/types/mod.rs diff --git a/.gitignore b/.gitignore index b22b4ec..2307bac 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ /target /Cargo.lock +.vscode + tmp diff --git a/Cargo.toml b/Cargo.toml index 6d30522..e19ee69 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,3 +31,5 @@ doc-comment = "0.3.3" futures = "0.3.29" tokio = { version = "1.28.1", features = ["rt", "macros", "test-util"] } tokio-test = "0.4.3" +tracing-capture = "0.2.0-beta.1" +tracing-subscriber = "0.3.18" diff --git a/src/batch.rs b/src/batch.rs index d03a9d4..0af6fd3 100644 --- a/src/batch.rs +++ b/src/batch.rs @@ -13,7 +13,7 @@ use tokio::{ task::JoinHandle, time::Instant, }; -use tracing::{debug, span, Instrument, Level}; +use tracing::{debug, span, Instrument, Level, Span}; use crate::{batcher::Processor, error::Result, worker::Message, BatchError}; @@ -22,11 +22,13 @@ pub(crate) struct BatchItem { pub key: K, pub input: I, /// Used to send the output back. - pub tx: oneshot::Sender>, + pub tx: SendOutput, /// This item was added to the batch as part of this span. - pub span_id: Option, + pub requesting_span: Span, } +type SendOutput = oneshot::Sender<(Result, Option)>; + /// A batch of items to process. #[derive(Debug)] pub(crate) struct Batch { @@ -178,13 +180,13 @@ where let mut items = Vec::new(); mem::swap(&mut self.items, &mut items); - let (inputs, txs): (Vec, Vec>>) = items + let (inputs, txs): (Vec, Vec>) = items .into_iter() .map(|item| { // Link the shared batch processing span to the span for each batch item. We // don't use a parent relationship because that's 1:many (parent:child), and // this is many:1. - span.follows_from(item.span_id); + span.follows_from(item.requesting_span.id()); (item.input, item.tx) }) @@ -194,7 +196,7 @@ where let result = processor .process(self.key.clone(), inputs.into_iter()) - .instrument(span) + .instrument(span.clone()) .await; let outputs: Vec<_> = match result { @@ -203,7 +205,10 @@ where }; for (tx, output) in txs.into_iter().zip(outputs) { - if tx.send(output.map_err(BatchError::BatchFailed)).is_err() { + if tx + .send((output.map_err(BatchError::BatchFailed), Some(span.clone()))) + .is_err() + { // Whatever was waiting for the output must have shut down. Presumably it // doesn't care anymore, but we log here anyway. There's not much else we can do // here. diff --git a/src/batcher.rs b/src/batcher.rs index 480dbd7..4a4a03f 100644 --- a/src/batcher.rs +++ b/src/batcher.rs @@ -2,7 +2,7 @@ use std::{fmt::Display, hash::Hash, sync::Arc}; use async_trait::async_trait; use tokio::sync::{mpsc, oneshot}; -use tracing::Span; +use tracing::{span, Level, Span}; use crate::{ batch::BatchItem, @@ -71,7 +71,7 @@ where /// Add an item to the batch and await the result. pub async fn add(&self, key: K, input: I) -> Result { // Record the span ID so we can link the shared processing span. - let span_id = Span::current().id(); + let requesting_span = Span::current().clone(); let (tx, rx) = oneshot::channel(); self.item_tx @@ -79,11 +79,39 @@ where key, input, tx, - span_id, + requesting_span, }) .await?; - rx.await? + let (output, batch_span) = rx.await?; + + { + let link_back_span = span!(Level::INFO, "batch finished"); + if let Some(span) = batch_span { + // WARNING: It's very important that we don't drop the span until _after_ + // follows_from(). + // + // If we did e.g. `.follows_from(span)` then the span would get converted into an ID + // and dropped. Any attempt to look up the span by ID _inside_ follows_from() would + // then panic, because the span will have been closed and no longer exist. + // + // Don't ask me how long this took me to debug. + link_back_span.follows_from(&span); + link_back_span.in_scope(|| { + // Do nothing. This span is just here to work around a Honeycomb limitation: + // + // If the batch span is linked to a parent span like so: + // + // parent_span_1 <-link- batch_span + // + // then in Honeycomb, the link is only shown on the batch span. It it not possible + // to click through to the batch span from the parent. + // + // So, here we link back to the batch to make this easier. + }); + } + } + output } } diff --git a/src/lib.rs b/src/lib.rs index 50e4741..9698e96 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,3 +26,110 @@ mod worker; pub use batcher::{Batcher, Processor}; pub use error::BatchError; pub use policies::{BatchingPolicy, Limits, OnFull}; + +#[cfg(test)] +mod test { + use std::time::Duration; + + use async_trait::async_trait; + use tokio::join; + use tracing::{span, Instrument}; + + use crate::{Batcher, BatchingPolicy, Limits, Processor}; + + #[derive(Debug, Clone)] + pub struct SimpleBatchProcessor(pub Duration); + + #[async_trait] + impl Processor for SimpleBatchProcessor { + async fn process( + &self, + key: String, + inputs: impl Iterator + Send, + ) -> Result, String> { + tokio::time::sleep(self.0).await; + Ok(inputs.map(|s| s + " processed for " + &key).collect()) + } + } + + #[tokio::test] + async fn test_tracing() { + use tracing::Level; + use tracing_capture::{CaptureLayer, SharedStorage}; + use tracing_subscriber::layer::SubscriberExt; + + let subscriber = tracing_subscriber::fmt() + .pretty() + .with_max_level(Level::INFO) + .finish(); + // Add the capturing layer. + let storage = SharedStorage::default(); + let subscriber = subscriber.with(CaptureLayer::new(&storage)); + + // Capture tracing information. + let _guard = tracing::subscriber::set_default(subscriber); + + let batcher = Batcher::new( + SimpleBatchProcessor(Duration::ZERO), + Limits::default().max_batch_size(2), + BatchingPolicy::Size, + ); + + let h1 = { + tokio_test::task::spawn({ + let span = span!(Level::INFO, "test_handler_span1"); + + batcher + .add("A".to_string(), "1".to_string()) + .instrument(span) + }) + }; + let h2 = { + tokio_test::task::spawn({ + let span = span!(Level::INFO, "test_handler_span2"); + + batcher + .add("A".to_string(), "2".to_string()) + .instrument(span) + }) + }; + + let (_o1, _o2) = join!(h1, h2); + + let storage = storage.lock(); + + let process_span: Vec<_> = storage + .all_spans() + .filter(|span| span.metadata().name().contains("process batch")) + .collect(); + assert_eq!( + process_span.len(), + 1, + "should be a single span for processing the batch" + ); + + assert_eq!( + process_span.first().unwrap().follows_from().len(), + 2, + "should follow from both handler spans" + ); + + let link_back_spans: Vec<_> = storage + .all_spans() + .filter(|span| span.metadata().name().contains("batch finished")) + .collect(); + assert_eq!( + link_back_spans.len(), + 2, + "should be two spans for linking back to the process span" + ); + + for span in link_back_spans { + assert_eq!( + span.follows_from().len(), + 1, + "should follow from the process span" + ); + } + } +} diff --git a/src/worker.rs b/src/worker.rs index d5c26ec..d138394 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -102,7 +102,11 @@ where batch.push(item); } PreAdd::Reject(reason) => { - if item.tx.send(Err(BatchError::Rejected(reason))).is_err() { + if item + .tx + .send((Err(BatchError::Rejected(reason)), None)) + .is_err() + { // Whatever was waiting for the output must have shut down. Presumably it // doesn't care anymore, but we log here anyway. There's not much else we can do // here. @@ -173,6 +177,7 @@ impl Drop for WorkerHandle { mod test { use async_trait::async_trait; use tokio::sync::oneshot; + use tracing::Span; use super::*; @@ -205,7 +210,7 @@ mod test { key: "K1".to_string(), input: "I1".to_string(), tx, - span_id: None, + requesting_span: Span::none(), }) .await .unwrap(); @@ -220,7 +225,7 @@ mod test { key: "K1".to_string(), input: "I2".to_string(), tx, - span_id: None, + requesting_span: Span::none(), }) .await .unwrap(); @@ -228,8 +233,8 @@ mod test { rx }; - let o1 = rx1.await.unwrap().unwrap(); - let o2 = rx2.await.unwrap().unwrap(); + let o1 = rx1.await.unwrap().0.unwrap(); + let o2 = rx2.await.unwrap().0.unwrap(); assert_eq!(o1, "I1 processed".to_string()); assert_eq!(o2, "I2 processed".to_string()); diff --git a/tests/mod.rs b/tests/tests.rs similarity index 93% rename from tests/mod.rs rename to tests/tests.rs index 612417e..5c13ab3 100644 --- a/tests/mod.rs +++ b/tests/tests.rs @@ -1,35 +1,12 @@ -use std::{marker::Send, time::Duration}; +use std::time::Duration; -use async_trait::async_trait; -use batch_aint_one::{Batcher, BatchingPolicy, Limits, OnFull, Processor}; +use batch_aint_one::{Batcher, BatchingPolicy, Limits, OnFull}; use futures::future::join_all; use tokio::{join, time::Instant}; -// use tokio_test::assert_elapsed; - -#[derive(Debug, Clone)] -struct SimpleBatchProcessor(Duration); - -#[async_trait] -impl Processor for SimpleBatchProcessor { - async fn process( - &self, - key: String, - inputs: impl Iterator + Send, - ) -> Result, String> { - tokio::time::sleep(self.0).await; - Ok(inputs.map(|s| s + " processed for " + &key).collect()) - } -} -struct NotCloneable {} -type Cloneable = Batcher; +use crate::types::SimpleBatchProcessor; -/// A [Batcher] should be cloneable, even when the `I`s and `O`s are not. -#[derive(Clone)] -#[allow(unused)] -struct CanDeriveClone { - batcher: Cloneable, -} +mod types; #[tokio::test] async fn strategy_size() { diff --git a/tests/types/mod.rs b/tests/types/mod.rs new file mode 100644 index 0000000..b49ef83 --- /dev/null +++ b/tests/types/mod.rs @@ -0,0 +1,29 @@ +use std::time::Duration; + +use async_trait::async_trait; +use batch_aint_one::{Batcher, Processor}; + +#[derive(Debug, Clone)] +pub struct SimpleBatchProcessor(pub Duration); + +#[async_trait] +impl Processor for SimpleBatchProcessor { + async fn process( + &self, + key: String, + inputs: impl Iterator + Send, + ) -> Result, String> { + tokio::time::sleep(self.0).await; + Ok(inputs.map(|s| s + " processed for " + &key).collect()) + } +} + +struct NotCloneable {} +type Cloneable = Batcher; + +/// A [Batcher] should be cloneable, even when the `I`s and `O`s are not. +#[derive(Clone)] +#[allow(unused)] +struct CanDeriveClone { + batcher: Cloneable, +}