Skip to content

Commit

Permalink
rhythm: set ingestion slack for partition consumption (#4611)
Browse files Browse the repository at this point in the history
* rhythm: set ingestion slack for partition consumption

* remove test

* Update WalBlock interface to explicitly compute the ingestionSlack

* add cycle duration to slack ingestion range calculation

* distinguish between ingesters and blockbuilder for dataquality warnings
  • Loading branch information
javiermolinar authored Jan 29, 2025
1 parent 649b77f commit 4ac0715
Show file tree
Hide file tree
Showing 26 changed files with 372 additions and 120 deletions.
2 changes: 1 addition & 1 deletion modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ outer:
if !init {
end = rec.Timestamp.Add(dur) // When block will be cut
metricPartitionLagSeconds.WithLabelValues(partLabel).Set(time.Since(rec.Timestamp).Seconds())
writer = newPartitionSectionWriter(b.logger, uint64(partition), uint64(rec.Offset), b.cfg.BlockConfig, b.overrides, b.wal, b.enc)
writer = newPartitionSectionWriter(b.logger, uint64(partition), uint64(rec.Offset), rec.Timestamp, dur, b.cfg.BlockConfig, b.overrides, b.wal, b.enc)
nextCut = rec.Timestamp.Add(cutTime)
init = true
}
Expand Down
5 changes: 4 additions & 1 deletion modules/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,10 @@ func countFlushedTraces(store storage.Store) int {
func sendReq(t testing.TB, ctx context.Context, client *kgo.Client) []*kgo.Record {
traceID := generateTraceID(t)

req := test.MakePushBytesRequest(t, 10, traceID)
now := time.Now()
startTime := uint64(now.UnixNano())
endTime := uint64(now.Add(time.Second).UnixNano())
req := test.MakePushBytesRequest(t, 10, traceID, startTime, endTime)
records, err := ingest.Encode(0, util.FakeTenantID, req, 1_000_000)
require.NoError(t, err)

Expand Down
32 changes: 18 additions & 14 deletions modules/blockbuilder/partition_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ type partitionSectionWriter interface {
type writer struct {
logger log.Logger

blockCfg BlockConfig
partition, cycleEndTs uint64
blockCfg BlockConfig
partition, firstOffset uint64
startSectionTime time.Time
cycleDuration time.Duration

overrides Overrides
wal *wal.WAL
Expand All @@ -34,17 +36,19 @@ type writer struct {
m map[string]*tenantStore
}

func newPartitionSectionWriter(logger log.Logger, partition, cycleEndTs uint64, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer {
func newPartitionSectionWriter(logger log.Logger, partition, firstOffset uint64, startSectionTime time.Time, cycleDuration time.Duration, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer {
return &writer{
logger: logger,
partition: partition,
cycleEndTs: cycleEndTs,
blockCfg: blockCfg,
overrides: overrides,
wal: wal,
enc: enc,
mtx: sync.Mutex{},
m: make(map[string]*tenantStore),
logger: logger,
partition: partition,
firstOffset: firstOffset,
startSectionTime: startSectionTime,
cycleDuration: cycleDuration,
blockCfg: blockCfg,
overrides: overrides,
wal: wal,
enc: enc,
mtx: sync.Mutex{},
m: make(map[string]*tenantStore),
}
}

Expand Down Expand Up @@ -72,7 +76,7 @@ func (p *writer) pushBytes(ts time.Time, tenant string, req *tempopb.PushBytesRe

func (p *writer) cutidle(since time.Time, immediate bool) error {
for _, i := range p.m {
if err := i.CutIdle(since, immediate); err != nil {
if err := i.CutIdle(p.startSectionTime, p.cycleDuration, since, immediate); err != nil {
return err
}
}
Expand All @@ -98,7 +102,7 @@ func (p *writer) instanceForTenant(tenant string) (*tenantStore, error) {
return i, nil
}

i, err := newTenantStore(tenant, p.partition, p.cycleEndTs, p.blockCfg, p.logger, p.wal, p.enc, p.overrides)
i, err := newTenantStore(tenant, p.partition, p.firstOffset, p.blockCfg, p.logger, p.wal, p.enc, p.overrides)
if err != nil {
return nil, err
}
Expand Down
44 changes: 44 additions & 0 deletions modules/blockbuilder/partition_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package blockbuilder

import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/wal"
"github.com/stretchr/testify/require"
)

func getPartitionWriter(t *testing.T) *writer {
logger := log.NewNopLogger()
startTime := time.Now()
cycleDuration := 1 * time.Minute
blockCfg := BlockConfig{}
tmpDir := t.TempDir()
w, err := wal.New(&wal.Config{
Filepath: tmpDir,
Encoding: backend.EncNone,
IngestionSlack: 3 * time.Minute,
Version: encoding.DefaultEncoding().Version(),
})
require.NoError(t, err)

return newPartitionSectionWriter(logger, 1, 1, startTime, cycleDuration, blockCfg, &mockOverrides{}, w, encoding.DefaultEncoding())
}

func TestPushBytes(t *testing.T) {
pw := getPartitionWriter(t)

tenant := "test-tenant"
traceID := generateTraceID(t)
now := time.Now()
startTime := uint64(now.UnixNano())
endTime := uint64(now.Add(time.Second).UnixNano())
req := test.MakePushBytesRequest(t, 1, traceID, startTime, endTime)

err := pw.pushBytes(now, tenant, req)
require.NoError(t, err)
}
27 changes: 25 additions & 2 deletions modules/blockbuilder/tenant_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/grafana/tempo/modules/blockbuilder/util"
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/boundedwaitgroup"
"github.com/grafana/tempo/pkg/dataquality"
"github.com/grafana/tempo/pkg/livetraces"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
Expand Down Expand Up @@ -148,7 +149,7 @@ func (s *tenantStore) AppendTrace(traceID []byte, tr []byte, ts time.Time) error
return nil
}

func (s *tenantStore) CutIdle(since time.Time, immediate bool) error {
func (s *tenantStore) CutIdle(startSectionTime time.Time, cycleDuration time.Duration, since time.Time, immediate bool) error {
idle := s.liveTraces.CutIdle(since, immediate)

slices.SortFunc(idle, func(a, b *livetraces.LiveTrace[[]byte]) int {
Expand Down Expand Up @@ -211,7 +212,8 @@ func (s *tenantStore) CutIdle(since time.Time, immediate bool) error {
}

for i, tr := range unmarshaled {
if err := s.headBlock.AppendTrace(idle[i].ID, tr, starts[i], ends[i]); err != nil {
start, end := s.adjustTimeRangeForSlack(startSectionTime, cycleDuration, starts[i], ends[i])
if err := s.headBlock.AppendTrace(idle[i].ID, tr, start, end, false); err != nil {
return err
}
}
Expand All @@ -230,6 +232,27 @@ func (s *tenantStore) CutIdle(since time.Time, immediate bool) error {
return s.cutHeadBlock(false)
}

func (s *tenantStore) adjustTimeRangeForSlack(startSectionTime time.Time, cycleDuration time.Duration, start, end uint32) (uint32, uint32) {
startOfRange := uint32(startSectionTime.Add(-s.headBlock.IngestionSlack()).Unix())
endOfRange := uint32(startSectionTime.Add(s.headBlock.IngestionSlack() + cycleDuration).Unix())

warn := false
if start < startOfRange {
warn = true
start = uint32(startSectionTime.Unix())
}
if end > endOfRange || end < start {
warn = true
end = uint32(startSectionTime.Unix())
}

if warn {
dataquality.WarnBlockBuilderOutsideIngestionSlack(s.headBlock.BlockMeta().TenantID)
}

return start, end
}

func (s *tenantStore) Flush(ctx context.Context, store tempodb.Writer) error {
// TODO - Advance some of this work if possible

Expand Down
80 changes: 80 additions & 0 deletions modules/blockbuilder/tenant_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package blockbuilder

import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/wal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func getTenantStore(t *testing.T) (*tenantStore, error) {
logger := log.NewNopLogger()
blockCfg := BlockConfig{}
tmpDir := t.TempDir()
w, err := wal.New(&wal.Config{
Filepath: tmpDir,
Encoding: backend.EncNone,
IngestionSlack: 3 * time.Minute,
Version: encoding.DefaultEncoding().Version(),
})
require.NoError(t, err)
return newTenantStore("test-tenant", 1, 1, blockCfg, logger, w, encoding.DefaultEncoding(), &mockOverrides{})
}

func TestAdjustTimeRangeForSlack(t *testing.T) {
store, err := getTenantStore(t)
require.NoError(t, err)

startCycleTime := time.Now()
cycleDuration := 1 * time.Minute

tests := []struct {
name string
start uint32
end uint32
expectedStart uint32
expectedEnd uint32
}{
{
name: "within slack range",
start: uint32(startCycleTime.Add(-2 * time.Minute).Unix()),
end: uint32(startCycleTime.Add(2 * time.Minute).Unix()),
expectedStart: uint32(startCycleTime.Add(-2 * time.Minute).Unix()),
expectedEnd: uint32(startCycleTime.Add(2 * time.Minute).Unix()),
},
{
name: "start before slack range",
start: uint32(startCycleTime.Add(-10 * time.Minute).Unix()),
end: uint32(startCycleTime.Add(2 * time.Minute).Unix()),
expectedStart: uint32(startCycleTime.Unix()),
expectedEnd: uint32(startCycleTime.Add(2 * time.Minute).Unix()),
},
{
name: "end after slack range",
start: uint32(startCycleTime.Add(-2 * time.Minute).Unix()),
end: uint32(startCycleTime.Add(20 * time.Minute).Unix()),
expectedStart: uint32(startCycleTime.Add(-2 * time.Minute).Unix()),
expectedEnd: uint32(startCycleTime.Unix()),
},
{
name: "end before start",
start: uint32(startCycleTime.Add(-2 * time.Minute).Unix()),
end: uint32(startCycleTime.Add(-3 * time.Minute).Unix()),
expectedStart: uint32(startCycleTime.Add(-2 * time.Minute).Unix()),
expectedEnd: uint32(startCycleTime.Unix()),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
start, end := store.adjustTimeRangeForSlack(startCycleTime, cycleDuration, tt.start, tt.end)
assert.Equal(t, tt.expectedStart, start)
assert.Equal(t, tt.expectedEnd, end)
})
}
}
2 changes: 1 addition & 1 deletion modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ func (p *Processor) writeHeadBlock(id common.ID, tr *tempopb.Trace) error {
startSeconds := uint32(start / uint64(time.Second))
endSeconds := uint32(end / uint64(time.Second))

err := p.headBlock.AppendTrace(id, tr, startSeconds, endSeconds)
err := p.headBlock.AppendTrace(id, tr, startSeconds, endSeconds, true)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ func (i *instance) writeTraceToHeadBlock(id common.ID, b []byte, start, end uint
defer i.headBlockMtx.Unlock()

i.tracesCreatedTotal.Inc()
err := i.headBlock.Append(id, b, start, end)
err := i.headBlock.Append(id, b, start, end, true)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ func makeBatchWithMaxBytes(maxBytes int, traceID []byte) *v1_trace.ResourceSpans
batch := test.MakeBatch(1, traceID)

for batch.Size() < maxBytes {
batch.ScopeSpans[0].Spans = append(batch.ScopeSpans[0].Spans, test.MakeSpanWithAttributeCount(traceID, 0))
batch.ScopeSpans[0].Spans = append(batch.ScopeSpans[0].Spans, test.MakeSpan(traceID))
}

return batch
Expand Down
11 changes: 8 additions & 3 deletions pkg/dataquality/warnings.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
)

const (
reasonOutsideIngestionSlack = "outside_ingestion_time_slack"
reasonDisconnectedTrace = "disconnected_trace"
reasonRootlessTrace = "rootless_trace"
reasonOutsideIngestionSlack = "outside_ingestion_time_slack"
reasonBlockBuilderOutsideIngestionSlack = "blockbuilder_outside_ingestion_time_slack"
reasonDisconnectedTrace = "disconnected_trace"
reasonRootlessTrace = "rootless_trace"

PhaseTraceFlushedToWal = "_flushed_to_wal"
PhaseTraceWalToComplete = "_wal_to_complete"
Expand All @@ -25,6 +26,10 @@ func WarnOutsideIngestionSlack(tenant string) {
metric.WithLabelValues(tenant, reasonOutsideIngestionSlack).Inc()
}

func WarnBlockBuilderOutsideIngestionSlack(tenant string) {
metric.WithLabelValues(tenant, reasonBlockBuilderOutsideIngestionSlack).Inc()
}

func WarnDisconnectedTrace(tenant string, phase string) {
metric.WithLabelValues(tenant, reasonDisconnectedTrace+phase).Inc()
}
Expand Down
Loading

0 comments on commit 4ac0715

Please sign in to comment.