Skip to content

Commit

Permalink
Increase queueing capacity
Browse files Browse the repository at this point in the history
Originally only one batch could be queued up at a time.

If the available processing concurrency is e.g. 10 batches of 10 items,
then the worst case is that all batches finish at the same time and we
only have 10 new items to process, using only 10% of the available
capacity.

If we can queue up the same number of items as we can process, then we
would be able to use 100% of the processing capacity in this case.

To that end, we now queue multiple batches ready for processing, instead
of just one.
  • Loading branch information
ThomWright committed Aug 25, 2024
1 parent 556be4d commit 60a4ac2
Show file tree
Hide file tree
Showing 10 changed files with 426 additions and 94 deletions.
20 changes: 14 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ Batch up multiple items for processing as a single unit.

Sometimes it is more efficient to process many items at once rather than one at a time. Especially when the processing step has overheads which can be shared between many items.

### Example: Inserting multiple rows into a database
Often applications work with one item at a time, e.g. _select one row_ or _insert one row_. Many of these operations can be batched up into more efficient versions: _select many rows_ and _insert many rows_.

## How

A worker task is run in the background. Many client tasks (e.g. message handlers) can submit items to the worker and wait for them to be processed. The worker task batches together many items and processes them as one unit, before sending a result back to each calling task.

## Use case: Inserting multiple rows into a database

For example, each database operation – such as an `INSERT` – has the overhead of a round trip to the database.

Expand All @@ -23,7 +29,7 @@ Multi-row inserts can share this overhead between many items. This also allows u

![Batched example](./docs/images/example-insert-batched.png)

### Example: With transactions and locking
## Use case: With transactions and locking

Inserts into database tables can often be done concurrently. In some cases these must be done serially, enforced using locks. This can be a significant throughput bottleneck.

Expand All @@ -35,10 +41,6 @@ With batching, we can improve the throughput. Acquiring/releasing the lock and b

![Batched example](./docs/images/example-batched.png)

## How

A worker task is run in the background and items are submitted to it for batching. Batches are processed in their own tasks, concurrently.

## Example

```rust
Expand Down Expand Up @@ -103,6 +105,12 @@ tokio_test::block_on(async {
});
```

## FAQ

**If the worker needs to wait to receive multiple items, won't this increase latency?**

This depends on the batching policy used. `BatchingPolicy::Immediate` optimises for latency and processes items as soon as possible.

## Roadmap

- [x] Tests
Expand Down
96 changes: 64 additions & 32 deletions src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl Generation {
}

impl<K, I, O, E: Display> Batch<K, I, O, E> {
pub(crate) fn new(key: K) -> Self {
pub(crate) fn new(key: K, processing: Arc<AtomicUsize>) -> Self {
Self {
key,
generation: Generation::default(),
Expand All @@ -64,7 +64,7 @@ impl<K, I, O, E: Display> Batch<K, I, O, E> {
timeout_deadline: None,
timeout_handle: None,

processing: Arc::<AtomicUsize>::default(),
processing,
}
}

Expand All @@ -85,10 +85,6 @@ impl<K, I, O, E: Display> Batch<K, I, O, E> {
self.len() == max - 1
}

pub(crate) fn processing(&self) -> usize {
self.processing.load(std::sync::atomic::Ordering::Acquire)
}

pub(crate) fn generation(&self) -> Generation {
self.generation
}
Expand All @@ -98,7 +94,12 @@ impl<K, I, O, E: Display> Batch<K, I, O, E> {
}

pub(crate) fn is_processable(&self) -> bool {
// To be processable, we must have some items to process...
self.len() > 0
// ... and if there is a timeout deadline, it must be in the past.
&& self
.timeout_deadline
.map_or(true, |deadline| deadline.checked_duration_since(Instant::now()).is_none())
}

pub(crate) fn push(&mut self, item: BatchItem<K, I, O, E>) {
Expand Down Expand Up @@ -129,7 +130,7 @@ where
O: 'static + Send,
E: 'static + Send + Clone + Display,
{
fn new_generation(&self) -> Self {
pub(crate) fn new_generation(&self) -> Self {
Self {
key: self.key.clone(),
generation: self.generation.next(),
Expand All @@ -142,31 +143,6 @@ where
}
}

/// If a batch exists for the given generation, returns it and replaces it with an empty
/// placeholder for the next generation.
pub fn take_generation_for_processing(
&mut self,
generation: Generation,
) -> Option<Batch<K, I, O, E>> {
if self.is_generation(generation) && self.is_processable() {
let batch = std::mem::replace(self, self.new_generation());

Some(batch)
} else {
None
}
}

pub fn take_batch_for_processing(&mut self) -> Option<Batch<K, I, O, E>> {
if self.is_processable() {
let batch = std::mem::replace(self, self.new_generation());

Some(batch)
} else {
None
}
}

pub(crate) fn process<F>(mut self, processor: F, on_finished: mpsc::Sender<Message<K>>)
where
F: 'static + Send + Clone + Processor<K, I, O, E>,
Expand Down Expand Up @@ -275,3 +251,59 @@ impl<K, I, O, E: Display> Drop for Batch<K, I, O, E> {
}
}
}

#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};

use tokio::{
sync::{mpsc, oneshot},
time,
};
use tracing::Span;

use super::{Batch, BatchItem};

#[tokio::test]
async fn is_processable_timeout() {
time::pause();

let mut batch: Batch<String, String, String, String> =
Batch::new("key".to_string(), Arc::default());

let (tx, _rx) = oneshot::channel();

batch.push(BatchItem {
key: "key".to_string(),
input: "item".to_string(),
tx,
requesting_span: Span::none(),
});

let (tx, _rx) = mpsc::channel(1);
batch.time_out_after(Duration::from_millis(50), tx);

assert!(
!batch.is_processable(),
"should not be processable initially"
);

time::advance(Duration::from_millis(49)).await;

assert!(
!batch.is_processable(),
"should not be processable after 49ms",
);

time::advance(Duration::from_millis(1)).await;

assert!(
!batch.is_processable(),
"should not be processable after 50ms"
);

time::advance(Duration::from_millis(1)).await;

assert!(batch.is_processable(), "should be processable after 51ms");
}
}
133 changes: 133 additions & 0 deletions src/batch_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use std::{
collections::VecDeque,
fmt::Display,
sync::{atomic::AtomicUsize, Arc},
time::Duration,
};

use tokio::sync::mpsc;

use crate::{
batch::{Batch, BatchItem, Generation},
worker::Message,
Limits,
};

/// A double-ended queue for queueing up multiple batches for later processing.
pub(crate) struct BatchQueue<K, I, O, E: Display> {
queue: VecDeque<Batch<K, I, O, E>>,

limits: Limits,

/// The number of batches with this key that are currently processing.
processing: Arc<AtomicUsize>,
}

impl<K, I, O, E: Display> BatchQueue<K, I, O, E> {
pub(crate) fn new(key: K, limits: Limits) -> Self {
// The queue size is the same as the max processing capacity.
let mut queue = VecDeque::with_capacity(limits.max_key_concurrency);

let processing = Arc::<AtomicUsize>::default();
queue.push_back(Batch::new(key, processing.clone()));

Self {
queue,
limits,
processing,
}
}

pub(crate) fn is_next_batch_full(&self) -> bool {
let next = self.queue.front().expect("Should always be non-empty");
next.is_full(self.limits.max_batch_size)
}

pub(crate) fn is_full(&self) -> bool {
let back = self.queue.back().expect("Should always be non-empty");
self.queue.len() == self.limits.max_key_concurrency
&& back.len() == self.limits.max_batch_size
}

pub(crate) fn last_space_in_batch(&self) -> bool {
let back = self.queue.back().expect("Should always be non-empty");
back.has_single_space(self.limits.max_batch_size)
}

pub(crate) fn adding_to_new_batch(&self) -> bool {
let back = self.queue.back().expect("Should always be non-empty");
back.is_new_batch()
}

pub(crate) fn at_max_processing_capacity(&self) -> bool {
self.processing.load(std::sync::atomic::Ordering::Acquire)
>= self.limits.max_key_concurrency
}
}

impl<K, I, O, E> BatchQueue<K, I, O, E>
where
K: 'static + Send + Clone,
I: 'static + Send,
O: 'static + Send,
E: 'static + Send + Clone + Display,
{
pub(crate) fn push(&mut self, item: BatchItem<K, I, O, E>) {
let back = self.queue.back_mut().expect("Should always be non-empty");

if back.is_full(self.limits.max_batch_size) {
let mut new_back = back.new_generation();
new_back.push(item);
self.queue.push_back(new_back);
} else {
back.push(item);
}
}

pub(crate) fn take_next_batch(&mut self) -> Option<Batch<K, I, O, E>> {
let batch = self.queue.front().expect("Should always be non-empty");
if batch.is_processable() {
let batch = self.queue.pop_front().expect("Should always be non-empty");
if self.queue.is_empty() {
self.queue.push_back(batch.new_generation())
}
return Some(batch);
}

None
}

pub(crate) fn take_generation(&mut self, generation: Generation) -> Option<Batch<K, I, O, E>> {
for (index, batch) in self.queue.iter().enumerate() {
if batch.is_generation(generation) {
if batch.is_processable() {
let batch = self
.queue
.remove(index)
.expect("Should exist, we just found it");

if self.queue.is_empty() {
self.queue.push_back(batch.new_generation())
}

return Some(batch);
} else {
return None;
}
}
}

None
}
}

impl<K, I, O, E> BatchQueue<K, I, O, E>
where
K: 'static + Send + Clone,
E: Display,
{
pub(crate) fn time_out_after(&mut self, duration: Duration, tx: mpsc::Sender<Message<K>>) {
let back = self.queue.back_mut().expect("Should always be non-empty");
back.time_out_after(duration, tx);
}
}
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ pub enum BatchError<E: Display> {
#[derive(Debug)]
#[non_exhaustive]
pub enum RejectionReason {
/// The batch is full and still waiting to be processed.
BatchFull,
/// The batch is full and no more batches can be processed concurrently.
MaxConcurrency,
}

Expand Down
23 changes: 16 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@
//! Sometimes it is more efficient to process many items at once rather than one at a time.
//! Especially when the processing step has overheads which can be shared between many items.
//!
//! A worker task is run in the background and items are submitted to it for batching. Batches are
//! processed in their own tasks, concurrently.
//! Often applications work with one item at a time, e.g. _select one row_ or _insert one row_. Many
//! of these operations can be batched up into more efficient versions: _select many rows_ and
//! _insert many rows_.
//!
//! A worker task is run in the background. Many client tasks (e.g. message handlers) can submit
//! items to the worker and wait for them to be processed. The worker task batches together many
//! items and processes them as one unit, before sending a result back to each calling task.
//!
//! See the README for an example.
Expand All @@ -18,6 +23,7 @@ use doc_comment::doctest;
doctest!("../README.md");

mod batch;
mod batch_queue;
mod batcher;
mod error;
mod policies;
Expand All @@ -28,7 +34,7 @@ pub use error::BatchError;
pub use policies::{BatchingPolicy, Limits, OnFull};

#[cfg(test)]
mod test {
mod tests {
use std::time::Duration;

use async_trait::async_trait;
Expand Down Expand Up @@ -94,11 +100,12 @@ mod test {
})
};

let (_o1, _o2) = join!(h1, h2);
let (o1, o2) = join!(h1, h2);

let storage = storage.lock();
assert!(o1.is_ok());
assert!(o2.is_ok());

assert_eq!(storage.all_spans().len(), 5, "should be 5 spans in total");
let storage = storage.lock();

let process_spans: Vec<_> = storage
.all_spans()
Expand Down Expand Up @@ -137,8 +144,10 @@ mod test {
assert_eq!(
span.follows_from().len(),
1,
"should follow from the process span"
"link back spans should follow from the process span"
);
}

assert_eq!(storage.all_spans().len(), 5, "should be 5 spans in total");
}
}
Loading

0 comments on commit 60a4ac2

Please sign in to comment.