Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,7 @@ func (p *ConnPool) queuedNewConn(ctx context.Context) (*Conn, error) {
var err error
defer func() {
if err != nil {
if cn := w.cancel(); cn != nil {
p.putIdleConn(ctx, cn)
if cn := w.cancel(); cn != nil && p.putIdleConn(ctx, cn) {
p.freeTurn()
}
}
Expand All @@ -593,14 +592,15 @@ func (p *ConnPool) queuedNewConn(ctx context.Context) (*Conn, error) {

dialCtx := w.getCtxForDial()
cn, cnErr := p.newConn(dialCtx, true)
delivered := w.tryDeliver(cn, cnErr)
if cnErr == nil && delivered {
return
} else if cnErr == nil && !delivered {
p.putIdleConn(dialCtx, cn)
if cnErr != nil {
w.tryDeliver(nil, cnErr) // deliver error to caller, notify connection creation failed
p.freeTurn()
freeTurnCalled = true
} else {
return
}

delivered := w.tryDeliver(cn, cnErr)
if !delivered && p.putIdleConn(dialCtx, cn) {
p.freeTurn()
freeTurnCalled = true
}
Expand All @@ -616,14 +616,20 @@ func (p *ConnPool) queuedNewConn(ctx context.Context) (*Conn, error) {
}
}

func (p *ConnPool) putIdleConn(ctx context.Context, cn *Conn) {
// putIdleConn puts a connection back to the pool or passes it to the next waiting request.
//
// It returns true if the connection was put back to the pool,
// which means the turn needs to be freed directly by the caller,
// or false if the connection was passed to the next waiting request,
// which means the turn will be freed by the waiting goroutine after it returns.
func (p *ConnPool) putIdleConn(ctx context.Context, cn *Conn) bool {
for {
w, ok := p.dialsQueue.dequeue()
if !ok {
break
}
if w.tryDeliver(cn, nil) {
return
return false
}
}

Expand All @@ -632,12 +638,14 @@ func (p *ConnPool) putIdleConn(ctx context.Context, cn *Conn) {

if p.closed() {
_ = cn.Close()
return
return true
}

// poolSize is increased in newConn
p.idleConns = append(p.idleConns, cn)
p.idleConnsLen.Add(1)

return true
}

func (p *ConnPool) waitTurn(ctx context.Context) error {
Expand Down
28 changes: 14 additions & 14 deletions internal/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,22 +1020,22 @@ var _ = Describe("queuedNewConn", func() {
Expect(reqBErr).NotTo(HaveOccurred(), "Request B should receive Request A's connection")
Expect(reqBConn).NotTo(BeNil())

// CRITICAL CHECK: Turn leak detection
// FIRST CRITICAL CHECK: Turn state after connection delivery
// After Request B receives connection from putIdleConn:
// - Request A's turn SHOULD be released (via freeTurn)
// - Request B's turn is still held (will release on Put)
// Expected QueueLen: 1 (only Request B)
// If Bug exists (missing freeTurn): QueueLen: 2 (Request A's turn leaked)
time.Sleep(100 * time.Millisecond) // Allow time for turn release
currentQueueLen := testPool.QueueLen()

Expect(currentQueueLen).To(Equal(1),
"QueueLen should be 1 (only Request B holding turn). "+
"If it's 2, Request A's turn leaked due to missing freeTurn()")

// Cleanup
// - Request A's turn is held by Request B (connection delivered)
// - Request B's turn is still held by Request B's dial to complete the connection
// Expected QueueLen: 2 (Request B holding turn for connection usage)
time.Sleep(100 * time.Millisecond) // ~300ms total
Expect(testPool.QueueLen()).To(Equal(2))

// SECOND CRITICAL CHECK: Turn release after dial completion
// Wait for Request B's dial result to complete
time.Sleep(300 * time.Millisecond) // ~600ms total
Expect(testPool.QueueLen()).To(Equal(1))

// Cleanup and verify turn is released
testPool.Put(ctx, reqBConn)
Eventually(func() int { return testPool.QueueLen() }, "500ms").Should(Equal(0))
Eventually(func() int { return testPool.QueueLen() }, "600ms").Should(Equal(0))
})
})

Expand Down
Loading