From b3451774b1faa41eef10c922d68392b3cb5b2cb8 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 18 Oct 2024 13:38:09 -0600 Subject: [PATCH 01/15] Complete pending CSOT reads in foreground --- internal/integration/client_test.go | 141 +++---- x/mongo/driver/topology/pool.go | 91 ++-- x/mongo/driver/topology/pool_test.go | 608 +++++++++++++-------------- 3 files changed, 416 insertions(+), 424 deletions(-) diff --git a/internal/integration/client_test.go b/internal/integration/client_test.go index d08792d74a..d653d08a91 100644 --- a/internal/integration/client_test.go +++ b/internal/integration/client_test.go @@ -20,7 +20,6 @@ import ( "go.mongodb.org/mongo-driver/v2/event" "go.mongodb.org/mongo-driver/v2/internal/assert" "go.mongodb.org/mongo-driver/v2/internal/eventtest" - "go.mongodb.org/mongo-driver/v2/internal/failpoint" "go.mongodb.org/mongo-driver/v2/internal/handshake" "go.mongodb.org/mongo-driver/v2/internal/integration/mtest" "go.mongodb.org/mongo-driver/v2/internal/integtest" @@ -648,76 +647,76 @@ func TestClient(t *testing.T) { } }) - opts := mtest.NewOptions(). - // Blocking failpoints don't work on pre-4.2 and sharded clusters. - Topologies(mtest.Single, mtest.ReplicaSet). - MinServerVersion("4.2"). - // Expliticly enable retryable reads and retryable writes. - ClientOptions(options.Client().SetRetryReads(true).SetRetryWrites(true)) - mt.RunOpts("operations don't retry after a context timeout", opts, func(mt *mtest.T) { - testCases := []struct { - desc string - operation func(context.Context, *mongo.Collection) error - }{ - { - desc: "read op", - operation: func(ctx context.Context, coll *mongo.Collection) error { - return coll.FindOne(ctx, bson.D{}).Err() - }, - }, - { - desc: "write op", - operation: func(ctx context.Context, coll *mongo.Collection) error { - _, err := coll.InsertOne(ctx, bson.D{}) - return err - }, - }, - } - - for _, tc := range testCases { - mt.Run(tc.desc, func(mt *mtest.T) { - _, err := mt.Coll.InsertOne(context.Background(), bson.D{}) - require.NoError(mt, err) - - mt.SetFailPoint(failpoint.FailPoint{ - ConfigureFailPoint: "failCommand", - Mode: failpoint.ModeAlwaysOn, - Data: failpoint.Data{ - FailCommands: []string{"find", "insert"}, - BlockConnection: true, - BlockTimeMS: 500, - }, - }) - - mt.ClearEvents() - - for i := 0; i < 50; i++ { - // Run 50 operations, each with a timeout of 50ms. Expect - // them to all return a timeout error because the failpoint - // blocks find operations for 500ms. Run 50 to increase the - // probability that an operation will time out in a way that - // can cause a retry. - ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) - err = tc.operation(ctx, mt.Coll) - cancel() - assert.ErrorIs(mt, err, context.DeadlineExceeded) - assert.True(mt, mongo.IsTimeout(err), "expected mongo.IsTimeout(err) to be true") - - // Assert that each operation reported exactly one command - // started events, which means the operation did not retry - // after the context timeout. - evts := mt.GetAllStartedEvents() - require.Len(mt, - mt.GetAllStartedEvents(), - 1, - "expected exactly 1 command started event per operation, but got %d after %d iterations", - len(evts), - i) - mt.ClearEvents() - } - }) - } - }) + //opts := mtest.NewOptions(). + // // Blocking failpoints don't work on pre-4.2 and sharded clusters. + // Topologies(mtest.Single, mtest.ReplicaSet). + // MinServerVersion("4.2"). + // // Expliticly enable retryable reads and retryable writes. + // ClientOptions(options.Client().SetRetryReads(true).SetRetryWrites(true)) + //mt.RunOpts("operations don't retry after a context timeout", opts, func(mt *mtest.T) { + // testCases := []struct { + // desc string + // operation func(context.Context, *mongo.Collection) error + // }{ + // { + // desc: "read op", + // operation: func(ctx context.Context, coll *mongo.Collection) error { + // return coll.FindOne(ctx, bson.D{}).Err() + // }, + // }, + // { + // desc: "write op", + // operation: func(ctx context.Context, coll *mongo.Collection) error { + // _, err := coll.InsertOne(ctx, bson.D{}) + // return err + // }, + // }, + // } + + // for _, tc := range testCases { + // mt.Run(tc.desc, func(mt *mtest.T) { + // _, err := mt.Coll.InsertOne(context.Background(), bson.D{}) + // require.NoError(mt, err) + + // mt.SetFailPoint(failpoint.FailPoint{ + // ConfigureFailPoint: "failCommand", + // Mode: failpoint.ModeAlwaysOn, + // Data: failpoint.Data{ + // FailCommands: []string{"find", "insert"}, + // BlockConnection: true, + // BlockTimeMS: 500, + // }, + // }) + + // mt.ClearEvents() + // //i := 0 + // for i := 0; i < 2; i++ { + // // Run 50 operations, each with a timeout of 50ms. Expect + // // them to all return a timeout error because the failpoint + // // blocks find operations for 500ms. Run 50 to increase the + // // probability that an operation will time out in a way that + // // can cause a retry. + // ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + // err = tc.operation(ctx, mt.Coll) + // cancel() + // assert.ErrorIs(mt, err, context.DeadlineExceeded) + // assert.True(mt, mongo.IsTimeout(err), "expected mongo.IsTimeout(err) to be true") + + // // Assert that each operation reported exactly one command + // // started events, which means the operation did not retry + // // after the context timeout. + // evts := mt.GetAllStartedEvents() + // require.Len(mt, + // mt.GetAllStartedEvents(), + // 1, + // "expected exactly 1 command started event per operation, but got %d after %d iterations", + // len(evts), + // i) + // mt.ClearEvents() + // } + // }) + // } + //}) } func TestClient_BSONOptions(t *testing.T) { diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index d6568e844f..f4aaa0bbdc 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -128,6 +128,8 @@ type pool struct { idleConns []*connection // idleConns holds all idle connections. idleConnWait wantConnQueue // idleConnWait holds all wantConn requests for idle connections. connectTimeout time.Duration + + bgReadMu sync.Mutex } // getState returns the current state of the pool. Callers must not hold the stateMu lock. @@ -576,6 +578,10 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { return nil, w.err } + if err := awaitPendingRead(p, w.conn); err != nil { + return p.checkOut(ctx) // Retry the checkout if the read fails. + } + duration = time.Since(start) if mustLogPoolMessage(p) { keysAndValues := logger.KeyValues{ @@ -650,6 +656,11 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { Duration: duration, }) } + + if err := awaitPendingRead(p, w.conn); err != nil { + return p.checkOut(ctx) // Retry the checkout if the read fails. + } + return w.conn, nil case <-ctx.Done(): waitQueueDuration := time.Since(waitQueueStart) @@ -788,65 +799,64 @@ var ( BGReadCallback func(addr string, start, read time.Time, errs []error, connClosed bool) ) -// bgRead sets a new read deadline on the provided connection and tries to read -// any bytes returned by the server. If successful, it checks the connection -// into the provided pool. If there are any errors, it closes the connection. -// -// It calls the package-global BGReadCallback function, if set, with the -// address, timings, and any errors that occurred. -func bgRead(pool *pool, conn *connection, size int32) { - var err error - start := time.Now() +// awaitPendingRead sets a new read deadline on the provided connection and +// tries to read any bytes returned by the server. If there are any errors, the +// connection will be checked back into the pool to be retried. +func awaitPendingRead(pool *pool, conn *connection) error { + pool.bgReadMu.Lock() + defer pool.bgReadMu.Unlock() + + // If there are no bytes pending read, do nothing. + if conn.awaitRemainingBytes == nil { + return nil + } + + size := *conn.awaitRemainingBytes + + var checkIn bool defer func() { - read := time.Now() - errs := make([]error, 0) - connClosed := false - if err != nil { - errs = append(errs, err) - connClosed = true - err = conn.close() - if err != nil { - errs = append(errs, fmt.Errorf("error closing conn after reading: %w", err)) - } + if !checkIn { + return } - // No matter what happens, always check the connection back into the // pool, which will either make it available for other operations or // remove it from the pool if it was closed. - err = pool.checkInNoEvent(conn) + err := pool.checkInNoEvent(conn) if err != nil { - errs = append(errs, fmt.Errorf("error checking in: %w", err)) - } - - if BGReadCallback != nil { - BGReadCallback(conn.addr.String(), start, read, errs, connClosed) + panic(err) } }() - err = conn.nc.SetReadDeadline(time.Now().Add(BGReadTimeout)) + err := conn.nc.SetReadDeadline(time.Now().Add(BGReadTimeout)) if err != nil { - err = fmt.Errorf("error setting a read deadline: %w", err) - return + checkIn = true + return fmt.Errorf("error setting a read deadline: %w", err) } if size == 0 { var sizeBuf [4]byte _, err = io.ReadFull(conn.nc, sizeBuf[:]) if err != nil { - err = fmt.Errorf("error reading the message size: %w", err) - return + checkIn = true + return fmt.Errorf("error reading the message size: %w", err) } size, err = conn.parseWmSizeBytes(sizeBuf) if err != nil { - return + checkIn = true + return err } size -= 4 } _, err = io.CopyN(io.Discard, conn.nc, int64(size)) if err != nil { - err = fmt.Errorf("error discarding %d byte message: %w", size, err) + checkIn = true + return fmt.Errorf("error discarding %d byte message: %w", size, err) } + + conn.awaitRemainingBytes = nil + + return nil } // checkIn returns an idle connection to the pool. If the connection is perished or the pool is @@ -888,21 +898,6 @@ func (p *pool) checkInNoEvent(conn *connection) error { return ErrWrongPool } - // If the connection has an awaiting server response, try to read the - // response in another goroutine before checking it back into the pool. - // - // Do this here because we want to publish checkIn events when the operation - // is done with the connection, not when it's ready to be used again. That - // means that connections in "awaiting response" state are checked in but - // not usable, which is not covered by the current pool events. We may need - // to add pool event information in the future to communicate that. - if conn.awaitRemainingBytes != nil { - size := *conn.awaitRemainingBytes - conn.awaitRemainingBytes = nil - go bgRead(p, conn, size) - return nil - } - // Bump the connection idle start time here because we're about to make the // connection "available". The idle start time is used to determine how long // a connection has been idle and when it has reached its max idle time and diff --git a/x/mongo/driver/topology/pool_test.go b/x/mongo/driver/topology/pool_test.go index 910d2888a2..e627fe8c28 100644 --- a/x/mongo/driver/topology/pool_test.go +++ b/x/mongo/driver/topology/pool_test.go @@ -10,14 +10,12 @@ import ( "context" "errors" "net" - "regexp" "sync" "testing" "time" "go.mongodb.org/mongo-driver/v2/event" "go.mongodb.org/mongo-driver/v2/internal/assert" - "go.mongodb.org/mongo-driver/v2/internal/csot" "go.mongodb.org/mongo-driver/v2/internal/eventtest" "go.mongodb.org/mongo-driver/v2/internal/require" "go.mongodb.org/mongo-driver/v2/mongo/address" @@ -1229,309 +1227,309 @@ func TestPool(t *testing.T) { }) } -func TestBackgroundRead(t *testing.T) { - t.Parallel() - - newBGReadCallback := func(errsCh chan []error) func(string, time.Time, time.Time, []error, bool) { - return func(_ string, _, _ time.Time, errs []error, _ bool) { - errsCh <- errs - close(errsCh) - } - } - - t.Run("incomplete read of message header", func(t *testing.T) { - errsCh := make(chan []error) - var originalCallback func(string, time.Time, time.Time, []error, bool) - originalCallback, BGReadCallback = BGReadCallback, newBGReadCallback(errsCh) - t.Cleanup(func() { - BGReadCallback = originalCallback - }) - - timeout := 10 * time.Millisecond - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 1, func(nc net.Conn) { - defer func() { - <-cleanup - _ = nc.Close() - }() - - _, err := nc.Write([]byte{10, 0, 0}) - require.NoError(t, err) - }) - - p := newPool( - poolConfig{Address: address.Address(addr.String())}, - ) - defer p.close(context.Background()) - err := p.ready() - require.NoError(t, err) - - conn, err := p.checkOut(context.Background()) - require.NoError(t, err) - ctx, cancel := csot.WithTimeout(context.Background(), &timeout) - defer cancel() - _, err = conn.readWireMessage(ctx) - regex := regexp.MustCompile( - `^connection\(.*\[-\d+\]\) incomplete read of message header: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`, - ) - assert.True(t, regex.MatchString(err.Error()), "error %q does not match pattern %q", err, regex) - assert.Nil(t, conn.awaitRemainingBytes, "conn.awaitRemainingBytes should be nil") - close(errsCh) // this line causes a double close if BGReadCallback is ever called. - }) - t.Run("timeout reading message header, successful background read", func(t *testing.T) { - errsCh := make(chan []error) - var originalCallback func(string, time.Time, time.Time, []error, bool) - originalCallback, BGReadCallback = BGReadCallback, newBGReadCallback(errsCh) - t.Cleanup(func() { - BGReadCallback = originalCallback - }) - - timeout := 10 * time.Millisecond - - addr := bootstrapConnections(t, 1, func(nc net.Conn) { - defer func() { - _ = nc.Close() - }() - - // Wait until the operation times out, then write an full message. - time.Sleep(timeout * 2) - _, err := nc.Write([]byte{10, 0, 0, 0, 0, 0, 0, 0, 0, 0}) - require.NoError(t, err) - }) - - p := newPool( - poolConfig{Address: address.Address(addr.String())}, - ) - defer p.close(context.Background()) - err := p.ready() - require.NoError(t, err) - - conn, err := p.checkOut(context.Background()) - require.NoError(t, err) - ctx, cancel := csot.WithTimeout(context.Background(), &timeout) - defer cancel() - _, err = conn.readWireMessage(ctx) - regex := regexp.MustCompile( - `^connection\(.*\[-\d+\]\) incomplete read of message header: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`, - ) - assert.True(t, regex.MatchString(err.Error()), "error %q does not match pattern %q", err, regex) - err = p.checkIn(conn) - require.NoError(t, err) - var bgErrs []error - select { - case bgErrs = <-errsCh: - case <-time.After(3 * time.Second): - assert.Fail(t, "did not receive expected error after waiting for 3 seconds") - } - require.Len(t, bgErrs, 0, "expected no error from bgRead()") - }) - t.Run("timeout reading message header, incomplete head during background read", func(t *testing.T) { - errsCh := make(chan []error) - var originalCallback func(string, time.Time, time.Time, []error, bool) - originalCallback, BGReadCallback = BGReadCallback, newBGReadCallback(errsCh) - t.Cleanup(func() { - BGReadCallback = originalCallback - }) - - timeout := 10 * time.Millisecond - - addr := bootstrapConnections(t, 1, func(nc net.Conn) { - defer func() { - _ = nc.Close() - }() - - // Wait until the operation times out, then write an incomplete head. - time.Sleep(timeout * 2) - _, err := nc.Write([]byte{10, 0, 0}) - require.NoError(t, err) - }) - - p := newPool( - poolConfig{Address: address.Address(addr.String())}, - ) - defer p.close(context.Background()) - err := p.ready() - require.NoError(t, err) - - conn, err := p.checkOut(context.Background()) - require.NoError(t, err) - ctx, cancel := csot.WithTimeout(context.Background(), &timeout) - defer cancel() - _, err = conn.readWireMessage(ctx) - regex := regexp.MustCompile( - `^connection\(.*\[-\d+\]\) incomplete read of message header: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`, - ) - assert.True(t, regex.MatchString(err.Error()), "error %q does not match pattern %q", err, regex) - err = p.checkIn(conn) - require.NoError(t, err) - var bgErrs []error - select { - case bgErrs = <-errsCh: - case <-time.After(3 * time.Second): - assert.Fail(t, "did not receive expected error after waiting for 3 seconds") - } - require.Len(t, bgErrs, 1, "expected 1 error from bgRead()") - assert.EqualError(t, bgErrs[0], "error reading the message size: unexpected EOF") - }) - t.Run("timeout reading message header, background read timeout", func(t *testing.T) { - errsCh := make(chan []error) - var originalCallback func(string, time.Time, time.Time, []error, bool) - originalCallback, BGReadCallback = BGReadCallback, newBGReadCallback(errsCh) - t.Cleanup(func() { - BGReadCallback = originalCallback - }) - - timeout := 10 * time.Millisecond - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 1, func(nc net.Conn) { - defer func() { - <-cleanup - _ = nc.Close() - }() - - // Wait until the operation times out, then write an incomplete - // message. - time.Sleep(timeout * 2) - _, err := nc.Write([]byte{10, 0, 0, 0, 0, 0, 0, 0}) - require.NoError(t, err) - }) - - p := newPool( - poolConfig{Address: address.Address(addr.String())}, - ) - defer p.close(context.Background()) - err := p.ready() - require.NoError(t, err) - - conn, err := p.checkOut(context.Background()) - require.NoError(t, err) - ctx, cancel := csot.WithTimeout(context.Background(), &timeout) - defer cancel() - _, err = conn.readWireMessage(ctx) - regex := regexp.MustCompile( - `^connection\(.*\[-\d+\]\) incomplete read of message header: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`, - ) - assert.True(t, regex.MatchString(err.Error()), "error %q does not match pattern %q", err, regex) - err = p.checkIn(conn) - require.NoError(t, err) - var bgErrs []error - select { - case bgErrs = <-errsCh: - case <-time.After(3 * time.Second): - assert.Fail(t, "did not receive expected error after waiting for 3 seconds") - } - require.Len(t, bgErrs, 1, "expected 1 error from bgRead()") - wantErr := regexp.MustCompile( - `^error discarding 6 byte message: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`, - ) - assert.True(t, wantErr.MatchString(bgErrs[0].Error()), "error %q does not match pattern %q", bgErrs[0], wantErr) - }) - t.Run("timeout reading full message, successful background read", func(t *testing.T) { - errsCh := make(chan []error) - var originalCallback func(string, time.Time, time.Time, []error, bool) - originalCallback, BGReadCallback = BGReadCallback, newBGReadCallback(errsCh) - t.Cleanup(func() { - BGReadCallback = originalCallback - }) - - timeout := 10 * time.Millisecond - - addr := bootstrapConnections(t, 1, func(nc net.Conn) { - defer func() { - _ = nc.Close() - }() - - var err error - _, err = nc.Write([]byte{12, 0, 0, 0, 0, 0, 0, 0, 1}) - require.NoError(t, err) - time.Sleep(timeout * 2) - // write a complete message - _, err = nc.Write([]byte{2, 3, 4}) - require.NoError(t, err) - }) - - p := newPool( - poolConfig{Address: address.Address(addr.String())}, - ) - defer p.close(context.Background()) - err := p.ready() - require.NoError(t, err) - - conn, err := p.checkOut(context.Background()) - require.NoError(t, err) - ctx, cancel := csot.WithTimeout(context.Background(), &timeout) - defer cancel() - _, err = conn.readWireMessage(ctx) - regex := regexp.MustCompile( - `^connection\(.*\[-\d+\]\) incomplete read of full message: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`, - ) - assert.True(t, regex.MatchString(err.Error()), "error %q does not match pattern %q", err, regex) - err = p.checkIn(conn) - require.NoError(t, err) - var bgErrs []error - select { - case bgErrs = <-errsCh: - case <-time.After(3 * time.Second): - assert.Fail(t, "did not receive expected error after waiting for 3 seconds") - } - require.Len(t, bgErrs, 0, "expected no error from bgRead()") - }) - t.Run("timeout reading full message, background read EOF", func(t *testing.T) { - errsCh := make(chan []error) - var originalCallback func(string, time.Time, time.Time, []error, bool) - originalCallback, BGReadCallback = BGReadCallback, newBGReadCallback(errsCh) - t.Cleanup(func() { - BGReadCallback = originalCallback - }) - - timeout := 10 * time.Millisecond - - addr := bootstrapConnections(t, 1, func(nc net.Conn) { - defer func() { - _ = nc.Close() - }() - - var err error - _, err = nc.Write([]byte{12, 0, 0, 0, 0, 0, 0, 0, 1}) - require.NoError(t, err) - time.Sleep(timeout * 2) - // write an incomplete message - _, err = nc.Write([]byte{2}) - require.NoError(t, err) - }) - - p := newPool( - poolConfig{Address: address.Address(addr.String())}, - ) - defer p.close(context.Background()) - err := p.ready() - require.NoError(t, err) - - conn, err := p.checkOut(context.Background()) - require.NoError(t, err) - ctx, cancel := csot.WithTimeout(context.Background(), &timeout) - defer cancel() - _, err = conn.readWireMessage(ctx) - regex := regexp.MustCompile( - `^connection\(.*\[-\d+\]\) incomplete read of full message: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`, - ) - assert.True(t, regex.MatchString(err.Error()), "error %q does not match pattern %q", err, regex) - err = p.checkIn(conn) - require.NoError(t, err) - var bgErrs []error - select { - case bgErrs = <-errsCh: - case <-time.After(3 * time.Second): - assert.Fail(t, "did not receive expected error after waiting for 3 seconds") - } - require.Len(t, bgErrs, 1, "expected 1 error from bgRead()") - assert.EqualError(t, bgErrs[0], "error discarding 3 byte message: EOF") - }) -} +//func TestBackgroundRead(t *testing.T) { +// t.Parallel() +// +// newBGReadCallback := func(errsCh chan []error) func(string, time.Time, time.Time, []error, bool) { +// return func(_ string, _, _ time.Time, errs []error, _ bool) { +// errsCh <- errs +// close(errsCh) +// } +// } +// +// t.Run("incomplete read of message header", func(t *testing.T) { +// errsCh := make(chan []error) +// var originalCallback func(string, time.Time, time.Time, []error, bool) +// originalCallback, BGReadCallback = BGReadCallback, newBGReadCallback(errsCh) +// t.Cleanup(func() { +// BGReadCallback = originalCallback +// }) +// +// timeout := 10 * time.Millisecond +// +// cleanup := make(chan struct{}) +// defer close(cleanup) +// addr := bootstrapConnections(t, 1, func(nc net.Conn) { +// defer func() { +// <-cleanup +// _ = nc.Close() +// }() +// +// _, err := nc.Write([]byte{10, 0, 0}) +// require.NoError(t, err) +// }) +// +// p := newPool( +// poolConfig{Address: address.Address(addr.String())}, +// ) +// defer p.close(context.Background()) +// err := p.ready() +// require.NoError(t, err) +// +// conn, err := p.checkOut(context.Background()) +// require.NoError(t, err) +// ctx, cancel := csot.WithTimeout(context.Background(), &timeout) +// defer cancel() +// _, err = conn.readWireMessage(ctx) +// regex := regexp.MustCompile( +// `^connection\(.*\[-\d+\]\) incomplete read of message header: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`, +// ) +// assert.True(t, regex.MatchString(err.Error()), "error %q does not match pattern %q", err, regex) +// assert.Nil(t, conn.awaitRemainingBytes, "conn.awaitRemainingBytes should be nil") +// close(errsCh) // this line causes a double close if BGReadCallback is ever called. +// }) +// t.Run("timeout reading message header, successful background read", func(t *testing.T) { +// errsCh := make(chan []error) +// var originalCallback func(string, time.Time, time.Time, []error, bool) +// originalCallback, BGReadCallback = BGReadCallback, newBGReadCallback(errsCh) +// t.Cleanup(func() { +// BGReadCallback = originalCallback +// }) +// +// timeout := 10 * time.Millisecond +// +// addr := bootstrapConnections(t, 1, func(nc net.Conn) { +// defer func() { +// _ = nc.Close() +// }() +// +// // Wait until the operation times out, then write an full message. +// time.Sleep(timeout * 2) +// _, err := nc.Write([]byte{10, 0, 0, 0, 0, 0, 0, 0, 0, 0}) +// require.NoError(t, err) +// }) +// +// p := newPool( +// poolConfig{Address: address.Address(addr.String())}, +// ) +// defer p.close(context.Background()) +// err := p.ready() +// require.NoError(t, err) +// +// conn, err := p.checkOut(context.Background()) +// require.NoError(t, err) +// ctx, cancel := csot.WithTimeout(context.Background(), &timeout) +// defer cancel() +// _, err = conn.readWireMessage(ctx) +// regex := regexp.MustCompile( +// `^connection\(.*\[-\d+\]\) incomplete read of message header: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`, +// ) +// assert.True(t, regex.MatchString(err.Error()), "error %q does not match pattern %q", err, regex) +// err = p.checkIn(conn) +// require.NoError(t, err) +// var bgErrs []error +// select { +// case bgErrs = <-errsCh: +// case <-time.After(3 * time.Second): +// assert.Fail(t, "did not receive expected error after waiting for 3 seconds") +// } +// require.Len(t, bgErrs, 0, "expected no error from bgRead()") +// }) +// t.Run("timeout reading message header, incomplete head during background read", func(t *testing.T) { +// errsCh := make(chan []error) +// var originalCallback func(string, time.Time, time.Time, []error, bool) +// originalCallback, BGReadCallback = BGReadCallback, newBGReadCallback(errsCh) +// t.Cleanup(func() { +// BGReadCallback = originalCallback +// }) +// +// timeout := 10 * time.Millisecond +// +// addr := bootstrapConnections(t, 1, func(nc net.Conn) { +// defer func() { +// _ = nc.Close() +// }() +// +// // Wait until the operation times out, then write an incomplete head. +// time.Sleep(timeout * 2) +// _, err := nc.Write([]byte{10, 0, 0}) +// require.NoError(t, err) +// }) +// +// p := newPool( +// poolConfig{Address: address.Address(addr.String())}, +// ) +// defer p.close(context.Background()) +// err := p.ready() +// require.NoError(t, err) +// +// conn, err := p.checkOut(context.Background()) +// require.NoError(t, err) +// ctx, cancel := csot.WithTimeout(context.Background(), &timeout) +// defer cancel() +// _, err = conn.readWireMessage(ctx) +// regex := regexp.MustCompile( +// `^connection\(.*\[-\d+\]\) incomplete read of message header: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`, +// ) +// assert.True(t, regex.MatchString(err.Error()), "error %q does not match pattern %q", err, regex) +// err = p.checkIn(conn) +// require.NoError(t, err) +// var bgErrs []error +// select { +// case bgErrs = <-errsCh: +// case <-time.After(3 * time.Second): +// assert.Fail(t, "did not receive expected error after waiting for 3 seconds") +// } +// require.Len(t, bgErrs, 1, "expected 1 error from bgRead()") +// assert.EqualError(t, bgErrs[0], "error reading the message size: unexpected EOF") +// }) +// t.Run("timeout reading message header, background read timeout", func(t *testing.T) { +// errsCh := make(chan []error) +// var originalCallback func(string, time.Time, time.Time, []error, bool) +// originalCallback, BGReadCallback = BGReadCallback, newBGReadCallback(errsCh) +// t.Cleanup(func() { +// BGReadCallback = originalCallback +// }) +// +// timeout := 10 * time.Millisecond +// +// cleanup := make(chan struct{}) +// defer close(cleanup) +// addr := bootstrapConnections(t, 1, func(nc net.Conn) { +// defer func() { +// <-cleanup +// _ = nc.Close() +// }() +// +// // Wait until the operation times out, then write an incomplete +// // message. +// time.Sleep(timeout * 2) +// _, err := nc.Write([]byte{10, 0, 0, 0, 0, 0, 0, 0}) +// require.NoError(t, err) +// }) +// +// p := newPool( +// poolConfig{Address: address.Address(addr.String())}, +// ) +// defer p.close(context.Background()) +// err := p.ready() +// require.NoError(t, err) +// +// conn, err := p.checkOut(context.Background()) +// require.NoError(t, err) +// ctx, cancel := csot.WithTimeout(context.Background(), &timeout) +// defer cancel() +// _, err = conn.readWireMessage(ctx) +// regex := regexp.MustCompile( +// `^connection\(.*\[-\d+\]\) incomplete read of message header: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`, +// ) +// assert.True(t, regex.MatchString(err.Error()), "error %q does not match pattern %q", err, regex) +// err = p.checkIn(conn) +// require.NoError(t, err) +// var bgErrs []error +// select { +// case bgErrs = <-errsCh: +// case <-time.After(3 * time.Second): +// assert.Fail(t, "did not receive expected error after waiting for 3 seconds") +// } +// require.Len(t, bgErrs, 1, "expected 1 error from bgRead()") +// wantErr := regexp.MustCompile( +// `^error discarding 6 byte message: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`, +// ) +// assert.True(t, wantErr.MatchString(bgErrs[0].Error()), "error %q does not match pattern %q", bgErrs[0], wantErr) +// }) +// t.Run("timeout reading full message, successful background read", func(t *testing.T) { +// errsCh := make(chan []error) +// var originalCallback func(string, time.Time, time.Time, []error, bool) +// originalCallback, BGReadCallback = BGReadCallback, newBGReadCallback(errsCh) +// t.Cleanup(func() { +// BGReadCallback = originalCallback +// }) +// +// timeout := 10 * time.Millisecond +// +// addr := bootstrapConnections(t, 1, func(nc net.Conn) { +// defer func() { +// _ = nc.Close() +// }() +// +// var err error +// _, err = nc.Write([]byte{12, 0, 0, 0, 0, 0, 0, 0, 1}) +// require.NoError(t, err) +// time.Sleep(timeout * 2) +// // write a complete message +// _, err = nc.Write([]byte{2, 3, 4}) +// require.NoError(t, err) +// }) +// +// p := newPool( +// poolConfig{Address: address.Address(addr.String())}, +// ) +// defer p.close(context.Background()) +// err := p.ready() +// require.NoError(t, err) +// +// conn, err := p.checkOut(context.Background()) +// require.NoError(t, err) +// ctx, cancel := csot.WithTimeout(context.Background(), &timeout) +// defer cancel() +// _, err = conn.readWireMessage(ctx) +// regex := regexp.MustCompile( +// `^connection\(.*\[-\d+\]\) incomplete read of full message: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`, +// ) +// assert.True(t, regex.MatchString(err.Error()), "error %q does not match pattern %q", err, regex) +// err = p.checkIn(conn) +// require.NoError(t, err) +// var bgErrs []error +// select { +// case bgErrs = <-errsCh: +// case <-time.After(3 * time.Second): +// assert.Fail(t, "did not receive expected error after waiting for 3 seconds") +// } +// require.Len(t, bgErrs, 0, "expected no error from bgRead()") +// }) +// t.Run("timeout reading full message, background read EOF", func(t *testing.T) { +// errsCh := make(chan []error) +// var originalCallback func(string, time.Time, time.Time, []error, bool) +// originalCallback, BGReadCallback = BGReadCallback, newBGReadCallback(errsCh) +// t.Cleanup(func() { +// BGReadCallback = originalCallback +// }) +// +// timeout := 10 * time.Millisecond +// +// addr := bootstrapConnections(t, 1, func(nc net.Conn) { +// defer func() { +// _ = nc.Close() +// }() +// +// var err error +// _, err = nc.Write([]byte{12, 0, 0, 0, 0, 0, 0, 0, 1}) +// require.NoError(t, err) +// time.Sleep(timeout * 2) +// // write an incomplete message +// _, err = nc.Write([]byte{2}) +// require.NoError(t, err) +// }) +// +// p := newPool( +// poolConfig{Address: address.Address(addr.String())}, +// ) +// defer p.close(context.Background()) +// err := p.ready() +// require.NoError(t, err) +// +// conn, err := p.checkOut(context.Background()) +// require.NoError(t, err) +// ctx, cancel := csot.WithTimeout(context.Background(), &timeout) +// defer cancel() +// _, err = conn.readWireMessage(ctx) +// regex := regexp.MustCompile( +// `^connection\(.*\[-\d+\]\) incomplete read of full message: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`, +// ) +// assert.True(t, regex.MatchString(err.Error()), "error %q does not match pattern %q", err, regex) +// err = p.checkIn(conn) +// require.NoError(t, err) +// var bgErrs []error +// select { +// case bgErrs = <-errsCh: +// case <-time.After(3 * time.Second): +// assert.Fail(t, "did not receive expected error after waiting for 3 seconds") +// } +// require.Len(t, bgErrs, 1, "expected 1 error from bgRead()") +// assert.EqualError(t, bgErrs[0], "error discarding 3 byte message: EOF") +// }) +//} func assertConnectionsClosed(t *testing.T, dialer *dialer, count int) { t.Helper() From 3b67db9bfa110c6fa14bb71ec6a44434066b92a8 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Mon, 21 Oct 2024 10:56:53 -0600 Subject: [PATCH 02/15] Update remaining bytes for each c/o CSOT --- x/mongo/driver/topology/pool.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index f4aaa0bbdc..e914b1b031 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -8,6 +8,7 @@ package topology import ( "context" + "errors" "fmt" "io" "net" @@ -18,6 +19,7 @@ import ( "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/event" "go.mongodb.org/mongo-driver/v2/internal/logger" + "go.mongodb.org/mongo-driver/v2/internal/ptrutil" "go.mongodb.org/mongo-driver/v2/mongo/address" "go.mongodb.org/mongo-driver/v2/x/mongo/driver" ) @@ -803,6 +805,7 @@ var ( // tries to read any bytes returned by the server. If there are any errors, the // connection will be checked back into the pool to be retried. func awaitPendingRead(pool *pool, conn *connection) error { + fmt.Println("awaitRemainingBytes", conn.awaitRemainingBytes) pool.bgReadMu.Lock() defer pool.bgReadMu.Unlock() @@ -836,8 +839,14 @@ func awaitPendingRead(pool *pool, conn *connection) error { if size == 0 { var sizeBuf [4]byte - _, err = io.ReadFull(conn.nc, sizeBuf[:]) + n, err := io.ReadFull(conn.nc, sizeBuf[:]) if err != nil { + // If the read times out, record the bytes left to read before exiting. + nerr := net.Error(nil) + if l := int32(n); l == 0 && errors.As(err, &nerr) && nerr.Timeout() { + conn.awaitRemainingBytes = ptrutil.Ptr(l + *conn.awaitRemainingBytes) + } + checkIn = true return fmt.Errorf("error reading the message size: %w", err) } From bdcb50be55fc318c6c82e89828fd27116f7471bf Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Mon, 21 Oct 2024 13:53:37 -0600 Subject: [PATCH 03/15] Move cont read to copyN --- x/mongo/driver/topology/pool.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index e914b1b031..6f4c2b73bb 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -805,7 +805,6 @@ var ( // tries to read any bytes returned by the server. If there are any errors, the // connection will be checked back into the pool to be retried. func awaitPendingRead(pool *pool, conn *connection) error { - fmt.Println("awaitRemainingBytes", conn.awaitRemainingBytes) pool.bgReadMu.Lock() defer pool.bgReadMu.Unlock() @@ -839,14 +838,7 @@ func awaitPendingRead(pool *pool, conn *connection) error { if size == 0 { var sizeBuf [4]byte - n, err := io.ReadFull(conn.nc, sizeBuf[:]) - if err != nil { - // If the read times out, record the bytes left to read before exiting. - nerr := net.Error(nil) - if l := int32(n); l == 0 && errors.As(err, &nerr) && nerr.Timeout() { - conn.awaitRemainingBytes = ptrutil.Ptr(l + *conn.awaitRemainingBytes) - } - + if _, err := io.ReadFull(conn.nc, sizeBuf[:]); err != nil { checkIn = true return fmt.Errorf("error reading the message size: %w", err) } @@ -857,8 +849,14 @@ func awaitPendingRead(pool *pool, conn *connection) error { } size -= 4 } - _, err = io.CopyN(io.Discard, conn.nc, int64(size)) + n, err := io.CopyN(io.Discard, conn.nc, int64(size)) if err != nil { + // If the read times out, record the bytes left to read before exiting. + nerr := net.Error(nil) + if l := int32(n); l == 0 && errors.As(err, &nerr) && nerr.Timeout() { + conn.awaitRemainingBytes = ptrutil.Ptr(l + *conn.awaitRemainingBytes) + } + checkIn = true return fmt.Errorf("error discarding %d byte message: %w", size, err) } From d80942e57e6188bd7bf58b47a18622d217034e24 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Wed, 23 Oct 2024 17:55:26 -0600 Subject: [PATCH 04/15] Include reduced static timeout --- internal/integration/csot_test.go | 99 ++- .../connection-churn.json | 663 ++++++++++++++++++ .../connection-churn.yml | 400 +++++++++++ x/mongo/driver/topology/connection.go | 4 + x/mongo/driver/topology/pool.go | 40 +- 5 files changed, 1142 insertions(+), 64 deletions(-) create mode 100644 testdata/client-side-operations-timeout/connection-churn.json create mode 100644 testdata/client-side-operations-timeout/connection-churn.yml diff --git a/internal/integration/csot_test.go b/internal/integration/csot_test.go index 6808efb2a4..15df47a981 100644 --- a/internal/integration/csot_test.go +++ b/internal/integration/csot_test.go @@ -16,7 +16,6 @@ import ( "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/event" "go.mongodb.org/mongo-driver/v2/internal/assert" - "go.mongodb.org/mongo-driver/v2/internal/eventtest" "go.mongodb.org/mongo-driver/v2/internal/failpoint" "go.mongodb.org/mongo-driver/v2/internal/integration/mtest" "go.mongodb.org/mongo-driver/v2/internal/require" @@ -349,55 +348,55 @@ func TestCSOT_maxTimeMS(t *testing.T) { } }) - opts := mtest.NewOptions(). - // Blocking failpoints don't work on pre-4.2 and sharded - // clusters. - Topologies(mtest.Single, mtest.ReplicaSet). - MinServerVersion("4.2") - mt.RunOpts("prevents connection closure", opts, func(mt *mtest.T) { - if tc.setup != nil { - err := tc.setup(mt.Coll) - require.NoError(mt, err) - } - - mt.SetFailPoint(failpoint.FailPoint{ - ConfigureFailPoint: "failCommand", - Mode: failpoint.ModeAlwaysOn, - Data: failpoint.Data{ - FailCommands: []string{tc.commandName}, - BlockConnection: true, - // Note that some operations (currently Find and - // Aggregate) do not send maxTimeMS by default, meaning - // that the server will only respond after BlockTimeMS - // is elapsed. If the amount of time that the driver - // waits for responses after a timeout is significantly - // lower than BlockTimeMS, this test will start failing - // for those operations. - BlockTimeMS: 500, - }, - }) - - tpm := eventtest.NewTestPoolMonitor() - mt.ResetClient(options.Client(). - SetPoolMonitor(tpm.PoolMonitor)) - - // Run 5 operations that time out, then assert that no - // connections were closed. - for i := 0; i < 5; i++ { - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Millisecond) - err := tc.operation(ctx, mt.Coll) - cancel() - - if !mongo.IsTimeout(err) { - t.Logf("Operation %d returned a non-timeout error: %v", i, err) - } - } - - closedEvents := tpm.Events(func(pe *event.PoolEvent) bool { - return pe.Type == event.ConnectionClosed - }) - assert.Len(mt, closedEvents, 0, "expected no connection closed event") - }) + //opts := mtest.NewOptions(). + // // Blocking failpoints don't work on pre-4.2 and sharded + // // clusters. + // Topologies(mtest.Single, mtest.ReplicaSet). + // MinServerVersion("4.2") + //mt.RunOpts("prevents connection closure", opts, func(mt *mtest.T) { + // if tc.setup != nil { + // err := tc.setup(mt.Coll) + // require.NoError(mt, err) + // } + + // mt.SetFailPoint(failpoint.FailPoint{ + // ConfigureFailPoint: "failCommand", + // Mode: failpoint.ModeAlwaysOn, + // Data: failpoint.Data{ + // FailCommands: []string{tc.commandName}, + // BlockConnection: true, + // // Note that some operations (currently Find and + // // Aggregate) do not send maxTimeMS by default, meaning + // // that the server will only respond after BlockTimeMS + // // is elapsed. If the amount of time that the driver + // // waits for responses after a timeout is significantly + // // lower than BlockTimeMS, this test will start failing + // // for those operations. + // BlockTimeMS: 500, + // }, + // }) + + // tpm := eventtest.NewTestPoolMonitor() + // mt.ResetClient(options.Client(). + // SetPoolMonitor(tpm.PoolMonitor)) + + // // Run 5 operations that time out, then assert that no + // // connections were closed. + // for i := 0; i < 5; i++ { + // ctx, cancel := context.WithTimeout(context.Background(), 15*time.Millisecond) + // err := tc.operation(ctx, mt.Coll) + // cancel() + + // if !mongo.IsTimeout(err) { + // t.Logf("Operation %d returned a non-timeout error: %v", i, err) + // } + // } + + // closedEvents := tpm.Events(func(pe *event.PoolEvent) bool { + // return pe.Type == event.ConnectionClosed + // }) + // assert.Len(mt, closedEvents, 0, "expected no connection closed event") + //}) }) } diff --git a/testdata/client-side-operations-timeout/connection-churn.json b/testdata/client-side-operations-timeout/connection-churn.json new file mode 100644 index 0000000000..b20c197919 --- /dev/null +++ b/testdata/client-side-operations-timeout/connection-churn.json @@ -0,0 +1,663 @@ +{ + "description": "operation timeouts do not cause connection churn", + "schemaVersion": "1.9", + "runOnRequirements": [ + { + "minServerVersion": "4.4", + "topologies": [ + "replicaset", + "sharded" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "failPointClient", + "useMultipleMongoses": false + } + } + ], + "initialData": [ + { + "collectionName": "coll", + "databaseName": "test", + "documents": [] + } + ], + "tests": [ + { + "description": "write op with successful pending read", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "maxPoolSize": 1 + }, + "useMultipleMongoses": false, + "observeEvents": [ + "commandFailedEvent", + "connectionClosedEvent" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "test" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "coll" + } + } + ] + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "insert" + ], + "blockConnection": true, + "blockTimeMS": 750 + } + } + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "timeoutMS": 500, + "document": { + "_id": 3, + "x": 1 + } + }, + "expectError": { + "isTimeoutError": true + } + }, + { + "name": "findOne", + "object": "collection", + "arguments": { + "filter": { + "_id": 1 + } + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandFailedEvent": { + "commandName": "insert" + } + } + ] + }, + { + "client": "client", + "eventType": "cmap", + "events": [] + } + ] + }, + { + "description": "write op with unsuccessful pending read", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "maxPoolSize": 1 + }, + "useMultipleMongoses": false, + "observeEvents": [ + "commandFailedEvent", + "connectionClosedEvent" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "test" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "coll" + } + } + ] + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 2 + }, + "data": { + "failCommands": [ + "insert" + ], + "blockConnection": true, + "blockTimeMS": 750 + } + } + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "timeoutMS": 50, + "document": { + "_id": 3, + "x": 1 + } + }, + "expectError": { + "isTimeoutError": true + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "timeoutMS": 50, + "document": { + "_id": 3, + "x": 1 + } + }, + "expectError": { + "isTimeoutError": true + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandFailedEvent": { + "commandName": "insert" + } + } + ] + }, + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "connectionClosedEvent": { + "reason": "error" + } + } + ] + } + ] + }, + { + "description": "read op with successful pending read", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "maxPoolSize": 1 + }, + "useMultipleMongoses": false, + "observeEvents": [ + "commandFailedEvent", + "connectionClosedEvent" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "test" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "coll" + } + } + ] + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "insert" + ], + "blockConnection": true, + "blockTimeMS": 750 + } + } + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "timeoutMS": 500, + "document": { + "_id": 3, + "x": 1 + } + }, + "expectError": { + "isTimeoutError": true + } + }, + { + "name": "findOne", + "object": "collection", + "arguments": { + "filter": { + "_id": 1 + } + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandFailedEvent": { + "commandName": "insert" + } + } + ] + }, + { + "client": "client", + "eventType": "cmap", + "events": [] + } + ] + }, + { + "description": "write op with unsuccessful pending read", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "maxPoolSize": 1 + }, + "useMultipleMongoses": false, + "observeEvents": [ + "commandFailedEvent", + "connectionClosedEvent" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "test" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "coll" + } + } + ] + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 2 + }, + "data": { + "failCommands": [ + "insert" + ], + "blockConnection": true, + "blockTimeMS": 750 + } + } + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "timeoutMS": 50, + "document": { + "_id": 3, + "x": 1 + } + }, + "expectError": { + "isTimeoutError": true + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "timeoutMS": 50, + "document": { + "_id": 3, + "x": 1 + } + }, + "expectError": { + "isTimeoutError": true + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandFailedEvent": { + "commandName": "insert" + } + } + ] + }, + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "connectionClosedEvent": { + "reason": "error" + } + } + ] + } + ] + }, + { + "description": "read op with successful pending read", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "maxPoolSize": 1 + }, + "useMultipleMongoses": false, + "observeEvents": [ + "commandFailedEvent", + "connectionClosedEvent" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "test" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "coll" + } + } + ] + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "insert" + ], + "blockConnection": true, + "blockTimeMS": 750 + } + } + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "timeoutMS": 500, + "document": { + "_id": 3, + "x": 1 + } + }, + "expectError": { + "isTimeoutError": true + } + }, + { + "name": "findOne", + "object": "collection", + "arguments": { + "filter": { + "_id": 1 + } + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandFailedEvent": { + "commandName": "insert" + } + } + ] + }, + { + "client": "client", + "eventType": "cmap", + "events": [] + } + ] + }, + { + "description": "write op with unsuccessful pending read", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "maxPoolSize": 1 + }, + "useMultipleMongoses": false, + "observeEvents": [ + "commandFailedEvent", + "connectionClosedEvent" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "test" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "coll" + } + } + ] + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 2 + }, + "data": { + "failCommands": [ + "insert" + ], + "blockConnection": true, + "blockTimeMS": 750 + } + } + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "timeoutMS": 50, + "document": { + "_id": 3, + "x": 1 + } + }, + "expectError": { + "isTimeoutError": true + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "timeoutMS": 50, + "document": { + "_id": 3, + "x": 1 + } + }, + "expectError": { + "isTimeoutError": true + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandFailedEvent": { + "commandName": "insert" + } + } + ] + }, + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "connectionClosedEvent": { + "reason": "error" + } + } + ] + } + ] + } + ] +} diff --git a/testdata/client-side-operations-timeout/connection-churn.yml b/testdata/client-side-operations-timeout/connection-churn.yml new file mode 100644 index 0000000000..6541719f3d --- /dev/null +++ b/testdata/client-side-operations-timeout/connection-churn.yml @@ -0,0 +1,400 @@ +description: "operation timeouts do not cause connection churn" + +schemaVersion: "1.9" + +runOnRequirements: + - minServerVersion: "4.4" + topologies: ["replicaset", "sharded"] + +createEntities: + - client: + id: &failPointClient failPointClient + useMultipleMongoses: false + +initialData: + - collectionName: &collectionName coll + databaseName: &databaseName test + documents: [] + +tests: + - description: "write op with successful pending read" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + uriOptions: + maxPoolSize: 1 + useMultipleMongoses: false + observeEvents: + - commandFailedEvent + - connectionClosedEvent + - database: + id: &database database + client: *client + databaseName: *databaseName + - collection: + id: &collection collection + database: *database + collectionName: *collectionName + + # Create a failpoint to block first op + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: ["insert"] + blockConnection: true + blockTimeMS: 750 + + # Execute op with timeout < block time + - name: insertOne + object: *collection + arguments: + timeoutMS: 500 + document: { _id: 3, x: 1 } + expectError: + isTimeoutError: true + + # Execute a subsequent operation to complete the read when checking out + # the single available connection. + - name: findOne + object: *collection + arguments: + filter: { _id: 1 } + + expectEvents: + - client: *client + events: + - commandFailedEvent: + commandName: insert + - client: *client + eventType: cmap + events: [] # Expect no connection closure. + + - description: "write op with unsuccessful pending read" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + uriOptions: + maxPoolSize: 1 + useMultipleMongoses: false + observeEvents: + - commandFailedEvent + - connectionClosedEvent + - database: + id: &database database + client: *client + databaseName: *databaseName + - collection: + id: &collection collection + database: *database + collectionName: *collectionName + + # Create a failpoint to block first op + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: { times: 2 } + data: + failCommands: ["insert"] + blockConnection: true + blockTimeMS: 750 + + # Execute op with timeout < block time + - name: insertOne + object: *collection + arguments: + timeoutMS: 50 + document: { _id: 3, x: 1 } + expectError: + isTimeoutError: true + + # The pending read should fail. + - name: insertOne + object: *collection + arguments: + timeoutMS: 50 + document: { _id: 3, x: 1 } + expectError: + isTimeoutError: true + + expectEvents: + - client: *client + events: + - commandFailedEvent: + commandName: insert + - client: *client + eventType: cmap + events: + - connectionClosedEvent: + reason: error + + - description: "read op with successful pending read" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + uriOptions: + maxPoolSize: 1 + useMultipleMongoses: false + observeEvents: + - commandFailedEvent + - connectionClosedEvent + - database: + id: &database database + client: *client + databaseName: *databaseName + - collection: + id: &collection collection + database: *database + collectionName: *collectionName + + # Create a failpoint to block first op + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: ["insert"] + blockConnection: true + blockTimeMS: 750 + + # Execute op with timeout < block time + - name: insertOne + object: *collection + arguments: + timeoutMS: 500 + document: { _id: 3, x: 1 } + expectError: + isTimeoutError: true + + # Execute a subsequent operation to complete the read when checking out + # the single available connection. + - name: findOne + object: *collection + arguments: + filter: { _id: 1 } + + expectEvents: + - client: *client + events: + - commandFailedEvent: + commandName: insert + - client: *client + eventType: cmap + events: [] # Expect no connection closure. + + - description: "write op with unsuccessful pending read" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + uriOptions: + # For single-threaded drivers, ensure the operating connection + # is checked out to complete the read. + maxPoolSize: 1 + useMultipleMongoses: false + observeEvents: + - commandFailedEvent + - connectionClosedEvent + - database: + id: &database database + client: *client + databaseName: *databaseName + - collection: + id: &collection collection + database: *database + collectionName: *collectionName + + # Create a failpoint to block first op + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: { times: 2 } + data: + failCommands: ["insert"] + blockConnection: true + blockTimeMS: 750 + + # Execute op with timeout < block time + - name: insertOne + object: *collection + arguments: + timeoutMS: 50 + document: { _id: 3, x: 1 } + expectError: + isTimeoutError: true + + # The pending read should fail. + - name: insertOne + object: *collection + arguments: + timeoutMS: 50 + document: { _id: 3, x: 1 } + expectError: + isTimeoutError: true + + expectEvents: + - client: *client + events: + - commandFailedEvent: + commandName: insert + - client: *client + eventType: cmap + events: + - connectionClosedEvent: + reason: error + + - description: "read op with successful pending read" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + uriOptions: + maxPoolSize: 1 + useMultipleMongoses: false + observeEvents: + - commandFailedEvent + - connectionClosedEvent + - database: + id: &database database + client: *client + databaseName: *databaseName + - collection: + id: &collection collection + database: *database + collectionName: *collectionName + + # Create a failpoint to block first op + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: ["insert"] + blockConnection: true + blockTimeMS: 750 + + # Execute op with timeout < block time + - name: insertOne + object: *collection + arguments: + timeoutMS: 500 + document: { _id: 3, x: 1 } + expectError: + isTimeoutError: true + + # Execute a subsequent operation to complete the read when checking out + # the single available connection. + - name: findOne + object: *collection + arguments: + filter: { _id: 1 } + + expectEvents: + - client: *client + events: + - commandFailedEvent: + commandName: insert + - client: *client + eventType: cmap + events: [] # Expect no connection closure. + + - description: "write op with unsuccessful pending read" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + uriOptions: + # For single-threaded drivers, ensure the operating connection + # is checked out to complete the read. + maxPoolSize: 1 + useMultipleMongoses: false + observeEvents: + - commandFailedEvent + - connectionClosedEvent + - database: + id: &database database + client: *client + databaseName: *databaseName + - collection: + id: &collection collection + database: *database + collectionName: *collectionName + + # Create a failpoint to block first op + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: { times: 2 } + data: + failCommands: ["insert"] + blockConnection: true + blockTimeMS: 750 + + # Execute op with timeout < block time + - name: insertOne + object: *collection + arguments: + timeoutMS: 50 + document: { _id: 3, x: 1 } + expectError: + isTimeoutError: true + + # The pending read should fail. + - name: insertOne + object: *collection + arguments: + timeoutMS: 50 + document: { _id: 3, x: 1 } + expectError: + isTimeoutError: true + + expectEvents: + - client: *client + events: + - commandFailedEvent: + commandName: insert + - client: *client + eventType: cmap + events: + - connectionClosedEvent: + reason: error diff --git a/x/mongo/driver/topology/connection.go b/x/mongo/driver/topology/connection.go index 24ad6a3a51..78e4461e1b 100644 --- a/x/mongo/driver/topology/connection.go +++ b/x/mongo/driver/topology/connection.go @@ -21,6 +21,7 @@ import ( "time" "go.mongodb.org/mongo-driver/v2/internal/driverutil" + "go.mongodb.org/mongo-driver/v2/internal/ptrutil" "go.mongodb.org/mongo-driver/v2/mongo/address" "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" "go.mongodb.org/mongo-driver/v2/x/mongo/driver" @@ -85,6 +86,7 @@ type connection struct { // awaitRemainingBytes indicates the size of server response that was not completely // read before returning the connection to the pool. awaitRemainingBytes *int32 + remainingTime *time.Duration } // newConnection handles the creation of a connection. It does not connect the connection. @@ -478,6 +480,7 @@ func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string, if err != nil { if l := int32(n); l == 0 && isCSOTTimeout(err) { c.awaitRemainingBytes = &l + c.remainingTime = ptrutil.Ptr(BGReadTimeout) } return nil, "incomplete read of message header", err } @@ -494,6 +497,7 @@ func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string, remainingBytes := size - 4 - int32(n) if remainingBytes > 0 && isCSOTTimeout(err) { c.awaitRemainingBytes = &remainingBytes + c.remainingTime = ptrutil.Ptr(BGReadTimeout) } return dst, "incomplete read of full message", err } diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index 6f4c2b73bb..9a47f68ed7 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -580,7 +580,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { return nil, w.err } - if err := awaitPendingRead(p, w.conn); err != nil { + if err := awaitPendingRead(ctx, p, w.conn); err != nil { return p.checkOut(ctx) // Retry the checkout if the read fails. } @@ -650,6 +650,10 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { logPoolMessage(p, logger.ConnectionCheckedOut, keysAndValues...) } + if err := awaitPendingRead(ctx, p, w.conn); err != nil { + return p.checkOut(ctx) // Retry the checkout if the read fails. + } + if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ Type: event.ConnectionCheckedOut, @@ -659,10 +663,6 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { }) } - if err := awaitPendingRead(p, w.conn); err != nil { - return p.checkOut(ctx) // Retry the checkout if the read fails. - } - return w.conn, nil case <-ctx.Done(): waitQueueDuration := time.Since(waitQueueStart) @@ -804,7 +804,7 @@ var ( // awaitPendingRead sets a new read deadline on the provided connection and // tries to read any bytes returned by the server. If there are any errors, the // connection will be checked back into the pool to be retried. -func awaitPendingRead(pool *pool, conn *connection) error { +func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error { pool.bgReadMu.Lock() defer pool.bgReadMu.Unlock() @@ -815,12 +815,20 @@ func awaitPendingRead(pool *pool, conn *connection) error { size := *conn.awaitRemainingBytes - var checkIn bool - defer func() { - if !checkIn { + if conn.awaitRemainingBytes == nil { + return + } + + if *conn.remainingTime < 0 { + err := conn.close() + if err != nil { + panic(err) + } + return } + // No matter what happens, always check the connection back into the // pool, which will either make it available for other operations or // remove it from the pool if it was closed. @@ -830,21 +838,24 @@ func awaitPendingRead(pool *pool, conn *connection) error { } }() - err := conn.nc.SetReadDeadline(time.Now().Add(BGReadTimeout)) + dl, ok := ctx.Deadline() + if !ok { + dl = time.Now().Add(BGReadTimeout) + } + + err := conn.nc.SetReadDeadline(dl) if err != nil { - checkIn = true return fmt.Errorf("error setting a read deadline: %w", err) } if size == 0 { var sizeBuf [4]byte if _, err := io.ReadFull(conn.nc, sizeBuf[:]); err != nil { - checkIn = true + conn.remainingTime = ptrutil.Ptr(*conn.remainingTime - time.Since(dl)) return fmt.Errorf("error reading the message size: %w", err) } size, err = conn.parseWmSizeBytes(sizeBuf) if err != nil { - checkIn = true return err } size -= 4 @@ -855,13 +866,14 @@ func awaitPendingRead(pool *pool, conn *connection) error { nerr := net.Error(nil) if l := int32(n); l == 0 && errors.As(err, &nerr) && nerr.Timeout() { conn.awaitRemainingBytes = ptrutil.Ptr(l + *conn.awaitRemainingBytes) + conn.remainingTime = ptrutil.Ptr(*conn.remainingTime - time.Since(dl)) } - checkIn = true return fmt.Errorf("error discarding %d byte message: %w", size, err) } conn.awaitRemainingBytes = nil + conn.remainingTime = nil return nil } From 02d5b7fb08a4238d3d2f8faf5fd461f769e7d05f Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 25 Oct 2024 18:55:15 -0600 Subject: [PATCH 05/15] Don't check checkIn err --- .../connection-churn.json | 117 +----------------- .../connection-churn.yml | 73 +---------- x/mongo/driver/topology/connection.go | 8 ++ x/mongo/driver/topology/pool.go | 75 +++++++---- 4 files changed, 65 insertions(+), 208 deletions(-) diff --git a/testdata/client-side-operations-timeout/connection-churn.json b/testdata/client-side-operations-timeout/connection-churn.json index b20c197919..4a765ac070 100644 --- a/testdata/client-side-operations-timeout/connection-churn.json +++ b/testdata/client-side-operations-timeout/connection-churn.json @@ -172,7 +172,7 @@ "failPoint": { "configureFailPoint": "failCommand", "mode": { - "times": 2 + "times": 1 }, "data": { "failCommands": [ @@ -202,7 +202,7 @@ "name": "insertOne", "object": "collection", "arguments": { - "timeoutMS": 50, + "timeoutMS": 500, "document": { "_id": 3, "x": 1 @@ -337,117 +337,6 @@ } ] }, - { - "description": "write op with unsuccessful pending read", - "operations": [ - { - "name": "createEntities", - "object": "testRunner", - "arguments": { - "entities": [ - { - "client": { - "id": "client", - "uriOptions": { - "maxPoolSize": 1 - }, - "useMultipleMongoses": false, - "observeEvents": [ - "commandFailedEvent", - "connectionClosedEvent" - ] - } - }, - { - "database": { - "id": "database", - "client": "client", - "databaseName": "test" - } - }, - { - "collection": { - "id": "collection", - "database": "database", - "collectionName": "coll" - } - } - ] - } - }, - { - "name": "failPoint", - "object": "testRunner", - "arguments": { - "client": "failPointClient", - "failPoint": { - "configureFailPoint": "failCommand", - "mode": { - "times": 2 - }, - "data": { - "failCommands": [ - "insert" - ], - "blockConnection": true, - "blockTimeMS": 750 - } - } - } - }, - { - "name": "insertOne", - "object": "collection", - "arguments": { - "timeoutMS": 50, - "document": { - "_id": 3, - "x": 1 - } - }, - "expectError": { - "isTimeoutError": true - } - }, - { - "name": "insertOne", - "object": "collection", - "arguments": { - "timeoutMS": 50, - "document": { - "_id": 3, - "x": 1 - } - }, - "expectError": { - "isTimeoutError": true - } - } - ], - "expectEvents": [ - { - "client": "client", - "events": [ - { - "commandFailedEvent": { - "commandName": "insert" - } - } - ] - }, - { - "client": "client", - "eventType": "cmap", - "events": [ - { - "connectionClosedEvent": { - "reason": "error" - } - } - ] - } - ] - }, { "description": "read op with successful pending read", "operations": [ @@ -624,7 +513,7 @@ "name": "insertOne", "object": "collection", "arguments": { - "timeoutMS": 50, + "timeoutMS": 500, "document": { "_id": 3, "x": 1 diff --git a/testdata/client-side-operations-timeout/connection-churn.yml b/testdata/client-side-operations-timeout/connection-churn.yml index 6541719f3d..fa8f761654 100644 --- a/testdata/client-side-operations-timeout/connection-churn.yml +++ b/testdata/client-side-operations-timeout/connection-churn.yml @@ -108,7 +108,7 @@ tests: client: *failPointClient failPoint: configureFailPoint: failCommand - mode: { times: 2 } + mode: { times: 1 } data: failCommands: ["insert"] blockConnection: true @@ -127,7 +127,7 @@ tests: - name: insertOne object: *collection arguments: - timeoutMS: 50 + timeoutMS: 500 document: { _id: 3, x: 1 } expectError: isTimeoutError: true @@ -204,73 +204,6 @@ tests: eventType: cmap events: [] # Expect no connection closure. - - description: "write op with unsuccessful pending read" - operations: - - name: createEntities - object: testRunner - arguments: - entities: - - client: - id: &client client - uriOptions: - # For single-threaded drivers, ensure the operating connection - # is checked out to complete the read. - maxPoolSize: 1 - useMultipleMongoses: false - observeEvents: - - commandFailedEvent - - connectionClosedEvent - - database: - id: &database database - client: *client - databaseName: *databaseName - - collection: - id: &collection collection - database: *database - collectionName: *collectionName - - # Create a failpoint to block first op - - name: failPoint - object: testRunner - arguments: - client: *failPointClient - failPoint: - configureFailPoint: failCommand - mode: { times: 2 } - data: - failCommands: ["insert"] - blockConnection: true - blockTimeMS: 750 - - # Execute op with timeout < block time - - name: insertOne - object: *collection - arguments: - timeoutMS: 50 - document: { _id: 3, x: 1 } - expectError: - isTimeoutError: true - - # The pending read should fail. - - name: insertOne - object: *collection - arguments: - timeoutMS: 50 - document: { _id: 3, x: 1 } - expectError: - isTimeoutError: true - - expectEvents: - - client: *client - events: - - commandFailedEvent: - commandName: insert - - client: *client - eventType: cmap - events: - - connectionClosedEvent: - reason: error - - description: "read op with successful pending read" operations: - name: createEntities @@ -383,7 +316,7 @@ tests: - name: insertOne object: *collection arguments: - timeoutMS: 50 + timeoutMS: 500 document: { _id: 3, x: 1 } expectError: isTimeoutError: true diff --git a/x/mongo/driver/topology/connection.go b/x/mongo/driver/topology/connection.go index 78e4461e1b..86d19e2be3 100644 --- a/x/mongo/driver/topology/connection.go +++ b/x/mongo/driver/topology/connection.go @@ -87,6 +87,7 @@ type connection struct { // read before returning the connection to the pool. awaitRemainingBytes *int32 remainingTime *time.Duration + pendingReadMU sync.Mutex } // newConnection handles the creation of a connection. It does not connect the connection. @@ -104,6 +105,7 @@ func newConnection(addr address.Address, opts ...ConnectionOption) *connection { connectContextMade: make(chan struct{}), cancellationListener: newContextDoneListener(), connectListener: newNonBlockingContextDoneListener(), + pendingReadMU: sync.Mutex{}, } // Connections to non-load balanced deployments should eagerly set the generation numbers so errors encountered // at any point during connection establishment can be processed without the connection being considered stale. @@ -409,11 +411,13 @@ func (c *connection) readWireMessage(ctx context.Context) ([]byte, error) { dst, errMsg, err := c.read(ctx) if err != nil { + c.pendingReadMU.Lock() if c.awaitRemainingBytes == nil { // If the connection was not marked as awaiting response, close the // connection because we don't know what the connection state is. c.close() } + c.pendingReadMU.Unlock() message := errMsg if errors.Is(err, io.EOF) { message = "socket was unexpectedly closed" @@ -479,8 +483,10 @@ func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string, n, err := io.ReadFull(c.nc, sizeBuf[:]) if err != nil { if l := int32(n); l == 0 && isCSOTTimeout(err) { + c.pendingReadMU.Lock() c.awaitRemainingBytes = &l c.remainingTime = ptrutil.Ptr(BGReadTimeout) + c.pendingReadMU.Unlock() } return nil, "incomplete read of message header", err } @@ -496,8 +502,10 @@ func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string, if err != nil { remainingBytes := size - 4 - int32(n) if remainingBytes > 0 && isCSOTTimeout(err) { + c.pendingReadMU.Lock() c.awaitRemainingBytes = &remainingBytes c.remainingTime = ptrutil.Ptr(BGReadTimeout) + c.pendingReadMU.Unlock() } return dst, "incomplete read of full message", err } diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index 9a47f68ed7..aea6877418 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -581,7 +581,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { } if err := awaitPendingRead(ctx, p, w.conn); err != nil { - return p.checkOut(ctx) // Retry the checkout if the read fails. + return nil, err } duration = time.Since(start) @@ -640,6 +640,10 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { return nil, w.err } + if err := awaitPendingRead(ctx, p, w.conn); err != nil { + return nil, err + } + duration := time.Since(start) if mustLogPoolMessage(p) { keysAndValues := logger.KeyValues{ @@ -650,10 +654,6 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { logPoolMessage(p, logger.ConnectionCheckedOut, keysAndValues...) } - if err := awaitPendingRead(ctx, p, w.conn); err != nil { - return p.checkOut(ctx) // Retry the checkout if the read fails. - } - if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ Type: event.ConnectionCheckedOut, @@ -805,8 +805,8 @@ var ( // tries to read any bytes returned by the server. If there are any errors, the // connection will be checked back into the pool to be retried. func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error { - pool.bgReadMu.Lock() - defer pool.bgReadMu.Unlock() + conn.pendingReadMU.Lock() + defer conn.pendingReadMU.Unlock() // If there are no bytes pending read, do nothing. if conn.awaitRemainingBytes == nil { @@ -815,12 +815,11 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error { size := *conn.awaitRemainingBytes - defer func() { - if conn.awaitRemainingBytes == nil { - return - } + checkIn := false - if *conn.remainingTime < 0 { + defer func() { + // If we have exceeded the time limit, then close the connection. + if conn.remainingTime != nil && *conn.remainingTime < 0 { err := conn.close() if err != nil { panic(err) @@ -829,33 +828,49 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error { return } + if !checkIn { + return + } + // No matter what happens, always check the connection back into the // pool, which will either make it available for other operations or // remove it from the pool if it was closed. - err := pool.checkInNoEvent(conn) - if err != nil { - panic(err) - } + // + // TODO(GODRIVER-3385): Figure out how to handle this error. It's possible + // that a single connection can be checked out to handle multiple concurrent + // operations. This is likely a bug in the Go Driver. So it's possible that + // the connection is idle at the point of check-in. + _ = pool.checkInNoEvent(conn) }() - dl, ok := ctx.Deadline() - if !ok { + dl, contextDeadlineUsed := ctx.Deadline() + if !contextDeadlineUsed { dl = time.Now().Add(BGReadTimeout) } err := conn.nc.SetReadDeadline(dl) if err != nil { + checkIn = true return fmt.Errorf("error setting a read deadline: %w", err) } + st := time.Now() + if size == 0 { var sizeBuf [4]byte if _, err := io.ReadFull(conn.nc, sizeBuf[:]); err != nil { - conn.remainingTime = ptrutil.Ptr(*conn.remainingTime - time.Since(dl)) + conn.remainingTime = ptrutil.Ptr(*conn.remainingTime - time.Since(st)) + checkIn = true + + err = transformNetworkError(ctx, err, contextDeadlineUsed) + return fmt.Errorf("error reading the message size: %w", err) } size, err = conn.parseWmSizeBytes(sizeBuf) if err != nil { + checkIn = true + err = transformNetworkError(ctx, err, contextDeadlineUsed) + return err } size -= 4 @@ -866,9 +881,13 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error { nerr := net.Error(nil) if l := int32(n); l == 0 && errors.As(err, &nerr) && nerr.Timeout() { conn.awaitRemainingBytes = ptrutil.Ptr(l + *conn.awaitRemainingBytes) - conn.remainingTime = ptrutil.Ptr(*conn.remainingTime - time.Since(dl)) + conn.remainingTime = ptrutil.Ptr(*conn.remainingTime - time.Since(st)) } + checkIn = true + + err = transformNetworkError(ctx, err, contextDeadlineUsed) + return fmt.Errorf("error discarding %d byte message: %w", size, err) } @@ -907,6 +926,16 @@ func (p *pool) checkIn(conn *connection) error { return p.checkInNoEvent(conn) } +func isIdleConnection(p *pool, conn *connection) bool { + for _, idle := range p.idleConns { + if idle == conn { + return true + } + } + + return false +} + // checkInNoEvent returns a connection to the pool. It behaves identically to checkIn except it does // not publish events. It is only intended for use by pool-internal functions. func (p *pool) checkInNoEvent(conn *connection) error { @@ -956,10 +985,8 @@ func (p *pool) checkInNoEvent(conn *connection) error { } } - for _, idle := range p.idleConns { - if idle == conn { - return fmt.Errorf("duplicate idle conn %p in idle connections stack", conn) - } + if isIdleConnection(p, conn) { + return fmt.Errorf("duplicate idle conn %p in idle connections stack", conn) } p.idleConns = append(p.idleConns, conn) From 0d81756f5c9d3771387209ace9da8d24d5efb625 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Tue, 29 Oct 2024 19:53:45 -0600 Subject: [PATCH 06/15] Dont test on sharded topology --- .../client-side-operations-timeout/connection-churn.json | 4 ++-- .../client-side-operations-timeout/connection-churn.yml | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/testdata/client-side-operations-timeout/connection-churn.json b/testdata/client-side-operations-timeout/connection-churn.json index 4a765ac070..55f74b73e2 100644 --- a/testdata/client-side-operations-timeout/connection-churn.json +++ b/testdata/client-side-operations-timeout/connection-churn.json @@ -5,8 +5,8 @@ { "minServerVersion": "4.4", "topologies": [ - "replicaset", - "sharded" + "standalone", + "replicaset" ] } ], diff --git a/testdata/client-side-operations-timeout/connection-churn.yml b/testdata/client-side-operations-timeout/connection-churn.yml index fa8f761654..f33d198bd5 100644 --- a/testdata/client-side-operations-timeout/connection-churn.yml +++ b/testdata/client-side-operations-timeout/connection-churn.yml @@ -4,7 +4,11 @@ schemaVersion: "1.9" runOnRequirements: - minServerVersion: "4.4" - topologies: ["replicaset", "sharded"] + # TODO(SERVER-96344): when using failpoints mongos returns MaxTimeMSExpired + # after maxTimeMS, whereas mongod returns MaxTimeMSExpired after + # max(blockTimeMS, maxTimeMS). Until this ticket is resolved, these tests + # will not pass on sharded clusters. + topologies: ["standalone", "replicaset"] createEntities: - client: From 7c49c1025e998f3eacc3ee473cdc770774ab95d5 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Wed, 30 Oct 2024 13:46:05 -0600 Subject: [PATCH 07/15] Clean up connection churn UST --- .../connection-churn.json | 365 +++++------------- .../connection-churn.yml | 220 +++-------- 2 files changed, 156 insertions(+), 429 deletions(-) diff --git a/testdata/client-side-operations-timeout/connection-churn.json b/testdata/client-side-operations-timeout/connection-churn.json index 55f74b73e2..d8177c38f4 100644 --- a/testdata/client-side-operations-timeout/connection-churn.json +++ b/testdata/client-side-operations-timeout/connection-churn.json @@ -16,6 +16,36 @@ "id": "failPointClient", "useMultipleMongoses": false } + }, + { + "client": { + "id": "client", + "uriOptions": { + "maxPoolSize": 1 + }, + "useMultipleMongoses": false, + "observeEvents": [ + "commandFailedEvent", + "commandSucceededEvent", + "connectionCheckedOutEvent", + "connectionCheckedInEvent", + "connectionClosedEvent" + ] + } + }, + { + "database": { + "id": "test", + "client": "client", + "databaseName": "test" + } + }, + { + "collection": { + "id": "coll", + "database": "test", + "collectionName": "coll" + } } ], "initialData": [ @@ -29,41 +59,6 @@ { "description": "write op with successful pending read", "operations": [ - { - "name": "createEntities", - "object": "testRunner", - "arguments": { - "entities": [ - { - "client": { - "id": "client", - "uriOptions": { - "maxPoolSize": 1 - }, - "useMultipleMongoses": false, - "observeEvents": [ - "commandFailedEvent", - "connectionClosedEvent" - ] - } - }, - { - "database": { - "id": "database", - "client": "client", - "databaseName": "test" - } - }, - { - "collection": { - "id": "collection", - "database": "database", - "collectionName": "coll" - } - } - ] - } - }, { "name": "failPoint", "object": "testRunner", @@ -86,7 +81,7 @@ }, { "name": "insertOne", - "object": "collection", + "object": "coll", "arguments": { "timeoutMS": 500, "document": { @@ -100,7 +95,7 @@ }, { "name": "findOne", - "object": "collection", + "object": "coll", "arguments": { "filter": { "_id": 1 @@ -116,54 +111,37 @@ "commandFailedEvent": { "commandName": "insert" } + }, + { + "commandSucceededEvent": { + "commandName": "find" + } } ] }, { "client": "client", "eventType": "cmap", - "events": [] + "events": [ + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + }, + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + } + ] } ] }, { "description": "write op with unsuccessful pending read", "operations": [ - { - "name": "createEntities", - "object": "testRunner", - "arguments": { - "entities": [ - { - "client": { - "id": "client", - "uriOptions": { - "maxPoolSize": 1 - }, - "useMultipleMongoses": false, - "observeEvents": [ - "commandFailedEvent", - "connectionClosedEvent" - ] - } - }, - { - "database": { - "id": "database", - "client": "client", - "databaseName": "test" - } - }, - { - "collection": { - "id": "collection", - "database": "database", - "collectionName": "coll" - } - } - ] - } - }, { "name": "failPoint", "object": "testRunner", @@ -186,7 +164,7 @@ }, { "name": "insertOne", - "object": "collection", + "object": "coll", "arguments": { "timeoutMS": 50, "document": { @@ -200,7 +178,7 @@ }, { "name": "insertOne", - "object": "collection", + "object": "coll", "arguments": { "timeoutMS": 500, "document": { @@ -228,6 +206,12 @@ "client": "client", "eventType": "cmap", "events": [ + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + }, { "connectionClosedEvent": { "reason": "error" @@ -240,41 +224,6 @@ { "description": "read op with successful pending read", "operations": [ - { - "name": "createEntities", - "object": "testRunner", - "arguments": { - "entities": [ - { - "client": { - "id": "client", - "uriOptions": { - "maxPoolSize": 1 - }, - "useMultipleMongoses": false, - "observeEvents": [ - "commandFailedEvent", - "connectionClosedEvent" - ] - } - }, - { - "database": { - "id": "database", - "client": "client", - "databaseName": "test" - } - }, - { - "collection": { - "id": "collection", - "database": "database", - "collectionName": "coll" - } - } - ] - } - }, { "name": "failPoint", "object": "testRunner", @@ -287,7 +236,7 @@ }, "data": { "failCommands": [ - "insert" + "find" ], "blockConnection": true, "blockTimeMS": 750 @@ -296,13 +245,12 @@ } }, { - "name": "insertOne", - "object": "collection", + "name": "findOne", + "object": "coll", "arguments": { - "timeoutMS": 500, - "document": { - "_id": 3, - "x": 1 + "timeoutMS": 50, + "filter": { + "_id": 1 } }, "expectError": { @@ -311,7 +259,7 @@ }, { "name": "findOne", - "object": "collection", + "object": "coll", "arguments": { "filter": { "_id": 1 @@ -325,7 +273,12 @@ "events": [ { "commandFailedEvent": { - "commandName": "insert" + "commandName": "find" + } + }, + { + "commandSucceededEvent": { + "commandName": "find" } } ] @@ -333,148 +286,26 @@ { "client": "client", "eventType": "cmap", - "events": [] - } - ] - }, - { - "description": "read op with successful pending read", - "operations": [ - { - "name": "createEntities", - "object": "testRunner", - "arguments": { - "entities": [ - { - "client": { - "id": "client", - "uriOptions": { - "maxPoolSize": 1 - }, - "useMultipleMongoses": false, - "observeEvents": [ - "commandFailedEvent", - "connectionClosedEvent" - ] - } - }, - { - "database": { - "id": "database", - "client": "client", - "databaseName": "test" - } - }, - { - "collection": { - "id": "collection", - "database": "database", - "collectionName": "coll" - } - } - ] - } - }, - { - "name": "failPoint", - "object": "testRunner", - "arguments": { - "client": "failPointClient", - "failPoint": { - "configureFailPoint": "failCommand", - "mode": { - "times": 1 - }, - "data": { - "failCommands": [ - "insert" - ], - "blockConnection": true, - "blockTimeMS": 750 - } - } - } - }, - { - "name": "insertOne", - "object": "collection", - "arguments": { - "timeoutMS": 500, - "document": { - "_id": 3, - "x": 1 - } - }, - "expectError": { - "isTimeoutError": true - } - }, - { - "name": "findOne", - "object": "collection", - "arguments": { - "filter": { - "_id": 1 - } - } - } - ], - "expectEvents": [ - { - "client": "client", "events": [ { - "commandFailedEvent": { - "commandName": "insert" - } + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + }, + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} } ] - }, - { - "client": "client", - "eventType": "cmap", - "events": [] } ] }, { - "description": "write op with unsuccessful pending read", + "description": "read op with unsuccessful pending read", "operations": [ - { - "name": "createEntities", - "object": "testRunner", - "arguments": { - "entities": [ - { - "client": { - "id": "client", - "uriOptions": { - "maxPoolSize": 1 - }, - "useMultipleMongoses": false, - "observeEvents": [ - "commandFailedEvent", - "connectionClosedEvent" - ] - } - }, - { - "database": { - "id": "database", - "client": "client", - "databaseName": "test" - } - }, - { - "collection": { - "id": "collection", - "database": "database", - "collectionName": "coll" - } - } - ] - } - }, { "name": "failPoint", "object": "testRunner", @@ -483,11 +314,11 @@ "failPoint": { "configureFailPoint": "failCommand", "mode": { - "times": 2 + "times": 1 }, "data": { "failCommands": [ - "insert" + "find" ], "blockConnection": true, "blockTimeMS": 750 @@ -496,13 +327,12 @@ } }, { - "name": "insertOne", - "object": "collection", + "name": "findOne", + "object": "coll", "arguments": { "timeoutMS": 50, - "document": { - "_id": 3, - "x": 1 + "filter": { + "_id": 1 } }, "expectError": { @@ -510,13 +340,12 @@ } }, { - "name": "insertOne", - "object": "collection", + "name": "findOne", + "object": "coll", "arguments": { "timeoutMS": 500, - "document": { - "_id": 3, - "x": 1 + "filter": { + "_id": 1 } }, "expectError": { @@ -530,7 +359,7 @@ "events": [ { "commandFailedEvent": { - "commandName": "insert" + "commandName": "find" } } ] @@ -539,6 +368,12 @@ "client": "client", "eventType": "cmap", "events": [ + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + }, { "connectionClosedEvent": { "reason": "error" diff --git a/testdata/client-side-operations-timeout/connection-churn.yml b/testdata/client-side-operations-timeout/connection-churn.yml index f33d198bd5..d011bb3f3a 100644 --- a/testdata/client-side-operations-timeout/connection-churn.yml +++ b/testdata/client-side-operations-timeout/connection-churn.yml @@ -14,36 +14,34 @@ createEntities: - client: id: &failPointClient failPointClient useMultipleMongoses: false + - client: + id: &client client + uriOptions: + maxPoolSize: 1 + useMultipleMongoses: false + observeEvents: + - commandFailedEvent + - commandSucceededEvent + - connectionCheckedOutEvent + - connectionCheckedInEvent + - connectionClosedEvent + - database: + id: &database test + client: *client + databaseName: *database + - collection: + id: &collection coll + database: *database + collectionName: *collection initialData: - - collectionName: &collectionName coll - databaseName: &databaseName test + - collectionName: *collection + databaseName: *database documents: [] tests: - description: "write op with successful pending read" operations: - - name: createEntities - object: testRunner - arguments: - entities: - - client: - id: &client client - uriOptions: - maxPoolSize: 1 - useMultipleMongoses: false - observeEvents: - - commandFailedEvent - - connectionClosedEvent - - database: - id: &database database - client: *client - databaseName: *databaseName - - collection: - id: &collection collection - database: *database - collectionName: *collectionName - # Create a failpoint to block first op - name: failPoint object: testRunner @@ -78,33 +76,18 @@ tests: events: - commandFailedEvent: commandName: insert + - commandSucceededEvent: + commandName: find - client: *client eventType: cmap - events: [] # Expect no connection closure. + events: + - connectionCheckedOutEvent: {} # insert + - connectionCheckedInEvent: {} # insert fails + - connectionCheckedOutEvent: {} # find + - connectionCheckedInEvent: {} # find succeeds - description: "write op with unsuccessful pending read" operations: - - name: createEntities - object: testRunner - arguments: - entities: - - client: - id: &client client - uriOptions: - maxPoolSize: 1 - useMultipleMongoses: false - observeEvents: - - commandFailedEvent - - connectionClosedEvent - - database: - id: &database database - client: *client - databaseName: *databaseName - - collection: - id: &collection collection - database: *database - collectionName: *collectionName - # Create a failpoint to block first op - name: failPoint object: testRunner @@ -141,35 +124,18 @@ tests: events: - commandFailedEvent: commandName: insert + # There is not a second failed event since we T/O attempting to + # check out the connection for the second operation. - client: *client eventType: cmap events: - - connectionClosedEvent: + - connectionCheckedOutEvent: {} # first insert + - connectionCheckedInEvent: {} # first insert fails + - connectionClosedEvent: # second insert T/O pending read in C/O, closes reason: error - description: "read op with successful pending read" operations: - - name: createEntities - object: testRunner - arguments: - entities: - - client: - id: &client client - uriOptions: - maxPoolSize: 1 - useMultipleMongoses: false - observeEvents: - - commandFailedEvent - - connectionClosedEvent - - database: - id: &database database - client: *client - databaseName: *databaseName - - collection: - id: &collection collection - database: *database - collectionName: *collectionName - # Create a failpoint to block first op - name: failPoint object: testRunner @@ -179,77 +145,16 @@ tests: configureFailPoint: failCommand mode: { times: 1 } data: - failCommands: ["insert"] + failCommands: ["find"] blockConnection: true blockTimeMS: 750 # Execute op with timeout < block time - - name: insertOne - object: *collection - arguments: - timeoutMS: 500 - document: { _id: 3, x: 1 } - expectError: - isTimeoutError: true - - # Execute a subsequent operation to complete the read when checking out - # the single available connection. - name: findOne object: *collection arguments: + timeoutMS: 50 filter: { _id: 1 } - - expectEvents: - - client: *client - events: - - commandFailedEvent: - commandName: insert - - client: *client - eventType: cmap - events: [] # Expect no connection closure. - - - description: "read op with successful pending read" - operations: - - name: createEntities - object: testRunner - arguments: - entities: - - client: - id: &client client - uriOptions: - maxPoolSize: 1 - useMultipleMongoses: false - observeEvents: - - commandFailedEvent - - connectionClosedEvent - - database: - id: &database database - client: *client - databaseName: *databaseName - - collection: - id: &collection collection - database: *database - collectionName: *collectionName - - # Create a failpoint to block first op - - name: failPoint - object: testRunner - arguments: - client: *failPointClient - failPoint: - configureFailPoint: failCommand - mode: { times: 1 } - data: - failCommands: ["insert"] - blockConnection: true - blockTimeMS: 750 - - # Execute op with timeout < block time - - name: insertOne - object: *collection - arguments: - timeoutMS: 500 - document: { _id: 3, x: 1 } expectError: isTimeoutError: true @@ -264,36 +169,19 @@ tests: - client: *client events: - commandFailedEvent: - commandName: insert + commandName: find + - commandSucceededEvent: + commandName: find - client: *client eventType: cmap - events: [] # Expect no connection closure. + events: + - connectionCheckedOutEvent: {} # first find + - connectionCheckedInEvent: {} # first find fails + - connectionCheckedOutEvent: {} # second find + - connectionCheckedInEvent: {} # second find succeeds - - description: "write op with unsuccessful pending read" + - description: "read op with unsuccessful pending read" operations: - - name: createEntities - object: testRunner - arguments: - entities: - - client: - id: &client client - uriOptions: - # For single-threaded drivers, ensure the operating connection - # is checked out to complete the read. - maxPoolSize: 1 - useMultipleMongoses: false - observeEvents: - - commandFailedEvent - - connectionClosedEvent - - database: - id: &database database - client: *client - databaseName: *databaseName - - collection: - id: &collection collection - database: *database - collectionName: *collectionName - # Create a failpoint to block first op - name: failPoint object: testRunner @@ -301,27 +189,27 @@ tests: client: *failPointClient failPoint: configureFailPoint: failCommand - mode: { times: 2 } + mode: { times: 1 } data: - failCommands: ["insert"] + failCommands: ["find"] blockConnection: true blockTimeMS: 750 # Execute op with timeout < block time - - name: insertOne + - name: findOne object: *collection arguments: timeoutMS: 50 - document: { _id: 3, x: 1 } + filter: { _id: 1 } expectError: isTimeoutError: true # The pending read should fail. - - name: insertOne + - name: findOne object: *collection arguments: timeoutMS: 500 - document: { _id: 3, x: 1 } + filter: { _id: 1 } expectError: isTimeoutError: true @@ -329,9 +217,13 @@ tests: - client: *client events: - commandFailedEvent: - commandName: insert + commandName: find + # There is not a second failed event since we T/O attempting to + # check out the connection for the second operation. - client: *client eventType: cmap events: - - connectionClosedEvent: + - connectionCheckedOutEvent: {} # first find + - connectionCheckedInEvent: {} # first find fails + - connectionClosedEvent: # second find T/O pending read in C/O, closes reason: error From 0e0785ce73331ffbe407972576dd3d157440e977 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Wed, 30 Oct 2024 14:59:33 -0600 Subject: [PATCH 08/15] Clean up connection-churn yaml --- .../connection-churn.json | 10 +-- .../connection-churn.yml | 68 +++++++++---------- 2 files changed, 38 insertions(+), 40 deletions(-) diff --git a/testdata/client-side-operations-timeout/connection-churn.json b/testdata/client-side-operations-timeout/connection-churn.json index d8177c38f4..7ca7ea3eee 100644 --- a/testdata/client-side-operations-timeout/connection-churn.json +++ b/testdata/client-side-operations-timeout/connection-churn.json @@ -1,5 +1,5 @@ { - "description": "operation timeouts do not cause connection churn", + "description": "Operation timeouts do not cause connection churn", "schemaVersion": "1.9", "runOnRequirements": [ { @@ -57,7 +57,7 @@ ], "tests": [ { - "description": "write op with successful pending read", + "description": "Write operation with successful pending read", "operations": [ { "name": "failPoint", @@ -140,7 +140,7 @@ ] }, { - "description": "write op with unsuccessful pending read", + "description": "Write operation with unsuccessful pending read", "operations": [ { "name": "failPoint", @@ -222,7 +222,7 @@ ] }, { - "description": "read op with successful pending read", + "description": "Read operation with successful pending read", "operations": [ { "name": "failPoint", @@ -304,7 +304,7 @@ ] }, { - "description": "read op with unsuccessful pending read", + "description": "Read operation with unsuccessful pending read", "operations": [ { "name": "failPoint", diff --git a/testdata/client-side-operations-timeout/connection-churn.yml b/testdata/client-side-operations-timeout/connection-churn.yml index d011bb3f3a..d68a13c373 100644 --- a/testdata/client-side-operations-timeout/connection-churn.yml +++ b/testdata/client-side-operations-timeout/connection-churn.yml @@ -1,12 +1,12 @@ -description: "operation timeouts do not cause connection churn" +description: "Operation timeouts do not cause connection churn" schemaVersion: "1.9" runOnRequirements: - minServerVersion: "4.4" - # TODO(SERVER-96344): when using failpoints mongos returns MaxTimeMSExpired - # after maxTimeMS, whereas mongod returns MaxTimeMSExpired after - # max(blockTimeMS, maxTimeMS). Until this ticket is resolved, these tests + # TODO(SERVER-96344): When using failpoints, mongos returns MaxTimeMSExpired + # after maxTimeMS, whereas mongod returns it after + # max(blockTimeMS, maxTimeMS). Until this ticket is resolved, these tests # will not pass on sharded clusters. topologies: ["standalone", "replicaset"] @@ -16,7 +16,7 @@ createEntities: useMultipleMongoses: false - client: id: &client client - uriOptions: + uriOptions: maxPoolSize: 1 useMultipleMongoses: false observeEvents: @@ -40,9 +40,9 @@ initialData: documents: [] tests: - - description: "write op with successful pending read" + - description: "Write operation with successful pending read" operations: - # Create a failpoint to block first op + # Create a failpoint to block the first operation - name: failPoint object: testRunner arguments: @@ -55,7 +55,7 @@ tests: blockConnection: true blockTimeMS: 750 - # Execute op with timeout < block time + # Execute operation with timeout less than block time - name: insertOne object: *collection arguments: @@ -64,8 +64,7 @@ tests: expectError: isTimeoutError: true - # Execute a subsequent operation to complete the read when checking out - # the single available connection. + # Execute a subsequent operation to complete the read - name: findOne object: *collection arguments: @@ -80,15 +79,15 @@ tests: commandName: find - client: *client eventType: cmap - events: + events: - connectionCheckedOutEvent: {} # insert - connectionCheckedInEvent: {} # insert fails - connectionCheckedOutEvent: {} # find - connectionCheckedInEvent: {} # find succeeds - - description: "write op with unsuccessful pending read" + - description: "Write operation with unsuccessful pending read" operations: - # Create a failpoint to block first op + # Create a failpoint to block the first operation - name: failPoint object: testRunner arguments: @@ -101,7 +100,7 @@ tests: blockConnection: true blockTimeMS: 750 - # Execute op with timeout < block time + # Execute operation with timeout less than block time - name: insertOne object: *collection arguments: @@ -110,7 +109,7 @@ tests: expectError: isTimeoutError: true - # The pending read should fail. + # The pending read should fail - name: insertOne object: *collection arguments: @@ -124,19 +123,19 @@ tests: events: - commandFailedEvent: commandName: insert - # There is not a second failed event since we T/O attempting to - # check out the connection for the second operation. + # No second failed event since we timed out attempting to check out + # the connection for the second operation - client: *client eventType: cmap - events: + events: - connectionCheckedOutEvent: {} # first insert - connectionCheckedInEvent: {} # first insert fails - - connectionClosedEvent: # second insert T/O pending read in C/O, closes + - connectionClosedEvent: # second insert times out pending read in checkout, closes reason: error - - description: "read op with successful pending read" + - description: "Read operation with successful pending read" operations: - # Create a failpoint to block first op + # Create a failpoint to block the first operation - name: failPoint object: testRunner arguments: @@ -149,7 +148,7 @@ tests: blockConnection: true blockTimeMS: 750 - # Execute op with timeout < block time + # Execute operation with timeout less than block time - name: findOne object: *collection arguments: @@ -158,8 +157,7 @@ tests: expectError: isTimeoutError: true - # Execute a subsequent operation to complete the read when checking out - # the single available connection. + # Execute a subsequent operation to complete the read - name: findOne object: *collection arguments: @@ -174,15 +172,15 @@ tests: commandName: find - client: *client eventType: cmap - events: - - connectionCheckedOutEvent: {} # first find + events: + - connectionCheckedOutEvent: {} # first find - connectionCheckedInEvent: {} # first find fails - connectionCheckedOutEvent: {} # second find - connectionCheckedInEvent: {} # second find succeeds - - description: "read op with unsuccessful pending read" + - description: "Read operation with unsuccessful pending read" operations: - # Create a failpoint to block first op + # Create a failpoint to block the first operation - name: failPoint object: testRunner arguments: @@ -195,7 +193,7 @@ tests: blockConnection: true blockTimeMS: 750 - # Execute op with timeout < block time + # Execute operation with timeout less than block time - name: findOne object: *collection arguments: @@ -204,7 +202,7 @@ tests: expectError: isTimeoutError: true - # The pending read should fail. + # The pending read should fail - name: findOne object: *collection arguments: @@ -218,12 +216,12 @@ tests: events: - commandFailedEvent: commandName: find - # There is not a second failed event since we T/O attempting to - # check out the connection for the second operation. + # No second failed event since we timed out attempting to check out + # the connection for the second operation - client: *client eventType: cmap - events: - - connectionCheckedOutEvent: {} # first find + events: + - connectionCheckedOutEvent: {} # first find - connectionCheckedInEvent: {} # first find fails - - connectionClosedEvent: # second find T/O pending read in C/O, closes + - connectionClosedEvent: # second find times out pending read in checkout, closes reason: error From ee82cd718c91e7e5d4fb6c69b8095753a66dc0e6 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Thu, 31 Oct 2024 10:04:13 -0600 Subject: [PATCH 09/15] Update connection churn tests --- .../connection-churn.json | 134 +++++++++++++++++- .../connection-churn.yml | 87 +++++++++++- 2 files changed, 219 insertions(+), 2 deletions(-) diff --git a/testdata/client-side-operations-timeout/connection-churn.json b/testdata/client-side-operations-timeout/connection-churn.json index 7ca7ea3eee..0706e43193 100644 --- a/testdata/client-side-operations-timeout/connection-churn.json +++ b/testdata/client-side-operations-timeout/connection-churn.json @@ -83,7 +83,7 @@ "name": "insertOne", "object": "coll", "arguments": { - "timeoutMS": 500, + "timeoutMS": 50, "document": { "_id": 3, "x": 1 @@ -139,6 +139,138 @@ } ] }, + { + "description": "Concurrent write operation with successful pending read", + "operations": [ + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "insert" + ], + "blockConnection": true, + "blockTimeMS": 750 + } + } + } + }, + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "thread": { + "id": "thread0" + } + }, + { + "thread": { + "id": "thread1" + } + } + ] + } + }, + { + "name": "runOnThread", + "object": "testRunner", + "arguments": { + "thread": "thread0", + "operation": { + "name": "insertOne", + "object": "coll", + "arguments": { + "timeoutMS": 500, + "document": { + "_id": 2 + } + } + }, + "expectError": { + "isTimeoutError": true + } + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "connectionCheckedOutEvent": {} + }, + "count": 1 + } + }, + { + "name": "runOnThread", + "object": "testRunner", + "arguments": { + "thread": "thread1", + "operation": { + "name": "insertOne", + "object": "coll", + "arguments": { + "document": { + "_id": 3 + } + } + } + } + }, + { + "name": "waitForThread", + "object": "testRunner", + "arguments": { + "thread": "thread1" + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandFailedEvent": { + "commandName": "insert" + } + }, + { + "commandSucceededEvent": { + "commandName": "insert" + } + } + ] + }, + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + }, + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + } + ] + } + ] + }, { "description": "Write operation with unsuccessful pending read", "operations": [ diff --git a/testdata/client-side-operations-timeout/connection-churn.yml b/testdata/client-side-operations-timeout/connection-churn.yml index d68a13c373..92f67bc006 100644 --- a/testdata/client-side-operations-timeout/connection-churn.yml +++ b/testdata/client-side-operations-timeout/connection-churn.yml @@ -59,7 +59,7 @@ tests: - name: insertOne object: *collection arguments: - timeoutMS: 500 + timeoutMS: 50 document: { _id: 3, x: 1 } expectError: isTimeoutError: true @@ -85,6 +85,91 @@ tests: - connectionCheckedOutEvent: {} # find - connectionCheckedInEvent: {} # find succeeds + - description: "Concurrent write operation with successful pending read" + operations: + # Create a failpoint to block the first operation + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: ["insert"] + blockConnection: true + blockTimeMS: 750 + + # Start threads. + - name: createEntities + object: testRunner + arguments: + entities: + - thread: + id: &thread0 thread0 + - thread: + id: &thread1 thread1 + + # Run an insert in two threads. We expect the first to time out and the + # second to finish the pending read from the first and complete + # successfully. + - name: runOnThread + object: testRunner + arguments: + thread: *thread0 + operation: + name: insertOne + object: *collection + arguments: + timeoutMS: 500 + document: + _id: 2 + expectError: + isTimeoutError: true + + # Ensure the first thread checks out a connection before executing the + # operation in the second thread. This maintains concurrent behavior but + # presents the worst case scenario. + - name: waitForEvent + object: testRunner + arguments: + client: *client + event: + connectionCheckedOutEvent: {} + count: 1 + + - name: runOnThread + object: testRunner + arguments: + thread: *thread1 + operation: + name: insertOne + object: *collection + arguments: + document: + _id: 3 + + # Stop threads. + - name: waitForThread + object: testRunner + arguments: + thread: *thread1 + + expectEvents: + - client: *client + events: + - commandFailedEvent: + commandName: insert + - commandSucceededEvent: + commandName: insert + - client: *client + eventType: cmap + events: + - connectionCheckedOutEvent: {} # insert + - connectionCheckedInEvent: {} # insert fails + - connectionCheckedOutEvent: {} # find + - connectionCheckedInEvent: {} # find succeeds + - description: "Write operation with unsuccessful pending read" operations: # Create a failpoint to block the first operation From 492b6a141d03602024afc38fe6dc8d4033a078c0 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Mon, 4 Nov 2024 10:57:46 -0700 Subject: [PATCH 10/15] Isolate server selection context --- x/mongo/driver/operation.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x/mongo/driver/operation.go b/x/mongo/driver/operation.go index 968b2f258c..f95fcaed44 100644 --- a/x/mongo/driver/operation.go +++ b/x/mongo/driver/operation.go @@ -422,10 +422,10 @@ func (op Operation) getServerAndConnection( requestID int32, deprioritized []description.Server, ) (Server, *mnet.Connection, error) { - ctx, cancel := csot.WithServerSelectionTimeout(ctx, op.Deployment.GetServerSelectionTimeout()) + serverSelectionCtx, cancel := csot.WithServerSelectionTimeout(ctx, op.Deployment.GetServerSelectionTimeout()) defer cancel() - server, err := op.selectServer(ctx, requestID, deprioritized) + server, err := op.selectServer(serverSelectionCtx, requestID, deprioritized) if err != nil { if op.Client != nil && !(op.Client.Committing || op.Client.Aborting) && op.Client.TransactionRunning() { From 67eef261053adf3de4434d72ababf43331718ae3 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 8 Nov 2024 13:46:25 -0700 Subject: [PATCH 11/15] Update naming and timeouts --- event/monitoring.go | 25 ++++++---- .../connection-churn.json | 8 +-- .../connection-churn.yml | 8 +-- x/mongo/driver/topology/connection.go | 4 +- x/mongo/driver/topology/pool.go | 50 ++++++++++++------- 5 files changed, 56 insertions(+), 39 deletions(-) diff --git a/event/monitoring.go b/event/monitoring.go index 2ca98969d7..8a779662c8 100644 --- a/event/monitoring.go +++ b/event/monitoring.go @@ -75,17 +75,20 @@ const ( // strings for pool command monitoring types const ( - ConnectionPoolCreated = "ConnectionPoolCreated" - ConnectionPoolReady = "ConnectionPoolReady" - ConnectionPoolCleared = "ConnectionPoolCleared" - ConnectionPoolClosed = "ConnectionPoolClosed" - ConnectionCreated = "ConnectionCreated" - ConnectionReady = "ConnectionReady" - ConnectionClosed = "ConnectionClosed" - ConnectionCheckOutStarted = "ConnectionCheckOutStarted" - ConnectionCheckOutFailed = "ConnectionCheckOutFailed" - ConnectionCheckedOut = "ConnectionCheckedOut" - ConnectionCheckedIn = "ConnectionCheckedIn" + ConnectionPoolCreated = "ConnectionPoolCreated" + ConnectionPoolReady = "ConnectionPoolReady" + ConnectionPoolCleared = "ConnectionPoolCleared" + ConnectionPoolClosed = "ConnectionPoolClosed" + ConnectionCreated = "ConnectionCreated" + ConnectionReady = "ConnectionReady" + ConnectionClosed = "ConnectionClosed" + ConnectionCheckOutStarted = "ConnectionCheckOutStarted" + ConnectionCheckOutFailed = "ConnectionCheckOutFailed" + ConnectionCheckedOut = "ConnectionCheckedOut" + ConnectionCheckedIn = "ConnectionCheckedIn" + ConnectionPendingReadStarted = "ConnectionPendingReadStarted" + ConnectionPendingReadSucceeded = "ConnectionPendingReadSucceeded" + ConnectionPendingReadFailed = "ConnectionPendingReadFailed" ) // MonitorPoolOptions contains pool options as formatted in pool events diff --git a/testdata/client-side-operations-timeout/connection-churn.json b/testdata/client-side-operations-timeout/connection-churn.json index 0706e43193..1f9c9f4cf0 100644 --- a/testdata/client-side-operations-timeout/connection-churn.json +++ b/testdata/client-side-operations-timeout/connection-churn.json @@ -289,7 +289,7 @@ "insert" ], "blockConnection": true, - "blockTimeMS": 750 + "blockTimeMS": 1100 } } } @@ -312,7 +312,7 @@ "name": "insertOne", "object": "coll", "arguments": { - "timeoutMS": 500, + "timeoutMS": 1000, "document": { "_id": 3, "x": 1 @@ -453,7 +453,7 @@ "find" ], "blockConnection": true, - "blockTimeMS": 750 + "blockTimeMS": 1100 } } } @@ -475,7 +475,7 @@ "name": "findOne", "object": "coll", "arguments": { - "timeoutMS": 500, + "timeoutMS": 1000, "filter": { "_id": 1 } diff --git a/testdata/client-side-operations-timeout/connection-churn.yml b/testdata/client-side-operations-timeout/connection-churn.yml index 92f67bc006..12fca8e548 100644 --- a/testdata/client-side-operations-timeout/connection-churn.yml +++ b/testdata/client-side-operations-timeout/connection-churn.yml @@ -183,7 +183,7 @@ tests: data: failCommands: ["insert"] blockConnection: true - blockTimeMS: 750 + blockTimeMS: 1100 # Execute operation with timeout less than block time - name: insertOne @@ -198,7 +198,7 @@ tests: - name: insertOne object: *collection arguments: - timeoutMS: 500 + timeoutMS: 1000 document: { _id: 3, x: 1 } expectError: isTimeoutError: true @@ -276,7 +276,7 @@ tests: data: failCommands: ["find"] blockConnection: true - blockTimeMS: 750 + blockTimeMS: 1100 # Execute operation with timeout less than block time - name: findOne @@ -291,7 +291,7 @@ tests: - name: findOne object: *collection arguments: - timeoutMS: 500 + timeoutMS: 1000 filter: { _id: 1 } expectError: isTimeoutError: true diff --git a/x/mongo/driver/topology/connection.go b/x/mongo/driver/topology/connection.go index 86d19e2be3..471c47a826 100644 --- a/x/mongo/driver/topology/connection.go +++ b/x/mongo/driver/topology/connection.go @@ -485,7 +485,7 @@ func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string, if l := int32(n); l == 0 && isCSOTTimeout(err) { c.pendingReadMU.Lock() c.awaitRemainingBytes = &l - c.remainingTime = ptrutil.Ptr(BGReadTimeout) + c.remainingTime = ptrutil.Ptr(PendingReadTimeout) c.pendingReadMU.Unlock() } return nil, "incomplete read of message header", err @@ -504,7 +504,7 @@ func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string, if remainingBytes > 0 && isCSOTTimeout(err) { c.pendingReadMU.Lock() c.awaitRemainingBytes = &remainingBytes - c.remainingTime = ptrutil.Ptr(BGReadTimeout) + c.remainingTime = ptrutil.Ptr(PendingReadTimeout) c.pendingReadMU.Unlock() } return dst, "incomplete read of full message", err diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index aea6877418..3cc1efd010 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -784,22 +784,14 @@ func (p *pool) removeConnection(conn *connection, reason reason, err error) erro return nil } -var ( - // BGReadTimeout is the maximum amount of the to wait when trying to read - // the server reply on a connection after an operation timed out. The - // default is 400ms. - // - // Deprecated: BGReadTimeout is intended for internal use only and may be - // removed or modified at any time. - BGReadTimeout = 400 * time.Millisecond +// PendingReadTimeout is the maximum amount of the to wait when trying to read +// the server reply on a connection after an operation timed out. The +// default is 1 second. +// +// Deprecated: PendingReadTimeout is intended for internal use only and may be +// removed or modified at any time. - // BGReadCallback is a callback for monitoring the behavior of the - // background-read-on-timeout connection preserving mechanism. - // - // Deprecated: BGReadCallback is intended for internal use only and may be - // removed or modified at any time. - BGReadCallback func(addr string, start, read time.Time, errs []error, connClosed bool) -) +var PendingReadTimeout = 1 * time.Second // awaitPendingRead sets a new read deadline on the provided connection and // tries to read any bytes returned by the server. If there are any errors, the @@ -813,6 +805,13 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error { return nil } + //if pool.monitor != nil { + // pool.monitor.Event(&event.PoolEvent{ + // Type: event.ConnectionPendingReadStarted, + // ConnectionID: conn.driverConnectionID, + // }) + //} + size := *conn.awaitRemainingBytes checkIn := false @@ -820,8 +819,7 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error { defer func() { // If we have exceeded the time limit, then close the connection. if conn.remainingTime != nil && *conn.remainingTime < 0 { - err := conn.close() - if err != nil { + if err := conn.close(); err != nil { panic(err) } @@ -832,6 +830,14 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error { return } + //if pool.monitor != nil { + // pool.monitor.Event(&event.PoolEvent{ + // Type: event.ConnectionPendingReadFailed, + // ConnectionID: conn.driverConnectionID, + // //Reason: readErr.Error(), + // }) + //} + // No matter what happens, always check the connection back into the // pool, which will either make it available for other operations or // remove it from the pool if it was closed. @@ -845,7 +851,7 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error { dl, contextDeadlineUsed := ctx.Deadline() if !contextDeadlineUsed { - dl = time.Now().Add(BGReadTimeout) + dl = time.Now().Add(PendingReadTimeout) } err := conn.nc.SetReadDeadline(dl) @@ -891,6 +897,14 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error { return fmt.Errorf("error discarding %d byte message: %w", size, err) } + //if pool.monitor != nil { + // pool.monitor.Event(&event.PoolEvent{ + // Type: event.ConnectionPendingReadSucceeded, + // ConnectionID: conn.driverConnectionID, + // //Reason: readErr.Error(), + // }) + //} + conn.awaitRemainingBytes = nil conn.remainingTime = nil From 14ca5aea2156d7825c33f7ea87fbdb4d89edea8f Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 22 Nov 2024 16:48:01 -0700 Subject: [PATCH 12/15] Advance logic to meet TD --- event/monitoring.go | 8 ++- internal/logger/component.go | 3 + x/mongo/driver/drivertest/channel_conn.go | 3 +- x/mongo/driver/drivertest/opmsg_deployment.go | 2 +- x/mongo/driver/mnet/connection.go | 21 +++++- x/mongo/driver/operation.go | 37 +++++++++-- x/mongo/driver/operation_test.go | 2 +- x/mongo/driver/topology/connection.go | 26 +++++--- x/mongo/driver/topology/pool.go | 66 +++++++++++-------- 9 files changed, 119 insertions(+), 49 deletions(-) diff --git a/event/monitoring.go b/event/monitoring.go index 8a779662c8..4965a55528 100644 --- a/event/monitoring.go +++ b/event/monitoring.go @@ -108,9 +108,11 @@ type PoolEvent struct { Reason string `json:"reason"` // ServiceID is only set if the Type is PoolCleared and the server is deployed behind a load balancer. This field // can be used to distinguish between individual servers in a load balanced deployment. - ServiceID *bson.ObjectID `json:"serviceId"` - Interruption bool `json:"interruptInUseConnections"` - Error error `json:"error"` + ServiceID *bson.ObjectID `json:"serviceId"` + Interruption bool `json:"interruptInUseConnections"` + Error error `json:"error"` + RequestID int32 `json:"requestId"` + RemainingTime time.Duration `json:"remainingTime"` } // PoolMonitor is a function that allows the user to gain access to events occurring in the pool diff --git a/internal/logger/component.go b/internal/logger/component.go index a601707cbf..5abc3f5f79 100644 --- a/internal/logger/component.go +++ b/internal/logger/component.go @@ -28,6 +28,9 @@ const ( ConnectionCheckoutFailed = "Connection checkout failed" ConnectionCheckedOut = "Connection checked out" ConnectionCheckedIn = "Connection checked in" + ConnectionPendingReadStarted = "Pending read started" + ConnectionPendingReadSucceeded = "Pending read succeeded" + ConnectionPendingReadFailed = "Pending read failed" ServerSelectionFailed = "Server selection failed" ServerSelectionStarted = "Server selection started" ServerSelectionSucceeded = "Server selection succeeded" diff --git a/x/mongo/driver/drivertest/channel_conn.go b/x/mongo/driver/drivertest/channel_conn.go index 4e1f2c78c5..a365e79bc6 100644 --- a/x/mongo/driver/drivertest/channel_conn.go +++ b/x/mongo/driver/drivertest/channel_conn.go @@ -13,6 +13,7 @@ import ( "go.mongodb.org/mongo-driver/v2/mongo/address" "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" "go.mongodb.org/mongo-driver/v2/x/mongo/driver/description" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/mnet" "go.mongodb.org/mongo-driver/v2/x/mongo/driver/wiremessage" ) @@ -52,7 +53,7 @@ func (c *ChannelConn) Write(ctx context.Context, wm []byte) error { } // ReadWireMessage implements the driver.Connection interface. -func (c *ChannelConn) Read(ctx context.Context) ([]byte, error) { +func (c *ChannelConn) Read(ctx context.Context, _ ...mnet.ReadOption) ([]byte, error) { var wm []byte var err error select { diff --git a/x/mongo/driver/drivertest/opmsg_deployment.go b/x/mongo/driver/drivertest/opmsg_deployment.go index 84fdb308df..ea3ffe5d68 100644 --- a/x/mongo/driver/drivertest/opmsg_deployment.go +++ b/x/mongo/driver/drivertest/opmsg_deployment.go @@ -68,7 +68,7 @@ func (c *connection) SetOIDCTokenGenID(uint64) { } // Read returns the next response in the connection's list of responses. -func (c *connection) Read(_ context.Context) ([]byte, error) { +func (c *connection) Read(_ context.Context, _ ...mnet.ReadOption) ([]byte, error) { var dst []byte if len(c.responses) == 0 { return dst, errors.New("no responses remaining") diff --git a/x/mongo/driver/mnet/connection.go b/x/mongo/driver/mnet/connection.go index e02ecceadb..3b84747431 100644 --- a/x/mongo/driver/mnet/connection.go +++ b/x/mongo/driver/mnet/connection.go @@ -14,10 +14,29 @@ import ( "go.mongodb.org/mongo-driver/v2/x/mongo/driver/description" ) +type ReadOption func(*ReadOptions) + +type ReadOptions struct { + HasMaxTimeMS bool + RequestID int32 +} + +func WithReadMaxTimeMS() ReadOption { + return func(opts *ReadOptions) { + opts.HasMaxTimeMS = true + } +} + +func WithRequestID(requestID int32) ReadOption { + return func(opts *ReadOptions) { + opts.RequestID = requestID + } +} + // ReadWriteCloser represents a Connection where server operations // can read from, written to, and closed. type ReadWriteCloser interface { - Read(ctx context.Context) ([]byte, error) + Read(ctx context.Context, opts ...ReadOption) ([]byte, error) Write(ctx context.Context, wm []byte) error io.Closer } diff --git a/x/mongo/driver/operation.go b/x/mongo/driver/operation.go index f95fcaed44..194c100824 100644 --- a/x/mongo/driver/operation.go +++ b/x/mongo/driver/operation.go @@ -792,7 +792,18 @@ func (op Operation) Execute(ctx context.Context) error { if moreToCome { roundTrip = op.moreToComeRoundTrip } - res, err = roundTrip(ctx, conn, *wm) + + readOpts := []mnet.ReadOption{} + if maxTimeMS != 0 { + readOpts = append(readOpts, mnet.WithReadMaxTimeMS()) + readOpts = append(readOpts, mnet.WithRequestID(startedInfo.requestID)) + } + + // Inform the roundTrip if maxTimeMS is set. If it is and the operation + // times out, then the connection should be put into a "pending" state + // so that the next time it is checked out it attempts to finish the read + // which is almost certainly a server error noting a timeout. + res, err = roundTrip(ctx, conn, *wm, readOpts) if ep, ok := srvr.(ErrorProcessor); ok { _ = ep.ProcessError(err, conn) @@ -1076,16 +1087,25 @@ func (op Operation) retryable(desc description.Server) bool { // roundTrip writes a wiremessage to the connection and then reads a wiremessage. The wm parameter // is reused when reading the wiremessage. -func (op Operation) roundTrip(ctx context.Context, conn *mnet.Connection, wm []byte) ([]byte, error) { +func (op Operation) roundTrip( + ctx context.Context, + conn *mnet.Connection, + wm []byte, + readOpts []mnet.ReadOption, +) ([]byte, error) { err := conn.Write(ctx, wm) if err != nil { return nil, op.networkError(err) } - return op.readWireMessage(ctx, conn) + return op.readWireMessage(ctx, conn, readOpts...) } -func (op Operation) readWireMessage(ctx context.Context, conn *mnet.Connection) (result []byte, err error) { - wm, err := conn.Read(ctx) +func (op Operation) readWireMessage( + ctx context.Context, + conn *mnet.Connection, + opts ...mnet.ReadOption, +) (result []byte, err error) { + wm, err := conn.Read(ctx, opts...) if err != nil { return nil, op.networkError(err) } @@ -1156,7 +1176,12 @@ func (op Operation) networkError(err error) error { // moreToComeRoundTrip writes a wiremessage to the provided connection. This is used when an OP_MSG is // being sent with the moreToCome bit set. -func (op *Operation) moreToComeRoundTrip(ctx context.Context, conn *mnet.Connection, wm []byte) (result []byte, err error) { +func (op *Operation) moreToComeRoundTrip( + ctx context.Context, + conn *mnet.Connection, + wm []byte, + _ []mnet.ReadOption, +) (result []byte, err error) { err = conn.Write(ctx, wm) if err != nil { if op.Client != nil { diff --git a/x/mongo/driver/operation_test.go b/x/mongo/driver/operation_test.go index 911f32dbf5..54fe82e597 100644 --- a/x/mongo/driver/operation_test.go +++ b/x/mongo/driver/operation_test.go @@ -793,7 +793,7 @@ func (m *mockConnection) Write(_ context.Context, wm []byte) error { return m.rWriteErr } -func (m *mockConnection) Read(_ context.Context) ([]byte, error) { +func (m *mockConnection) Read(_ context.Context, _ ...mnet.ReadOption) ([]byte, error) { return m.rReadWM, m.rReadErr } diff --git a/x/mongo/driver/topology/connection.go b/x/mongo/driver/topology/connection.go index 471c47a826..b06b742c96 100644 --- a/x/mongo/driver/topology/connection.go +++ b/x/mongo/driver/topology/connection.go @@ -86,6 +86,7 @@ type connection struct { // awaitRemainingBytes indicates the size of server response that was not completely // read before returning the connection to the pool. awaitRemainingBytes *int32 + requestID int32 remainingTime *time.Duration pendingReadMU sync.Mutex } @@ -396,7 +397,7 @@ func (c *connection) write(ctx context.Context, wm []byte) (err error) { } // readWireMessage reads a wiremessage from the connection. The dst parameter will be overwritten. -func (c *connection) readWireMessage(ctx context.Context) ([]byte, error) { +func (c *connection) readWireMessage(ctx context.Context, opts ...mnet.ReadOption) ([]byte, error) { if atomic.LoadInt64(&c.state) != connConnected { return nil, ConnectionError{ ConnectionID: c.id, @@ -409,7 +410,7 @@ func (c *connection) readWireMessage(ctx context.Context) ([]byte, error) { return nil, ConnectionError{ConnectionID: c.id, Wrapped: err, message: "failed to set read deadline"} } - dst, errMsg, err := c.read(ctx) + dst, errMsg, err := c.read(ctx, opts...) if err != nil { c.pendingReadMU.Lock() if c.awaitRemainingBytes == nil { @@ -452,7 +453,7 @@ func (c *connection) parseWmSizeBytes(wmSizeBytes [4]byte) (int32, error) { return size, nil } -func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string, err error) { +func (c *connection) read(ctx context.Context, opts ...mnet.ReadOption) (bytesRead []byte, errMsg string, err error) { go c.cancellationListener.Listen(ctx, c.cancellationListenerCallback) defer func() { // If the context is cancelled after we finish reading the server response, the cancellation listener could fire @@ -465,6 +466,11 @@ func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string, } }() + readOpts := mnet.ReadOptions{} + for _, opt := range opts { + opt(&readOpts) + } + isCSOTTimeout := func(err error) bool { // If the error was a timeout error, instead of closing the // connection mark it as awaiting response so the pool can read the @@ -482,9 +488,10 @@ func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string, // reading messages from an exhaust cursor. n, err := io.ReadFull(c.nc, sizeBuf[:]) if err != nil { - if l := int32(n); l == 0 && isCSOTTimeout(err) { + if l := int32(n); l == 0 && isCSOTTimeout(err) && readOpts.HasMaxTimeMS { c.pendingReadMU.Lock() c.awaitRemainingBytes = &l + c.requestID = readOpts.RequestID c.remainingTime = ptrutil.Ptr(PendingReadTimeout) c.pendingReadMU.Unlock() } @@ -501,9 +508,10 @@ func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string, n, err = io.ReadFull(c.nc, dst[4:]) if err != nil { remainingBytes := size - 4 - int32(n) - if remainingBytes > 0 && isCSOTTimeout(err) { + if remainingBytes > 0 && isCSOTTimeout(err) && readOpts.HasMaxTimeMS { c.pendingReadMU.Lock() c.awaitRemainingBytes = &remainingBytes + c.requestID = readOpts.RequestID c.remainingTime = ptrutil.Ptr(PendingReadTimeout) c.pendingReadMU.Unlock() } @@ -664,8 +672,8 @@ func (c initConnection) LocalAddress() address.Address { func (c initConnection) Write(ctx context.Context, wm []byte) error { return c.writeWireMessage(ctx, wm) } -func (c initConnection) Read(ctx context.Context) ([]byte, error) { - return c.readWireMessage(ctx) +func (c initConnection) Read(ctx context.Context, opts ...mnet.ReadOption) ([]byte, error) { + return c.readWireMessage(ctx, opts...) } func (c initConnection) SetStreaming(streaming bool) { c.setStreaming(streaming) @@ -712,13 +720,13 @@ func (c *Connection) Write(ctx context.Context, wm []byte) error { // ReadWireMessage handles reading a wire message from the underlying connection. The dst parameter // will be overwritten with the new wire message. -func (c *Connection) Read(ctx context.Context) ([]byte, error) { +func (c *Connection) Read(ctx context.Context, opts ...mnet.ReadOption) ([]byte, error) { c.mu.RLock() defer c.mu.RUnlock() if c.connection == nil { return nil, ErrConnectionClosed } - return c.connection.readWireMessage(ctx) + return c.connection.readWireMessage(ctx, opts...) } // CompressWireMessage handles compressing the provided wire message using the underlying diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index 3cc1efd010..3d5327972f 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -791,7 +791,7 @@ func (p *pool) removeConnection(conn *connection, reason reason, err error) erro // Deprecated: PendingReadTimeout is intended for internal use only and may be // removed or modified at any time. -var PendingReadTimeout = 1 * time.Second +var PendingReadTimeout = 400 * time.Millisecond // awaitPendingRead sets a new read deadline on the provided connection and // tries to read any bytes returned by the server. If there are any errors, the @@ -805,18 +805,32 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error { return nil } - //if pool.monitor != nil { - // pool.monitor.Event(&event.PoolEvent{ - // Type: event.ConnectionPendingReadStarted, - // ConnectionID: conn.driverConnectionID, - // }) - //} + if mustLogPoolMessage(pool) { + keysAndValues := logger.KeyValues{ + logger.KeyDriverConnectionID, conn.driverConnectionID, + logger.KeyRequestID, conn.requestID, + } + + logPoolMessage(pool, logger.ConnectionPendingReadStarted, keysAndValues...) + } size := *conn.awaitRemainingBytes checkIn := false + var someErr error defer func() { + if mustLogPoolMessage(pool) && someErr != nil { + keysAndValues := logger.KeyValues{ + logger.KeyDriverConnectionID, conn.driverConnectionID, + logger.KeyRequestID, conn.requestID, + logger.KeyReason, someErr.Error(), + logger.KeyRemainingTimeMS, *conn.remainingTime, + } + + logPoolMessage(pool, logger.ConnectionPendingReadFailed, keysAndValues...) + } + // If we have exceeded the time limit, then close the connection. if conn.remainingTime != nil && *conn.remainingTime < 0 { if err := conn.close(); err != nil { @@ -830,14 +844,6 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error { return } - //if pool.monitor != nil { - // pool.monitor.Event(&event.PoolEvent{ - // Type: event.ConnectionPendingReadFailed, - // ConnectionID: conn.driverConnectionID, - // //Reason: readErr.Error(), - // }) - //} - // No matter what happens, always check the connection back into the // pool, which will either make it available for other operations or // remove it from the pool if it was closed. @@ -857,7 +863,10 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error { err := conn.nc.SetReadDeadline(dl) if err != nil { checkIn = true - return fmt.Errorf("error setting a read deadline: %w", err) + + someErr = fmt.Errorf("error setting a read deadline: %w", err) + + return someErr } st := time.Now() @@ -869,15 +878,16 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error { checkIn = true err = transformNetworkError(ctx, err, contextDeadlineUsed) + someErr = fmt.Errorf("error reading the message size: %w", err) - return fmt.Errorf("error reading the message size: %w", err) + return someErr } size, err = conn.parseWmSizeBytes(sizeBuf) if err != nil { checkIn = true - err = transformNetworkError(ctx, err, contextDeadlineUsed) + someErr = transformNetworkError(ctx, err, contextDeadlineUsed) - return err + return someErr } size -= 4 } @@ -893,17 +903,19 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error { checkIn = true err = transformNetworkError(ctx, err, contextDeadlineUsed) + someErr = fmt.Errorf("error discarding %d byte message: %w", size, err) - return fmt.Errorf("error discarding %d byte message: %w", size, err) + return someErr } - //if pool.monitor != nil { - // pool.monitor.Event(&event.PoolEvent{ - // Type: event.ConnectionPendingReadSucceeded, - // ConnectionID: conn.driverConnectionID, - // //Reason: readErr.Error(), - // }) - //} + if mustLogPoolMessage(pool) { + keysAndValues := logger.KeyValues{ + logger.KeyDriverConnectionID, conn.driverConnectionID, + logger.KeyRequestID, conn.requestID, + } + + logPoolMessage(pool, logger.ConnectionPendingReadSucceeded, keysAndValues...) + } conn.awaitRemainingBytes = nil conn.remainingTime = nil From 1f442bfc39804b31f29e7881bc92aa6bc68dc5b2 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Tue, 3 Dec 2024 15:30:17 -0700 Subject: [PATCH 13/15] Use remaining time if available --- x/mongo/driver/topology/pool.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index 3d5327972f..38928fd043 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -7,6 +7,7 @@ package topology import ( + "bytes" "context" "errors" "fmt" @@ -857,7 +858,15 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error { dl, contextDeadlineUsed := ctx.Deadline() if !contextDeadlineUsed { - dl = time.Now().Add(PendingReadTimeout) + // If there is a remainingTime, use that. If not, use the static + // PendingReadTimeout. This is required since a user could provide a timeout + // for the first try that does not exceed the pending read timeout, fail, + // and then not use a timeout for a subsequent try. + if conn.remainingTime != nil { + dl = time.Now().Add(*conn.remainingTime) + } else { + dl = time.Now().Add(PendingReadTimeout) + } } err := conn.nc.SetReadDeadline(dl) @@ -871,7 +880,7 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error { st := time.Now() - if size == 0 { + if size == 0 { // Question: Would this alawys equal to zero? var sizeBuf [4]byte if _, err := io.ReadFull(conn.nc, sizeBuf[:]); err != nil { conn.remainingTime = ptrutil.Ptr(*conn.remainingTime - time.Since(st)) @@ -891,7 +900,10 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error { } size -= 4 } - n, err := io.CopyN(io.Discard, conn.nc, int64(size)) + + buf := bytes.NewBuffer(nil) + n, err := io.CopyN(buf, conn.nc, int64(size)) + fmt.Println("buf: ", buf) if err != nil { // If the read times out, record the bytes left to read before exiting. nerr := net.Error(nil) From 9787a12b9504e71f62d7c7d786b20b61c252c241 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Thu, 30 Jan 2025 12:56:41 -0700 Subject: [PATCH 14/15] Update CMAP --- event/monitoring.go | 1 + x/mongo/driver/topology/pool.go | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/event/monitoring.go b/event/monitoring.go index 4965a55528..71038c3812 100644 --- a/event/monitoring.go +++ b/event/monitoring.go @@ -89,6 +89,7 @@ const ( ConnectionPendingReadStarted = "ConnectionPendingReadStarted" ConnectionPendingReadSucceeded = "ConnectionPendingReadSucceeded" ConnectionPendingReadFailed = "ConnectionPendingReadFailed" + ConnectionPendingReadDuration = "ConnectionPendingReadDuration" ) // MonitorPoolOptions contains pool options as formatted in pool events diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index 38928fd043..ec99595b80 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -903,7 +903,6 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error { buf := bytes.NewBuffer(nil) n, err := io.CopyN(buf, conn.nc, int64(size)) - fmt.Println("buf: ", buf) if err != nil { // If the read times out, record the bytes left to read before exiting. nerr := net.Error(nil) From a7bc2ce74ef7cf7aac4aaed07ec61d27dff9a52d Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Wed, 5 Feb 2025 13:59:22 -0700 Subject: [PATCH 15/15] DRIVERS-2884 Add event monitoring to pending read --- x/mongo/driver/topology/pool.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index ec99595b80..f4325fe04f 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -832,6 +832,19 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error { logPoolMessage(pool, logger.ConnectionPendingReadFailed, keysAndValues...) } + if pool.monitor != nil && someErr != nil { + event := &event.PoolEvent{ + Type: event.ConnectionPendingReadFailed, + Address: pool.address.String(), + ConnectionID: conn.driverConnectionID, + RequestID: conn.requestID, + RemainingTime: *conn.remainingTime, + Reason: someErr.Error(), + } + + pool.monitor.Event(event) + } + // If we have exceeded the time limit, then close the connection. if conn.remainingTime != nil && *conn.remainingTime < 0 { if err := conn.close(); err != nil { @@ -928,6 +941,17 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error { logPoolMessage(pool, logger.ConnectionPendingReadSucceeded, keysAndValues...) } + if pool.monitor != nil { + event := &event.PoolEvent{ + Type: event.ConnectionPendingReadSucceeded, + Address: pool.address.String(), + ConnectionID: conn.driverConnectionID, + Duration: time.Since(st), + } + + pool.monitor.Event(event) + } + conn.awaitRemainingBytes = nil conn.remainingTime = nil