Skip to content

Commit

Permalink
Add logic to fetch index search stats for groups
Browse files Browse the repository at this point in the history
Signed-off-by: Pieter Van Isacker <[email protected]>
  • Loading branch information
ls-pieter-vanisacker committed Nov 27, 2021
1 parent d64ec09 commit eba38b7
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 10 deletions.
225 changes: 218 additions & 7 deletions collector/indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ type indexMetric struct {
Labels labels
}

type indexSearchGroupMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Value func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64
Labels labels
}

type shardMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Expand All @@ -53,19 +60,21 @@ type Indices struct {
client *http.Client
url *url.URL
shards bool
groups bool
clusterInfoCh chan *clusterinfo.Response
lastClusterInfo *clusterinfo.Response

up prometheus.Gauge
totalScrapes prometheus.Counter
jsonParseFailures prometheus.Counter

indexMetrics []*indexMetric
shardMetrics []*shardMetric
indexMetrics []*indexMetric
indexSearchMetrics []*indexSearchGroupMetric
shardMetrics []*shardMetric
}

// NewIndices defines Indices Prometheus metrics
func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards bool) *Indices {
func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards bool, groups bool) *Indices {

indexLabels := labels{
keys: func(...string) []string {
Expand All @@ -81,6 +90,20 @@ func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards boo
},
}

indexSearchGroupLabels := labels{
keys: func(...string) []string {
return []string{"index", "group", "cluster"}
},
values: func(lastClusterinfo *clusterinfo.Response, s ...string) []string {
if lastClusterinfo != nil {
return append(s, lastClusterinfo.ClusterName)
}
// this shouldn't happen, as the clusterinfo Retriever has a blocking
// Run method. It blocks until the first clusterinfo call has succeeded
return append(s, "unknown_cluster")
},
}

shardLabels := labels{
keys: func(...string) []string {
return []string{"index", "shard", "node", "primary", "cluster"}
Expand All @@ -100,6 +123,7 @@ func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards boo
client: client,
url: url,
shards: shards,
groups: groups,
clusterInfoCh: make(chan *clusterinfo.Response),
lastClusterInfo: &clusterinfo.Response{
ClusterName: "unknown_cluster",
Expand All @@ -118,6 +142,165 @@ func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards boo
Help: "Number of errors while parsing JSON.",
}),

indexSearchMetrics: []*indexSearchGroupMetric{
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_open_contexts"),
"Open contexts",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.OpenContexts)
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_query_total"),
"Total number of queries",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.QueryTotal)
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_query_time_seconds_total"),
"Total search query time in seconds",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.QueryTimeInMillis) / 1000
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_query_current"),
"The number of currently active queries",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.QueryCurrent)
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_fetch_total"),
"Total search fetch count",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.FetchTotal)
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_fetch_time_seconds_total"),
"Total search fetch time in seconds",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.FetchTimeInMillis) / 1000
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_fetch_current"),
"Current search fetch count",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.FetchCurrent)
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_scroll_total"),
"Total search scroll count",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.ScrollTotal)
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_scroll_time_seconds_total"),
"Total search scroll time in seconds",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.ScrollTimeInMillis) / 1000
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_scroll_current"),
"Current search scroll count",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.ScrollCurrent)
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_suggest_total"),
"Total search suggest count",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.SuggestTotal)
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_suggest_time_seconds_total"),
"Total search suggest time in seconds",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.SuggestTimeInMillis) / 1000
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_suggest_current"),
"Current search suggest count",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.SuggestCurrent)
},
Labels: indexSearchGroupLabels,
},
},

indexMetrics: []*indexMetric{
{
Type: prometheus.GaugeValue,
Expand Down Expand Up @@ -479,6 +662,18 @@ func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards boo
},
Labels: indexLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_open_contexts"),
"Open contexts",
indexLabels.keys(), nil,
),
Value: func(indexStats IndexStatsIndexResponse) float64 {
return float64(indexStats.Total.Search.OpenContexts)
},
Labels: indexLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
Expand Down Expand Up @@ -1064,11 +1259,16 @@ func (i *Indices) fetchAndDecodeIndexStats() (indexStatsResponse, error) {

u := *i.url
u.Path = path.Join(u.Path, "/_all/_stats")

q := u.Query()
q.Add("ignore_unavailable", "true")
if i.shards {
u.RawQuery = "ignore_unavailable=true&level=shards"
} else {
u.RawQuery = "ignore_unavailable=true"
q.Add("level", "shards")
}
if i.groups {
q.Add("groups", "*")
}
u.RawQuery = q.Encode()

res, err := i.client.Get(u.String())
if err != nil {
Expand Down Expand Up @@ -1134,7 +1334,18 @@ func (i *Indices) Collect(ch chan<- prometheus.Metric) {
metric.Value(indexStats),
metric.Labels.values(i.lastClusterInfo, indexName)...,
)

}
if i.groups {
for groupName, indexSearchStats := range indexStats.Total.Search.Groups {
for _, metric := range i.indexSearchMetrics {
ch <- prometheus.MustNewConstMetric(
metric.Desc,
metric.Type,
metric.Value(indexSearchStats),
metric.Labels.values(i.lastClusterInfo, indexName, groupName)...,
)
}
}
}
if i.shards {
for _, metric := range i.shardMetrics {
Expand Down
18 changes: 18 additions & 0 deletions collector/indices_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,24 @@ type IndexStatsIndexGetResponse struct {

// IndexStatsIndexSearchResponse defines index stats index search information structure
type IndexStatsIndexSearchResponse struct {
OpenContexts int64 `json:"open_contexts"`
QueryTotal int64 `json:"query_total"`
QueryTimeInMillis int64 `json:"query_time_in_millis"`
QueryCurrent int64 `json:"query_current"`
FetchTotal int64 `json:"fetch_total"`
FetchTimeInMillis int64 `json:"fetch_time_in_millis"`
FetchCurrent int64 `json:"fetch_current"`
ScrollTotal int64 `json:"scroll_total"`
ScrollTimeInMillis int64 `json:"scroll_time_in_millis"`
ScrollCurrent int64 `json:"scroll_current"`
SuggestTotal int64 `json:"suggest_total"`
SuggestTimeInMillis int64 `json:"suggest_time_in_millis"`
SuggestCurrent int64 `json:"suggest_current"`
Groups map[string]IndexStatsIndexSearchGroupResponse `json:"groups"`
}

// IndexStatsIndexSearchResponse defines index stats index search information structure
type IndexStatsIndexSearchGroupResponse struct {
OpenContexts int64 `json:"open_contexts"`
QueryTotal int64 `json:"query_total"`
QueryTimeInMillis int64 `json:"query_time_in_millis"`
Expand Down
2 changes: 1 addition & 1 deletion collector/indices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestIndices(t *testing.T) {
if err != nil {
t.Fatalf("Failed to parse URL: %s", err)
}
i := NewIndices(log.NewNopLogger(), http.DefaultClient, u, false)
i := NewIndices(log.NewNopLogger(), http.DefaultClient, u, false, false)
stats, err := i.fetchAndDecodeIndexStats()
if err != nil {
t.Fatalf("Failed to fetch or decode indices stats: %s", err)
Expand Down
7 changes: 5 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ func main() {
esExportIndicesSettings = kingpin.Flag("es.indices_settings",
"Export stats for settings of all indices of the cluster.").
Default("false").Envar("ES_INDICES_SETTINGS").Bool()
esExportIndicesSearchGroups = kingpin.Flag("es.indices_search_groups",
"Export search stats of all indices of the cluster by group.").
Default("false").Envar("ES_INDICES_TAGS").Bool()
esExportIndicesMappings = kingpin.Flag("es.indices_mappings",
"Export stats for mappings of all indices of the cluster.").
Default("false").Envar("ES_INDICES_MAPPINGS").Bool()
Expand Down Expand Up @@ -166,8 +169,8 @@ func main() {
prometheus.MustRegister(collector.NewClusterHealth(logger, httpClient, esURL))
prometheus.MustRegister(collector.NewNodes(logger, httpClient, esURL, *esAllNodes, *esNode))

if *esExportIndices || *esExportShards {
iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards)
if *esExportIndices || *esExportShards || *esExportIndicesSearchGroups {
iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards, *esExportIndicesSearchGroups)
prometheus.MustRegister(iC)
if registerErr := clusterInfoRetriever.RegisterConsumer(iC); registerErr != nil {
_ = level.Error(logger).Log("msg", "failed to register indices collector in cluster info")
Expand Down

0 comments on commit eba38b7

Please sign in to comment.