diff --git a/internal/integration/unified/client_entity.go b/internal/integration/unified/client_entity.go index 8f9df3d1c6..cd66f0a43e 100644 --- a/internal/integration/unified/client_entity.go +++ b/internal/integration/unified/client_entity.go @@ -24,6 +24,7 @@ import ( "go.mongodb.org/mongo-driver/v2/mongo/options" "go.mongodb.org/mongo-driver/v2/mongo/readconcern" "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/description" ) // There are no automated tests for truncation. Given that, setting the @@ -59,6 +60,8 @@ type clientEntity struct { ignoredCommands map[string]struct{} observeSensitiveCommands *bool numConnsCheckedOut int32 + latestDesc event.TopologyDescription + connsPerServer map[string]int // These should not be changed after the clientEntity is initialized observedEvents map[monitoringEventType]struct{} @@ -75,29 +78,6 @@ type clientEntity struct { logQueue chan orderedLogMessage } -// awaitMinimumPoolSize waits for the client's connection pool to reach the -// specified minimum size. This is a best effort operation that times out after -// some predefined amount of time to avoid blocking tests indefinitely. -func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64) error { - // Don't spend longer than 500ms awaiting minPoolSize. - awaitCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) - defer cancel() - - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - - for { - select { - case <-awaitCtx.Done(): - return fmt.Errorf("timed out waiting for client to reach minPoolSize") - case <-ticker.C: - if uint64(entity.eventsCount[connectionReadyEvent]) >= minPoolSize { - return nil - } - } - } -} - func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOptions) (*clientEntity, error) { // The "configureFailPoint" command should always be ignored. ignoredCommands := map[string]struct{}{ @@ -118,6 +98,7 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp serverDescriptionChangedEventsCount: make(map[serverDescriptionChangedEventInfo]int32), entityMap: em, observeSensitiveCommands: entityOptions.ObserveSensitiveCommands, + connsPerServer: make(map[string]int), } entity.setRecordEvents(true) @@ -226,8 +207,17 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp return nil, fmt.Errorf("error creating mongo.Client: %w", err) } - if entityOptions.AwaitMinPoolSize && clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 { - if err := awaitMinimumPoolSize(ctx, entity, *clientOpts.MinPoolSize); err != nil { + if entityOptions.AwaitMinPoolSizeMS != nil && *entityOptions.AwaitMinPoolSizeMS > 0 && + clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 { + + if err := func() error { + awaitDur := time.Duration(*entityOptions.AwaitMinPoolSizeMS) * time.Millisecond + + awaitCtx, cancel := context.WithTimeout(ctx, awaitDur) + defer cancel() + + return awaitMinimumPoolSize(awaitCtx, entity, *clientOpts.MinPoolSize) + }(); err != nil { return nil, err } } @@ -476,6 +466,27 @@ func (c *clientEntity) processFailedEvent(_ context.Context, evt *event.CommandF } } +func (c *clientEntity) resetEventHistory() { + c.eventProcessMu.Lock() + defer c.eventProcessMu.Unlock() + + c.pooled = nil + c.serverDescriptionChanged = nil + c.serverHeartbeatStartedEvent = nil + c.serverHeartbeatSucceeded = nil + c.serverHeartbeatFailedEvent = nil + c.topologyDescriptionChanged = nil + c.topologyOpening = nil + c.topologyClosed = nil +} + +func (c *clientEntity) latestTopology() event.TopologyDescription { + c.eventProcessMu.RLock() + defer c.eventProcessMu.RUnlock() + + return c.latestDesc +} + func getPoolEventDocument(evt *event.PoolEvent, eventType monitoringEventType) bson.Raw { bsonBuilder := bsoncore.NewDocumentBuilder(). AppendString("name", string(eventType)). @@ -506,12 +517,21 @@ func (c *clientEntity) processPoolEvent(evt *event.PoolEvent) { return } - // Update the connection counter. This happens even if we're not storing any events. + // Update the connection counter. This happens even if we're not storing any + // events. switch evt.Type { case event.ConnectionCheckedOut: atomic.AddInt32(&c.numConnsCheckedOut, 1) case event.ConnectionCheckedIn: atomic.AddInt32(&c.numConnsCheckedOut, -1) + case event.ConnectionReady: + c.eventProcessMu.Lock() + c.connsPerServer[evt.Address]++ + c.eventProcessMu.Unlock() + case event.ConnectionClosed: + c.eventProcessMu.Lock() + c.connsPerServer[evt.Address]-- + c.eventProcessMu.Unlock() } eventType := monitoringEventTypeFromPoolEvent(evt) @@ -529,6 +549,20 @@ func (c *clientEntity) processPoolEvent(evt *event.PoolEvent) { } } +// connsReady returns the number of ready connections for the given server +// address. If the server is not data-bearing, this method will return -1. +func (c *clientEntity) connsReady(server event.ServerDescription) int { + c.eventProcessMu.RLock() + defer c.eventProcessMu.RUnlock() + + if server.Kind == description.ServerKindRSArbiter.String() || + server.Kind == description.ServerKindRSGhost.String() { + return -1 + } + + return c.connsPerServer[server.Addr.String()] +} + func (c *clientEntity) processServerDescriptionChangedEvent(evt *event.ServerDescriptionChangedEvent) { c.eventProcessMu.Lock() defer c.eventProcessMu.Unlock() @@ -601,6 +635,8 @@ func (c *clientEntity) processTopologyDescriptionChangedEvent(evt *event.Topolog return } + c.latestDesc = evt.NewDescription + if _, ok := c.observedEvents[topologyDescriptionChangedEvent]; ok { c.topologyDescriptionChanged = append(c.topologyDescriptionChanged, evt) } @@ -724,3 +760,35 @@ func evaluateUseMultipleMongoses(clientOpts *options.ClientOptions, useMultipleM } return nil } + +// awaitMinimumPoolSize waits for the client's connection pool to reach the +// specified minimum size. This is a best effort operation that times out after +// some predefined amount of time to avoid blocking tests indefinitely. +func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64) error { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("timed out waiting for client to reach minPoolSize") + case <-ticker.C: + ready := true + for _, server := range entity.latestTopology().Servers { + if r := entity.connsReady(server); r >= 0 && r < int(minPoolSize) { + ready = false + + // If any server has less than minPoolSize connections, continue + // waiting. + break + } + } + + if ready { + entity.resetEventHistory() + + return nil + } + } + } +} diff --git a/internal/integration/unified/entity.go b/internal/integration/unified/entity.go index b1b827a124..f8828965fc 100644 --- a/internal/integration/unified/entity.go +++ b/internal/integration/unified/entity.go @@ -23,10 +23,8 @@ import ( "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" ) -var ( - // ErrEntityMapOpen is returned when a slice entity is accessed while the EntityMap is open - ErrEntityMapOpen = errors.New("slices cannot be accessed while EntityMap is open") -) +// ErrEntityMapOpen is returned when a slice entity is accessed while the EntityMap is open +var ErrEntityMapOpen = errors.New("slices cannot be accessed while EntityMap is open") var ( tlsCAFile = os.Getenv("CSFLE_TLS_CA_FILE") @@ -83,11 +81,10 @@ type entityOptions struct { ClientEncryptionOpts *clientEncryptionOpts `bson:"clientEncryptionOpts"` - // If true, the unified spec runner must wait for the connection pool to be - // populated for all servers according to the minPoolSize option. If false, - // not specified, or if minPoolSize equals 0, there is no need to wait for any - // specific pool state. - AwaitMinPoolSize bool `bson:"awaitMinPoolSize"` + // Maximum duration (in milliseconds) that the test runner MUST wait for each + // connection pool to be populated with minPoolSize. Any CMAP and SDAM events + // that occur before the pool is populated will be ignored. + AwaitMinPoolSizeMS *int `bson:"awaitMinPoolSizeMS"` } func (eo *entityOptions) setHeartbeatFrequencyMS(freq time.Duration) { @@ -106,7 +103,8 @@ func (eo *entityOptions) setHeartbeatFrequencyMS(freq time.Duration) { // newCollectionEntityOptions constructs an entity options object for a // collection. func newCollectionEntityOptions(id string, databaseID string, collectionName string, - opts *dbOrCollectionOptions) *entityOptions { + opts *dbOrCollectionOptions, +) *entityOptions { options := &entityOptions{ ID: id, DatabaseID: databaseID, @@ -598,7 +596,6 @@ func getKmsCredential(kmsDocument bson.Raw, credentialName string, envVar string return "", fmt.Errorf("unable to get environment value for %v. Please set the CSFLE environment variable: %v", credentialName, envVar) } return os.Getenv(envVar), nil - } func (em *EntityMap) addClientEncryptionEntity(entityOptions *entityOptions) error {