From ef7d1fef79d87ae012953bf09b47f6cbbfc66a41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alfonso=20Subiotto=20Marqu=C3=A9s?= Date: Tue, 21 May 2024 19:10:21 +0200 Subject: [PATCH] dst: bug and test fixes (#874) * dst: wait for spawned goroutines to finish on restart command Otherwise, some goroutines from the previous store (like block persistence) could race with the new store. This is an artifact of restarting the DB in-process. The approach used is to simply scan the running goroutine stacks, ignoring any goroutines that existed at the start of the test. * wal: sleep for defaultTickTime if no progress is made Previously, runtime.Gosched was called. However, this could be an issue in deterministic simulation tests since if other goroutines are sleeping, the yielding goroutine will schedule itself before advancing faketime. Sleeping is a quick fix for avoiding getting into this infinite loop, although we should probably come up with a long-term fix for these types of cases. * lsm: fix nil pointer in merge * dst: close most recent ColumnStore instead of first on defer We were closing the incorrect column store at the end of the test (in some cases double closing). * db: correctly discard table block contained in snapshot if block was persisted Previously, the code attempted to do this by resetting the active block index. However, the ULID was left untouched. This would cause a previously persisted block to lose data when the new empty block's rotation happened. This commit updates the full block when a NewTableBlock entry is found and the table exists. * table: enhance rotation log messages These log messges were useful when debugging DST failures. --- db.go | 38 +++++++++++++------------------------- dst/dst_test.go | 25 ++++++++++++++++++++++++- index/lsm.go | 2 +- table.go | 15 +++++++++------ wal/wal.go | 6 +++--- 5 files changed, 50 insertions(+), 36 deletions(-) diff --git a/db.go b/db.go index 3365fc6fa..c4a02bf6c 100644 --- a/db.go +++ b/db.go @@ -746,26 +746,8 @@ func (db *DB) recover(ctx context.Context, wal WAL) error { switch e := record.Entry.EntryType.(type) { case *walpb.Entry_TableBlockPersisted_: persistedTables[e.TableBlockPersisted.TableName] = e.TableBlockPersisted.NextTx - if e.TableBlockPersisted.NextTx > snapshotTx { - // The loaded snapshot has data in a table that has been - // persisted. Delete all data in this table, since it has - // already been persisted. - db.mtx.Lock() - if table, ok := db.tables[e.TableBlockPersisted.TableName]; ok { - table.ActiveBlock().index, err = index.NewLSM( - filepath.Join(table.db.indexDir(), table.name, table.ActiveBlock().ulid.String()), // Any index files are found at // - table.schema, - table.IndexConfig(), - db.HighWatermark, - index.LSMWithMetrics(table.metrics.indexMetrics), - index.LSMWithLogger(table.logger), - ) - if err != nil { - return fmt.Errorf("create new lsm index: %w", err) - } - } - db.mtx.Unlock() - } + // The loaded snapshot might have persisted data, this is handled in + // the replay loop below. return nil default: return nil @@ -802,7 +784,8 @@ func (db *DB) recover(ctx context.Context, wal WAL) error { return err } - if nextNonPersistedTxn, ok := persistedTables[entry.TableName]; ok && tx <= nextNonPersistedTxn { + nextNonPersistedTxn, wasPersisted := persistedTables[entry.TableName] + if wasPersisted && tx < nextNonPersistedTxn { // This block has already been successfully persisted, so we can // skip it. Note that if this new table block is the active // block after persistence tx == nextNonPersistedTxn. @@ -849,15 +832,20 @@ func (db *DB) recover(ctx context.Context, wal WAL) error { return fmt.Errorf("get table: %w", err) } - // If we get to this point it means a block was finished but did - // not get persisted. level.Info(db.logger).Log( "msg", "writing unfinished block in recovery", "table", tableName, "tx", tx, ) - table.pendingBlocks[table.active] = struct{}{} - go table.writeBlock(table.active, tx, db.columnStore.manualBlockRotation, false) + if snapshotTx == 0 || tx != nextNonPersistedTxn { + // If we get to this point it means a block was finished but did + // not get persisted. If a snapshot was loaded, then the table + // already exists but the active block is outdated. If + // tx == nextNonPersistedTxn, we should not persist the active + // block, but just create a new block. + table.pendingBlocks[table.active] = struct{}{} + go table.writeBlock(table.active, tx, db.columnStore.manualBlockRotation, false) + } protoEqual := false switch schema.(type) { diff --git a/dst/dst_test.go b/dst/dst_test.go index 9a4bda79c..c56902eaa 100644 --- a/dst/dst_test.go +++ b/dst/dst_test.go @@ -27,6 +27,7 @@ import ( "github.com/polarsignals/wal/types" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" + "go.uber.org/goleak" "golang.org/x/sync/errgroup" "github.com/polarsignals/frostdb" @@ -337,7 +338,6 @@ func TestDST(t *testing.T) { }, walTicker, ) require.NoError(t, err) - defer c.Close() ctx := context.Background() var db atomic.Pointer[frostdb.DB] @@ -371,6 +371,8 @@ func TestDST(t *testing.T) { errg := &errgroup.Group{} errg.SetLimit(32) commandDistribution := make(map[command]int) + + ignoreGoroutinesAtStartOfTest := goleak.IgnoreCurrent() for i := 0; i < numCommands; i++ { cmd := genCommand() commandDistribution[cmd]++ @@ -417,6 +419,23 @@ func TestDST(t *testing.T) { time.Sleep(1 * time.Millisecond) // Graceful shutdown. require.NoError(t, c.Close()) + _ = errg.Wait() + + // Unfortunately frostdb doesn't have goroutine lifecycle management + // and adding it could lead to subtle issues (e.g. on Close with + // many DBs). Instead, this test simply verifies all goroutines + // spawned up until this restart eventually exit after n retries. + const maxRetries = 10 + for i := 0; i < maxRetries; i++ { + if err := goleak.Find(ignoreGoroutinesAtStartOfTest); err == nil { + break + } else if i == maxRetries-1 { + t.Fatalf("leaked goroutines found on Close: %v", err) + } else { + time.Sleep(1 * time.Millisecond) + } + } + storeID++ c, err = newStore( storageDir, @@ -462,6 +481,10 @@ func TestDST(t *testing.T) { t.Log("snapshot files:", listFiles("snapshots")) t.Log("WAL files:", listFiles("wal")) + // Defer a close here. This is not done at the start of the test because + // the test run itself may close the store. + defer c.Close() + timestampSum := &int64checksum{} readTimestamps := make(map[int64]int) expectedTimestamps := make(map[int64]struct{}) diff --git a/index/lsm.go b/index/lsm.go index a99b48988..033922d2d 100644 --- a/index/lsm.go +++ b/index/lsm.go @@ -551,7 +551,7 @@ func (l *LSM) merge(level SentinelType) error { // Find the first part that is <= the watermark and reset the compact list to that part. wm := l.watermark() compact.Iterate(func(node *Node) bool { - if node.part != nil && node.sentinel != L0 { + if node.part == nil && node.sentinel != L0 { return false } if node.part.TX() <= wm { diff --git a/table.go b/table.go index ebefc28ff..f806a607d 100644 --- a/table.go +++ b/table.go @@ -497,12 +497,12 @@ func (t *Table) dropPendingBlock(block *TableBlock) { } func (t *Table) writeBlock(block *TableBlock, nextTxn uint64, skipPersist, snapshotDB bool) { - level.Debug(t.logger).Log("msg", "syncing block") + level.Debug(t.logger).Log("msg", "syncing block", "ulid", block.ulid, "size", block.index.Size()) block.pendingWritersWg.Wait() // from now on, the block will no longer be modified, we can persist it to disk - level.Debug(t.logger).Log("msg", "done syncing block") + level.Debug(t.logger).Log("msg", "done syncing block", "ulid", block.ulid, "size", block.index.Size()) // Persist the block var err error @@ -614,10 +614,13 @@ func (t *Table) RotateBlock(_ context.Context, block *TableBlock, skipPersist bo return nil } - level.Debug(t.logger).Log("msg", "rotating block", "blockSize", block.Size(), "skipPersist", skipPersist) - defer func() { - level.Debug(t.logger).Log("msg", "done rotating block") - }() + level.Debug(t.logger).Log( + "msg", "rotating block", + "ulid", block.ulid, + "size", block.Size(), + "skip_persist", skipPersist, + ) + defer level.Debug(t.logger).Log("msg", "done rotating block", "ulid", block.ulid) tx, _, commit := t.db.begin() defer commit() diff --git a/wal/wal.go b/wal/wal.go index 2d373fe1b..3c7b357f9 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -7,7 +7,6 @@ import ( "fmt" "math" "os" - "runtime" "sync" "time" @@ -261,8 +260,9 @@ func Open( } func (w *FileWAL) run(ctx context.Context) { + const defaultTickTime = 50 * time.Millisecond if w.ticker == nil { - w.ticker = realTicker{Ticker: time.NewTicker(50 * time.Millisecond)} + w.ticker = realTicker{Ticker: time.NewTicker(defaultTickTime)} } defer w.ticker.Stop() // lastQueueSize is only used on shutdown to reduce debug logging verbosity. @@ -291,7 +291,7 @@ func (w *FileWAL) run(ctx context.Context) { if n == lastQueueSize { // No progress made. - runtime.Gosched() + time.Sleep(defaultTickTime) continue }