Skip to content

Commit

Permalink
add total events per input, clean up and refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
AndersonQ committed Feb 11, 2025
1 parent d5120b9 commit 3f24545
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 226 deletions.
23 changes: 15 additions & 8 deletions filebeat/tests/integration/input_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,13 @@ logging.level: debug
defer resp.Body.Close()

var inputMetrics []struct {
EventsDroppedTotal int `json:"events_dropped_total"`
EventsFilteredTotal int `json:"events_filtered_total"`
EventsProcessedTotal int `json:"events_processed_total"`
EventsPublishedTotal int `json:"events_published_total"`
ID string `json:"id"`
Input string `json:"input"`
EventsPipelineTotal int `json:"events_pipeline_total"`
EventsPipelineDroppedTotal int `json:"events_pipeline_dropped_total"`
EventsPipelineFilteredTotal int `json:"events_pipeline_filtered_total"`
EventsPipelinePublishedTotal int `json:"events_pipeline_published_total"`
EventsProcessedTotal int `json:"events_processed_total"`
ID string `json:"id"`
Input string `json:"input"`
}

body, err := io.ReadAll(resp.Body)
Expand All @@ -117,9 +118,15 @@ logging.level: debug
require.Len(t, inputMetrics, 1)
assert.Equal(t, "a-filestream-id", inputMetrics[0].ID)
assert.Equal(t, "filestream", inputMetrics[0].Input)
assert.Equal(t,
inputMetrics[0].EventsPipelineTotal,
inputMetrics[0].EventsPipelinePublishedTotal+
inputMetrics[0].EventsPipelineFilteredTotal+
inputMetrics[0].EventsPipelineDroppedTotal)
assert.Equal(t, inputMetrics[0].EventsProcessedTotal, inputMetrics[0].EventsPipelineTotal)
assert.Equal(t, 10, inputMetrics[0].EventsProcessedTotal)
assert.Equal(t, 9, inputMetrics[0].EventsPublishedTotal)
assert.Equal(t, 1, inputMetrics[0].EventsFilteredTotal)
assert.Equal(t, 9, inputMetrics[0].EventsPipelinePublishedTotal)
assert.Equal(t, 1, inputMetrics[0].EventsPipelineFilteredTotal)

assert.Falsef(t, t.Failed(), "test faild: input metrics response used for the assertions: %s", body)
}
3 changes: 0 additions & 3 deletions libbeat/beat/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ const (
TimestampFieldKey = "@timestamp"
MetadataFieldKey = "@metadata"
ErrorFieldKey = "error"

MetadataKeyStreamID = "stream_id"

metadataKeyPrefix = MetadataFieldKey + "."
metadataKeyOffset = len(metadataKeyPrefix)
)
Expand Down
4 changes: 2 additions & 2 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ type Client interface {
// ClientConfig defines common configuration options one can pass to
// Pipeline.ConnectWith to control the clients behavior and provide ACK support.
type ClientConfig struct {
// InputID is the inout ID of the input using the client. The InputID is
// used to aggregate pipeline metrics per input. See TODO: add github issue
// InputID is the input ID of the input using the client. The InputID is
// used to aggregate pipeline metrics per input.
InputID string

PublishMode PublishMode
Expand Down
34 changes: 19 additions & 15 deletions libbeat/monitoring/inputmon/httphandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,29 @@ const (
)

type handler struct {
registryDataset *monitoring.Registry
registryInternal *monitoring.Registry
registryDataset *monitoring.Registry
registryInternalInputs *monitoring.Registry
}

// AttachHandler attaches an HTTP handler to the given mux.Router to handle
// requests to /inputs.
func AttachHandler(beatInfo beat.Info, r *mux.Router) error {
internalReg := beatInfo.Monitoring.Namespace.GetRegistry().
intInputsReg := beatInfo.Monitoring.Namespace.GetRegistry().
GetRegistry(libbeatmonitoring.RegistryNameInternalInputs)
if intInputsReg == nil {
intInputsReg = beatInfo.Monitoring.Namespace.GetRegistry().
NewRegistry(libbeatmonitoring.RegistryNameInternalInputs)
}

return attachHandler(r, globalRegistry(), internalReg)
return attachHandler(r, globalRegistry(), intInputsReg)
}

func attachHandler(r *mux.Router, datasetReg, internalReg *monitoring.Registry) error {
func attachHandler(r *mux.Router, datasetReg, intInputsReg *monitoring.Registry) error {
r = r.PathPrefix(route).Subrouter()

h := &handler{
registryDataset: datasetReg,
registryInternal: internalReg,
registryDataset: datasetReg,
registryInternalInputs: intInputsReg,
}
return r.StrictSlash(true).Handle("/", validationHandler(http.MethodGet, []string{"pretty", "type"}, h.allInputs)).GetError()
}
Expand All @@ -75,13 +79,13 @@ func (h *handler) allInputs(w http.ResponseWriter, req *http.Request) {
}

filtered := filteredSnapshot(
h.registryDataset, h.registryInternal, requestedType)
h.registryDataset, h.registryInternalInputs, requestedType)

w.Header().Set(contentType, applicationJSON)
serveJSON(w, filtered, requestedPretty)
}

func filteredSnapshot(dataset, internal *monitoring.Registry, requestedType string) []map[string]any {
func filteredSnapshot(dataset, intInputs *monitoring.Registry, requestedType string) []map[string]any {
metrics := monitoring.CollectStructSnapshot(dataset, monitoring.Full, false)

filtered := make([]map[string]any, 0, len(metrics))
Expand All @@ -102,18 +106,18 @@ func filteredSnapshot(dataset, internal *monitoring.Registry, requestedType stri
continue
}

// merge the internal namespace if found
mergeInternalMetrics(internal, id, m)
// merge metrics stored in the internal namespace if any is found
mergeInternalMetrics(intInputs, id, m)

filtered = append(filtered, m)
}
return filtered
}

// mergeInternalMetrics looks for a registry 'id' in the 'internal' registry. If
// found, all the metrics are merged into m, if not, m is not changed. If there
// already is a
// The internal registry should be globalInternalRegistry()
// mergeInternalMetrics looks for a registry identified by in the internal
// registry. If found, all the metrics are merged into m, if not, m is not
// changed.
// TODO: add tests
func mergeInternalMetrics(internal *monitoring.Registry, id string, m map[string]any) {
reg := internal.GetRegistry(id)
if reg == nil {
Expand Down
5 changes: 3 additions & 2 deletions libbeat/monitoring/inputmon/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ func globalRegistry() *monitoring.Registry {
}

// MetricSnapshotJSON returns a snapshot of the input metric values from the
// global 'dataset' and beat 'internal' monitoring namespace merged and encoded as a
// JSON array (pretty formatted).
// global 'dataset' and libbeatmonitoring.RegistryNameInternalInputs monitoring
// namespace from the beatInfo instance. It returns a pretty formated JSON array
// as a byte slice.
func MetricSnapshotJSON(beatInfo beat.Info) ([]byte, error) {
intReg := beatInfo.Monitoring.Namespace.GetRegistry().
GetRegistry(libbeatmonitoring.RegistryNameInternalInputs)
Expand Down
28 changes: 14 additions & 14 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type client struct {
isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore.

// inputID of the input this client belongs to. It's used to aggregate
// metrics by input
// metrics by input.
inputID string
observer observer
eventListener beat.EventListener
Expand Down Expand Up @@ -82,11 +82,11 @@ func (c *client) publish(e beat.Event) {
publish = true
)

c.onNewEvent(e)
c.onNewEvent()

if !c.isOpen.Load() {
// client is closing down -> report event as dropped and return
c.onDroppedOnPublish(e, "")
c.onDroppedOnPublish(e)
return
}

Expand All @@ -108,7 +108,7 @@ func (c *client) publish(e beat.Event) {

c.eventListener.AddEvent(e, publish)
if !publish {
c.onFilteredOut(e, c.inputID)
c.onFilteredOut()
return
}

Expand All @@ -126,9 +126,9 @@ func (c *client) publish(e beat.Event) {
}

if published {
c.onPublished(e, c.inputID)
c.onPublished()
} else {
c.onDroppedOnPublish(e, c.inputID)
c.onDroppedOnPublish(e)
}
}

Expand Down Expand Up @@ -174,23 +174,23 @@ func (c *client) onClosed() {
}
}

func (c *client) onNewEvent(e beat.Event) {
c.observer.newEvent(e)
func (c *client) onNewEvent() {
c.observer.newEvent(c.inputID)
}

func (c *client) onPublished(e beat.Event, inputID string) {
c.observer.publishedEvent(e, inputID)
func (c *client) onPublished() {
c.observer.publishedEvent(c.inputID)
if c.clientListener != nil {
c.clientListener.Published()
}
}

func (c *client) onFilteredOut(e beat.Event, inputID string) {
c.observer.filteredEvent(e, inputID)
func (c *client) onFilteredOut() {
c.observer.filteredEvent(c.inputID)
}

func (c *client) onDroppedOnPublish(e beat.Event, inputID string) {
c.observer.failedPublishEvent(e, inputID)
func (c *client) onDroppedOnPublish(e beat.Event) {
c.observer.failedPublishEvent(c.inputID)
if c.clientListener != nil {
c.clientListener.DroppedOnPublish(e)
}
Expand Down
Loading

0 comments on commit 3f24545

Please sign in to comment.