Skip to content

Commit

Permalink
[Bifrost] Maintain a lookup index in Log metadata for replicated loglets
Browse files Browse the repository at this point in the history
This introduces a lookup table to find replicated loglets by id in Log metadata. This also promotes replicated loglet as a core feature in restate-types since it'll become the default soon.


In future follow-up we can explore deserializing the loglet params prematurely (or use typed structure instead of string) and maintain reference-counted container to avoid memory bloat. In this PR, we treat replicated loglet as a special case to speed up implementation and since this will be the de-facto loglet provider anyway.
  • Loading branch information
AhmedSoliman committed Nov 7, 2024
1 parent b5777bf commit 4f31f69
Show file tree
Hide file tree
Showing 17 changed files with 193 additions and 48 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ impl LogsControllerInner {
observed_cluster_state,
|seal_lsn, provider_kind, loglet_params| {
let mut chain_builder = logs_builder
.chain(log_id)
.chain(*log_id)
.expect("Log with '{log_id}' should be present");

chain_builder.append_segment(seal_lsn, provider_kind, loglet_params)
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ publish = false

[features]
default = []
replicated-loglet = ["restate-types/replicated-loglet"]
replicated-loglet = []
memory-loglet = ["restate-types/memory-loglet"]
test-util = ["memory-loglet", "dep:googletest", "dep:restate-test-util"]

Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ mod tests {
let old_version = bifrost.inner.metadata.logs_version();

let mut builder = bifrost.inner.metadata.logs_ref().clone().into_builder();
let mut chain_builder = builder.chain(&LOG_ID).unwrap();
let mut chain_builder = builder.chain(LOG_ID).unwrap();
assert_eq!(1, chain_builder.num_segments());
let new_segment_params = new_single_node_loglet_params(ProviderKind::InMemory);
// deliberately skips Lsn::from(6) to create a zombie record in segment 1. Segment 1 now has 4 records.
Expand Down
3 changes: 1 addition & 2 deletions crates/bifrost/src/bifrost_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,7 @@ impl<'a> BifrostAdmin<'a> {
let logs = logs.ok_or(Error::UnknownLogId(log_id))?;

let mut builder = logs.into_builder();
let mut chain_builder =
builder.chain(&log_id).ok_or(Error::UnknownLogId(log_id))?;
let mut chain_builder = builder.chain(log_id).ok_or(Error::UnknownLogId(log_id))?;

if chain_builder.tail().index() != last_segment_index {
// tail is not what we expected.
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ mod tests {
// when it's implemented)
let old_version = bifrost.inner.metadata.logs_version();
let mut builder = bifrost.inner.metadata.logs_ref().clone().into_builder();
let mut chain_builder = builder.chain(&LOG_ID).unwrap();
let mut chain_builder = builder.chain(LOG_ID).unwrap();
assert_eq!(1, chain_builder.num_segments());
let new_segment_params = new_single_node_loglet_params(ProviderKind::InMemory);
chain_builder.append_segment(
Expand Down Expand Up @@ -982,7 +982,7 @@ mod tests {
// prepare a chain that starts from Lsn 10 (we expect trim from OLDEST -> 9)
let old_version = bifrost.inner.metadata.logs_version();
let mut builder = bifrost.inner.metadata.logs_ref().clone().into_builder();
let mut chain_builder = builder.chain(&LOG_ID).unwrap();
let mut chain_builder = builder.chain(LOG_ID).unwrap();
assert_eq!(1, chain_builder.num_segments());
let new_segment_params = new_single_node_loglet_params(ProviderKind::Local);
chain_builder.append_segment(Lsn::new(10), ProviderKind::Local, new_segment_params)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/local-cluster-runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ default = []
[dependencies]
restate-metadata-store = { workspace = true }
# nb features here will also affect the compiled restate-server binary in integration tests
restate-types = { workspace = true, features = ["unsafe-mutable-config", "replicated-loglet"] }
restate-types = { workspace = true, features = ["unsafe-mutable-config"] }

arc-swap = { workspace = true }
clap = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/log-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ restate-bifrost = { workspace = true }
restate-core = { workspace = true }
restate-metadata-store = { workspace = true }
restate-rocksdb = { workspace = true }
restate-types = { workspace = true, features = ["replicated-loglet"] }
restate-types = { workspace = true }

anyhow = { workspace = true }
async-trait = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ publish = false
[features]
default = []

replicated-loglet = []
memory-loglet = []
schemars = ["dep:schemars", "restate-serde-util/schema"]
unsafe-mutable-config = []
Expand Down Expand Up @@ -60,6 +59,7 @@ serde_json = { workspace = true }
serde_path_to_error = { version = "0.1" }
serde_with = { workspace = true }
sha2 = { workspace = true }
smallvec = { workspace = true }
static_assertions = { workspace = true }
strum = { workspace = true }
sync_wrapper = { workspace = true }
Expand Down
5 changes: 0 additions & 5 deletions crates/types/src/config/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ pub struct BifrostOptions {
#[cfg_attr(feature = "schemars", schemars(with = "String"))]
/// Configuration of local loglet provider
pub local: LocalLogletOptions,
#[cfg(feature = "replicated-loglet")]
/// [IN DEVELOPMENT]
/// Configuration of replicated loglet provider
pub replicated_loglet: ReplicatedLogletOptions,

Expand Down Expand Up @@ -92,7 +90,6 @@ impl Default for BifrostOptions {
fn default() -> Self {
Self {
default_provider: ProviderKind::Local,
#[cfg(feature = "replicated-loglet")]
replicated_loglet: ReplicatedLogletOptions::default(),
local: LocalLogletOptions::default(),
read_retry_policy: RetryPolicy::exponential(
Expand Down Expand Up @@ -207,7 +204,6 @@ impl Default for LocalLogletOptions {
}
}

#[cfg(feature = "replicated-loglet")]
#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
Expand Down Expand Up @@ -240,7 +236,6 @@ pub struct ReplicatedLogletOptions {
pub log_server_retry_policy: RetryPolicy,
}

#[cfg(feature = "replicated-loglet")]
impl Default for ReplicatedLogletOptions {
fn default() -> Self {
Self {
Expand Down
1 change: 0 additions & 1 deletion crates/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ pub mod net;
pub mod nodes_config;
pub mod partition_table;
pub mod protobuf;
#[cfg(feature = "replicated-loglet")]
pub mod replicated_loglet;
pub mod retries;
pub mod schema;
Expand Down
72 changes: 66 additions & 6 deletions crates/types/src/logs/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ use std::num::NonZeroU32;
use std::ops::Deref;

use super::metadata::{
Chain, LogletConfig, LogletParams, Logs, MaybeSegment, ProviderKind, SegmentIndex,
Chain, LogletConfig, LogletParams, Logs, LookupIndex, MaybeSegment, ProviderKind, SegmentIndex,
};
use super::{LogId, Lsn};
use crate::replicated_loglet::ReplicatedLogletParams;
use crate::Version;

#[derive(Debug, Default, Clone)]
Expand All @@ -27,6 +28,8 @@ pub struct LogsBuilder {
pub enum BuilderError {
#[error("log {0} already exists")]
LogAlreadyExists(LogId),
#[error("loglet params could not be deserialized: {0}")]
ParamsSerde(#[from] serde_json::Error),
#[error("Segment conflicts with existing (base_lsn={0})")]
SegmentConflict(Lsn),
}
Expand All @@ -41,15 +44,29 @@ impl LogsBuilder {
if self.inner.logs.contains_key(&log_id) {
return Err(BuilderError::LogAlreadyExists(log_id));
}
for loglet_config in chain.chain.values() {
if let ProviderKind::Replicated = loglet_config.kind {
let params =
ReplicatedLogletParams::deserialize_from(loglet_config.params.as_bytes())?;
self.inner.lookup_index.add_replicated_loglet(
log_id,
loglet_config.index(),
params,
);
}
}
self.inner.logs.insert(log_id, chain);
// update replicated loglet index
self.modified = true;
Ok(self.chain(&log_id).unwrap())
Ok(self.chain(log_id).unwrap())
}

pub fn chain(&mut self, log_id: &LogId) -> Option<ChainBuilder<'_>> {
let chain = self.inner.logs.get_mut(log_id)?;
pub fn chain(&mut self, log_id: LogId) -> Option<ChainBuilder<'_>> {
let chain = self.inner.logs.get_mut(&log_id)?;
Some(ChainBuilder {
log_id,
inner: chain,
lookup_index: &mut self.inner.lookup_index,
modified: &mut self.modified,
})
}
Expand All @@ -59,6 +76,7 @@ impl LogsBuilder {
Logs {
version: self.inner.version.next(),
logs: self.inner.logs,
lookup_index: self.inner.lookup_index,
}
}

Expand All @@ -73,6 +91,7 @@ impl LogsBuilder {
Some(Logs {
version: self.inner.version.next(),
logs: self.inner.logs,
lookup_index: self.inner.lookup_index,
})
} else {
None
Expand All @@ -97,7 +116,9 @@ impl From<Logs> for LogsBuilder {

#[derive(Debug)]
pub struct ChainBuilder<'a> {
log_id: LogId,
inner: &'a mut Chain,
lookup_index: &'a mut LookupIndex,
modified: &'a mut bool,
}

Expand All @@ -114,7 +135,23 @@ impl<'a> ChainBuilder<'a> {
MaybeSegment::Trim { .. } => return,
};

self.inner.chain = self.inner.chain.split_off(&found_base_lsn);
let remaining = self.inner.chain.split_off(&found_base_lsn);
for loglet_config in self.inner.chain.values() {
if let ProviderKind::Replicated = loglet_config.kind {
// if it was inserted correctly before, we shouldn't fail to deserialize it.
// validation happens at original insert time.
let params =
ReplicatedLogletParams::deserialize_from(loglet_config.params.as_bytes())
.expect("params should be deserializable");
self.lookup_index.rm_replicated_loglet_reference(
self.log_id,
loglet_config.index(),
params.loglet_id,
);
}
}

self.inner.chain = remaining;
*self.modified = true;
}

Expand All @@ -133,10 +170,16 @@ impl<'a> ChainBuilder<'a> {
.chain
.last_entry()
.expect("chain have at least one segment");

match *last_entry.key() {
key if key < base_lsn => {
// append
let new_index = SegmentIndex(last_entry.get().index().0 + 1);
if let ProviderKind::Replicated = provider {
let params = ReplicatedLogletParams::deserialize_from(params.as_bytes())?;
self.lookup_index
.add_replicated_loglet(self.log_id, new_index, params);
}
self.inner
.chain
.insert(base_lsn, LogletConfig::new(new_index, provider, params));
Expand All @@ -145,7 +188,24 @@ impl<'a> ChainBuilder<'a> {
}
key if key == base_lsn => {
// Replace the last segment (empty segment)
{
// Let's remove the loglet from the index if it's a replicated loglet
let old = last_entry.get();
if let ProviderKind::Replicated = old.kind {
let params = ReplicatedLogletParams::deserialize_from(params.as_bytes())?;
self.lookup_index.rm_replicated_loglet_reference(
self.log_id,
old.index(),
params.loglet_id,
);
}
}
let new_index = SegmentIndex(last_entry.get().index().0 + 1);
if let ProviderKind::Replicated = provider {
let params = ReplicatedLogletParams::deserialize_from(params.as_bytes())?;
self.lookup_index
.add_replicated_loglet(self.log_id, new_index, params);
}
last_entry.insert(LogletConfig::new(new_index, provider, params));
*self.modified = true;
Ok(new_index)
Expand Down Expand Up @@ -451,7 +511,7 @@ mod tests {
LogletParams::from("test1".to_owned()),
),
)?;
let mut chain = builder.chain(&log_id).unwrap();
let mut chain = builder.chain(log_id).unwrap();
// removing the only segment is not allowed (no-op)
chain.trim_prefix(Lsn::new(10));
let segment = chain.tail();
Expand Down
Loading

0 comments on commit 4f31f69

Please sign in to comment.