Skip to content

Commit

Permalink
storage: track latest reported status and report on reconnect
Browse files Browse the repository at this point in the history
Before, we would only pump what the dataflows reported back to the
controller. Which would make it so that when a controller re-connects it
doesn't get status messages for the latest state.

Now, we track the latest status update, per object, and also what has
been reported, per object. And on re-connect we clear out what has been
reported.
  • Loading branch information
aljoscha committed Nov 20, 2024
1 parent ba3085f commit 3947a06
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 12 deletions.
4 changes: 2 additions & 2 deletions src/storage/src/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ pub fn build_ingestion_dataflow<A: Allocate>(
health_configs,
crate::healthcheck::DefaultWriter {
command_tx: Rc::clone(&storage_state.internal_cmd_tx),
updates: Rc::clone(&storage_state.object_status_updates),
updates: Rc::clone(&storage_state.shared_status_updates),
},
storage_state
.storage_configuration
Expand Down Expand Up @@ -474,7 +474,7 @@ pub fn build_export_dataflow<A: Allocate>(
health_configs,
crate::healthcheck::DefaultWriter {
command_tx: Rc::clone(&storage_state.internal_cmd_tx),
updates: Rc::clone(&storage_state.object_status_updates),
updates: Rc::clone(&storage_state.shared_status_updates),
},
storage_state
.storage_configuration
Expand Down
72 changes: 62 additions & 10 deletions src/storage/src/storage_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,9 @@ impl<'w, A: Allocate> Worker<'w, A> {
timely_worker.index(),
timely_worker.peers(),
),
object_status_updates: Default::default(),
shared_status_updates: Default::default(),
latest_status_updates: Default::default(),
reported_status_updates: Default::default(),
internal_cmd_tx: command_sequencer,
read_only_tx,
read_only_rx,
Expand Down Expand Up @@ -302,11 +304,20 @@ pub struct StorageState {
/// Statistics for sources and sinks.
pub aggregated_statistics: AggregatedStatistics,

/// Status updates reported by health operators.
/// A place shared with running dataflows, so that health operators, can
/// report status updates back to us.
///
/// **NOTE**: Operators that append to this collection should take care to only add new
/// status updates if the status of the ingestion/export in question has _changed_.
pub object_status_updates: Rc<RefCell<Vec<StatusUpdate>>>,
pub shared_status_updates: Rc<RefCell<Vec<StatusUpdate>>>,

/// The latest status update for each object.
pub latest_status_updates: BTreeMap<GlobalId, StatusUpdate>,

/// The latest status update that has been _reported_ back to the
/// controller. This will be reset when a new client connects, so that we
/// can determine what updates we have to report again.
pub reported_status_updates: BTreeMap<GlobalId, StatusUpdate>,

/// Sender for cluster-internal storage commands. These can be sent from
/// within workers/operators and will be distributed to all workers. For
Expand Down Expand Up @@ -450,13 +461,7 @@ impl<'w, A: Allocate> Worker<'w, A> {

self.report_frontier_progress(&response_tx);

// Report status updates if any are present
if self.storage_state.object_status_updates.borrow().len() > 0 {
self.send_storage_response(
&response_tx,
StorageResponse::StatusUpdates(self.storage_state.object_status_updates.take()),
);
}
self.report_status_updates(&response_tx);

if last_stats_time.elapsed() >= stats_interval {
self.report_storage_statistics(&response_tx);
Expand Down Expand Up @@ -811,6 +816,48 @@ impl<'w, A: Allocate> Worker<'w, A> {
}
}

/// Pumps latest status updates from the buffer shared with operators and
/// reports any updates that need reporting.
pub fn report_status_updates(&mut self, response_tx: &ResponseSender) {
// First, pump updates into our own state.
if self.storage_state.shared_status_updates.borrow().len() > 0 {
for shared_update in self.storage_state.shared_status_updates.take() {
let id = shared_update.id;
self.storage_state
.latest_status_updates
.insert(id, shared_update);
}
}

let mut to_report = Vec::new();

// Report status updates if any are present and differ from the latest
// update we have sent.
for (id, latest_update) in self.storage_state.latest_status_updates.iter() {
let reported_update = self.storage_state.reported_status_updates.get(id);
if let Some(reported_update) = reported_update {
if reported_update == latest_update {
continue;
}
}

// We either didn't report an update yet or already reported the
// latest one.
to_report.push(latest_update.clone());
}

// Store what we're about to report for the future.
for reported_update in &to_report {
self.storage_state
.reported_status_updates
.insert(reported_update.id.clone(), reported_update.clone());
}

if to_report.len() > 0 {
self.send_storage_response(response_tx, StorageResponse::StatusUpdates(to_report));
}
}

/// Report source statistics back to the controller.
pub fn report_storage_statistics(&mut self, response_tx: &ResponseSender) {
let (sources, sinks) = self.storage_state.aggregated_statistics.emit_local();
Expand Down Expand Up @@ -1076,6 +1123,9 @@ impl<'w, A: Allocate> Worker<'w, A> {
*frontier = Antichain::from_elem(<_>::minimum());
}

// Reset the reported status updates for the remaining objects.
self.storage_state.reported_status_updates.clear();

// Execute the modified commands.
for command in commands {
self.storage_state.handle_storage_command(command);
Expand Down Expand Up @@ -1216,6 +1266,8 @@ impl StorageState {
self.ingestions.remove(&id);
self.exports.remove(&id);

let _ = self.reported_status_updates.remove(&id);

// This will stop reporting of frontiers.
//
// If this object still has its frontiers reported, we will notify the
Expand Down

0 comments on commit 3947a06

Please sign in to comment.