Skip to content

Commit

Permalink
[Bifrost] Remove unused structures
Browse files Browse the repository at this point in the history
Summary:
Removes `FindTailAttributes` since it was never actually needed and `write_set` from ReplicatedLogletParams which wasn't necessary/needed.

Test Plan:
Tests
  • Loading branch information
AhmedSoliman committed Nov 13, 2024
1 parent ecd19ac commit 84880ce
Show file tree
Hide file tree
Showing 12 changed files with 32 additions and 132 deletions.
3 changes: 0 additions & 3 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,6 @@ fn build_new_replicated_loglet_configuration(
sequencer,
replication,
nodeset,
write_set: None,
}),

Err(NodeSelectionError::InsufficientWriteableNodes) => {
Expand All @@ -402,7 +401,6 @@ fn build_new_replicated_loglet_configuration(
sequencer,
replication,
nodeset: previous_configuration.expect("to exist").nodeset.clone(),
write_set: None,
}
})
}
Expand Down Expand Up @@ -1490,7 +1488,6 @@ pub mod tests {
sequencer: GenerationalNodeId::new(0, 1),
replication: ReplicationProperty::new(NonZeroU8::new(2).unwrap()),
nodeset: NodeSet::from([0, 1, 2]),
write_set: None,
};

let sequencer_replacement = LogletConfiguration::Replicated(ReplicatedLogletParams {
Expand Down
82 changes: 18 additions & 64 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::background_appender::BackgroundAppender;
use crate::loglet::LogletProvider;
use crate::loglet_wrapper::LogletWrapper;
use crate::watchdog::WatchdogSender;
use crate::{Error, FindTailAttributes, InputRecord, LogReadStream, Result};
use crate::{Error, InputRecord, LogReadStream, Result};

/// Bifrost is Restate's durable interconnect system
///
Expand Down Expand Up @@ -151,15 +151,15 @@ impl Bifrost {
/// follows:
///
/// ```no_run
/// use restate_bifrost::{Bifrost, FindTailAttributes, LogReadStream};
/// use restate_bifrost::{Bifrost, LogReadStream};
/// use restate_types::logs::{KeyFilter, LogId, SequenceNumber};
///
/// async fn reader(bifrost: &Bifrost, log_id: LogId) -> LogReadStream {
/// bifrost.create_reader(
/// log_id,
/// KeyFilter::Any,
/// bifrost.get_trim_point(log_id).await.unwrap(),
/// bifrost.find_tail(log_id, FindTailAttributes::default()).await.unwrap().offset().prev(),
/// bifrost.find_tail(log_id).await.unwrap().offset().prev(),
/// ).unwrap()
/// }
/// ```
Expand Down Expand Up @@ -206,30 +206,16 @@ impl Bifrost {
/// If the log is empty, it returns TailState::Open(Lsn::OLDEST).
/// This should never return Err(Error::LogSealed). Sealed state is represented as
/// TailState::Sealed(..)
pub async fn find_tail(
&self,
log_id: LogId,
attributes: FindTailAttributes,
) -> Result<TailState> {
pub async fn find_tail(&self, log_id: LogId) -> Result<TailState> {
self.inner.fail_if_shutting_down()?;
Ok(self.inner.find_tail(log_id, attributes).await?.1)
Ok(self.inner.find_tail(log_id).await?.1)
}

// Get the loglet currently serving the tail of the chain, for use in integration tests.
#[cfg(any(test, feature = "test-util"))]
pub async fn find_tail_loglet(
&self,
log_id: LogId,
attributes: FindTailAttributes,
) -> Result<Arc<dyn crate::loglet::Loglet>> {
pub async fn find_tail_loglet(&self, log_id: LogId) -> Result<Arc<dyn crate::loglet::Loglet>> {
self.inner.fail_if_shutting_down()?;
Ok(self
.inner
.find_tail(log_id, attributes)
.await?
.0
.inner()
.clone())
Ok(self.inner.find_tail(log_id).await?.0.inner().clone())
}

/// The lsn of the slot **before** the first readable record (if it exists), or the offset
Expand All @@ -254,9 +240,7 @@ impl Bifrost {

self.inner.fail_if_shutting_down()?;

let current_tail = self
.find_tail(log_id, FindTailAttributes::default())
.await?;
let current_tail = self.find_tail(log_id).await?;

if current_tail.offset() <= Lsn::OLDEST {
return Ok(Vec::default());
Expand Down Expand Up @@ -330,9 +314,7 @@ impl BifrostInner {
from: Lsn,
) -> Result<Option<crate::LogEntry>> {
use futures::StreamExt;
let (_, tail_state) = self
.find_tail(log_id, FindTailAttributes::default())
.await?;
let (_, tail_state) = self.find_tail(log_id).await?;
if from >= tail_state.offset() {
// Can't use this function to read future records.
return Ok(None);
Expand All @@ -349,11 +331,7 @@ impl BifrostInner {
stream.next().await.transpose()
}

pub async fn find_tail(
&self,
log_id: LogId,
_attributes: FindTailAttributes,
) -> Result<(LogletWrapper, TailState)> {
pub async fn find_tail(&self, log_id: LogId) -> Result<(LogletWrapper, TailState)> {
let loglet = self.writeable_loglet(log_id).await?;
let tail = loglet.find_tail().await?;
Ok((loglet, tail))
Expand Down Expand Up @@ -606,9 +584,7 @@ mod tests {
assert_eq!(max_lsn + Lsn::from(1), lsn);
max_lsn = lsn;

let tail = bifrost
.find_tail(LogId::new(0), FindTailAttributes::default())
.await?;
let tail = bifrost.find_tail(LogId::new(0)).await?;
assert_eq!(max_lsn.next(), tail.offset());

// Initiate shutdown
Expand Down Expand Up @@ -664,13 +640,7 @@ mod tests {
&node_env.metadata_store_client,
);

assert_eq!(
Lsn::OLDEST,
bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?
.offset()
);
assert_eq!(Lsn::OLDEST, bifrost.find_tail(LOG_ID).await?.offset());

assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?);

Expand All @@ -682,9 +652,7 @@ mod tests {

bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?;

let tail = bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?;
let tail = bifrost.find_tail(LOG_ID).await?;
assert_eq!(tail.offset(), Lsn::from(11));
assert!(!tail.is_sealed());
assert_eq!(Lsn::from(5), bifrost.get_trim_point(LOG_ID).await?);
Expand All @@ -706,13 +674,7 @@ mod tests {
// trimming beyond the release point will fall back to the release point
bifrost_admin.trim(LOG_ID, Lsn::MAX).await?;

assert_eq!(
Lsn::from(11),
bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?
.offset()
);
assert_eq!(Lsn::from(11), bifrost.find_tail(LOG_ID).await?.offset());
let new_trim_point = bifrost.get_trim_point(LOG_ID).await?;
assert_eq!(Lsn::from(10), new_trim_point);

Expand Down Expand Up @@ -765,9 +727,7 @@ mod tests {

// not sealed, tail is what we expect
assert_that!(
bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?,
bifrost.find_tail(LOG_ID).await?,
pat!(TailState::Open(eq(Lsn::new(6))))
);

Expand All @@ -784,9 +744,7 @@ mod tests {

// sealed, tail is what we expect
assert_that!(
bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?,
bifrost.find_tail(LOG_ID).await?,
pat!(TailState::Sealed(eq(Lsn::new(6))))
);

Expand Down Expand Up @@ -842,9 +800,7 @@ mod tests {
// find_tail() on the underlying loglet returns (6) but for bifrost it should be (5) after
// the new segment was created at tail of the chain with base_lsn=5
assert_that!(
bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?,
bifrost.find_tail(LOG_ID).await?,
pat!(TailState::Open(eq(Lsn::new(5))))
);

Expand All @@ -859,9 +815,7 @@ mod tests {

// tail is now 8 and open.
assert_that!(
bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?,
bifrost.find_tail(LOG_ID).await?,
pat!(TailState::Open(eq(Lsn::new(8))))
);

Expand Down
6 changes: 0 additions & 6 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,6 @@ mod tests {
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
write_set: None,
};

run_in_test_env(params, |env| async move {
Expand Down Expand Up @@ -438,7 +437,6 @@ mod tests {
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
write_set: None,
};

run_in_test_env(params, |env| async move {
Expand Down Expand Up @@ -480,7 +478,6 @@ mod tests {
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
write_set: None,
};
run_in_test_env(params, |env| {
crate::loglet::loglet_tests::single_loglet_readstream(env.loglet)
Expand All @@ -496,7 +493,6 @@ mod tests {
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
write_set: None,
};
run_in_test_env(params, |env| {
crate::loglet::loglet_tests::append_after_seal(env.loglet)
Expand All @@ -512,7 +508,6 @@ mod tests {
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
write_set: None,
};
run_in_test_env(params, |env| {
crate::loglet::loglet_tests::append_after_seal_concurrent(env.loglet)
Expand All @@ -528,7 +523,6 @@ mod tests {
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
write_set: None,
};
run_in_test_env(params, |env| {
crate::loglet::loglet_tests::seal_empty(env.loglet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,6 @@ mod test {
nodeset: NodeSet::empty(),
replication: ReplicationProperty::new(1.try_into().unwrap()),
sequencer: GenerationalNodeId::new(1, 1),
write_set: None,
};
let known_global_tail = TailOffsetWatch::new(TailState::Open(LogletOffset::OLDEST));
let remote_sequencer = RemoteSequencer::new(
Expand Down
35 changes: 8 additions & 27 deletions crates/bifrost/src/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ mod tests {
use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY;
use restate_types::Versioned;

use crate::{setup_panic_handler, BifrostAdmin, BifrostService, FindTailAttributes};
use crate::{setup_panic_handler, BifrostAdmin, BifrostService};

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[traced_test]
Expand All @@ -482,9 +482,7 @@ mod tests {
let mut reader = bifrost.create_reader(LOG_ID, KeyFilter::Any, read_from, Lsn::MAX)?;
let mut appender = bifrost.create_appender(LOG_ID)?;

let tail = bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?;
let tail = bifrost.find_tail(LOG_ID).await?;
// no records have been written
assert!(!tail.is_sealed());
assert_eq!(Lsn::OLDEST, tail.offset());
Expand Down Expand Up @@ -585,13 +583,7 @@ mod tests {
// [1..5] trimmed. trim_point = 5
bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?;

assert_eq!(
Lsn::from(11),
bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?
.offset(),
);
assert_eq!(Lsn::from(11), bifrost.find_tail(LOG_ID).await?.offset());
assert_eq!(Lsn::from(5), bifrost.get_trim_point(LOG_ID).await?);

let mut read_stream =
Expand All @@ -608,10 +600,7 @@ mod tests {
assert!(!read_stream.is_terminated());
assert_eq!(Lsn::from(8), read_stream.read_pointer());

let tail = bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?
.offset();
let tail = bifrost.find_tail(LOG_ID).await?.offset();
// trimming beyond the release point will fall back to the release point
bifrost_admin.trim(LOG_ID, Lsn::from(u64::MAX)).await?;
let trim_point = bifrost.get_trim_point(LOG_ID).await?;
Expand Down Expand Up @@ -687,9 +676,7 @@ mod tests {
pat!(Poll::Pending)
);

let tail = bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?;
let tail = bifrost.find_tail(LOG_ID).await?;
// no records have been written
assert!(!tail.is_sealed());
assert_eq!(Lsn::OLDEST, tail.offset());
Expand Down Expand Up @@ -751,9 +738,7 @@ mod tests {
pat!(Poll::Pending)
);

let tail = bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?;
let tail = bifrost.find_tail(LOG_ID).await?;

assert!(tail.is_sealed());
assert_eq!(Lsn::from(11), tail.offset());
Expand Down Expand Up @@ -856,9 +841,7 @@ mod tests {

let mut appender = bifrost.create_appender(LOG_ID)?;

let tail = bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?;
let tail = bifrost.find_tail(LOG_ID).await?;
// no records have been written
assert!(!tail.is_sealed());
assert_eq!(Lsn::OLDEST, tail.offset());
Expand All @@ -881,9 +864,7 @@ mod tests {
)
.await?;

let tail = bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?;
let tail = bifrost.find_tail(LOG_ID).await?;

assert!(!tail.is_sealed());
assert_eq!(Lsn::from(11), tail.offset());
Expand Down
7 changes: 0 additions & 7 deletions crates/bifrost/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,6 @@ where

impl LsnExt for Lsn {}

#[derive(Debug, Clone, Default)]
pub struct FindTailAttributes {
// Ensure that we are reading the most recent metadata. This should be used when
// linearizable metadata reads are required.
// TODO: consistent_read: bool,
}

/// A future that resolves to the Lsn of the last Lsn in a committed batch.
///
/// Note: dropping this future doesn't cancel or stop the underlying enqueued append.
Expand Down
7 changes: 0 additions & 7 deletions crates/types/src/replicated_loglet/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ pub struct ReplicatedLogletParams {
/// Replication properties of this loglet
pub replication: ReplicationProperty,
pub nodeset: NodeSet,
/// The set of nodes the sequencer has been considering for writes after the last
/// known_global_tail advance.
///
/// If unset, the entire nodeset is considered as part of the write set
/// If set, tail repair will attempt reading only from this set.
#[serde(skip_serializing_if = "Option::is_none")]
pub write_set: Option<NodeSet>,
}

impl ReplicatedLogletParams {
Expand Down
Loading

0 comments on commit 84880ce

Please sign in to comment.