Skip to content

Commit

Permalink
improve debug logging around dynamoevents cursor handling (#51944)
Browse files Browse the repository at this point in the history
  • Loading branch information
fspmarshall authored Feb 7, 2025
1 parent a33bbb0 commit 0c4a8b5
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 5 deletions.
14 changes: 11 additions & 3 deletions integrations/event-handler/legacy_events_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,14 @@ func (t *LegacyEventsWatcher) GetCursorValues() LegacyCursorValues {
}

// flipPage flips the current page
func (t *LegacyEventsWatcher) flipPage() bool {
func (t *LegacyEventsWatcher) flipPage(ctx context.Context) bool {
if t.nextCursor == "" {
t.log.DebugContext(ctx, "not flipping page (no next cursor)")
return false
}

t.log.DebugContext(ctx, "flipping page", "cursor", t.cursor, "next", t.nextCursor)

t.cursor = t.nextCursor
t.pos = -1
t.batch = make([]*LegacyTeleportEvent, 0)
Expand Down Expand Up @@ -174,7 +177,11 @@ func (t *LegacyEventsWatcher) fetch(ctx context.Context) error {
// Set the position of the last known event
t.pos = pos

t.log.DebugContext(ctx, "Skipped last known event", "id", t.id, "pos", t.pos)
if pos == 0 {
t.log.DebugContext(ctx, "starting from first event in fetch", "id", t.id, "pos", pos)
} else {
t.log.DebugContext(ctx, "advancing past last known event in fetch", "id", t.id, "pos", pos)
}

return nil
}
Expand Down Expand Up @@ -317,6 +324,7 @@ func (t *LegacyEventsWatcher) ExportEvents(ctx context.Context) error {
for {
// If there is nothing in the batch, request
if len(t.batch) == 0 {
t.log.DebugContext(ctx, "fetching due to empty batch...")
err := t.fetch(ctx)
if err != nil {
e <- trace.Wrap(err)
Expand All @@ -343,7 +351,7 @@ func (t *LegacyEventsWatcher) ExportEvents(ctx context.Context) error {
// If we processed the last event on a page
if t.pos >= len(t.batch) {
// If there is next page, flip page
if t.flipPage() {
if t.flipPage(ctx) {
continue
}

Expand Down
13 changes: 12 additions & 1 deletion lib/events/dynamoevents/dynamoevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,21 +838,28 @@ func (l *Log) searchEventsRaw(ctx context.Context, fromUTC, toUTC time.Time, nam
return nil, "", trace.Wrap(err)
}

l.logger.DebugContext(ctx, "search events", "from", fromUTC, "to", toUTC, "filter", filter, "limit", limit, "start_key", startKey, "order", order, "checkpoint", checkpoint)

if startKey != "" {
if createdAt, err := GetCreatedAtFromStartKey(startKey); err == nil {
createdAt, err := GetCreatedAtFromStartKey(startKey)
if err == nil {
// we compare the cursor unix time to the from unix in order to drop the nanoseconds
// that are not present in the cursor.
if fromUTC.Unix() > createdAt.Unix() {
l.logger.WarnContext(ctx, "cursor is from before window start time, resetting cursor", "created_at", createdAt, "from", fromUTC)
// if fromUTC is after than the cursor, we changed the window and need to reset the cursor.
// This is a guard check when iterating over the events using sliding window
// and the previous cursor no longer fits the new window.
checkpoint = checkpointKey{}
}
if createdAt.After(toUTC) {
l.logger.DebugContext(ctx, "cursor is after the end of the window, skipping search", "created_at", createdAt, "to", toUTC)
// if the cursor is after the end of the window, we can return early since we
// won't find any events.
return nil, "", nil
}
} else {
l.logger.WarnContext(ctx, "failed to get creation time from start key", "start_key", startKey, "error", err)
}
}

Expand Down Expand Up @@ -1388,6 +1395,7 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeft
if err != nil {
return nil, false, err
}
l.log.DebugContext(context.Background(), "updating iterator for events fetcher", "iterator", string(iter))
l.checkpoint.Iterator = string(iter)
}

Expand Down Expand Up @@ -1419,6 +1427,7 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeft
if err != nil {
return nil, false, trace.Wrap(err)
}
l.log.DebugContext(context.Background(), "breaking up sub-page due to event size", "key", key)
l.checkpoint.EventKey = key

// We need to reset the iterator so we get the previous page again.
Expand All @@ -1441,6 +1450,7 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeft
}
l.hasLeft = hf || l.checkpoint.Iterator != ""
l.checkpoint.EventKey = ""
l.log.DebugContext(context.Background(), "resetting checkpoint event-key due to full page", "has_left", l.hasLeft, "checkpoint", l.checkpoint)
return out, true, nil
}
}
Expand Down Expand Up @@ -1497,6 +1507,7 @@ dateLoop:
if err != nil {
return nil, trace.Wrap(err)
}

l.log.DebugContext(ctx, "Query completed.",
"duration", time.Since(start),
"items", len(out.Items),
Expand Down
5 changes: 4 additions & 1 deletion lib/events/dynamoevents/dynamoevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"encoding/json"
"fmt"
"log/slog"
"math/rand/v2"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -126,7 +127,9 @@ func TestSearchSessionEvensBySessionID(t *testing.T) {
// TestCheckpointOutsideOfWindow tests if [Log] doesn't panic
// if checkpoint date is outside of the window [fromUTC,toUTC].
func TestCheckpointOutsideOfWindow(t *testing.T) {
tt := &Log{}
tt := &Log{
logger: slog.With(teleport.ComponentKey, teleport.ComponentDynamoDB),
}

key := checkpointKey{
Date: "2022-10-02",
Expand Down

0 comments on commit 0c4a8b5

Please sign in to comment.