Skip to content

Commit

Permalink
Allow stream custom maxsize per batch
Browse files Browse the repository at this point in the history
  • Loading branch information
simon28082 authored and mangalaman93 committed Oct 14, 2024
1 parent 16b63df commit acba512
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 1 deletion.
11 changes: 10 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ type Stream struct {
// Note: Calls to ChooseKey are concurrent.
ChooseKey func(item *Item) bool

// MaxSize is the maximum allowed size of a stream batch. This is a soft limit
// as a single list that is still over the limit will have to be sent as is since it
// cannot be split further. This limit prevents the framework from creating batches
// so big that sending them causes issues (e.g running into the max size gRPC limit).
// If necessary, set it up before the Stream starts synchronisation
// This is not a concurrency-safe setting
MaxSize uint64

// KeyToList, similar to ChooseKey, is only invoked on the highest version of the value. It
// is upto the caller to iterate over the versions and generate zero, one or more KVs. It
// is expected that the user would advance the iterator to go through the versions of the
Expand Down Expand Up @@ -315,7 +323,7 @@ func (st *Stream) streamKVs(ctx context.Context) error {
// Send the batch immediately if it already exceeds the maximum allowed size.
// If the size of the batch exceeds maxStreamSize, break from the loop to
// avoid creating a batch that is so big that certain limits are reached.
if batch.LenNoPadding() > int(maxStreamSize) {
if batch.LenNoPadding() > int(st.MaxSize) {

Check failure on line 326 in stream.go

View workflow job for this annotation

GitHub Actions / lint

G115: integer overflow conversion uint64 -> int (gosec)
break loop
}
select {
Expand Down Expand Up @@ -452,6 +460,7 @@ func (db *DB) newStream() *Stream {
db: db,
NumGo: db.opt.NumGoroutines,
LogPrefix: "Badger.Stream",
MaxSize: maxStreamSize,
}
}

Expand Down
62 changes: 62 additions & 0 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,68 @@ func TestStream(t *testing.T) {
require.NoError(t, db.Close())
}


func TestStreamMaxSize(t *testing.T) {
if !*manual {
t.Skip("Skipping test meant to be run manually.")
return
}
// Set the maxStreamSize to 1MB for the duration of the test so that the it can use a smaller
// dataset than it would otherwise need.
originalMaxStreamSize := maxStreamSize
maxStreamSize = 1 << 20
defer func() {
maxStreamSize = originalMaxStreamSize
}()

testSize := int(1e6)
dir, err := os.MkdirTemp("", "badger-big-test")
require.NoError(t, err)
defer removeDir(dir)

db, err := OpenManaged(DefaultOptions(dir))
require.NoError(t, err)

var count int
wb := db.NewWriteBatchAt(5)
for _, prefix := range []string{"p0", "p1", "p2"} {
for i := 1; i <= testSize; i++ {
require.NoError(t, wb.SetEntry(NewEntry(keyWithPrefix(prefix, i), value(i))))
count++
}
}
require.NoError(t, wb.Flush())

stream := db.NewStreamAt(math.MaxUint64)
stream.LogPrefix = "Testing"
c := &collector{}
stream.Send = c.Send

// default value
require.Equal(t, stream.MaxSize, maxStreamSize)

// reset maxsize
stream.MaxSize = 1024 * 1024 * 50

// Test case 1. Retrieve everything.
err = stream.Orchestrate(ctxb)
require.NoError(t, err)
require.Equal(t, 3*testSize, len(c.kv), "Expected 30000. Got: %d", len(c.kv))

m := make(map[string]int)
for _, kv := range c.kv {
prefix, ki := keyToInt(kv.Key)
expected := value(ki)
require.Equal(t, expected, kv.Value)
m[prefix]++
}
require.Equal(t, 3, len(m))
for pred, count := range m {
require.Equal(t, testSize, count, "Count mismatch for pred: %s", pred)
}
require.NoError(t, db.Close())
}

func TestStreamWithThreadId(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err)
Expand Down

0 comments on commit acba512

Please sign in to comment.