Skip to content

Commit

Permalink
Support both orders in the structured-data cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
bkirwi committed Sep 19, 2024
1 parent 15e6f76 commit 17c6550
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 39 deletions.
167 changes: 129 additions & 38 deletions src/persist-client/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use mz_ore::instrument;
use mz_ore::now::EpochMillis;
use mz_ore::task::{AbortOnDropHandle, JoinHandle, RuntimeExt};
use mz_persist::location::{Blob, SeqNo};
use mz_persist_types::columnar::{ColumnDecoder, Schema2};
use mz_persist_types::{Codec, Codec64};
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};
Expand All @@ -35,14 +36,15 @@ use tokio::runtime::Handle;
use tracing::{debug_span, warn, Instrument};
use uuid::Uuid;

use crate::batch::{BLOB_TARGET_SIZE, STRUCTURED_ORDER};
use crate::cfg::RetryParameters;
use crate::fetch::{fetch_leased_part, FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart};
use crate::internal::encoding::Schemas;
use crate::internal::machine::{ExpireFn, Machine};
use crate::internal::metrics::Metrics;
use crate::internal::state::{BatchPart, HollowBatch};
use crate::internal::watch::StateWatch;
use crate::iter::{CodecSort, Consolidator};
use crate::iter::{CodecSort, Consolidator, StructuredSort};
use crate::schema::SchemaCache;
use crate::stats::{SnapshotPartStats, SnapshotPartsStats, SnapshotStats};
use crate::{parse_id, GarbageCollector, PersistConfig, ShardId};
Expand Down Expand Up @@ -904,11 +906,23 @@ pub(crate) struct UnexpiredReadHandleState {
/// but it's also free to abandon the instance at any time if it eg. only needs a few entries.
#[derive(Debug)]
pub struct Cursor<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64> {
consolidator: Consolidator<T, D>,
consolidator: CursorConsolidator<K, V, T, D>,
_lease: Lease,
read_schemas: Schemas<K, V>,
}

#[derive(Debug)]
enum CursorConsolidator<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64> {
Codec {
consolidator: Consolidator<T, D, CodecSort<T, D>>,
},
Structured {
consolidator: Consolidator<T, D, StructuredSort<K, V, T, D>>,
max_len: usize,
max_bytes: usize,
},
}

impl<K, V, T, D> Cursor<K, V, T, D>
where
K: Debug + Codec + Ord,
Expand All @@ -919,18 +933,59 @@ where
/// Grab the next batch of consolidated data.
pub async fn next(
&mut self,
) -> Option<impl Iterator<Item = ((Result<K, String>, Result<V, String>), T, D)> + '_> {
let iter = self
.consolidator
.next()
.await
.expect("fetching a leased part")?;
let iter = iter.map(|((k, v), t, d)| {
let key = K::decode(k, &self.read_schemas.key);
let val = V::decode(v, &self.read_schemas.val);
((key, val), t, d)
});
Some(iter)
) -> Option<
Box<dyn Iterator<Item = ((Result<K, String>, Result<V, String>), T, D)> + Send + Sync + '_>,
> {
match &mut self.consolidator {
CursorConsolidator::Structured {
consolidator,
max_len,
max_bytes,
} => {
let mut iter = consolidator
.next_chunk(*max_len, *max_bytes)
.await
.expect("fetching a leased part")?;
let structured = iter.get_or_make_structured::<K, V>(
self.read_schemas.key.as_ref(),
self.read_schemas.val.as_ref(),
);
let key_decoder = self
.read_schemas
.key
.decoder_any(structured.key.as_ref())
.expect("ok");
let val_decoder = self
.read_schemas
.val
.decoder_any(structured.val.as_ref())
.expect("ok");
let iter = (0..iter.len()).map(move |i| {
let mut k = K::default();
let mut v = V::default();
key_decoder.decode(i, &mut k);
val_decoder.decode(i, &mut v);
let t = T::decode(iter.records().timestamps().value(i).to_le_bytes());
let d = D::decode(iter.records().diffs().value(i).to_le_bytes());
((Ok(k), Ok(v)), t, d)
});

Some(Box::new(iter))
}
CursorConsolidator::Codec { consolidator } => {
let iter = consolidator
.next()
.await
.expect("fetching a leased part")?
.map(|((k, v), t, d)| {
let key = K::decode(k, &self.read_schemas.key);
let val = V::decode(v, &self.read_schemas.val);
((key, val), t, d)
});

Some(Box::new(iter))
}
}
}
}

Expand Down Expand Up @@ -993,32 +1048,68 @@ where
) -> Result<Cursor<K, V, T, D>, Since<T>> {
let batches = self.machine.snapshot(&as_of).await?;

let mut consolidator = Consolidator::new(
format!("{}[as_of={:?}]", self.shard_id(), as_of.elements()),
self.shard_id(),
CodecSort::default(),
Arc::clone(&self.blob),
Arc::clone(&self.metrics),
Arc::clone(&self.machine.applier.shard_metrics),
self.metrics.read.snapshot.clone(),
FetchBatchFilter::Snapshot {
as_of: as_of.clone(),
},
self.cfg.dynamic.compaction_memory_bound_bytes(),
);

let context = format!("{}[as_of={:?}]", self.shard_id(), as_of.elements());
let filter = FetchBatchFilter::Snapshot {
as_of: as_of.clone(),
};
let lease = self.lease_seqno();
for batch in batches {
for (meta, run) in batch.runs() {
consolidator.enqueue_run(
&batch.desc,
meta,
run.into_iter()
.filter(|p| should_fetch_part(p.stats()))
.cloned(),
);

let consolidator = if STRUCTURED_ORDER.get(&self.cfg) {
let mut consolidator = Consolidator::new(
context,
self.shard_id(),
StructuredSort::new(self.read_schemas.clone()),
Arc::clone(&self.blob),
Arc::clone(&self.metrics),
Arc::clone(&self.machine.applier.shard_metrics),
self.metrics.read.snapshot.clone(),
filter,
self.cfg.dynamic.compaction_memory_bound_bytes(),
);
for batch in batches {
for (meta, run) in batch.runs() {
consolidator.enqueue_run(
&batch.desc,
meta,
run.into_iter()
.filter(|p| should_fetch_part(p.stats()))
.cloned(),
);
}
}
}
CursorConsolidator::Structured {
consolidator,
// This default may end up consolidating more records than previously
// for cases like fast-path peeks, where only the first few entries are used.
// If this is a noticeable performance impact, thread the max-len in from the caller.
max_len: self.cfg.compaction_yield_after_n_updates,
max_bytes: BLOB_TARGET_SIZE.get(&self.cfg).max(1),
}
} else {
let mut consolidator = Consolidator::new(
context,
self.shard_id(),
CodecSort::default(),
Arc::clone(&self.blob),
Arc::clone(&self.metrics),
Arc::clone(&self.machine.applier.shard_metrics),
self.metrics.read.snapshot.clone(),
filter,
self.cfg.dynamic.compaction_memory_bound_bytes(),
);
for batch in batches {
for (meta, run) in batch.runs() {
consolidator.enqueue_run(
&batch.desc,
meta,
run.into_iter()
.filter(|p| should_fetch_part(p.stats()))
.cloned(),
);
}
}
CursorConsolidator::Codec { consolidator }
};

Ok(Cursor {
consolidator,
Expand Down
2 changes: 1 addition & 1 deletion src/persist-types/src/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub trait Schema2<T>: Debug + Send + Sync {
type Statistics: DynStats + 'static;

/// Type that is able to decode values of `T` from [`Self::ArrowColumn`].
type Decoder: ColumnDecoder<T> + Debug;
type Decoder: ColumnDecoder<T> + Debug + Send + Sync;
/// Type that is able to encoder values of `T`.
type Encoder: ColumnEncoder<T, FinishedColumn = Self::ArrowColumn> + Debug;

Expand Down

0 comments on commit 17c6550

Please sign in to comment.