diff --git a/collector/nodes.go b/collector/nodes.go index 7d648f86..23c30b7c 100644 --- a/collector/nodes.go +++ b/collector/nodes.go @@ -171,6 +171,13 @@ type filesystemIODeviceMetric struct { Labels func(cluster string, node NodeStatsNodeResponse, device string) []string } +type indexingPressureMetric struct { + Type prometheus.ValueType + Desc *prometheus.Desc + Value func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 + Labels func(cluster string, node NodeStatsNodeResponse) []string +} + // Nodes information struct type Nodes struct { logger log.Logger @@ -188,6 +195,7 @@ type Nodes struct { threadPoolMetrics []*threadPoolMetric filesystemDataMetrics []*filesystemDataMetric filesystemIODeviceMetrics []*filesystemIODeviceMetric + indexingPressureMetrics []*indexingPressureMetric } // NewNodes defines Nodes Prometheus metrics @@ -1781,6 +1789,176 @@ func NewNodes(logger log.Logger, client *http.Client, url *url.URL, all bool, no Labels: defaultFilesystemIODeviceLabelValues, }, }, + indexingPressureMetrics: []*indexingPressureMetric{ + { + Type: prometheus.GaugeValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "indexing_pressure", "current_combined_coordinating_and_primary_in_bytes"), + "Memory consumed, in bytes, by indexing requests in the coordinating or primary stage.", + defaultNodeLabels, nil, + ), + Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 { + return float64(indexingPressureMem.Current.CombinedCoordinatingAndPrimaryInBytes) + }, + Labels: defaultNodeLabelValues, + }, + { + Type: prometheus.GaugeValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "indexing_pressure", "current_coordinating_in_bytes"), + "Memory consumed, in bytes, by indexing requests in the coordinating stage.", + defaultNodeLabels, nil, + ), + Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 { + return float64(indexingPressureMem.Current.CoordinatingInBytes) + }, + Labels: defaultNodeLabelValues, + }, + { + Type: prometheus.GaugeValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "indexing_pressure", "current_primary_in_bytes"), + "Memory consumed, in bytes, by indexing requests in the primary stage.", + defaultNodeLabels, nil, + ), + Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 { + return float64(indexingPressureMem.Current.PrimaryInBytes) + }, + Labels: defaultNodeLabelValues, + }, + { + Type: prometheus.GaugeValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "indexing_pressure", "current_replica_in_bytes"), + "Memory consumed, in bytes, by indexing requests in the replica stage.", + defaultNodeLabels, nil, + ), + Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 { + return float64(indexingPressureMem.Current.ReplicaInBytes) + }, + Labels: defaultNodeLabelValues, + }, + { + Type: prometheus.GaugeValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "indexing_pressure", "current_all_in_bytes"), + "Memory consumed, in bytes, by indexing requests in the coordinating, primary, or replica stage.", + defaultNodeLabels, nil, + ), + Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 { + return float64(indexingPressureMem.Current.ReplicaInBytes) + }, + Labels: defaultNodeLabelValues, + }, + { + Type: prometheus.CounterValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "indexing_pressure", "total_combined_coordinating_and_primary_in_bytes"), + "Memory consumed, in bytes, by indexing requests in the coordinating or primary stage.", + defaultNodeLabels, nil, + ), + Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 { + return float64(indexingPressureMem.Total.CombinedCoordinatingAndPrimaryInBytes) + }, + Labels: defaultNodeLabelValues, + }, + { + Type: prometheus.CounterValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "indexing_pressure", "total_coordinating_in_bytes"), + "Memory consumed, in bytes, by indexing requests in the coordinating stage.", + defaultNodeLabels, nil, + ), + Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 { + return float64(indexingPressureMem.Total.CoordinatingInBytes) + }, + Labels: defaultNodeLabelValues, + }, + { + Type: prometheus.CounterValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "indexing_pressure", "total_primary_in_bytes"), + "Memory consumed, in bytes, by indexing requests in the primary stage.", + defaultNodeLabels, nil, + ), + Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 { + return float64(indexingPressureMem.Total.PrimaryInBytes) + }, + Labels: defaultNodeLabelValues, + }, + { + Type: prometheus.CounterValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "indexing_pressure", "total_replica_in_bytes"), + "Memory consumed, in bytes, by indexing requests in the replica stage.", + defaultNodeLabels, nil, + ), + Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 { + return float64(indexingPressureMem.Total.ReplicaInBytes) + }, + Labels: defaultNodeLabelValues, + }, + { + Type: prometheus.CounterValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "indexing_pressure", "total_all_in_bytes"), + "Memory consumed, in bytes, by indexing requests in the coordinating, primary, or replica stage.", + defaultNodeLabels, nil, + ), + Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 { + return float64(indexingPressureMem.Total.AllInBytes) + }, + Labels: defaultNodeLabelValues, + }, + { + Type: prometheus.CounterValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "indexing_pressure", "total_coordinating_rejections"), + "Number of indexing requests rejected in the coordinating stage.", + defaultNodeLabels, nil, + ), + Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 { + return float64(indexingPressureMem.Total.CoordinatingRejections) + }, + Labels: defaultNodeLabelValues, + }, + { + Type: prometheus.CounterValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "indexing_pressure", "total_primary_rejections"), + "Number of indexing requests rejected in the primary stage.", + defaultNodeLabels, nil, + ), + Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 { + return float64(indexingPressureMem.Total.PrimaryRejections) + }, + Labels: defaultNodeLabelValues, + }, + { + Type: prometheus.CounterValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "indexing_pressure", "total_replica_rejections"), + "Number of indexing requests rejected in the replica stage.", + defaultNodeLabels, nil, + ), + Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 { + return float64(indexingPressureMem.Total.ReplicaRejections) + }, + Labels: defaultNodeLabelValues, + }, + { + Type: prometheus.CounterValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "indexing_pressure", "limit_in_bytes"), + "Configured memory limit, in bytes, for the indexing requests. Replica requests have an automatic limit that is 1.5x this value.", + defaultNodeLabels, nil, + ), + Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 { + return float64(indexingPressureMem.LimitInBytes) + }, + Labels: defaultNodeLabelValues, + }, + }, } } @@ -1801,6 +1979,9 @@ func (c *Nodes) Describe(ch chan<- *prometheus.Desc) { for _, metric := range c.filesystemIODeviceMetrics { ch <- metric.Desc } + for _, metric := range c.indexingPressureMetrics { + ch <- metric.Desc + } ch <- c.up.Desc() ch <- c.totalScrapes.Desc() ch <- c.jsonParseFailures.Desc() @@ -1955,5 +2136,18 @@ func (c *Nodes) Collect(ch chan<- prometheus.Metric) { } } + // indexing_pressure Stats https://github.com/prometheus-community/elasticsearch_exporter/issues/638 + // https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules-indexing-pressure.html + for _, indexingPressureMem := range node.IndexingPressure { + for _, metric := range c.indexingPressureMetrics { + ch <- prometheus.MustNewConstMetric( + metric.Desc, + metric.Type, + metric.Value(indexingPressureMem), + metric.Labels(nodeStatsResp.ClusterName, node)..., + ) + } + } + } } diff --git a/collector/nodes_response.go b/collector/nodes_response.go index 4985add0..7ac9b39f 100644 --- a/collector/nodes_response.go +++ b/collector/nodes_response.go @@ -23,23 +23,24 @@ type nodeStatsResponse struct { // NodeStatsNodeResponse defines node stats information structure for nodes type NodeStatsNodeResponse struct { - Name string `json:"name"` - Host string `json:"host"` - Timestamp int64 `json:"timestamp"` - TransportAddress string `json:"transport_address"` - Hostname string `json:"hostname"` - Roles []string `json:"roles"` - Attributes map[string]string `json:"attributes"` - Indices NodeStatsIndicesResponse `json:"indices"` - OS NodeStatsOSResponse `json:"os"` - Network NodeStatsNetworkResponse `json:"network"` - FS NodeStatsFSResponse `json:"fs"` - ThreadPool map[string]NodeStatsThreadPoolPoolResponse `json:"thread_pool"` - JVM NodeStatsJVMResponse `json:"jvm"` - Breakers map[string]NodeStatsBreakersResponse `json:"breakers"` - HTTP map[string]interface{} `json:"http"` - Transport NodeStatsTransportResponse `json:"transport"` - Process NodeStatsProcessResponse `json:"process"` + Name string `json:"name"` + Host string `json:"host"` + Timestamp int64 `json:"timestamp"` + TransportAddress string `json:"transport_address"` + Hostname string `json:"hostname"` + Roles []string `json:"roles"` + Attributes map[string]string `json:"attributes"` + Indices NodeStatsIndicesResponse `json:"indices"` + OS NodeStatsOSResponse `json:"os"` + Network NodeStatsNetworkResponse `json:"network"` + FS NodeStatsFSResponse `json:"fs"` + ThreadPool map[string]NodeStatsThreadPoolPoolResponse `json:"thread_pool"` + JVM NodeStatsJVMResponse `json:"jvm"` + Breakers map[string]NodeStatsBreakersResponse `json:"breakers"` + HTTP map[string]interface{} `json:"http"` + Transport NodeStatsTransportResponse `json:"transport"` + Process NodeStatsProcessResponse `json:"process"` + IndexingPressure map[string]NodestatsIndexingPressureMemoryResponse `json:"indexing_pressure"` } // NodeStatsBreakersResponse is a representation of a statistics about the field data circuit breaker @@ -317,6 +318,32 @@ type NodeStatsProcessResponse struct { Memory NodeStatsProcessMemResponse `json:"mem"` } +// https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-nodes-stats.html#cluster-nodes-stats-api-response-body-indexing-pressure +type NodestatsIndexingPressureMemoryResponse struct { + Current NodestatsIndexingPressureMemoryCurrentResponse `json:"current"` + Total NodestatsIndexingPressureMemoryTotalResponse `json:"total"` + LimitInBytes int64 `json:"limit_in_bytes"` +} + +type NodestatsIndexingPressureMemoryCurrentResponse struct { + CombinedCoordinatingAndPrimaryInBytes int64 `json:"combined_coordinating_and_primary_in_bytes"` + CoordinatingInBytes int64 `json:"coordinating_in_bytes"` + PrimaryInBytes int64 `json:"primary_in_bytes"` + ReplicaInBytes int64 `json:"replica_in_bytes"` + AllInBytes int64 `json:"all_in_bytes"` +} + +type NodestatsIndexingPressureMemoryTotalResponse struct { + CombinedCoordinatingAndPrimaryInBytes int64 `json:"combined_coordinating_and_primary_in_bytes"` + CoordinatingInBytes int64 `json:"coordinating_in_bytes"` + PrimaryInBytes int64 `json:"primary_in_bytes"` + ReplicaInBytes int64 `json:"replica_in_bytes"` + AllInBytes int64 `json:"all_in_bytes"` + CoordinatingRejections int64 `json:"coordinating_rejections"` + PrimaryRejections int64 `json:"primary_rejections"` + ReplicaRejections int64 `json:"replica_rejections"` +} + // NodeStatsProcessMemResponse defines node stats process memory usage structure type NodeStatsProcessMemResponse struct { Resident int64 `json:"resident_in_bytes"`