Skip to content

Commit

Permalink
Add latency metrics for logstash async output (#42565) (#42585)
Browse files Browse the repository at this point in the history
* Add latency metrics for logstash async output

* Properly handle per-batch latency

(cherry picked from commit accc5e1)

Co-authored-by: William Easton <[email protected]>
  • Loading branch information
mergify[bot] and strawgate authored Feb 19, 2025
1 parent ae90ef0 commit 44a6547
Showing 1 changed file with 16 additions and 2 deletions.
18 changes: 16 additions & 2 deletions libbeat/outputs/logstash/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ func (c *asyncClient) sendEvents(ref *msgRef, events []publisher.Event) error {
window[i] = &events[i].Content
}
ref.count.Add(1)
return client.Send(ref.callback, window)

return client.Send(ref.customizedCallback(), window)
}

func (c *asyncClient) getClient() *v2.AsyncClient {
Expand All @@ -231,7 +232,15 @@ func (c *asyncClient) getClient() *v2.AsyncClient {
return client
}

func (r *msgRef) callback(n uint32, err error) {
func (r *msgRef) customizedCallback() func(uint32, error) {
start := time.Now()

return func(n uint32, err error) {
r.callback(start, n, err)
}
}

func (r *msgRef) callback(start time.Time, n uint32, err error) {
r.client.observer.AckedEvents(int(n))
r.slice = r.slice[n:]
r.deadlockListener.ack(int(n))
Expand All @@ -246,6 +255,11 @@ func (r *msgRef) callback(n uint32, err error) {
r.win.tryGrowWindow(r.batchSize)
}
}

// Report the latency for the batch of events
duration := time.Since(start)
r.client.observer.ReportLatency(duration)

r.dec()
}

Expand Down

0 comments on commit 44a6547

Please sign in to comment.