Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix span linking #4

Merged
merged 4 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
/target
/Cargo.lock

.vscode

tmp
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,5 @@ doc-comment = "0.3.3"
futures = "0.3.29"
tokio = { version = "1.28.1", features = ["rt", "macros", "test-util"] }
tokio-test = "0.4.3"
tracing-capture = "0.2.0-beta.1"
tracing-subscriber = "0.3.18"
19 changes: 12 additions & 7 deletions src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio::{
task::JoinHandle,
time::Instant,
};
use tracing::{debug, span, Instrument, Level};
use tracing::{debug, span, Instrument, Level, Span};

use crate::{batcher::Processor, error::Result, worker::Message, BatchError};

Expand All @@ -22,11 +22,13 @@ pub(crate) struct BatchItem<K, I, O, E: Display> {
pub key: K,
pub input: I,
/// Used to send the output back.
pub tx: oneshot::Sender<Result<O, E>>,
pub tx: SendOutput<O, E>,
/// This item was added to the batch as part of this span.
pub span_id: Option<span::Id>,
pub requesting_span: Span,
}

type SendOutput<O, E> = oneshot::Sender<(Result<O, E>, Option<Span>)>;

/// A batch of items to process.
#[derive(Debug)]
pub(crate) struct Batch<K, I, O, E: Display> {
Expand Down Expand Up @@ -178,13 +180,13 @@ where
let mut items = Vec::new();
mem::swap(&mut self.items, &mut items);

let (inputs, txs): (Vec<I>, Vec<oneshot::Sender<Result<O, E>>>) = items
let (inputs, txs): (Vec<I>, Vec<SendOutput<O, E>>) = items
.into_iter()
.map(|item| {
// Link the shared batch processing span to the span for each batch item. We
// don't use a parent relationship because that's 1:many (parent:child), and
// this is many:1.
span.follows_from(item.span_id);
span.follows_from(item.requesting_span.id());

(item.input, item.tx)
})
Expand All @@ -194,7 +196,7 @@ where

let result = processor
.process(self.key.clone(), inputs.into_iter())
.instrument(span)
.instrument(span.clone())
.await;

let outputs: Vec<_> = match result {
Expand All @@ -203,7 +205,10 @@ where
};

for (tx, output) in txs.into_iter().zip(outputs) {
if tx.send(output.map_err(BatchError::BatchFailed)).is_err() {
if tx
.send((output.map_err(BatchError::BatchFailed), Some(span.clone())))
.is_err()
{
// Whatever was waiting for the output must have shut down. Presumably it
// doesn't care anymore, but we log here anyway. There's not much else we can do
// here.
Expand Down
36 changes: 32 additions & 4 deletions src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{fmt::Display, hash::Hash, sync::Arc};

use async_trait::async_trait;
use tokio::sync::{mpsc, oneshot};
use tracing::Span;
use tracing::{span, Level, Span};

use crate::{
batch::BatchItem,
Expand Down Expand Up @@ -71,19 +71,47 @@ where
/// Add an item to the batch and await the result.
pub async fn add(&self, key: K, input: I) -> Result<O, E> {
// Record the span ID so we can link the shared processing span.
let span_id = Span::current().id();
let requesting_span = Span::current().clone();

let (tx, rx) = oneshot::channel();
self.item_tx
.send(BatchItem {
key,
input,
tx,
span_id,
requesting_span,
})
.await?;

rx.await?
let (output, batch_span) = rx.await?;

{
let link_back_span = span!(Level::INFO, "batch finished");
if let Some(span) = batch_span {
// WARNING: It's very important that we don't drop the span until _after_
// follows_from().
//
// If we did e.g. `.follows_from(span)` then the span would get converted into an ID
// and dropped. Any attempt to look up the span by ID _inside_ follows_from() would
// then panic, because the span will have been closed and no longer exist.
//
// Don't ask me how long this took me to debug.
link_back_span.follows_from(&span);
link_back_span.in_scope(|| {
// Do nothing. This span is just here to work around a Honeycomb limitation:
//
// If the batch span is linked to a parent span like so:
//
// parent_span_1 <-link- batch_span
//
// then in Honeycomb, the link is only shown on the batch span. It it not possible
// to click through to the batch span from the parent.
//
// So, here we link back to the batch to make this easier.
});
}
}
output
}
}

Expand Down
107 changes: 107 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,110 @@ mod worker;
pub use batcher::{Batcher, Processor};
pub use error::BatchError;
pub use policies::{BatchingPolicy, Limits, OnFull};

#[cfg(test)]
mod test {
use std::time::Duration;

use async_trait::async_trait;
use tokio::join;
use tracing::{span, Instrument};

use crate::{Batcher, BatchingPolicy, Limits, Processor};

#[derive(Debug, Clone)]
pub struct SimpleBatchProcessor(pub Duration);

#[async_trait]
impl Processor<String, String, String> for SimpleBatchProcessor {
async fn process(
&self,
key: String,
inputs: impl Iterator<Item = String> + Send,
) -> Result<Vec<String>, String> {
tokio::time::sleep(self.0).await;
Ok(inputs.map(|s| s + " processed for " + &key).collect())
}
}

#[tokio::test]
async fn test_tracing() {
use tracing::Level;
use tracing_capture::{CaptureLayer, SharedStorage};
use tracing_subscriber::layer::SubscriberExt;

let subscriber = tracing_subscriber::fmt()
.pretty()
.with_max_level(Level::INFO)
.finish();
// Add the capturing layer.
let storage = SharedStorage::default();
let subscriber = subscriber.with(CaptureLayer::new(&storage));

// Capture tracing information.
let _guard = tracing::subscriber::set_default(subscriber);

let batcher = Batcher::new(
SimpleBatchProcessor(Duration::ZERO),
Limits::default().max_batch_size(2),
BatchingPolicy::Size,
);

let h1 = {
tokio_test::task::spawn({
let span = span!(Level::INFO, "test_handler_span1");

batcher
.add("A".to_string(), "1".to_string())
.instrument(span)
})
};
let h2 = {
tokio_test::task::spawn({
let span = span!(Level::INFO, "test_handler_span2");

batcher
.add("A".to_string(), "2".to_string())
.instrument(span)
})
};

let (_o1, _o2) = join!(h1, h2);

let storage = storage.lock();

let process_span: Vec<_> = storage
.all_spans()
.filter(|span| span.metadata().name().contains("process batch"))
.collect();
assert_eq!(
process_span.len(),
1,
"should be a single span for processing the batch"
);

assert_eq!(
process_span.first().unwrap().follows_from().len(),
2,
"should follow from both handler spans"
);

let link_back_spans: Vec<_> = storage
.all_spans()
.filter(|span| span.metadata().name().contains("batch finished"))
.collect();
assert_eq!(
link_back_spans.len(),
2,
"should be two spans for linking back to the process span"
);

for span in link_back_spans {
assert_eq!(
span.follows_from().len(),
1,
"should follow from the process span"
);
}
}
}
15 changes: 10 additions & 5 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ where
batch.push(item);
}
PreAdd::Reject(reason) => {
if item.tx.send(Err(BatchError::Rejected(reason))).is_err() {
if item
.tx
.send((Err(BatchError::Rejected(reason)), None))
.is_err()
{
// Whatever was waiting for the output must have shut down. Presumably it
// doesn't care anymore, but we log here anyway. There's not much else we can do
// here.
Expand Down Expand Up @@ -173,6 +177,7 @@ impl Drop for WorkerHandle {
mod test {
use async_trait::async_trait;
use tokio::sync::oneshot;
use tracing::Span;

use super::*;

Expand Down Expand Up @@ -205,7 +210,7 @@ mod test {
key: "K1".to_string(),
input: "I1".to_string(),
tx,
span_id: None,
requesting_span: Span::none(),
})
.await
.unwrap();
Expand All @@ -220,16 +225,16 @@ mod test {
key: "K1".to_string(),
input: "I2".to_string(),
tx,
span_id: None,
requesting_span: Span::none(),
})
.await
.unwrap();

rx
};

let o1 = rx1.await.unwrap().unwrap();
let o2 = rx2.await.unwrap().unwrap();
let o1 = rx1.await.unwrap().0.unwrap();
let o2 = rx2.await.unwrap().0.unwrap();

assert_eq!(o1, "I1 processed".to_string());
assert_eq!(o2, "I2 processed".to_string());
Expand Down
31 changes: 4 additions & 27 deletions tests/mod.rs → tests/tests.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,12 @@
use std::{marker::Send, time::Duration};
use std::time::Duration;

use async_trait::async_trait;
use batch_aint_one::{Batcher, BatchingPolicy, Limits, OnFull, Processor};
use batch_aint_one::{Batcher, BatchingPolicy, Limits, OnFull};
use futures::future::join_all;
use tokio::{join, time::Instant};
// use tokio_test::assert_elapsed;

#[derive(Debug, Clone)]
struct SimpleBatchProcessor(Duration);

#[async_trait]
impl Processor<String, String, String> for SimpleBatchProcessor {
async fn process(
&self,
key: String,
inputs: impl Iterator<Item = String> + Send,
) -> Result<Vec<String>, String> {
tokio::time::sleep(self.0).await;
Ok(inputs.map(|s| s + " processed for " + &key).collect())
}
}

struct NotCloneable {}
type Cloneable = Batcher<String, NotCloneable, NotCloneable>;
use crate::types::SimpleBatchProcessor;

/// A [Batcher] should be cloneable, even when the `I`s and `O`s are not.
#[derive(Clone)]
#[allow(unused)]
struct CanDeriveClone {
batcher: Cloneable,
}
mod types;

#[tokio::test]
async fn strategy_size() {
Expand Down
29 changes: 29 additions & 0 deletions tests/types/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::time::Duration;

use async_trait::async_trait;
use batch_aint_one::{Batcher, Processor};

#[derive(Debug, Clone)]
pub struct SimpleBatchProcessor(pub Duration);

#[async_trait]
impl Processor<String, String, String> for SimpleBatchProcessor {
async fn process(
&self,
key: String,
inputs: impl Iterator<Item = String> + Send,
) -> Result<Vec<String>, String> {
tokio::time::sleep(self.0).await;
Ok(inputs.map(|s| s + " processed for " + &key).collect())
}
}

struct NotCloneable {}
type Cloneable = Batcher<String, NotCloneable, NotCloneable>;

/// A [Batcher] should be cloneable, even when the `I`s and `O`s are not.
#[derive(Clone)]
#[allow(unused)]
struct CanDeriveClone {
batcher: Cloneable,
}
Loading