Skip to content

Commit

Permalink
fix(storage): fix adding multiple range on stream with same read id (g…
Browse files Browse the repository at this point in the history
…oogleapis#11584)

Issue:
1. If we get a read add call we put it in map `mr.mp[curentID] = spec`
2. And if at that same point we receive an error from server we go into retry loop and close the stream manager go-routine.
3. During retry we see this new entry in map which is also retried.
4. Basically this creates two instances of curentID which we send to server. One due to add call one due to retry.

Fix:
1. Remove adding the current id in map in add call. As that can create duplicates as seen above.
2. Ignore the values from server in case entry is not found in map, given user will be notified if an entry is not in map by the help of callback.
  • Loading branch information
shubham-diwakar authored Feb 24, 2025
1 parent d14e91c commit 0bb3434
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1206,6 +1206,7 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
delete(rr.mp, key)
}
}
rr.activeTask = 0
rr.mu.Unlock()
return
case currentSpec = <-rr.data:
Expand Down Expand Up @@ -1287,6 +1288,11 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
for _, val := range arr {
id := val.GetReadRange().GetReadId()
rr.mu.Lock()
_, ok := rr.mp[id]
if !ok {
// it's ok to ignore responses for read_id not in map as user would have been notified by callback.
continue
}
_, err = rr.mp[id].writer.Write(val.GetChecksummedData().GetContent())
if err != nil {
rr.mp[id].callback(rr.mp[id].offset, rr.mp[id].limit, err)
Expand Down Expand Up @@ -1337,6 +1343,8 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
rr.mp[key].callback(rr.mp[key].offset, rr.mp[key].limit, err)
delete(rr.mp, key)
}
// In case we hit an permanent error, delete entries from map and remove active tasks.
rr.activeTask = 0
rr.mu.Unlock()
rr.close()
} else {
Expand Down Expand Up @@ -1443,11 +1451,10 @@ func (mr *gRPCBidiReader) add(output io.Writer, offset, limit int64, callback fu
return
}
mr.mu.Lock()
curentID := (*mr).readID
currentID := (*mr).readID
(*mr).readID++
if !mr.done {
spec := rangeSpec{readID: curentID, writer: output, offset: offset, limit: limit, bytesWritten: 0, callback: callback}
mr.mp[curentID] = spec
spec := rangeSpec{readID: currentID, writer: output, offset: offset, limit: limit, bytesWritten: 0, callback: callback}
mr.activeTask++
mr.data <- []rangeSpec{spec}
} else {
Expand All @@ -1458,12 +1465,15 @@ func (mr *gRPCBidiReader) add(output io.Writer, offset, limit int64, callback fu

func (mr *gRPCBidiReader) wait() {
mr.mu.Lock()
keepWaiting := len(mr.mp) != 0 && mr.activeTask != 0
// we should wait until there is active task or an entry in the map.
// there can be a scenario we have nothing in map for a moment or too but still have active task.
// hence in case we have permanent errors we reduce active task to 0 so that this does not block wait.
keepWaiting := len(mr.mp) != 0 || mr.activeTask != 0
mr.mu.Unlock()

for keepWaiting {
mr.mu.Lock()
keepWaiting = len(mr.mp) != 0 && mr.activeTask != 0
keepWaiting = len(mr.mp) != 0 || mr.activeTask != 0
mr.mu.Unlock()
}
}
Expand Down

0 comments on commit 0bb3434

Please sign in to comment.