diff --git a/crates/fluvio/src/consumer/mod.rs b/crates/fluvio/src/consumer/mod.rs index 336f7557b5..7c392ed039 100644 --- a/crates/fluvio/src/consumer/mod.rs +++ b/crates/fluvio/src/consumer/mod.rs @@ -34,9 +34,12 @@ use crate::spu::{SpuDirectory, SpuSocketPool}; pub use config::{ConsumerConfig, ConsumerConfigBuilder}; pub use config::{ConsumerConfigExt, ConsumerConfigExtBuilder, OffsetManagementStrategy, RetryMode}; -pub use stream::{ConsumerStream, MultiplePartitionConsumerStream, SinglePartitionConsumerStream}; +pub use stream::{ + ConsumerStream, MultiplePartitionConsumerStream, SinglePartitionConsumerStream, + ConsumerBoxFuture, +}; pub use offset::ConsumerOffset; -pub use retry::ConsumerWithRetry; +pub use retry::ConsumerRetryStream; pub use fluvio_protocol::record::ConsumerRecord as Record; pub use fluvio_spu_schema::server::smartmodule::SmartModuleInvocation; diff --git a/crates/fluvio/src/consumer/retry.rs b/crates/fluvio/src/consumer/retry.rs index 3d54ef2ae1..96357419b1 100644 --- a/crates/fluvio/src/consumer/retry.rs +++ b/crates/fluvio/src/consumer/retry.rs @@ -8,7 +8,6 @@ use anyhow::Result; use adaptive_backoff::prelude::{ Backoff, BackoffBuilder, ExponentialBackoff, ExponentialBackoffBuilder, }; -use futures_util::future::BoxFuture; use futures_util::Stream; use futures_util::StreamExt; use tokio::sync::Notify; @@ -20,14 +19,14 @@ use fluvio_sc_schema::errors::ErrorCode; use crate::consumer::RetryMode; use crate::{Fluvio, FluvioConfig, Offset}; -use super::{ConsumerConfigExt, ConsumerStream}; +use super::{ConsumerConfigExt, ConsumerStream, ConsumerBoxFuture}; pub const SPAN_RETRY: &str = "fluvio::retry"; pub const BACKOFF_MIN_DURATION: Duration = Duration::from_secs(1); pub const BACKOFF_MAX_DURATION: Duration = Duration::from_secs(30); pub const BACKOFF_FACTOR: f64 = 1.1; -/// Type alias for a consumer stream with conditional `Send` support. +/// Type alias for the consumer record stream. #[cfg(target_arch = "wasm32")] type BoxConsumerStream = Pin> + 'static>>; @@ -35,125 +34,138 @@ type BoxConsumerStream = type BoxConsumerStream = Pin> + Send + 'static>>; -/// Type alias for a pending future with conditional `Send` support. +/// Type alias for the future returned by our retry logic. #[cfg(target_arch = "wasm32")] -type BoxPendingFuture = Pin< +type BoxConsumerFuture = Pin< Box< dyn Future< - Output = Option< - Result<(ConsumerRecord, BoxConsumerStream, Option), ErrorCode>, - >, + Output = ( + BoxConsumerStream, + Option), ErrorCode>>, + ), > + 'static, >, >; #[cfg(not(target_arch = "wasm32"))] -type BoxPendingFuture = Pin< +type BoxConsumerFuture = Pin< Box< dyn Future< - Output = Option< - Result<(ConsumerRecord, BoxConsumerStream, Option), ErrorCode>, - >, + Output = ( + BoxConsumerStream, + Option), ErrorCode>>, + ), > + Send + 'static, >, >; #[derive(Clone)] -pub struct ConsumerWithRetryInner { - fluvio_client: Arc, +pub struct ConsumerRetryInner { + fluvio_config: FluvioConfig, next_offset_to_read: Option, consumer_config: ConsumerConfigExt, } /// The internal state of our consumer. -enum ConsumerState { - /// An active stream is available. - Current(BoxConsumerStream), - /// A pending future is running (e.g. while reconnecting). - Pending(BoxPendingFuture), +enum ConsumerRetryState { + /// The stream is idle and ready to consume. + Idle, + /// The stream is currently processing a task. + Task(BoxConsumerFuture), /// The stream has terminated. Terminated, } /// A consumer stream that automatically retries on failure. /// -/// This version uses a state machine plus a notification primitive to signal -/// when the state changes. (See offset_flush and offset_commit below.) -pub struct ConsumerWithRetry { - inner: ConsumerWithRetryInner, - state: ConsumerState, +/// In this refactored version we remove the mutex by taking ownership of +/// the consumer stream whenever we start a new retry task. When the task finishes, +/// the stream is returned. +pub struct ConsumerRetryStream { + inner: ConsumerRetryInner, + state: ConsumerRetryState, + /// The consumer stream is stored directly (inside an Option for ownership transfer). + stream: Option, notify: Arc, } -impl ConsumerWithRetry { - fn change_state(&mut self, new_state: ConsumerState) { +impl ConsumerRetryStream { + fn change_state(&mut self, new_state: ConsumerRetryState) { self.state = new_state; self.notify.notify_one(); } + + fn set_idle(&mut self) { + self.change_state(ConsumerRetryState::Idle); + self.notify.notify_one(); + } + + fn set_terminated(&mut self) { + self.change_state(ConsumerRetryState::Terminated); + self.notify.notify_one(); + } + + fn set_task(&mut self, task: BoxConsumerFuture) { + self.change_state(ConsumerRetryState::Task(task)); + } } -impl Stream for ConsumerWithRetry { +impl Stream for ConsumerRetryStream { type Item = Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // We use a loop so that after a state transition we immediately poll again. - let this = self.get_mut(); + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { - match &mut this.state { - ConsumerState::Terminated => return Poll::Ready(None), - ConsumerState::Current(_) => { - // Transition into Pending so we can take ownership of the stream. - let stream = match std::mem::replace(&mut this.state, ConsumerState::Terminated) - { - ConsumerState::Current(s) => s, - _ => unreachable!(), - }; - - let client = Arc::clone(&this.inner.fluvio_client); - let new_state = ConsumerState::Pending(Box::pin(Self::consumer_with_retry( - this.inner.clone(), - client, - stream, - ))); - - this.change_state(new_state); - // Loop to poll the new pending future. + match &mut self.state { + ConsumerRetryState::Terminated => return Poll::Ready(None), + ConsumerRetryState::Idle => { + // Take ownership of the stream and start a new retry task. + if let Some(stream) = self.stream.take() { + let future = Self::consumer_with_retry(self.inner.clone(), stream); + self.set_task(Box::pin(future)); + } else { + // If the stream is missing, treat as terminated. + self.set_terminated(); + return Poll::Ready(None); + } } - ConsumerState::Pending(pending) => { - match pending.as_mut().poll(cx) { - Poll::Ready(Some(Ok((record, new_stream, new_offset)))) => { - this.inner.next_offset_to_read = new_offset; - this.change_state(ConsumerState::Current(new_stream)); - return Poll::Ready(Some(Ok(record))); - } - Poll::Ready(Some(Err(e))) => { - // Transition back to Idle with a dummy stream. - this.notify.notify_one(); - return Poll::Ready(Some(Err(e))); - } - Poll::Ready(None) => { - this.change_state(ConsumerState::Terminated); - return Poll::Ready(None); + ConsumerRetryState::Task(fut) => match fut.as_mut().poll(cx) { + Poll::Ready((new_stream, opt_result)) => { + self.stream = Some(new_stream); + self.set_idle(); + match opt_result { + Some(Ok((record, new_offset))) => { + self.inner.next_offset_to_read = new_offset; + return Poll::Ready(Some(Ok(record))); + } + Some(Err(e)) => { + self.notify.notify_one(); + return Poll::Ready(Some(Err(e))); + } + None => { + self.set_terminated(); + return Poll::Ready(None); + } } - Poll::Pending => return Poll::Pending, } - } + Poll::Pending => return Poll::Pending, + }, } } } } -impl ConsumerStream for ConsumerWithRetry { - #[cfg(not(target_arch = "wasm32"))] - fn offset_commit(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>> { +impl ConsumerStream for ConsumerRetryStream { + fn offset_commit(&mut self) -> ConsumerBoxFuture { let notify = self.notify.clone(); Box::pin(async move { loop { match self.state { - ConsumerState::Current(ref mut stream) => return stream.offset_commit().await, - ConsumerState::Terminated => { - warn!("offset_commit called but stream is terminated"); - return Ok(()); + ConsumerRetryState::Idle | ConsumerRetryState::Terminated => { + if let Some(ref mut stream) = self.stream { + return stream.offset_commit().await; + } else { + return Err(ErrorCode::Other("Stream not available".to_string())); + } } _ => { notify.notified().await; @@ -163,35 +175,17 @@ impl ConsumerStream for ConsumerWithRetry { }) } - #[cfg(target_arch = "wasm32")] - fn offset_commit(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>> { - loop { - match self.state { - ConsumerState::Current(ref mut stream) => return stream.offset_commit(), - ConsumerState::Terminated => { - warn!("offset_commit called but stream is terminated"); - return Box::pin(async { Ok(()) }); - } - _ => { - let notify = self.notify.clone(); - fluvio_future::task::run_block_on(async move { - notify.notified().await; - }); - } - } - } - } - - #[cfg(not(target_arch = "wasm32"))] - fn offset_flush(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>> { + fn offset_flush(&mut self) -> ConsumerBoxFuture { let notify = self.notify.clone(); Box::pin(async move { loop { match self.state { - ConsumerState::Current(ref mut stream) => return stream.offset_flush().await, - ConsumerState::Terminated => { - warn!("offset_flush called but stream is terminated"); - return Ok(()); + ConsumerRetryState::Idle | ConsumerRetryState::Terminated => { + if let Some(ref mut stream) = self.stream { + return stream.offset_flush().await; + } else { + return Err(ErrorCode::Other("Stream not available".to_string())); + } } _ => { notify.notified().await; @@ -200,41 +194,23 @@ impl ConsumerStream for ConsumerWithRetry { } }) } - - #[cfg(target_arch = "wasm32")] - fn offset_flush(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>> { - loop { - match self.state { - ConsumerState::Current(ref mut stream) => return stream.offset_flush(), - ConsumerState::Terminated => { - warn!("offset_flush called but stream is terminated"); - return Box::pin(async { Ok(()) }); - } - _ => { - let notify = self.notify.clone(); - fluvio_future::task::run_block_on(async move { - notify.notified().await; - }); - } - } - } - } } -impl ConsumerWithRetry { - /// Creates a new `ConsumerWithRetry` instance. +impl ConsumerRetryStream { + /// Creates a new `ConsumerRetryStream` with the given configuration. pub async fn new(fluvio_config: FluvioConfig, config: ConsumerConfigExt) -> Result { let fluvio = Fluvio::connect_with_config(&fluvio_config).await?; let stream = fluvio.consumer_with_config_inner(config.clone()).await?; let boxed_stream: BoxConsumerStream = Box::pin(stream); Ok(Self { - inner: ConsumerWithRetryInner { - fluvio_client: Arc::new(fluvio), + inner: ConsumerRetryInner { + fluvio_config, next_offset_to_read: None, consumer_config: config, }, - state: ConsumerState::Current(boxed_stream), + state: ConsumerRetryState::Idle, + stream: Some(boxed_stream), notify: Arc::new(Notify::new()), }) } @@ -245,33 +221,38 @@ impl ConsumerWithRetry { /// reconnects. When a record is successfully produced, the new stream and /// updated offset are returned. async fn consumer_with_retry( - inner: ConsumerWithRetryInner, - fluvio_client: Arc, - mut current_stream: BoxConsumerStream, - ) -> Option), ErrorCode>> { + inner: ConsumerRetryInner, + mut stream: BoxConsumerStream, + ) -> ( + BoxConsumerStream, + Option), ErrorCode>>, + ) { let mut attempts: u32 = 0; let mut backoff = match create_backoff() { Ok(b) => b, Err(_) => { - return Some(Err(ErrorCode::Other("Error creating backoff".to_string()))); + return ( + stream, + Some(Err(ErrorCode::Other("Error creating backoff".to_string()))), + ); } }; loop { // Try to retrieve the next record. - if let Some(record_result) = current_stream.as_mut().next().await { + if let Some(record_result) = stream.as_mut().next().await { match record_result { Ok(record) => { let new_offset = Some(record.offset + 1); if attempts > 0 { debug!(target: SPAN_RETRY, "Record produced successfully after reconnect"); } - return Some(Ok((record, current_stream, new_offset))); + return (stream, Some(Ok((record, new_offset)))); } Err(e) => { warn!(target: SPAN_RETRY, "Error consuming record: {}", e); if let RetryMode::Disabled = inner.consumer_config.retry_mode { - return Some(Err(e)); + return (stream, Some(Err(e))); } } } @@ -279,7 +260,7 @@ impl ConsumerWithRetry { // If continuous consumption is disabled, end the stream. if inner.consumer_config.disable_continuous { - return None; + return (stream, None); } // Wait before retrying. @@ -292,7 +273,7 @@ impl ConsumerWithRetry { Ok(off) => off, Err(e) => { warn!(target: SPAN_RETRY, "Error creating offset: {}", e); - return Some(Err(ErrorCode::OffsetOutOfRange)); + return (stream, Some(Err(ErrorCode::OffsetOutOfRange))); } } } else { @@ -305,15 +286,9 @@ impl ConsumerWithRetry { // Reconnect loop: keep trying until a new stream is created. loop { - info!(target: SPAN_RETRY, "Reconnecting to stream"); - match fluvio_client - .consumer_with_config_inner(new_config.clone()) - .await - { + match Self::reconnect_stream(&inner, new_config.clone(), backoff.clone()).await { Ok(new_stream) => { - backoff.reset(); - current_stream = Box::pin(new_stream) as BoxConsumerStream; - info!(target: SPAN_RETRY, "Created new consumer stream with offset: {:?}", inner.next_offset_to_read); + stream = new_stream; break; } Err(e) => { @@ -323,10 +298,10 @@ impl ConsumerWithRetry { match inner.consumer_config.retry_mode { RetryMode::TryUntil(max) if attempts >= max => { - return Some(Err(ErrorCode::MaxRetryReached)); + return (stream, Some(Err(ErrorCode::MaxRetryReached))); } RetryMode::Disabled => { - return Some(Err(ErrorCode::Other(format!("{}", e)))); + return (stream, Some(Err(ErrorCode::Other(format!("{}", e))))); } _ => {} // Continue retrying. } @@ -335,6 +310,23 @@ impl ConsumerWithRetry { } } } + + async fn reconnect_stream( + inner: &ConsumerRetryInner, + new_config: ConsumerConfigExt, + mut backoff: ExponentialBackoff, + ) -> Result { + info!(target: SPAN_RETRY, "Reconnecting to stream"); + let fluvio_client = Fluvio::connect_with_config(&inner.fluvio_config).await?; + + let new_stream = fluvio_client + .consumer_with_config_inner(new_config.clone()) + .await?; + + backoff.reset(); + info!(target: SPAN_RETRY, "Created new consumer stream with offset: {:?}", inner.next_offset_to_read); + Ok(Box::pin(new_stream)) + } } /// Creates an exponential backoff configuration. @@ -353,3 +345,121 @@ async fn backoff_and_wait(backoff: &mut ExponentialBackoff) { let _ = sleep(wait_duration).await; debug!(target: SPAN_RETRY, "Resuming after backoff"); } + +#[cfg(test)] +mod tests { + use std::vec::IntoIter; + + use fluvio_protocol::record::Batch; + use fluvio_smartmodule::RecordData; + use fluvio_types::PartitionId; + use futures_util::{stream::Iter, StreamExt}; + + use crate::consumer::{ + MultiplePartitionConsumerStream, OffsetManagementStrategy, SinglePartitionConsumerStream, + StreamToServer, + }; + + use super::*; + + #[fluvio_future::test] + async fn test_retry_stream() { + //given + let (tx1, rx1) = async_channel::unbounded(); + let partition_stream1 = SinglePartitionConsumerStream::new( + records_stream(0, ["1", "3", "5"]), + OffsetManagementStrategy::Manual, + Default::default(), + tx1, + ); + let (tx2, rx2) = async_channel::unbounded(); + let partition_stream2 = SinglePartitionConsumerStream::new( + records_stream(1, ["2", "4", "6"]), + OffsetManagementStrategy::Manual, + Default::default(), + tx2, + ); + let multi_stream = + MultiplePartitionConsumerStream::new([partition_stream1, partition_stream2]); + + let mut retry_stream = ConsumerRetryStream { + inner: ConsumerRetryInner { + fluvio_config: FluvioConfig::new("localhost:9003".to_string()), + next_offset_to_read: None, + consumer_config: ConsumerConfigExt::builder() + .topic("test_topic".to_string()) + .offset_start(Offset::beginning()) + .disable_continuous(true) + .offset_strategy(OffsetManagementStrategy::Manual) + .offset_consumer("test_consumer".to_string()) + .build() + .expect("no error"), + }, + state: ConsumerRetryState::Idle, + stream: Some(Box::pin(multi_stream)), + notify: Arc::new(Notify::new()), + }; + + //when + let mut result = vec![]; + assert!(matches!(retry_stream.state, ConsumerRetryState::Idle)); + let next = retry_stream.next().await.unwrap().unwrap(); + result.push(next); + + //then + assert!(matches!(retry_stream.state, ConsumerRetryState::Idle)); + while let Some(r) = retry_stream.next().await { + result.push(r.unwrap()); + } + + assert_eq!( + result + .iter() + .map(|r| String::from_utf8_lossy(r.as_ref()).to_string()) + .collect::>(), + ["1", "2", "3", "4", "5", "6"] + ); + assert!(matches!(retry_stream.state, ConsumerRetryState::Terminated)); + + retry_stream.offset_commit().await.unwrap(); + fluvio_future::task::spawn(async move { + let message = rx1.recv().await; + if let Ok(StreamToServer::FlushManagedOffset { + offset: _, + callback, + }) = message + { + callback.send(ErrorCode::None).await; + } + }); + fluvio_future::task::spawn(async move { + let message = rx2.recv().await; + if let Ok(StreamToServer::FlushManagedOffset { + callback, + offset: _, + }) = message + { + callback.send(ErrorCode::None).await; + } + }); + + assert!(retry_stream.offset_flush().await.is_ok()) + } + + fn records_stream( + partition: PartitionId, + input: impl IntoIterator, + ) -> Iter>> { + let mut records: Vec<_> = input + .into_iter() + .map(|item| fluvio_protocol::record::Record::new(RecordData::from(item.as_bytes()))) + .collect(); + let mut batch = Batch::default(); + batch.add_records(&mut records); + let consumer_records: Vec<_> = batch + .into_consumer_records_iter(partition) + .map(Ok) + .collect(); + futures_util::stream::iter(consumer_records) + } +} diff --git a/crates/fluvio/src/consumer/stream.rs b/crates/fluvio/src/consumer/stream.rs index 1eda80128c..b95e6fcf92 100644 --- a/crates/fluvio/src/consumer/stream.rs +++ b/crates/fluvio/src/consumer/stream.rs @@ -4,7 +4,6 @@ use std::time::{Duration, SystemTime}; use async_channel::Sender; use fluvio_protocol::{link::ErrorCode, record::ConsumerRecord as Record}; -use futures_util::future::BoxFuture; use futures_util::stream::select_all; use futures_util::{future::try_join_all, ready, FutureExt}; use futures_util::Stream; @@ -13,14 +12,20 @@ use tracing::{info, warn}; use super::config::OffsetManagementStrategy; use super::{offset::OffsetLocalStore, StreamToServer}; +#[cfg(not(target_arch = "wasm32"))] +pub type ConsumerBoxFuture<'a> = futures_util::future::BoxFuture<'a, Result<(), ErrorCode>>; + +#[cfg(target_arch = "wasm32")] +pub type ConsumerBoxFuture<'a> = futures_util::future::LocalBoxFuture<'a, Result<(), ErrorCode>>; + /// Extension of [`Stream`] trait with offset management capabilities. pub trait ConsumerStream: Stream> + Unpin { /// Mark the offset of the last yelded record as committed. Depending on [`OffsetManagementStrategy`] /// it may require a subsequent `offset_flush()` call to take any effect. - fn offset_commit(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>>; + fn offset_commit(&mut self) -> ConsumerBoxFuture<'_>; /// Send the committed offset to the server. The method waits for the server's acknowledgment before it finishes. - fn offset_flush(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>>; + fn offset_flush(&mut self) -> ConsumerBoxFuture<'_>; } pub struct MultiplePartitionConsumerStream { @@ -114,11 +119,11 @@ impl ConsumerStream for futures_util::stream::TakeUntil BoxFuture<'_, Result<(), ErrorCode>> { + fn offset_commit(&mut self) -> ConsumerBoxFuture { self.get_mut().offset_commit() } - fn offset_flush(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>> { + fn offset_flush(&mut self) -> ConsumerBoxFuture { self.get_mut().offset_flush() } } @@ -126,11 +131,11 @@ where impl> + Unpin> ConsumerStream for SinglePartitionConsumerStream { - fn offset_commit(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>> { + fn offset_commit(&mut self) -> ConsumerBoxFuture { Box::pin(async { self.offset_mngt.commit() }) } - fn offset_flush(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>> { + fn offset_flush(&mut self) -> ConsumerBoxFuture { Box::pin(self.offset_mngt.flush()) } } @@ -138,7 +143,7 @@ impl> + Unpin> ConsumerStream impl> + Unpin> ConsumerStream for MultiplePartitionConsumerStream { - fn offset_commit(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>> { + fn offset_commit(&mut self) -> ConsumerBoxFuture { for partition in &self.offset_mgnts { if let Err(err) = partition.commit() { return Box::pin(async { Err(err) }); @@ -148,7 +153,7 @@ impl> + Unpin> ConsumerStream Box::pin(async { Ok(()) }) } - fn offset_flush(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>> { + fn offset_flush(&mut self) -> ConsumerBoxFuture { let futures: Vec<_> = self.offset_mgnts.iter().map(|p| p.flush()).collect(); Box::pin(try_join_all(futures).map(|r| r.map(|_| ()))) } diff --git a/crates/fluvio/src/fluvio.rs b/crates/fluvio/src/fluvio.rs index eb650e821e..3e220aa147 100644 --- a/crates/fluvio/src/fluvio.rs +++ b/crates/fluvio/src/fluvio.rs @@ -18,7 +18,7 @@ use fluvio_socket::{ use crate::admin::FluvioAdmin; use crate::error::anyhow_version_error; use crate::consumer::{ - ConsumerConfigExt, ConsumerOffset, ConsumerStream, ConsumerWithRetry, + ConsumerConfigExt, ConsumerOffset, ConsumerStream, ConsumerRetryStream, MultiplePartitionConsumer, MultiplePartitionConsumerStream, PartitionSelectionStrategy, Record, }; use crate::metrics::ClientMetrics; @@ -339,7 +339,7 @@ impl Fluvio { ) -> Result< impl ConsumerStream>, > { - ConsumerWithRetry::new(self.fluvio_config.clone(), config).await + ConsumerRetryStream::new(self.fluvio_config.clone(), config).await } /// Creates a new [ConsumerStream] instance without retry logic.