Skip to content

Commit

Permalink
Let PartitionProcessors failures propagate to the PartitionProcessorM…
Browse files Browse the repository at this point in the history
…anager

Summary:
When PP crashes (returns an error) we should update it's status instead
of panicing

Fixes #2147
  • Loading branch information
muhamadazmy committed Nov 7, 2024
1 parent 384fd8d commit 67bae96
Showing 1 changed file with 29 additions and 1 deletion.
30 changes: 29 additions & 1 deletion crates/worker/src/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,10 @@ impl MultiplexedInvokerStatusReader {
fn push(&mut self, key_range: RangeInclusive<PartitionKey>, reader: ChannelStatusReader) {
self.readers.write().push((key_range, reader));
}

fn remove(&mut self, key_range: &RangeInclusive<PartitionKey>) {
self.readers.write().retain(|elem| &elem.0 != key_range);
}
}

impl StatusHandle for MultiplexedInvokerStatusReader {
Expand Down Expand Up @@ -648,6 +652,17 @@ impl<T: TransportConnect> PartitionProcessorManager<T> {
error!(%partition_id, error=%err, "Starting partition processor failed");
self.running_partition_processors.remove(&partition_id);
}
ProcessorEvent::Stopped(err) => {
if let Some(err) = err {
warn!(%partition_id, error=%err, "Partition processor exited unexpectedly");
}

if let Some(ProcessorStatus::Started(status)) =
self.running_partition_processors.remove(&partition_id)
{
self.invokers_status_reader.remove(&status.key_range);
}
}
}
}

Expand Down Expand Up @@ -921,6 +936,7 @@ struct ManagerEvent {
enum ProcessorEvent {
Started(StartedProcessorStatus),
StartFailed(anyhow::Error),
Stopped(Option<anyhow::Error>),
}

enum ProcessorStatus {
Expand Down Expand Up @@ -992,6 +1008,7 @@ impl SpawnPartitionProcessorTask {
metadata,
bifrost,
partition_store_manager,
events.clone(),
)
.await
{
Expand Down Expand Up @@ -1030,6 +1047,7 @@ impl SpawnPartitionProcessorTask {
metadata: Metadata,
bifrost: Bifrost,
partition_store_manager: PartitionStoreManager,
events: EventSender,
) -> anyhow::Result<StartedProcessorStatus> {
let config = configuration.pinned();
let schema = metadata.updateable_schema();
Expand Down Expand Up @@ -1092,11 +1110,21 @@ impl SpawnPartitionProcessorTask {
invoker.run(invoker_config),
)?;

pp_builder
let err = pp_builder
.build::<ProtobufRawEntryCodec>(tc, bifrost, partition_store, configuration)
.await?
.run()
.await
.err();

let _ = events
.send(ManagerEvent {
partition_id,
event: ProcessorEvent::Stopped(err),
})
.await;

Ok(())
}
},
);
Expand Down

0 comments on commit 67bae96

Please sign in to comment.