Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pageserver): filter keys with gc-compaction #9004

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,36 @@ impl Timeline {
Ok(total_size * BLCKSZ as u64)
}

/// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
/// for gc-compaction.
///
/// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
/// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
/// be kept only for a specific range of LSN.
///
/// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
/// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
/// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
/// determine which keys to retain/drop for gc-compaction.
///
/// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
/// to be retained for each of the branch LSN.
///
/// The return value is (dense keyspace, sparse keyspace).
pub(crate) async fn collect_gc_compaction_keyspace(
&self,
) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
let metadata_key_begin = Key::metadata_key_range().start;
let aux_v1_key = AUX_FILES_KEY;
let dense_keyspace = KeySpace {
ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
};
Ok((
dense_keyspace,
SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
))
}

///
/// Get a KeySpace that covers all the Keys that are in use at the given LSN.
/// Anything that's not listed maybe removed from the underlying storage (from
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/storage_layer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
//! Common traits and structs for layers

pub mod delta_layer;
pub mod filter_iterator;
pub mod image_layer;
pub mod inmemory_layer;
pub(crate) mod layer;
mod layer_desc;
mod layer_name;
pub mod merge_iterator;

pub mod split_writer;

use crate::context::{AccessStatsBehavior, RequestContext};
Expand Down
181 changes: 181 additions & 0 deletions pageserver/src/tenant/storage_layer/filter_iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
use std::ops::Range;

use anyhow::bail;
use pageserver_api::{
key::Key,
keyspace::{KeySpace, SparseKeySpace},
};
use utils::lsn::Lsn;

use crate::repository::Value;

use super::merge_iterator::MergeIterator;

/// A filter iterator over merge iterators (and can be easily extended to other types of iterators).
///
/// The iterator will skip any keys not included in the keyspace filter. In other words, the keyspace filter contains the keys
/// to be retained.
pub struct FilterIterator<'a> {
inner: MergeIterator<'a>,
retain_key_filters: Vec<Range<Key>>,
current_filter_idx: usize,
}

impl<'a> FilterIterator<'a> {
pub fn create(
inner: MergeIterator<'a>,
dense_keyspace: KeySpace,
sparse_keyspace: SparseKeySpace,
) -> anyhow::Result<Self> {
let mut retain_key_filters = Vec::new();
retain_key_filters.extend(dense_keyspace.ranges);
retain_key_filters.extend(sparse_keyspace.0.ranges);
retain_key_filters.sort_by(|a, b| a.start.cmp(&b.start));
// Verify key filters are non-overlapping and sorted
for i in 1..retain_key_filters.len() {
if retain_key_filters[i - 1].end > retain_key_filters[i].start {
bail!(
"Key filters are overlapping: {:?} and {:?}",
retain_key_filters[i - 1],
retain_key_filters[i]
);
}
}
Ok(Self {
inner,
retain_key_filters,
current_filter_idx: 0,
})
}

pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
while let Some(item) = self.inner.next().await? {
while self.current_filter_idx < self.retain_key_filters.len()
&& item.0 >= self.retain_key_filters[self.current_filter_idx].end
{
// [filter region] [filter region] [filter region]
// ^ item
// ^ current filter
self.current_filter_idx += 1;
// [filter region] [filter region] [filter region]
// ^ item
// ^ current filter
}
if self.current_filter_idx >= self.retain_key_filters.len() {
// We already exhausted all filters, so nothing needs to be filtered.
// [filter region] [filter region] [filter region]
// ^ item
// ^ current filter (nothing)
return Ok(Some(item));
}
if self.retain_key_filters[self.current_filter_idx].contains(&item.0) {
// [filter region] [filter region] [filter region]
// ^ item
// ^ current filter
return Ok(Some(item));
}
// If the key is not contained in the key retaining filters, continue to the next item.
// [filter region] [filter region] [filter region]
// ^ item
// ^ current filter
}
Ok(None)
}
}

#[cfg(test)]
mod tests {
use super::*;

use itertools::Itertools;
use pageserver_api::key::Key;
use utils::lsn::Lsn;

use crate::{
tenant::{
harness::{TenantHarness, TIMELINE_ID},
storage_layer::delta_layer::test::produce_delta_layer,
},
DEFAULT_PG_VERSION,
};

async fn assert_filter_iter_equal(
filter_iter: &mut FilterIterator<'_>,
expect: &[(Key, Lsn, Value)],
) {
let mut expect_iter = expect.iter();
loop {
let o1 = filter_iter.next().await.unwrap();
let o2 = expect_iter.next();
assert_eq!(o1.is_some(), o2.is_some());
if o1.is_none() && o2.is_none() {
break;
}
let (k1, l1, v1) = o1.unwrap();
let (k2, l2, v2) = o2.unwrap();
assert_eq!(&k1, k2);
assert_eq!(l1, *l2);
assert_eq!(&v1, v2);
}
}

#[tokio::test]
async fn filter_keyspace_iterator() {
use crate::repository::Value;
use bytes::Bytes;

let harness = TenantHarness::create("filter_iterator_filter_keyspace_iterator")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;

let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();

fn get_key(id: u32) -> Key {
let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}
const N: usize = 100;
let test_deltas1 = (0..N)
.map(|idx| {
(
get_key(idx as u32),
Lsn(0x20 * ((idx as u64) % 10 + 1)),
Value::Image(Bytes::from(format!("img{idx:05}"))),
)
})
.collect_vec();
let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
.await
.unwrap();

let merge_iter = MergeIterator::create(
&[resident_layer_1.get_as_delta(&ctx).await.unwrap()],
&[],
&ctx,
);

let mut filter_iter = FilterIterator::create(
merge_iter,
KeySpace {
ranges: vec![
get_key(5)..get_key(10),
get_key(20)..get_key(30),
get_key(90)..get_key(110),
get_key(1000)..get_key(2000),
],
},
SparseKeySpace(KeySpace::default()),
)
.unwrap();
let mut result = Vec::new();
result.extend(test_deltas1[5..10].iter().cloned());
result.extend(test_deltas1[20..30].iter().cloned());
result.extend(test_deltas1[90..100].iter().cloned());
assert_filter_iter_equal(&mut filter_iter, &result).await;
}
}
9 changes: 8 additions & 1 deletion pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use utils::id::TimelineId;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::filter_iterator::FilterIterator;
use crate::tenant::storage_layer::merge_iterator::MergeIterator;
use crate::tenant::storage_layer::split_writer::{
SplitDeltaLayerWriter, SplitImageLayerWriter, SplitWriterResult,
Expand Down Expand Up @@ -1769,6 +1770,7 @@ impl Timeline {
gc_cutoff,
lowest_retain_lsn
);

// Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
// Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
Expand Down Expand Up @@ -1825,7 +1827,12 @@ impl Timeline {
image_layers.push(layer);
}
}
let mut merge_iter = MergeIterator::create(&delta_layers, &image_layers, ctx);
let (dense_ks, sparse_ks) = self.collect_gc_compaction_keyspace().await?;
let mut merge_iter = FilterIterator::create(
MergeIterator::create(&delta_layers, &image_layers, ctx),
dense_ks,
sparse_ks,
)?;
// Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
// Data of the same key.
let mut accumulated_values = Vec::new();
Expand Down
Loading