Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase queueing capacity #5

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- nightly
- macos
- win-msvc
# - win-gnu
- win-gnu
include:
- build: pinned
os: ubuntu-22.04
Expand All @@ -42,9 +42,9 @@ jobs:
- build: win-msvc
os: windows-2019
rust: stable
# - build: win-gnu
# os: windows-2019
# rust: stable-x86_64-gnu
- build: win-gnu
os: windows-2019
rust: stable-x86_64-gnu
steps:
- name: Check out repository
uses: actions/checkout@v3
Expand All @@ -57,7 +57,7 @@ jobs:

- run: cargo build --verbose
- run: cargo doc --verbose
- run: cargo test --verbose
- run: cargo test --verbose -- --test-threads 1

rustfmt:
name: rustfmt
Expand Down
20 changes: 14 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ Batch up multiple items for processing as a single unit.

Sometimes it is more efficient to process many items at once rather than one at a time. Especially when the processing step has overheads which can be shared between many items.

### Example: Inserting multiple rows into a database
Often applications work with one item at a time, e.g. _select one row_ or _insert one row_. Many of these operations can be batched up into more efficient versions: _select many rows_ and _insert many rows_.

## How

A worker task is run in the background. Many client tasks (e.g. message handlers) can submit items to the worker and wait for them to be processed. The worker task batches together many items and processes them as one unit, before sending a result back to each calling task.

## Use case: Inserting multiple rows into a database

For example, each database operation – such as an `INSERT` – has the overhead of a round trip to the database.

Expand All @@ -23,7 +29,7 @@ Multi-row inserts can share this overhead between many items. This also allows u

![Batched example](./docs/images/example-insert-batched.png)

### Example: With transactions and locking
## Use case: With transactions and locking

Inserts into database tables can often be done concurrently. In some cases these must be done serially, enforced using locks. This can be a significant throughput bottleneck.

Expand All @@ -35,10 +41,6 @@ With batching, we can improve the throughput. Acquiring/releasing the lock and b

![Batched example](./docs/images/example-batched.png)

## How

A worker task is run in the background and items are submitted to it for batching. Batches are processed in their own tasks, concurrently.

## Example

```rust
Expand Down Expand Up @@ -103,6 +105,12 @@ tokio_test::block_on(async {
});
```

## FAQ

**If the worker needs to wait to receive multiple items, won't this increase latency?**

This depends on the batching policy used. `BatchingPolicy::Immediate` optimises for latency and processes items as soon as possible.

## Roadmap

- [x] Tests
Expand Down
101 changes: 66 additions & 35 deletions src/batch.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::{
cmp,
fmt::Display,
mem,
sync::{
atomic::{AtomicUsize, Ordering},
atomic::{self, AtomicUsize},
Arc,
},
time::Duration,
Expand Down Expand Up @@ -55,7 +56,7 @@ impl Generation {
}

impl<K, I, O, E: Display> Batch<K, I, O, E> {
pub(crate) fn new(key: K) -> Self {
pub(crate) fn new(key: K, processing: Arc<AtomicUsize>) -> Self {
Self {
key,
generation: Generation::default(),
Expand All @@ -64,7 +65,7 @@ impl<K, I, O, E: Display> Batch<K, I, O, E> {
timeout_deadline: None,
timeout_handle: None,

processing: Arc::<AtomicUsize>::default(),
processing,
}
}

Expand All @@ -85,10 +86,6 @@ impl<K, I, O, E: Display> Batch<K, I, O, E> {
self.len() == max - 1
}

pub(crate) fn processing(&self) -> usize {
self.processing.load(std::sync::atomic::Ordering::Acquire)
}

pub(crate) fn generation(&self) -> Generation {
self.generation
}
Expand All @@ -98,7 +95,16 @@ impl<K, I, O, E: Display> Batch<K, I, O, E> {
}

pub(crate) fn is_processable(&self) -> bool {
// To be processable, we must have some items to process...
self.len() > 0
// ... and if there is a timeout deadline, it must be in the past.
&& self
.timeout_deadline
.map_or(true, |deadline| match deadline.cmp(&Instant::now()){
cmp::Ordering::Less => true,
cmp::Ordering::Equal => true,
cmp::Ordering::Greater => false,
})
}

pub(crate) fn push(&mut self, item: BatchItem<K, I, O, E>) {
Expand Down Expand Up @@ -129,7 +135,7 @@ where
O: 'static + Send,
E: 'static + Send + Clone + Display,
{
fn new_generation(&self) -> Self {
pub(crate) fn new_generation(&self) -> Self {
Self {
key: self.key.clone(),
generation: self.generation.next(),
Expand All @@ -142,43 +148,19 @@ where
}
}

/// If a batch exists for the given generation, returns it and replaces it with an empty
/// placeholder for the next generation.
pub fn take_generation_for_processing(
&mut self,
generation: Generation,
) -> Option<Batch<K, I, O, E>> {
if self.is_generation(generation) && self.is_processable() {
let batch = std::mem::replace(self, self.new_generation());

Some(batch)
} else {
None
}
}

pub fn take_batch_for_processing(&mut self) -> Option<Batch<K, I, O, E>> {
if self.is_processable() {
let batch = std::mem::replace(self, self.new_generation());

Some(batch)
} else {
None
}
}

pub(crate) fn process<F>(mut self, processor: F, on_finished: mpsc::Sender<Message<K>>)
where
F: 'static + Send + Clone + Processor<K, I, O, E>,
{
self.processing.fetch_add(1, Ordering::AcqRel);
self.processing.fetch_add(1, atomic::Ordering::AcqRel);

self.cancel_timeout();

// Spawn a new task so we can process multiple batches concurrently, without blocking the
// run loop.
tokio::spawn(async move {
let batch_size = self.items.len();

// Convert to u64 so tracing will treat this as an integer instead of a string.
let span = span!(Level::INFO, "process batch", batch_size = batch_size as u64);

Expand Down Expand Up @@ -224,7 +206,7 @@ where
}
}

self.processing.fetch_sub(1, Ordering::AcqRel);
self.processing.fetch_sub(1, atomic::Ordering::AcqRel);

// We're finished with this batch
if on_finished
Expand Down Expand Up @@ -275,3 +257,52 @@ impl<K, I, O, E: Display> Drop for Batch<K, I, O, E> {
}
}
}

#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};

use tokio::{
sync::{mpsc, oneshot},
time,
};
use tracing::Span;

use super::{Batch, BatchItem};

#[tokio::test]
async fn is_processable_timeout() {
time::pause();

let mut batch: Batch<String, String, String, String> =
Batch::new("key".to_string(), Arc::default());

let (tx, _rx) = oneshot::channel();

batch.push(BatchItem {
key: "key".to_string(),
input: "item".to_string(),
tx,
requesting_span: Span::none(),
});

let (tx, _rx) = mpsc::channel(1);
batch.time_out_after(Duration::from_millis(50), tx);

assert!(
!batch.is_processable(),
"should not be processable initially"
);

time::advance(Duration::from_millis(49)).await;

assert!(
!batch.is_processable(),
"should not be processable after 49ms",
);

time::advance(Duration::from_millis(1)).await;

assert!(batch.is_processable(), "should be processable after 50ms");
}
}
133 changes: 133 additions & 0 deletions src/batch_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use std::{
collections::VecDeque,
fmt::Display,
sync::{atomic::AtomicUsize, Arc},
time::Duration,
};

use tokio::sync::mpsc;

use crate::{
batch::{Batch, BatchItem, Generation},
worker::Message,
Limits,
};

/// A double-ended queue for queueing up multiple batches for later processing.
pub(crate) struct BatchQueue<K, I, O, E: Display> {
queue: VecDeque<Batch<K, I, O, E>>,

limits: Limits,

/// The number of batches with this key that are currently processing.
processing: Arc<AtomicUsize>,
}

impl<K, I, O, E: Display> BatchQueue<K, I, O, E> {
pub(crate) fn new(key: K, limits: Limits) -> Self {
// The queue size is the same as the max processing capacity.
let mut queue = VecDeque::with_capacity(limits.max_key_concurrency);

let processing = Arc::<AtomicUsize>::default();
queue.push_back(Batch::new(key, processing.clone()));

Self {
queue,
limits,
processing,
}
}

pub(crate) fn is_next_batch_full(&self) -> bool {
let next = self.queue.front().expect("Should always be non-empty");
next.is_full(self.limits.max_batch_size)
}

pub(crate) fn is_full(&self) -> bool {
let back = self.queue.back().expect("Should always be non-empty");
self.queue.len() == self.limits.max_key_concurrency
&& back.len() == self.limits.max_batch_size
}

pub(crate) fn last_space_in_batch(&self) -> bool {
let back = self.queue.back().expect("Should always be non-empty");
back.has_single_space(self.limits.max_batch_size)
}

pub(crate) fn adding_to_new_batch(&self) -> bool {
let back = self.queue.back().expect("Should always be non-empty");
back.is_new_batch()
}

pub(crate) fn at_max_processing_capacity(&self) -> bool {
self.processing.load(std::sync::atomic::Ordering::Acquire)
>= self.limits.max_key_concurrency
}
}

impl<K, I, O, E> BatchQueue<K, I, O, E>
where
K: 'static + Send + Clone,
I: 'static + Send,
O: 'static + Send,
E: 'static + Send + Clone + Display,
{
pub(crate) fn push(&mut self, item: BatchItem<K, I, O, E>) {
let back = self.queue.back_mut().expect("Should always be non-empty");

if back.is_full(self.limits.max_batch_size) {
let mut new_back = back.new_generation();
new_back.push(item);
self.queue.push_back(new_back);
} else {
back.push(item);
}
}

pub(crate) fn take_next_batch(&mut self) -> Option<Batch<K, I, O, E>> {
let batch = self.queue.front().expect("Should always be non-empty");
if batch.is_processable() {
let batch = self.queue.pop_front().expect("Should always be non-empty");
if self.queue.is_empty() {
self.queue.push_back(batch.new_generation())
}
return Some(batch);
}

None
}

pub(crate) fn take_generation(&mut self, generation: Generation) -> Option<Batch<K, I, O, E>> {
for (index, batch) in self.queue.iter().enumerate() {
if batch.is_generation(generation) {
if batch.is_processable() {
let batch = self
.queue
.remove(index)
.expect("Should exist, we just found it");

if self.queue.is_empty() {
self.queue.push_back(batch.new_generation())
}

return Some(batch);
} else {
return None;
}
}
}

None
}
}

impl<K, I, O, E> BatchQueue<K, I, O, E>
where
K: 'static + Send + Clone,
E: Display,
{
pub(crate) fn time_out_after(&mut self, duration: Duration, tx: mpsc::Sender<Message<K>>) {
let back = self.queue.back_mut().expect("Should always be non-empty");
back.time_out_after(duration, tx);
}
}
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ pub enum BatchError<E: Display> {
#[derive(Debug)]
#[non_exhaustive]
pub enum RejectionReason {
/// The batch is full and still waiting to be processed.
BatchFull,
/// The batch is full and no more batches can be processed concurrently.
MaxConcurrency,
}

Expand Down
Loading
Loading