Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for change in temporality for counter metrics #6919

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2950,6 +2950,58 @@ func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []s
return metricNameToTemporality, nil
}

func (r *ClickHouseReader) GetTemporalitySwitchPoints(ctx context.Context, metricName string, startTime int64, endTime int64) ([]v3.TemporalityChangePoint, error) {
// Initialize slice to store temporality switch points
var temporalitySwitches []v3.TemporalityChangePoint

query := fmt.Sprintf(`
SELECT
temporality,
unix_milli,
lag_temporality
FROM (
SELECT
metric_name,
temporality,
unix_milli,
lagInFrame(temporality, 1, '') OVER (
PARTITION BY metric_name ORDER BY unix_milli
) AS lag_temporality
FROM %s.%s
WHERE unix_milli >= %d
AND unix_milli <= %d
AND metric_name = '%s'
) AS subquery
WHERE lag_temporality != temporality
AND lag_temporality != ''
ORDER BY unix_milli ASC;
`, signozMetricDBName, signozTSLocalTableNameV4, startTime, endTime, metricName)

rows, err := r.db.Query(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()

for rows.Next() {
var temporality string
var timestamp int64
var lagTemporality string
err := rows.Scan(&temporality, &timestamp, &lagTemporality)
if err != nil {
return nil, err
}
// Store each temporality switch point with both temporalities
temporalitySwitches = append(temporalitySwitches, v3.TemporalityChangePoint{
Timestamp: timestamp,
FromTemporality: v3.Temporality(lagTemporality),
ToTemporality: v3.Temporality(temporality),
})
}

return temporalitySwitches, nil
}

func (r *ClickHouseReader) GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error) {

queryStr := fmt.Sprintf("SELECT countDistinct(fingerprint) as count from %s.%s where metric_name not like 'signoz_%%' group by metric_name order by count desc;", signozMetricDBName, signozTSTableNameV41Day)
Expand Down
6 changes: 6 additions & 0 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,9 @@ func (aH *APIHandler) PopulateTemporality(ctx context.Context, qp *v3.QueryRange
} else {
query.Temporality = v3.Unspecified
}
if len(aH.temporalityMap[query.AggregateAttribute.Key]) > 1 {
query.MultipleTemporalities = true
}
}
// we don't have temporality for this metric
if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" {
Expand Down Expand Up @@ -682,6 +685,9 @@ func (aH *APIHandler) PopulateTemporality(ctx context.Context, qp *v3.QueryRange
} else {
query.Temporality = v3.Unspecified
}
if len(nameToTemporality[query.AggregateAttribute.Key]) > 1 {
query.MultipleTemporalities = true
}
aH.temporalityMap[query.AggregateAttribute.Key] = nameToTemporality[query.AggregateAttribute.Key]
}
}
Expand Down
70 changes: 69 additions & 1 deletion pkg/query-service/app/querier/v2/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,23 @@ func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangePa
for queryName, builderQuery := range params.CompositeQuery.BuilderQueries {
if queryName == builderQuery.Expression {
wg.Add(1)
go q.runBuilderQuery(ctx, builderQuery, params, cacheKeys, ch, &wg)
if builderQuery.MultipleTemporalities == true {
go func() {

temporalitySwitches, err := q.reader.GetTemporalitySwitchPoints(ctx, builderQuery.AggregateAttribute.Key, params.Start, params.End)
if err != nil {
ch <- channelResult{Err: err, Name: queryName}
return
}
if len(temporalitySwitches) == 0 {
q.runBuilderQuery(ctx, builderQuery, params, cacheKeys, ch, &wg)
} else {
q.handleTemporalitySwitches(ctx, temporalitySwitches, &wg, builderQuery, params, cacheKeys, ch, queryName)
}
}()
} else {
go q.runBuilderQuery(ctx, builderQuery, params, cacheKeys, ch, &wg)
}
}
}

Expand Down Expand Up @@ -209,6 +225,58 @@ func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangePa
return results, errQueriesByName, err
}

func (q *querier) handleTemporalitySwitches(ctx context.Context, temporalitySwitches []v3.TemporalityChangePoint, wg *sync.WaitGroup, builderQuery *v3.BuilderQuery, params *v3.QueryRangeParamsV3, cacheKeys map[string]string, ch chan channelResult, queryName string) {
defer wg.Done()

tempCh := make(chan channelResult, len(temporalitySwitches)+1)

var tempWg sync.WaitGroup
// Handle each segment between switch points
for i := 0; i <= len(temporalitySwitches); i++ {
tempWg.Add(1)
go func(idx int) {
queryWithTemporality := *builderQuery
queryParams := *params
if i == 0 {
queryParams.End = temporalitySwitches[idx].Timestamp
queryWithTemporality.Temporality = temporalitySwitches[idx].FromTemporality
} else if idx < len(temporalitySwitches) {
queryParams.Start = temporalitySwitches[idx-1].Timestamp
queryParams.End = temporalitySwitches[idx].Timestamp
queryWithTemporality.Temporality = temporalitySwitches[idx].FromTemporality
queryWithTemporality.ShiftBy = 0
} else if idx == len(temporalitySwitches) {
queryParams.Start = temporalitySwitches[idx-1].Timestamp
queryParams.End = params.End
queryWithTemporality.Temporality = temporalitySwitches[idx-1].ToTemporality
}

q.runBuilderQuery(ctx, &queryWithTemporality, &queryParams, cacheKeys, tempCh, &tempWg)
}(i)
}
// Wait for all temporal queries to complete
tempWg.Wait()
close(tempCh)

// Combine results from all temporal queries
var combinedSeries []*v3.Series
var lastErr error

for result := range tempCh {
if result.Err != nil {
lastErr = result.Err
continue
}
combinedSeries = append(combinedSeries, result.Series...)
}

ch <- channelResult{
Series: combinedSeries,
Err: lastErr,
Name: queryName,
}
}

func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
channelResults := make(chan channelResult, len(params.CompositeQuery.PromQueries))
var wg sync.WaitGroup
Expand Down
2 changes: 2 additions & 0 deletions pkg/query-service/interfaces/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ type Reader interface {
//trace
GetTraceFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError)
UpdateTraceField(ctx context.Context, field *model.UpdateField) *model.ApiError

GetTemporalitySwitchPoints(ctx context.Context, metricName string, startTime int64, endTime int64) ([]v3.TemporalityChangePoint, error)
}

type Querier interface {
Expand Down
36 changes: 36 additions & 0 deletions pkg/query-service/model/v3/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,7 @@
}

type BuilderQuery struct {
<<<<<<< HEAD

Check failure on line 810 in pkg/query-service/model/v3/v3.go

View workflow job for this annotation

GitHub Actions / build-query-service

syntax error: unexpected <<, expected field name or embedded type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unresolved merge conflict marker found. Please resolve the conflict before merging.

QueryName string `json:"queryName"`
StepInterval int64 `json:"stepInterval"`
DataSource DataSource `json:"dataSource"`
Expand Down Expand Up @@ -834,6 +835,35 @@
QueriesUsedInFormula []string
MetricTableHints *MetricTableHints `json:"-"`
MetricValueFilter *MetricValueFilter `json:"-"`
=======

Check failure on line 838 in pkg/query-service/model/v3/v3.go

View workflow job for this annotation

GitHub Actions / build-query-service

syntax error: unexpected ==, expected field name or embedded type
QueryName string `json:"queryName"`
StepInterval int64 `json:"stepInterval"`
DataSource DataSource `json:"dataSource"`
AggregateOperator AggregateOperator `json:"aggregateOperator"`
AggregateAttribute AttributeKey `json:"aggregateAttribute,omitempty"`
Temporality Temporality `json:"temporality,omitempty"`
Filters *FilterSet `json:"filters,omitempty"`
GroupBy []AttributeKey `json:"groupBy,omitempty"`
Expression string `json:"expression"`
Disabled bool `json:"disabled"`
Having []Having `json:"having,omitempty"`
Legend string `json:"legend,omitempty"`
Limit uint64 `json:"limit"`
Offset uint64 `json:"offset"`
PageSize uint64 `json:"pageSize"`
OrderBy []OrderBy `json:"orderBy,omitempty"`
ReduceTo ReduceToOperator `json:"reduceTo,omitempty"`
SelectColumns []AttributeKey `json:"selectColumns,omitempty"`
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
Functions []Function `json:"functions,omitempty"`
ShiftBy int64
IsAnomaly bool
QueriesUsedInFormula []string
MetricTableHints *MetricTableHints `json:"-"`
MetricValueFilter *MetricValueFilter `json:"-"`
MultipleTemporalities bool
>>>>>>> 762e61b44 (feat: added simultaneous temporality changes | 6175)

Check failure on line 866 in pkg/query-service/model/v3/v3.go

View workflow job for this annotation

GitHub Actions / build-query-service

syntax error: unexpected >>, expected field name or embedded type
}

func (b *BuilderQuery) SetShiftByFromFunc() {
Expand Down Expand Up @@ -1406,3 +1436,9 @@
IsLivetailQuery bool
PreferRPM bool
}

type TemporalityChangePoint struct {
Timestamp int64 `json:"timestamp"`
FromTemporality Temporality `json:"from_temporality"`
ToTemporality Temporality `json:"to_temporality"`
}
Loading