diff --git a/crates/admin/src/cluster_controller/logs_controller.rs b/crates/admin/src/cluster_controller/logs_controller.rs index 3092734cc..6deee6b1f 100644 --- a/crates/admin/src/cluster_controller/logs_controller.rs +++ b/crates/admin/src/cluster_controller/logs_controller.rs @@ -379,7 +379,6 @@ fn build_new_replicated_loglet_configuration( sequencer, replication, nodeset, - write_set: None, }), Err(NodeSelectionError::InsufficientWriteableNodes) => { @@ -402,7 +401,6 @@ fn build_new_replicated_loglet_configuration( sequencer, replication, nodeset: previous_configuration.expect("to exist").nodeset.clone(), - write_set: None, } }) } @@ -1490,7 +1488,6 @@ pub mod tests { sequencer: GenerationalNodeId::new(0, 1), replication: ReplicationProperty::new(NonZeroU8::new(2).unwrap()), nodeset: NodeSet::from([0, 1, 2]), - write_set: None, }; let sequencer_replacement = LogletConfiguration::Replicated(ReplicatedLogletParams { diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index d214c9276..2c945fede 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -26,7 +26,7 @@ use crate::background_appender::BackgroundAppender; use crate::loglet::LogletProvider; use crate::loglet_wrapper::LogletWrapper; use crate::watchdog::WatchdogSender; -use crate::{Error, FindTailAttributes, InputRecord, LogReadStream, Result}; +use crate::{Error, InputRecord, LogReadStream, Result}; /// Bifrost is Restate's durable interconnect system /// @@ -151,7 +151,7 @@ impl Bifrost { /// follows: /// /// ```no_run - /// use restate_bifrost::{Bifrost, FindTailAttributes, LogReadStream}; + /// use restate_bifrost::{Bifrost, LogReadStream}; /// use restate_types::logs::{KeyFilter, LogId, SequenceNumber}; /// /// async fn reader(bifrost: &Bifrost, log_id: LogId) -> LogReadStream { @@ -159,7 +159,7 @@ impl Bifrost { /// log_id, /// KeyFilter::Any, /// bifrost.get_trim_point(log_id).await.unwrap(), - /// bifrost.find_tail(log_id, FindTailAttributes::default()).await.unwrap().offset().prev(), + /// bifrost.find_tail(log_id).await.unwrap().offset().prev(), /// ).unwrap() /// } /// ``` @@ -206,30 +206,16 @@ impl Bifrost { /// If the log is empty, it returns TailState::Open(Lsn::OLDEST). /// This should never return Err(Error::LogSealed). Sealed state is represented as /// TailState::Sealed(..) - pub async fn find_tail( - &self, - log_id: LogId, - attributes: FindTailAttributes, - ) -> Result { + pub async fn find_tail(&self, log_id: LogId) -> Result { self.inner.fail_if_shutting_down()?; - Ok(self.inner.find_tail(log_id, attributes).await?.1) + Ok(self.inner.find_tail(log_id).await?.1) } // Get the loglet currently serving the tail of the chain, for use in integration tests. #[cfg(any(test, feature = "test-util"))] - pub async fn find_tail_loglet( - &self, - log_id: LogId, - attributes: FindTailAttributes, - ) -> Result> { + pub async fn find_tail_loglet(&self, log_id: LogId) -> Result> { self.inner.fail_if_shutting_down()?; - Ok(self - .inner - .find_tail(log_id, attributes) - .await? - .0 - .inner() - .clone()) + Ok(self.inner.find_tail(log_id).await?.0.inner().clone()) } /// The lsn of the slot **before** the first readable record (if it exists), or the offset @@ -254,9 +240,7 @@ impl Bifrost { self.inner.fail_if_shutting_down()?; - let current_tail = self - .find_tail(log_id, FindTailAttributes::default()) - .await?; + let current_tail = self.find_tail(log_id).await?; if current_tail.offset() <= Lsn::OLDEST { return Ok(Vec::default()); @@ -330,9 +314,7 @@ impl BifrostInner { from: Lsn, ) -> Result> { use futures::StreamExt; - let (_, tail_state) = self - .find_tail(log_id, FindTailAttributes::default()) - .await?; + let (_, tail_state) = self.find_tail(log_id).await?; if from >= tail_state.offset() { // Can't use this function to read future records. return Ok(None); @@ -349,11 +331,7 @@ impl BifrostInner { stream.next().await.transpose() } - pub async fn find_tail( - &self, - log_id: LogId, - _attributes: FindTailAttributes, - ) -> Result<(LogletWrapper, TailState)> { + pub async fn find_tail(&self, log_id: LogId) -> Result<(LogletWrapper, TailState)> { let loglet = self.writeable_loglet(log_id).await?; let tail = loglet.find_tail().await?; Ok((loglet, tail)) @@ -606,9 +584,7 @@ mod tests { assert_eq!(max_lsn + Lsn::from(1), lsn); max_lsn = lsn; - let tail = bifrost - .find_tail(LogId::new(0), FindTailAttributes::default()) - .await?; + let tail = bifrost.find_tail(LogId::new(0)).await?; assert_eq!(max_lsn.next(), tail.offset()); // Initiate shutdown @@ -664,13 +640,7 @@ mod tests { &node_env.metadata_store_client, ); - assert_eq!( - Lsn::OLDEST, - bifrost - .find_tail(LOG_ID, FindTailAttributes::default()) - .await? - .offset() - ); + assert_eq!(Lsn::OLDEST, bifrost.find_tail(LOG_ID).await?.offset()); assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?); @@ -682,9 +652,7 @@ mod tests { bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?; - let tail = bifrost - .find_tail(LOG_ID, FindTailAttributes::default()) - .await?; + let tail = bifrost.find_tail(LOG_ID).await?; assert_eq!(tail.offset(), Lsn::from(11)); assert!(!tail.is_sealed()); assert_eq!(Lsn::from(5), bifrost.get_trim_point(LOG_ID).await?); @@ -706,13 +674,7 @@ mod tests { // trimming beyond the release point will fall back to the release point bifrost_admin.trim(LOG_ID, Lsn::MAX).await?; - assert_eq!( - Lsn::from(11), - bifrost - .find_tail(LOG_ID, FindTailAttributes::default()) - .await? - .offset() - ); + assert_eq!(Lsn::from(11), bifrost.find_tail(LOG_ID).await?.offset()); let new_trim_point = bifrost.get_trim_point(LOG_ID).await?; assert_eq!(Lsn::from(10), new_trim_point); @@ -765,9 +727,7 @@ mod tests { // not sealed, tail is what we expect assert_that!( - bifrost - .find_tail(LOG_ID, FindTailAttributes::default()) - .await?, + bifrost.find_tail(LOG_ID).await?, pat!(TailState::Open(eq(Lsn::new(6)))) ); @@ -784,9 +744,7 @@ mod tests { // sealed, tail is what we expect assert_that!( - bifrost - .find_tail(LOG_ID, FindTailAttributes::default()) - .await?, + bifrost.find_tail(LOG_ID).await?, pat!(TailState::Sealed(eq(Lsn::new(6)))) ); @@ -842,9 +800,7 @@ mod tests { // find_tail() on the underlying loglet returns (6) but for bifrost it should be (5) after // the new segment was created at tail of the chain with base_lsn=5 assert_that!( - bifrost - .find_tail(LOG_ID, FindTailAttributes::default()) - .await?, + bifrost.find_tail(LOG_ID).await?, pat!(TailState::Open(eq(Lsn::new(5)))) ); @@ -859,9 +815,7 @@ mod tests { // tail is now 8 and open. assert_that!( - bifrost - .find_tail(LOG_ID, FindTailAttributes::default()) - .await?, + bifrost.find_tail(LOG_ID).await?, pat!(TailState::Open(eq(Lsn::new(8)))) ); diff --git a/crates/bifrost/src/providers/replicated_loglet/loglet.rs b/crates/bifrost/src/providers/replicated_loglet/loglet.rs index ebba2c26a..5040c32c8 100644 --- a/crates/bifrost/src/providers/replicated_loglet/loglet.rs +++ b/crates/bifrost/src/providers/replicated_loglet/loglet.rs @@ -400,7 +400,6 @@ mod tests { sequencer: GenerationalNodeId::new(1, 1), replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()), nodeset: NodeSet::from_single(PlainNodeId::new(1)), - write_set: None, }; run_in_test_env(params, |env| async move { @@ -438,7 +437,6 @@ mod tests { sequencer: GenerationalNodeId::new(1, 1), replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()), nodeset: NodeSet::from_single(PlainNodeId::new(1)), - write_set: None, }; run_in_test_env(params, |env| async move { @@ -480,7 +478,6 @@ mod tests { sequencer: GenerationalNodeId::new(1, 1), replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()), nodeset: NodeSet::from_single(PlainNodeId::new(1)), - write_set: None, }; run_in_test_env(params, |env| { crate::loglet::loglet_tests::single_loglet_readstream(env.loglet) @@ -496,7 +493,6 @@ mod tests { sequencer: GenerationalNodeId::new(1, 1), replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()), nodeset: NodeSet::from_single(PlainNodeId::new(1)), - write_set: None, }; run_in_test_env(params, |env| { crate::loglet::loglet_tests::append_after_seal(env.loglet) @@ -512,7 +508,6 @@ mod tests { sequencer: GenerationalNodeId::new(1, 1), replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()), nodeset: NodeSet::from_single(PlainNodeId::new(1)), - write_set: None, }; run_in_test_env(params, |env| { crate::loglet::loglet_tests::append_after_seal_concurrent(env.loglet) @@ -528,7 +523,6 @@ mod tests { sequencer: GenerationalNodeId::new(1, 1), replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()), nodeset: NodeSet::from_single(PlainNodeId::new(1)), - write_set: None, }; run_in_test_env(params, |env| { crate::loglet::loglet_tests::seal_empty(env.loglet) diff --git a/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs b/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs index 2c486da84..05926df86 100644 --- a/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs +++ b/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs @@ -583,7 +583,6 @@ mod test { nodeset: NodeSet::empty(), replication: ReplicationProperty::new(1.try_into().unwrap()), sequencer: GenerationalNodeId::new(1, 1), - write_set: None, }; let known_global_tail = TailOffsetWatch::new(TailState::Open(LogletOffset::OLDEST)); let remote_sequencer = RemoteSequencer::new( diff --git a/crates/bifrost/src/read_stream.rs b/crates/bifrost/src/read_stream.rs index aa974e4b3..1d895b716 100644 --- a/crates/bifrost/src/read_stream.rs +++ b/crates/bifrost/src/read_stream.rs @@ -455,7 +455,7 @@ mod tests { use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; use restate_types::Versioned; - use crate::{setup_panic_handler, BifrostAdmin, BifrostService, FindTailAttributes}; + use crate::{setup_panic_handler, BifrostAdmin, BifrostService}; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[traced_test] @@ -482,9 +482,7 @@ mod tests { let mut reader = bifrost.create_reader(LOG_ID, KeyFilter::Any, read_from, Lsn::MAX)?; let mut appender = bifrost.create_appender(LOG_ID)?; - let tail = bifrost - .find_tail(LOG_ID, FindTailAttributes::default()) - .await?; + let tail = bifrost.find_tail(LOG_ID).await?; // no records have been written assert!(!tail.is_sealed()); assert_eq!(Lsn::OLDEST, tail.offset()); @@ -585,13 +583,7 @@ mod tests { // [1..5] trimmed. trim_point = 5 bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?; - assert_eq!( - Lsn::from(11), - bifrost - .find_tail(LOG_ID, FindTailAttributes::default()) - .await? - .offset(), - ); + assert_eq!(Lsn::from(11), bifrost.find_tail(LOG_ID).await?.offset()); assert_eq!(Lsn::from(5), bifrost.get_trim_point(LOG_ID).await?); let mut read_stream = @@ -608,10 +600,7 @@ mod tests { assert!(!read_stream.is_terminated()); assert_eq!(Lsn::from(8), read_stream.read_pointer()); - let tail = bifrost - .find_tail(LOG_ID, FindTailAttributes::default()) - .await? - .offset(); + let tail = bifrost.find_tail(LOG_ID).await?.offset(); // trimming beyond the release point will fall back to the release point bifrost_admin.trim(LOG_ID, Lsn::from(u64::MAX)).await?; let trim_point = bifrost.get_trim_point(LOG_ID).await?; @@ -687,9 +676,7 @@ mod tests { pat!(Poll::Pending) ); - let tail = bifrost - .find_tail(LOG_ID, FindTailAttributes::default()) - .await?; + let tail = bifrost.find_tail(LOG_ID).await?; // no records have been written assert!(!tail.is_sealed()); assert_eq!(Lsn::OLDEST, tail.offset()); @@ -751,9 +738,7 @@ mod tests { pat!(Poll::Pending) ); - let tail = bifrost - .find_tail(LOG_ID, FindTailAttributes::default()) - .await?; + let tail = bifrost.find_tail(LOG_ID).await?; assert!(tail.is_sealed()); assert_eq!(Lsn::from(11), tail.offset()); @@ -856,9 +841,7 @@ mod tests { let mut appender = bifrost.create_appender(LOG_ID)?; - let tail = bifrost - .find_tail(LOG_ID, FindTailAttributes::default()) - .await?; + let tail = bifrost.find_tail(LOG_ID).await?; // no records have been written assert!(!tail.is_sealed()); assert_eq!(Lsn::OLDEST, tail.offset()); @@ -881,9 +864,7 @@ mod tests { ) .await?; - let tail = bifrost - .find_tail(LOG_ID, FindTailAttributes::default()) - .await?; + let tail = bifrost.find_tail(LOG_ID).await?; assert!(!tail.is_sealed()); assert_eq!(Lsn::from(11), tail.offset()); diff --git a/crates/bifrost/src/types.rs b/crates/bifrost/src/types.rs index 616b38e1f..9a4568be0 100644 --- a/crates/bifrost/src/types.rs +++ b/crates/bifrost/src/types.rs @@ -71,13 +71,6 @@ where impl LsnExt for Lsn {} -#[derive(Debug, Clone, Default)] -pub struct FindTailAttributes { - // Ensure that we are reading the most recent metadata. This should be used when - // linearizable metadata reads are required. - // TODO: consistent_read: bool, -} - /// A future that resolves to the Lsn of the last Lsn in a committed batch. /// /// Note: dropping this future doesn't cancel or stop the underlying enqueued append. diff --git a/crates/types/src/replicated_loglet/params.rs b/crates/types/src/replicated_loglet/params.rs index d52ffb92d..9445c8b6d 100644 --- a/crates/types/src/replicated_loglet/params.rs +++ b/crates/types/src/replicated_loglet/params.rs @@ -33,13 +33,6 @@ pub struct ReplicatedLogletParams { /// Replication properties of this loglet pub replication: ReplicationProperty, pub nodeset: NodeSet, - /// The set of nodes the sequencer has been considering for writes after the last - /// known_global_tail advance. - /// - /// If unset, the entire nodeset is considered as part of the write set - /// If set, tail repair will attempt reading only from this set. - #[serde(skip_serializing_if = "Option::is_none")] - pub write_set: Option, } impl ReplicatedLogletParams { diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 289d56f67..a72e729f3 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -21,7 +21,7 @@ use tokio::sync::{mpsc, oneshot, watch}; use tokio::time::MissedTickBehavior; use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span}; -use restate_bifrost::{Bifrost, FindTailAttributes}; +use restate_bifrost::Bifrost; use restate_core::network::{HasConnection, Incoming, Outgoing}; use restate_core::{cancellation_watcher, metadata, TaskCenter, TaskHandle, TaskKind}; use restate_partition_store::{PartitionStore, PartitionStoreTransaction}; @@ -295,10 +295,7 @@ where // propagate errors and let the PPM handle error retries let current_tail = self .bifrost - .find_tail( - LogId::from(self.partition_id), - FindTailAttributes::default(), - ) + .find_tail(LogId::from(self.partition_id)) .await?; debug!( diff --git a/server/tests/common/replicated_loglet.rs b/server/tests/common/replicated_loglet.rs index c1aece5f2..d58d168d1 100644 --- a/server/tests/common/replicated_loglet.rs +++ b/server/tests/common/replicated_loglet.rs @@ -5,7 +5,7 @@ use enumset::{enum_set, EnumSet}; use googletest::internal::test_outcome::TestAssertionFailure; use googletest::IntoTestResult; -use restate_bifrost::{loglet::Loglet, Bifrost, BifrostAdmin, FindTailAttributes}; +use restate_bifrost::{loglet::Loglet, Bifrost, BifrostAdmin}; use restate_core::metadata_store::Precondition; use restate_core::{metadata_store::MetadataStoreClient, MetadataWriter, TaskCenterBuilder}; use restate_local_cluster_runner::{ @@ -68,9 +68,7 @@ async fn replicated_loglet_client( node.start().await.into_test_result()?; - let loglet = bifrost - .find_tail_loglet(LogId::MIN, FindTailAttributes::default()) - .await?; + let loglet = bifrost.find_tail_loglet(LogId::MIN).await?; Ok((bifrost, loglet, metadata_writer, metadata_store_client)) } @@ -141,7 +139,6 @@ where replication, // node 1 is the metadata, 2..=count+1 are logservers nodeset: (2..=log_server_count + 1).collect(), - write_set: None, }; let loglet_params = loglet_params.serialize()?; diff --git a/tools/restatectl/src/commands/log/dump_log.rs b/tools/restatectl/src/commands/log/dump_log.rs index 90c78ca7a..61feee102 100644 --- a/tools/restatectl/src/commands/log/dump_log.rs +++ b/tools/restatectl/src/commands/log/dump_log.rs @@ -16,7 +16,7 @@ use cling::prelude::*; use futures_util::StreamExt; use tracing::{debug, info}; -use restate_bifrost::{BifrostService, FindTailAttributes}; +use restate_bifrost::BifrostService; use restate_core::network::MessageRouterBuilder; use restate_core::{MetadataBuilder, MetadataManager, TaskKind}; use restate_rocksdb::RocksDbManager; @@ -107,9 +107,7 @@ async fn dump_log(opts: &DumpLogOpts) -> anyhow::Result<()> { let log_id = LogId::from(opts.log_id); debug!("Finding log tail"); - let tail = bifrost - .find_tail(log_id, FindTailAttributes::default()) - .await?; + let tail = bifrost.find_tail(log_id).await?; debug!("Log tail is {:?}", tail); let trim_point = bifrost.get_trim_point(log_id).await?; debug!("Trim point is {:?}", trim_point); diff --git a/tools/restatectl/src/commands/log/gen_metadata.rs b/tools/restatectl/src/commands/log/gen_metadata.rs index 8d363be5b..6d5a11443 100644 --- a/tools/restatectl/src/commands/log/gen_metadata.rs +++ b/tools/restatectl/src/commands/log/gen_metadata.rs @@ -53,7 +53,6 @@ async fn generate_log_metadata(opts: &GenerateLogMetadataOpts) -> anyhow::Result sequencer: opts.sequencer, replication: ReplicationProperty::new(opts.replication_factor), nodeset: NodeSet::from_iter(opts.nodeset.clone()), - write_set: None, }; let params = LogletParams::from(loglet_params.serialize()?); diff --git a/tools/restatectl/src/commands/log/reconfigure.rs b/tools/restatectl/src/commands/log/reconfigure.rs index f4d186439..883398655 100644 --- a/tools/restatectl/src/commands/log/reconfigure.rs +++ b/tools/restatectl/src/commands/log/reconfigure.rs @@ -170,7 +170,6 @@ async fn replicated_loglet_params( .map(ReplicationProperty::new) .unwrap_or(last_params.replication.clone()), sequencer: opts.sequencer.unwrap_or(last_params.sequencer), - write_set: None, } } else { ReplicatedLogletParams { @@ -184,7 +183,6 @@ async fn replicated_loglet_params( opts.replication_factor_nodes.context("Missing replication-factor. Replication factor is required if last segment is not of replicated type")?, ), sequencer: opts.sequencer.context("Missing sequencer. Sequencer is required if last segment is not of replicated type")?, - write_set: None, } };