Skip to content

Commit

Permalink
try to decode once via all
Browse files Browse the repository at this point in the history
  • Loading branch information
jh-bate committed Dec 21, 2023
1 parent 84e90b3 commit 47d16a7
Showing 1 changed file with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,9 @@ func (m *Migration) fetchAndUpdateBatch() bool {

if dataC := m.getDataCollection(); dataC != nil {
fetchStart := time.Now()
dataSet := []bson.M{}
dDataCursor, err := dataC.Find(m.ctx, selector,
&options.FindOptions{
Limit: &m.config.readBatchSize,
Sort: bson.M{"_id": 1},
BatchSize: &size,
},
Expand All @@ -398,27 +398,37 @@ func (m *Migration) fetchAndUpdateBatch() bool {
return false
}

err = dDataCursor.All(m.ctx, &dataSet)

if err != nil {
log.Printf("decoding data: %s", err)
return false
}

defer dDataCursor.Close(m.ctx)

log.Printf("fetch took %s", time.Since(fetchStart))
log.Printf("fetch took %s for %d items", time.Since(fetchStart), len(dataSet))
updateStart := time.Now()
for dDataCursor.Next(m.ctx) {
var dDataResult bson.M
if err = dDataCursor.Decode(&dDataResult); err != nil {
log.Printf("failed decoding data: %s", err)
return false
}
datumID, datumUpdates, err := utils.GetDatumUpdates(dDataResult)
for _, item := range dataSet {

//for dDataCursor.Next(m.ctx) {
// var dDataResult bson.M
// if err = dDataCursor.Decode(&dDataResult); err != nil {
// log.Printf("failed decoding data: %s", err)
// return false
// }
datumID, datumUpdates, err := utils.GetDatumUpdates(item)
if err != nil {
m.onError(err, datumID, "failed getting updates")
continue
}
updateOp := mongo.NewUpdateOneModel()
updateOp.SetFilter(bson.M{"_id": datumID, "modifiedTime": dDataResult["modifiedTime"]})
updateOp.SetFilter(bson.M{"_id": datumID, "modifiedTime": item["modifiedTime"]})
updateOp.SetUpdate(datumUpdates)
m.updates = append(m.updates, updateOp)
m.lastUpdatedId = datumID
}
//}

log.Printf("batch iteration took %s", time.Since(updateStart))
log.Printf("fetch and update took %s", time.Since(fetchAndUpdateStart))
Expand Down

0 comments on commit 47d16a7

Please sign in to comment.