Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logic to fetch index search stats for groups #505

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 218 additions & 7 deletions collector/indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ type indexMetric struct {
Labels labels
}

type indexSearchGroupMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Value func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64
Labels labels
}

type shardMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Expand All @@ -53,19 +60,21 @@ type Indices struct {
client *http.Client
url *url.URL
shards bool
groups bool
clusterInfoCh chan *clusterinfo.Response
lastClusterInfo *clusterinfo.Response

up prometheus.Gauge
totalScrapes prometheus.Counter
jsonParseFailures prometheus.Counter

indexMetrics []*indexMetric
shardMetrics []*shardMetric
indexMetrics []*indexMetric
indexSearchMetrics []*indexSearchGroupMetric
shardMetrics []*shardMetric
}

// NewIndices defines Indices Prometheus metrics
func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards bool) *Indices {
func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards bool, groups bool) *Indices {

indexLabels := labels{
keys: func(...string) []string {
Expand All @@ -81,6 +90,20 @@ func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards boo
},
}

indexSearchGroupLabels := labels{
keys: func(...string) []string {
return []string{"index", "group", "cluster"}
},
values: func(lastClusterinfo *clusterinfo.Response, s ...string) []string {
if lastClusterinfo != nil {
return append(s, lastClusterinfo.ClusterName)
}
// this shouldn't happen, as the clusterinfo Retriever has a blocking
// Run method. It blocks until the first clusterinfo call has succeeded
return append(s, "unknown_cluster")
},
}

shardLabels := labels{
keys: func(...string) []string {
return []string{"index", "shard", "node", "primary", "cluster"}
Expand All @@ -100,6 +123,7 @@ func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards boo
client: client,
url: url,
shards: shards,
groups: groups,
clusterInfoCh: make(chan *clusterinfo.Response),
lastClusterInfo: &clusterinfo.Response{
ClusterName: "unknown_cluster",
Expand All @@ -118,6 +142,165 @@ func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards boo
Help: "Number of errors while parsing JSON.",
}),

indexSearchMetrics: []*indexSearchGroupMetric{
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_open_contexts"),
"Open contexts",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.OpenContexts)
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_query_total"),
"Total number of queries",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.QueryTotal)
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_query_time_seconds_total"),
"Total search query time in seconds",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.QueryTimeInMillis) / 1000
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_query_current"),
"The number of currently active queries",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.QueryCurrent)
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_fetch_total"),
"Total search fetch count",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.FetchTotal)
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_fetch_time_seconds_total"),
"Total search fetch time in seconds",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.FetchTimeInMillis) / 1000
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_fetch_current"),
"Current search fetch count",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.FetchCurrent)
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_scroll_total"),
"Total search scroll count",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.ScrollTotal)
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_scroll_time_seconds_total"),
"Total search scroll time in seconds",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.ScrollTimeInMillis) / 1000
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_scroll_current"),
"Current search scroll count",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.ScrollCurrent)
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_suggest_total"),
"Total search suggest count",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.SuggestTotal)
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_suggest_time_seconds_total"),
"Total search suggest time in seconds",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.SuggestTimeInMillis) / 1000
},
Labels: indexSearchGroupLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_group_suggest_current"),
"Current search suggest count",
indexSearchGroupLabels.keys(), nil,
),
Value: func(indexSearchGroupStats IndexStatsIndexSearchGroupResponse) float64 {
return float64(indexSearchGroupStats.SuggestCurrent)
},
Labels: indexSearchGroupLabels,
},
},

indexMetrics: []*indexMetric{
{
Type: prometheus.GaugeValue,
Expand Down Expand Up @@ -479,6 +662,18 @@ func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards boo
},
Labels: indexLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "index_stats", "search_open_contexts"),
"Open contexts",
indexLabels.keys(), nil,
),
Value: func(indexStats IndexStatsIndexResponse) float64 {
return float64(indexStats.Total.Search.OpenContexts)
},
Labels: indexLabels,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
Expand Down Expand Up @@ -1064,11 +1259,16 @@ func (i *Indices) fetchAndDecodeIndexStats() (indexStatsResponse, error) {

u := *i.url
u.Path = path.Join(u.Path, "/_all/_stats")

q := u.Query()
q.Add("ignore_unavailable", "true")
if i.shards {
u.RawQuery = "ignore_unavailable=true&level=shards"
} else {
u.RawQuery = "ignore_unavailable=true"
q.Add("level", "shards")
}
if i.groups {
q.Add("groups", "*")
}
u.RawQuery = q.Encode()

res, err := i.client.Get(u.String())
if err != nil {
Expand Down Expand Up @@ -1134,7 +1334,18 @@ func (i *Indices) Collect(ch chan<- prometheus.Metric) {
metric.Value(indexStats),
metric.Labels.values(i.lastClusterInfo, indexName)...,
)

}
if i.groups {
for groupName, indexSearchStats := range indexStats.Total.Search.Groups {
for _, metric := range i.indexSearchMetrics {
ch <- prometheus.MustNewConstMetric(
metric.Desc,
metric.Type,
metric.Value(indexSearchStats),
metric.Labels.values(i.lastClusterInfo, indexName, groupName)...,
)
}
}
}
if i.shards {
for _, metric := range i.shardMetrics {
Expand Down
18 changes: 18 additions & 0 deletions collector/indices_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,24 @@ type IndexStatsIndexGetResponse struct {

// IndexStatsIndexSearchResponse defines index stats index search information structure
type IndexStatsIndexSearchResponse struct {
OpenContexts int64 `json:"open_contexts"`
QueryTotal int64 `json:"query_total"`
QueryTimeInMillis int64 `json:"query_time_in_millis"`
QueryCurrent int64 `json:"query_current"`
FetchTotal int64 `json:"fetch_total"`
FetchTimeInMillis int64 `json:"fetch_time_in_millis"`
FetchCurrent int64 `json:"fetch_current"`
ScrollTotal int64 `json:"scroll_total"`
ScrollTimeInMillis int64 `json:"scroll_time_in_millis"`
ScrollCurrent int64 `json:"scroll_current"`
SuggestTotal int64 `json:"suggest_total"`
SuggestTimeInMillis int64 `json:"suggest_time_in_millis"`
SuggestCurrent int64 `json:"suggest_current"`
Groups map[string]IndexStatsIndexSearchGroupResponse `json:"groups"`
}

// IndexStatsIndexSearchResponse defines index stats index search information structure
type IndexStatsIndexSearchGroupResponse struct {
OpenContexts int64 `json:"open_contexts"`
QueryTotal int64 `json:"query_total"`
QueryTimeInMillis int64 `json:"query_time_in_millis"`
Expand Down
2 changes: 1 addition & 1 deletion collector/indices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestIndices(t *testing.T) {
if err != nil {
t.Fatalf("Failed to parse URL: %s", err)
}
i := NewIndices(log.NewNopLogger(), http.DefaultClient, u, false)
i := NewIndices(log.NewNopLogger(), http.DefaultClient, u, false, false)
stats, err := i.fetchAndDecodeIndexStats()
if err != nil {
t.Fatalf("Failed to fetch or decode indices stats: %s", err)
Expand Down
7 changes: 5 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ func main() {
esExportIndicesSettings = kingpin.Flag("es.indices_settings",
"Export stats for settings of all indices of the cluster.").
Default("false").Envar("ES_INDICES_SETTINGS").Bool()
esExportIndicesSearchGroups = kingpin.Flag("es.indices_search_groups",
"Export search stats of all indices of the cluster by group.").
Default("false").Envar("ES_INDICES_TAGS").Bool()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've stopped using ENV vars here.

Suggested change
Default("false").Envar("ES_INDICES_TAGS").Bool()
Default("false").Bool()

esExportIndicesMappings = kingpin.Flag("es.indices_mappings",
"Export stats for mappings of all indices of the cluster.").
Default("false").Envar("ES_INDICES_MAPPINGS").Bool()
Expand Down Expand Up @@ -166,8 +169,8 @@ func main() {
prometheus.MustRegister(collector.NewClusterHealth(logger, httpClient, esURL))
prometheus.MustRegister(collector.NewNodes(logger, httpClient, esURL, *esAllNodes, *esNode))

if *esExportIndices || *esExportShards {
iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards)
if *esExportIndices || *esExportShards || *esExportIndicesSearchGroups {
iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards, *esExportIndicesSearchGroups)
prometheus.MustRegister(iC)
if registerErr := clusterInfoRetriever.RegisterConsumer(iC); registerErr != nil {
_ = level.Error(logger).Log("msg", "failed to register indices collector in cluster info")
Expand Down