Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bifrost] Remove unused structures #2284

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,6 @@ fn build_new_replicated_loglet_configuration(
sequencer,
replication,
nodeset,
write_set: None,
}),

Err(NodeSelectionError::InsufficientWriteableNodes) => {
Expand All @@ -402,7 +401,6 @@ fn build_new_replicated_loglet_configuration(
sequencer,
replication,
nodeset: previous_configuration.expect("to exist").nodeset.clone(),
write_set: None,
}
})
}
Expand Down Expand Up @@ -1490,7 +1488,6 @@ pub mod tests {
sequencer: GenerationalNodeId::new(0, 1),
replication: ReplicationProperty::new(NonZeroU8::new(2).unwrap()),
nodeset: NodeSet::from([0, 1, 2]),
write_set: None,
};

let sequencer_replacement = LogletConfiguration::Replicated(ReplicatedLogletParams {
Expand Down
82 changes: 18 additions & 64 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::background_appender::BackgroundAppender;
use crate::loglet::LogletProvider;
use crate::loglet_wrapper::LogletWrapper;
use crate::watchdog::WatchdogSender;
use crate::{Error, FindTailAttributes, InputRecord, LogReadStream, Result};
use crate::{Error, InputRecord, LogReadStream, Result};

/// Bifrost is Restate's durable interconnect system
///
Expand Down Expand Up @@ -151,15 +151,15 @@ impl Bifrost {
/// follows:
///
/// ```no_run
/// use restate_bifrost::{Bifrost, FindTailAttributes, LogReadStream};
/// use restate_bifrost::{Bifrost, LogReadStream};
/// use restate_types::logs::{KeyFilter, LogId, SequenceNumber};
///
/// async fn reader(bifrost: &Bifrost, log_id: LogId) -> LogReadStream {
/// bifrost.create_reader(
/// log_id,
/// KeyFilter::Any,
/// bifrost.get_trim_point(log_id).await.unwrap(),
/// bifrost.find_tail(log_id, FindTailAttributes::default()).await.unwrap().offset().prev(),
/// bifrost.find_tail(log_id).await.unwrap().offset().prev(),
/// ).unwrap()
/// }
/// ```
Expand Down Expand Up @@ -206,30 +206,16 @@ impl Bifrost {
/// If the log is empty, it returns TailState::Open(Lsn::OLDEST).
/// This should never return Err(Error::LogSealed). Sealed state is represented as
/// TailState::Sealed(..)
pub async fn find_tail(
&self,
log_id: LogId,
attributes: FindTailAttributes,
) -> Result<TailState> {
pub async fn find_tail(&self, log_id: LogId) -> Result<TailState> {
self.inner.fail_if_shutting_down()?;
Ok(self.inner.find_tail(log_id, attributes).await?.1)
Ok(self.inner.find_tail(log_id).await?.1)
}

// Get the loglet currently serving the tail of the chain, for use in integration tests.
#[cfg(any(test, feature = "test-util"))]
pub async fn find_tail_loglet(
&self,
log_id: LogId,
attributes: FindTailAttributes,
) -> Result<Arc<dyn crate::loglet::Loglet>> {
pub async fn find_tail_loglet(&self, log_id: LogId) -> Result<Arc<dyn crate::loglet::Loglet>> {
self.inner.fail_if_shutting_down()?;
Ok(self
.inner
.find_tail(log_id, attributes)
.await?
.0
.inner()
.clone())
Ok(self.inner.find_tail(log_id).await?.0.inner().clone())
}

/// The lsn of the slot **before** the first readable record (if it exists), or the offset
Expand All @@ -254,9 +240,7 @@ impl Bifrost {

self.inner.fail_if_shutting_down()?;

let current_tail = self
.find_tail(log_id, FindTailAttributes::default())
.await?;
let current_tail = self.find_tail(log_id).await?;

if current_tail.offset() <= Lsn::OLDEST {
return Ok(Vec::default());
Expand Down Expand Up @@ -330,9 +314,7 @@ impl BifrostInner {
from: Lsn,
) -> Result<Option<crate::LogEntry>> {
use futures::StreamExt;
let (_, tail_state) = self
.find_tail(log_id, FindTailAttributes::default())
.await?;
let (_, tail_state) = self.find_tail(log_id).await?;
if from >= tail_state.offset() {
// Can't use this function to read future records.
return Ok(None);
Expand All @@ -349,11 +331,7 @@ impl BifrostInner {
stream.next().await.transpose()
}

pub async fn find_tail(
&self,
log_id: LogId,
_attributes: FindTailAttributes,
) -> Result<(LogletWrapper, TailState)> {
pub async fn find_tail(&self, log_id: LogId) -> Result<(LogletWrapper, TailState)> {
let loglet = self.writeable_loglet(log_id).await?;
let tail = loglet.find_tail().await?;
Ok((loglet, tail))
Expand Down Expand Up @@ -606,9 +584,7 @@ mod tests {
assert_eq!(max_lsn + Lsn::from(1), lsn);
max_lsn = lsn;

let tail = bifrost
.find_tail(LogId::new(0), FindTailAttributes::default())
.await?;
let tail = bifrost.find_tail(LogId::new(0)).await?;
assert_eq!(max_lsn.next(), tail.offset());

// Initiate shutdown
Expand Down Expand Up @@ -664,13 +640,7 @@ mod tests {
&node_env.metadata_store_client,
);

assert_eq!(
Lsn::OLDEST,
bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?
.offset()
);
assert_eq!(Lsn::OLDEST, bifrost.find_tail(LOG_ID).await?.offset());

assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?);

Expand All @@ -682,9 +652,7 @@ mod tests {

bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?;

let tail = bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?;
let tail = bifrost.find_tail(LOG_ID).await?;
assert_eq!(tail.offset(), Lsn::from(11));
assert!(!tail.is_sealed());
assert_eq!(Lsn::from(5), bifrost.get_trim_point(LOG_ID).await?);
Expand All @@ -706,13 +674,7 @@ mod tests {
// trimming beyond the release point will fall back to the release point
bifrost_admin.trim(LOG_ID, Lsn::MAX).await?;

assert_eq!(
Lsn::from(11),
bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?
.offset()
);
assert_eq!(Lsn::from(11), bifrost.find_tail(LOG_ID).await?.offset());
let new_trim_point = bifrost.get_trim_point(LOG_ID).await?;
assert_eq!(Lsn::from(10), new_trim_point);

Expand Down Expand Up @@ -765,9 +727,7 @@ mod tests {

// not sealed, tail is what we expect
assert_that!(
bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?,
bifrost.find_tail(LOG_ID).await?,
pat!(TailState::Open(eq(Lsn::new(6))))
);

Expand All @@ -784,9 +744,7 @@ mod tests {

// sealed, tail is what we expect
assert_that!(
bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?,
bifrost.find_tail(LOG_ID).await?,
pat!(TailState::Sealed(eq(Lsn::new(6))))
);

Expand Down Expand Up @@ -842,9 +800,7 @@ mod tests {
// find_tail() on the underlying loglet returns (6) but for bifrost it should be (5) after
// the new segment was created at tail of the chain with base_lsn=5
assert_that!(
bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?,
bifrost.find_tail(LOG_ID).await?,
pat!(TailState::Open(eq(Lsn::new(5))))
);

Expand All @@ -859,9 +815,7 @@ mod tests {

// tail is now 8 and open.
assert_that!(
bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?,
bifrost.find_tail(LOG_ID).await?,
pat!(TailState::Open(eq(Lsn::new(8))))
);

Expand Down
6 changes: 0 additions & 6 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,6 @@ mod tests {
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
write_set: None,
};

run_in_test_env(params, |env| async move {
Expand Down Expand Up @@ -438,7 +437,6 @@ mod tests {
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
write_set: None,
};

run_in_test_env(params, |env| async move {
Expand Down Expand Up @@ -480,7 +478,6 @@ mod tests {
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
write_set: None,
};
run_in_test_env(params, |env| {
crate::loglet::loglet_tests::single_loglet_readstream(env.loglet)
Expand All @@ -496,7 +493,6 @@ mod tests {
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
write_set: None,
};
run_in_test_env(params, |env| {
crate::loglet::loglet_tests::append_after_seal(env.loglet)
Expand All @@ -512,7 +508,6 @@ mod tests {
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
write_set: None,
};
run_in_test_env(params, |env| {
crate::loglet::loglet_tests::append_after_seal_concurrent(env.loglet)
Expand All @@ -528,7 +523,6 @@ mod tests {
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
write_set: None,
};
run_in_test_env(params, |env| {
crate::loglet::loglet_tests::seal_empty(env.loglet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,6 @@ mod test {
nodeset: NodeSet::empty(),
replication: ReplicationProperty::new(1.try_into().unwrap()),
sequencer: GenerationalNodeId::new(1, 1),
write_set: None,
};
let known_global_tail = TailOffsetWatch::new(TailState::Open(LogletOffset::OLDEST));
let remote_sequencer = RemoteSequencer::new(
Expand Down
35 changes: 8 additions & 27 deletions crates/bifrost/src/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ mod tests {
use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY;
use restate_types::Versioned;

use crate::{setup_panic_handler, BifrostAdmin, BifrostService, FindTailAttributes};
use crate::{setup_panic_handler, BifrostAdmin, BifrostService};

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[traced_test]
Expand All @@ -482,9 +482,7 @@ mod tests {
let mut reader = bifrost.create_reader(LOG_ID, KeyFilter::Any, read_from, Lsn::MAX)?;
let mut appender = bifrost.create_appender(LOG_ID)?;

let tail = bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?;
let tail = bifrost.find_tail(LOG_ID).await?;
// no records have been written
assert!(!tail.is_sealed());
assert_eq!(Lsn::OLDEST, tail.offset());
Expand Down Expand Up @@ -585,13 +583,7 @@ mod tests {
// [1..5] trimmed. trim_point = 5
bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?;

assert_eq!(
Lsn::from(11),
bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?
.offset(),
);
assert_eq!(Lsn::from(11), bifrost.find_tail(LOG_ID).await?.offset());
assert_eq!(Lsn::from(5), bifrost.get_trim_point(LOG_ID).await?);

let mut read_stream =
Expand All @@ -608,10 +600,7 @@ mod tests {
assert!(!read_stream.is_terminated());
assert_eq!(Lsn::from(8), read_stream.read_pointer());

let tail = bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?
.offset();
let tail = bifrost.find_tail(LOG_ID).await?.offset();
// trimming beyond the release point will fall back to the release point
bifrost_admin.trim(LOG_ID, Lsn::from(u64::MAX)).await?;
let trim_point = bifrost.get_trim_point(LOG_ID).await?;
Expand Down Expand Up @@ -687,9 +676,7 @@ mod tests {
pat!(Poll::Pending)
);

let tail = bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?;
let tail = bifrost.find_tail(LOG_ID).await?;
// no records have been written
assert!(!tail.is_sealed());
assert_eq!(Lsn::OLDEST, tail.offset());
Expand Down Expand Up @@ -751,9 +738,7 @@ mod tests {
pat!(Poll::Pending)
);

let tail = bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?;
let tail = bifrost.find_tail(LOG_ID).await?;

assert!(tail.is_sealed());
assert_eq!(Lsn::from(11), tail.offset());
Expand Down Expand Up @@ -856,9 +841,7 @@ mod tests {

let mut appender = bifrost.create_appender(LOG_ID)?;

let tail = bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?;
let tail = bifrost.find_tail(LOG_ID).await?;
// no records have been written
assert!(!tail.is_sealed());
assert_eq!(Lsn::OLDEST, tail.offset());
Expand All @@ -881,9 +864,7 @@ mod tests {
)
.await?;

let tail = bifrost
.find_tail(LOG_ID, FindTailAttributes::default())
.await?;
let tail = bifrost.find_tail(LOG_ID).await?;

assert!(!tail.is_sealed());
assert_eq!(Lsn::from(11), tail.offset());
Expand Down
7 changes: 0 additions & 7 deletions crates/bifrost/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,6 @@ where

impl LsnExt for Lsn {}

#[derive(Debug, Clone, Default)]
pub struct FindTailAttributes {
// Ensure that we are reading the most recent metadata. This should be used when
// linearizable metadata reads are required.
// TODO: consistent_read: bool,
}

/// A future that resolves to the Lsn of the last Lsn in a committed batch.
///
/// Note: dropping this future doesn't cancel or stop the underlying enqueued append.
Expand Down
7 changes: 0 additions & 7 deletions crates/types/src/replicated_loglet/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ pub struct ReplicatedLogletParams {
/// Replication properties of this loglet
pub replication: ReplicationProperty,
pub nodeset: NodeSet,
/// The set of nodes the sequencer has been considering for writes after the last
/// known_global_tail advance.
///
/// If unset, the entire nodeset is considered as part of the write set
/// If set, tail repair will attempt reading only from this set.
#[serde(skip_serializing_if = "Option::is_none")]
pub write_set: Option<NodeSet>,
}

impl ReplicatedLogletParams {
Expand Down
Loading
Loading