Skip to content

Commit

Permalink
Fix span linking (#4)
Browse files Browse the repository at this point in the history
* Test spans

* Fix following from the shared process span

* Use beta of tracing-capture
  • Loading branch information
ThomWright authored Apr 29, 2024
1 parent 35af5d0 commit e6fc44d
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 43 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
/target
/Cargo.lock

.vscode

tmp
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,5 @@ doc-comment = "0.3.3"
futures = "0.3.29"
tokio = { version = "1.28.1", features = ["rt", "macros", "test-util"] }
tokio-test = "0.4.3"
tracing-capture = "0.2.0-beta.1"
tracing-subscriber = "0.3.18"
19 changes: 12 additions & 7 deletions src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio::{
task::JoinHandle,
time::Instant,
};
use tracing::{debug, span, Instrument, Level};
use tracing::{debug, span, Instrument, Level, Span};

use crate::{batcher::Processor, error::Result, worker::Message, BatchError};

Expand All @@ -22,11 +22,13 @@ pub(crate) struct BatchItem<K, I, O, E: Display> {
pub key: K,
pub input: I,
/// Used to send the output back.
pub tx: oneshot::Sender<Result<O, E>>,
pub tx: SendOutput<O, E>,
/// This item was added to the batch as part of this span.
pub span_id: Option<span::Id>,
pub requesting_span: Span,
}

type SendOutput<O, E> = oneshot::Sender<(Result<O, E>, Option<Span>)>;

/// A batch of items to process.
#[derive(Debug)]
pub(crate) struct Batch<K, I, O, E: Display> {
Expand Down Expand Up @@ -178,13 +180,13 @@ where
let mut items = Vec::new();
mem::swap(&mut self.items, &mut items);

let (inputs, txs): (Vec<I>, Vec<oneshot::Sender<Result<O, E>>>) = items
let (inputs, txs): (Vec<I>, Vec<SendOutput<O, E>>) = items
.into_iter()
.map(|item| {
// Link the shared batch processing span to the span for each batch item. We
// don't use a parent relationship because that's 1:many (parent:child), and
// this is many:1.
span.follows_from(item.span_id);
span.follows_from(item.requesting_span.id());

(item.input, item.tx)
})
Expand All @@ -194,7 +196,7 @@ where

let result = processor
.process(self.key.clone(), inputs.into_iter())
.instrument(span)
.instrument(span.clone())
.await;

let outputs: Vec<_> = match result {
Expand All @@ -203,7 +205,10 @@ where
};

for (tx, output) in txs.into_iter().zip(outputs) {
if tx.send(output.map_err(BatchError::BatchFailed)).is_err() {
if tx
.send((output.map_err(BatchError::BatchFailed), Some(span.clone())))
.is_err()
{
// Whatever was waiting for the output must have shut down. Presumably it
// doesn't care anymore, but we log here anyway. There's not much else we can do
// here.
Expand Down
36 changes: 32 additions & 4 deletions src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{fmt::Display, hash::Hash, sync::Arc};

use async_trait::async_trait;
use tokio::sync::{mpsc, oneshot};
use tracing::Span;
use tracing::{span, Level, Span};

use crate::{
batch::BatchItem,
Expand Down Expand Up @@ -71,19 +71,47 @@ where
/// Add an item to the batch and await the result.
pub async fn add(&self, key: K, input: I) -> Result<O, E> {
// Record the span ID so we can link the shared processing span.
let span_id = Span::current().id();
let requesting_span = Span::current().clone();

let (tx, rx) = oneshot::channel();
self.item_tx
.send(BatchItem {
key,
input,
tx,
span_id,
requesting_span,
})
.await?;

rx.await?
let (output, batch_span) = rx.await?;

{
let link_back_span = span!(Level::INFO, "batch finished");
if let Some(span) = batch_span {
// WARNING: It's very important that we don't drop the span until _after_
// follows_from().
//
// If we did e.g. `.follows_from(span)` then the span would get converted into an ID
// and dropped. Any attempt to look up the span by ID _inside_ follows_from() would
// then panic, because the span will have been closed and no longer exist.
//
// Don't ask me how long this took me to debug.
link_back_span.follows_from(&span);
link_back_span.in_scope(|| {
// Do nothing. This span is just here to work around a Honeycomb limitation:
//
// If the batch span is linked to a parent span like so:
//
// parent_span_1 <-link- batch_span
//
// then in Honeycomb, the link is only shown on the batch span. It it not possible
// to click through to the batch span from the parent.
//
// So, here we link back to the batch to make this easier.
});
}
}
output
}
}

Expand Down
107 changes: 107 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,110 @@ mod worker;
pub use batcher::{Batcher, Processor};
pub use error::BatchError;
pub use policies::{BatchingPolicy, Limits, OnFull};

#[cfg(test)]
mod test {
use std::time::Duration;

use async_trait::async_trait;
use tokio::join;
use tracing::{span, Instrument};

use crate::{Batcher, BatchingPolicy, Limits, Processor};

#[derive(Debug, Clone)]
pub struct SimpleBatchProcessor(pub Duration);

#[async_trait]
impl Processor<String, String, String> for SimpleBatchProcessor {
async fn process(
&self,
key: String,
inputs: impl Iterator<Item = String> + Send,
) -> Result<Vec<String>, String> {
tokio::time::sleep(self.0).await;
Ok(inputs.map(|s| s + " processed for " + &key).collect())
}
}

#[tokio::test]
async fn test_tracing() {
use tracing::Level;
use tracing_capture::{CaptureLayer, SharedStorage};
use tracing_subscriber::layer::SubscriberExt;

let subscriber = tracing_subscriber::fmt()
.pretty()
.with_max_level(Level::INFO)
.finish();
// Add the capturing layer.
let storage = SharedStorage::default();
let subscriber = subscriber.with(CaptureLayer::new(&storage));

// Capture tracing information.
let _guard = tracing::subscriber::set_default(subscriber);

let batcher = Batcher::new(
SimpleBatchProcessor(Duration::ZERO),
Limits::default().max_batch_size(2),
BatchingPolicy::Size,
);

let h1 = {
tokio_test::task::spawn({
let span = span!(Level::INFO, "test_handler_span1");

batcher
.add("A".to_string(), "1".to_string())
.instrument(span)
})
};
let h2 = {
tokio_test::task::spawn({
let span = span!(Level::INFO, "test_handler_span2");

batcher
.add("A".to_string(), "2".to_string())
.instrument(span)
})
};

let (_o1, _o2) = join!(h1, h2);

let storage = storage.lock();

let process_span: Vec<_> = storage
.all_spans()
.filter(|span| span.metadata().name().contains("process batch"))
.collect();
assert_eq!(
process_span.len(),
1,
"should be a single span for processing the batch"
);

assert_eq!(
process_span.first().unwrap().follows_from().len(),
2,
"should follow from both handler spans"
);

let link_back_spans: Vec<_> = storage
.all_spans()
.filter(|span| span.metadata().name().contains("batch finished"))
.collect();
assert_eq!(
link_back_spans.len(),
2,
"should be two spans for linking back to the process span"
);

for span in link_back_spans {
assert_eq!(
span.follows_from().len(),
1,
"should follow from the process span"
);
}
}
}
15 changes: 10 additions & 5 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ where
batch.push(item);
}
PreAdd::Reject(reason) => {
if item.tx.send(Err(BatchError::Rejected(reason))).is_err() {
if item
.tx
.send((Err(BatchError::Rejected(reason)), None))
.is_err()
{
// Whatever was waiting for the output must have shut down. Presumably it
// doesn't care anymore, but we log here anyway. There's not much else we can do
// here.
Expand Down Expand Up @@ -173,6 +177,7 @@ impl Drop for WorkerHandle {
mod test {
use async_trait::async_trait;
use tokio::sync::oneshot;
use tracing::Span;

use super::*;

Expand Down Expand Up @@ -205,7 +210,7 @@ mod test {
key: "K1".to_string(),
input: "I1".to_string(),
tx,
span_id: None,
requesting_span: Span::none(),
})
.await
.unwrap();
Expand All @@ -220,16 +225,16 @@ mod test {
key: "K1".to_string(),
input: "I2".to_string(),
tx,
span_id: None,
requesting_span: Span::none(),
})
.await
.unwrap();

rx
};

let o1 = rx1.await.unwrap().unwrap();
let o2 = rx2.await.unwrap().unwrap();
let o1 = rx1.await.unwrap().0.unwrap();
let o2 = rx2.await.unwrap().0.unwrap();

assert_eq!(o1, "I1 processed".to_string());
assert_eq!(o2, "I2 processed".to_string());
Expand Down
31 changes: 4 additions & 27 deletions tests/mod.rs → tests/tests.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,12 @@
use std::{marker::Send, time::Duration};
use std::time::Duration;

use async_trait::async_trait;
use batch_aint_one::{Batcher, BatchingPolicy, Limits, OnFull, Processor};
use batch_aint_one::{Batcher, BatchingPolicy, Limits, OnFull};
use futures::future::join_all;
use tokio::{join, time::Instant};
// use tokio_test::assert_elapsed;

#[derive(Debug, Clone)]
struct SimpleBatchProcessor(Duration);

#[async_trait]
impl Processor<String, String, String> for SimpleBatchProcessor {
async fn process(
&self,
key: String,
inputs: impl Iterator<Item = String> + Send,
) -> Result<Vec<String>, String> {
tokio::time::sleep(self.0).await;
Ok(inputs.map(|s| s + " processed for " + &key).collect())
}
}

struct NotCloneable {}
type Cloneable = Batcher<String, NotCloneable, NotCloneable>;
use crate::types::SimpleBatchProcessor;

/// A [Batcher] should be cloneable, even when the `I`s and `O`s are not.
#[derive(Clone)]
#[allow(unused)]
struct CanDeriveClone {
batcher: Cloneable,
}
mod types;

#[tokio::test]
async fn strategy_size() {
Expand Down
29 changes: 29 additions & 0 deletions tests/types/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::time::Duration;

use async_trait::async_trait;
use batch_aint_one::{Batcher, Processor};

#[derive(Debug, Clone)]
pub struct SimpleBatchProcessor(pub Duration);

#[async_trait]
impl Processor<String, String, String> for SimpleBatchProcessor {
async fn process(
&self,
key: String,
inputs: impl Iterator<Item = String> + Send,
) -> Result<Vec<String>, String> {
tokio::time::sleep(self.0).await;
Ok(inputs.map(|s| s + " processed for " + &key).collect())
}
}

struct NotCloneable {}
type Cloneable = Batcher<String, NotCloneable, NotCloneable>;

/// A [Batcher] should be cloneable, even when the `I`s and `O`s are not.
#[derive(Clone)]
#[allow(unused)]
struct CanDeriveClone {
batcher: Cloneable,
}

0 comments on commit e6fc44d

Please sign in to comment.