Skip to content

Commit

Permalink
Even better log statement in Kafka (#2293)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored Nov 14, 2024
1 parent 484a0df commit 61fe56d
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions crates/ingress-kafka/src/consumer_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,6 @@ impl ConsumerTask {
let topics: Vec<&str> = self.topics.iter().map(|x| &**x).collect();
consumer.subscribe(&topics)?;

debug!(
restate.subscription.id = %self.sender.subscription.id(),
messaging.consumer.group.name = consumer_group_id,
"Assigned topic/partitions/offset: {:?}",
consumer.assignment()?
);

let mut topic_partition_tasks: HashMap<(String, i32), TaskId> = Default::default();

let result = loop {
Expand All @@ -269,6 +262,12 @@ impl ConsumerTask {
None => break Err(Error::TopicPartitionSplit(topic.clone(), partition))
};

debug!(
restate.subscription.id = %self.sender.subscription.id(),
messaging.consumer.group.name = consumer_group_id,
"Starting topic '{topic}' partition '{partition}' consumption loop from offset '{offset}'"
);

let task = topic_partition_queue_consumption_loop(
self.sender.clone(),
topic.clone(), partition,
Expand Down

0 comments on commit 61fe56d

Please sign in to comment.