Skip to content

Commit 371ef16

Browse files
Bryan C. Millsgopherbot
Bryan C. Mills
authored andcommitted
internal/jsonrpc2_v2: rework concurrency in idleListener
This eliminates a race between a successful Accept call and a concurrent Close call, which previously could have shut down the 'run' goroutine before Accept sent to the newConns channel, causing Accept to deadlock. In fact, it eliminates the long-running 'run' goroutine entirely (replacing it with a time.Timer), and in the process avoids leaking O(N) closed connections when only the very first one is long-lived. It also eliminates a potential double-Close bug: if the run method had called l.wrapped.Close due to an idle timeout, a subsequent call to Close would invoke l.wrapped.Close again. The io.Closer method explicitly documents doubled Close calls as undefined behavior, and many Closer implementations (especially test fakes) panic or deadlock in that case. It also eliminates a timer leak if the Listener rapidly oscillates between active and idle: previously the implementation used time.After, but it now uses an explicit time.Timer which can be stopped (and garbage-collected) when the listener becomes active. Idleness is now tracked based on the connection's Close method rather than Read: we have no guarantee in general that a caller will ever actually invoke Read (if, for example, they Close the connection as soon as it is dialed), but we can reasonably expect a caller to at least try to ensure that Close is always called. We now also verify, using a finalizer on a best-effort basis, that the Close method on each connection is called. We use the finalizer to verify the Close call — rather than to close the connection implicitly — because closing the connection in a finalizer would delay the start of the idle timer by an arbitrary and unbounded duration after the last connection is actually no longer in use. Fixes golang/go#46047. Fixes golang/go#51435. For golang/go#46520. For golang/go#49387. Change-Id: If173a3ed7a44aff14734b72c8340122e8d020f26 Reviewed-on: https://go-review.googlesource.com/c/tools/+/388597 Run-TryBot: Bryan Mills <[email protected]> TryBot-Result: Gopher Robot <[email protected]> Reviewed-by: Ian Cottrell <[email protected]> Auto-Submit: Bryan Mills <[email protected]> gopls-CI: kokoro <[email protected]>
1 parent 5935531 commit 371ef16

File tree

3 files changed

+334
-112
lines changed

3 files changed

+334
-112
lines changed

internal/jsonrpc2_v2/serve.go

+171-77
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package jsonrpc2
77
import (
88
"context"
99
"errors"
10+
"fmt"
1011
"io"
1112
"runtime"
1213
"strings"
@@ -171,112 +172,205 @@ func isClosingError(err error) bool {
171172
}
172173

173174
// NewIdleListener wraps a listener with an idle timeout.
174-
// When there are no active connections for at least the timeout duration a
175-
// call to accept will fail with ErrIdleTimeout.
175+
//
176+
// When there are no active connections for at least the timeout duration,
177+
// calls to Accept will fail with ErrIdleTimeout.
178+
//
179+
// A connection is considered inactive as soon as its Close method is called.
176180
func NewIdleListener(timeout time.Duration, wrap Listener) Listener {
177181
l := &idleListener{
178-
timeout: timeout,
179-
wrapped: wrap,
180-
newConns: make(chan *idleCloser),
181-
closed: make(chan struct{}),
182-
wasTimeout: make(chan struct{}),
182+
wrapped: wrap,
183+
timeout: timeout,
184+
active: make(chan int, 1),
185+
timedOut: make(chan struct{}),
186+
idleTimer: make(chan *time.Timer, 1),
183187
}
184-
go l.run()
188+
l.idleTimer <- time.AfterFunc(l.timeout, l.timerExpired)
185189
return l
186190
}
187191

188192
type idleListener struct {
189-
wrapped Listener
190-
timeout time.Duration
191-
newConns chan *idleCloser
192-
closed chan struct{}
193-
wasTimeout chan struct{}
194-
closeOnce sync.Once
195-
}
193+
wrapped Listener
194+
timeout time.Duration
196195

197-
type idleCloser struct {
198-
wrapped io.ReadWriteCloser
199-
closed chan struct{}
200-
closeOnce sync.Once
196+
// Only one of these channels is receivable at any given time.
197+
active chan int // count of active connections; closed when Close is called if not timed out
198+
timedOut chan struct{} // closed when the idle timer expires
199+
idleTimer chan *time.Timer // holds the timer only when idle
201200
}
202201

203-
func (c *idleCloser) Read(p []byte) (int, error) {
204-
n, err := c.wrapped.Read(p)
205-
if err != nil && isClosingError(err) {
206-
c.closeOnce.Do(func() { close(c.closed) })
202+
// Accept accepts an incoming connection.
203+
//
204+
// If an incoming connection is accepted concurrent to the listener being closed
205+
// due to idleness, the new connection is immediately closed.
206+
func (l *idleListener) Accept(ctx context.Context) (io.ReadWriteCloser, error) {
207+
rwc, err := l.wrapped.Accept(ctx)
208+
209+
if err != nil && !isClosingError(err) {
210+
return nil, err
207211
}
208-
return n, err
209-
}
210212

211-
func (c *idleCloser) Write(p []byte) (int, error) {
212-
// we do not close on write failure, we rely on the wrapped writer to do that
213-
// if it is appropriate, which we will detect in the next read.
214-
return c.wrapped.Write(p)
215-
}
213+
select {
214+
case n, ok := <-l.active:
215+
if err != nil {
216+
if ok {
217+
l.active <- n
218+
}
219+
return nil, err
220+
}
221+
if ok {
222+
l.active <- n + 1
223+
} else {
224+
// l.wrapped.Close Close has been called, but Accept returned a
225+
// connection. This race can occur with concurrent Accept and Close calls
226+
// with any net.Listener, and it is benign: since the listener was closed
227+
// explicitly, it can't have also timed out.
228+
}
229+
return l.newConn(rwc), nil
216230

217-
func (c *idleCloser) Close() error {
218-
// we rely on closing the wrapped stream to signal to the next read that we
219-
// are closed, rather than triggering the closed signal directly
220-
return c.wrapped.Close()
221-
}
231+
case <-l.timedOut:
232+
if err == nil {
233+
// Keeping the connection open would leave the listener simultaneously
234+
// active and closed due to idleness, which would be contradictory and
235+
// confusing. Close the connection and pretend that it never happened.
236+
rwc.Close()
237+
}
238+
return nil, ErrIdleTimeout
222239

223-
func (l *idleListener) Accept(ctx context.Context) (io.ReadWriteCloser, error) {
224-
rwc, err := l.wrapped.Accept(ctx)
225-
if err != nil {
226-
if isClosingError(err) {
227-
// underlying listener was closed
228-
l.closeOnce.Do(func() { close(l.closed) })
229-
// was it closed because of the idle timeout?
230-
select {
231-
case <-l.wasTimeout:
232-
err = ErrIdleTimeout
233-
default:
234-
}
240+
case timer := <-l.idleTimer:
241+
if err != nil {
242+
// The idle timer hasn't run yet, so err can't be ErrIdleTimeout.
243+
// Leave the idle timer as it was and return whatever error we got.
244+
l.idleTimer <- timer
245+
return nil, err
235246
}
236-
return nil, err
237-
}
238-
conn := &idleCloser{
239-
wrapped: rwc,
240-
closed: make(chan struct{}),
247+
248+
if !timer.Stop() {
249+
// Failed to stop the timer — the timer goroutine is in the process of
250+
// firing. Send the timer back to the timer goroutine so that it can
251+
// safely close the timedOut channel, and then wait for the listener to
252+
// actually be closed before we return ErrIdleTimeout.
253+
l.idleTimer <- timer
254+
rwc.Close()
255+
<-l.timedOut
256+
return nil, ErrIdleTimeout
257+
}
258+
259+
l.active <- 1
260+
return l.newConn(rwc), nil
241261
}
242-
l.newConns <- conn
243-
return conn, err
244262
}
245263

246264
func (l *idleListener) Close() error {
247-
defer l.closeOnce.Do(func() { close(l.closed) })
265+
select {
266+
case _, ok := <-l.active:
267+
if ok {
268+
close(l.active)
269+
}
270+
271+
case <-l.timedOut:
272+
// Already closed by the timer; take care not to double-close if the caller
273+
// only explicitly invokes this Close method once, since the io.Closer
274+
// interface explicitly leaves doubled Close calls undefined.
275+
return ErrIdleTimeout
276+
277+
case timer := <-l.idleTimer:
278+
if !timer.Stop() {
279+
// Couldn't stop the timer. It shouldn't take long to run, so just wait
280+
// (so that the Listener is guaranteed to be closed before we return)
281+
// and pretend that this call happened afterward.
282+
// That way we won't leak any timers or goroutines when Close returns.
283+
l.idleTimer <- timer
284+
<-l.timedOut
285+
return ErrIdleTimeout
286+
}
287+
close(l.active)
288+
}
289+
248290
return l.wrapped.Close()
249291
}
250292

251293
func (l *idleListener) Dialer() Dialer {
252294
return l.wrapped.Dialer()
253295
}
254296

255-
func (l *idleListener) run() {
256-
var conns []*idleCloser
257-
for {
258-
var firstClosed chan struct{} // left at nil if there are no active conns
259-
var timeout <-chan time.Time // left at nil if there are active conns
260-
if len(conns) > 0 {
261-
firstClosed = conns[0].closed
297+
func (l *idleListener) timerExpired() {
298+
select {
299+
case n, ok := <-l.active:
300+
if ok {
301+
panic(fmt.Sprintf("jsonrpc2: idleListener idle timer fired with %d connections still active", n))
262302
} else {
263-
timeout = time.After(l.timeout)
303+
panic("jsonrpc2: Close finished with idle timer still running")
264304
}
265-
select {
266-
case <-l.closed:
267-
// the main listener closed, no need to keep going
305+
306+
case <-l.timedOut:
307+
panic("jsonrpc2: idleListener idle timer fired more than once")
308+
309+
case <-l.idleTimer:
310+
// The timer for this very call!
311+
}
312+
313+
// Close the Listener with all channels still blocked to ensure that this call
314+
// to l.wrapped.Close doesn't race with the one in l.Close.
315+
defer close(l.timedOut)
316+
l.wrapped.Close()
317+
}
318+
319+
func (l *idleListener) connClosed() {
320+
select {
321+
case n, ok := <-l.active:
322+
if !ok {
323+
// l is already closed, so it can't close due to idleness,
324+
// and we don't need to track the number of active connections any more.
268325
return
269-
case conn := <-l.newConns:
270-
// a new conn arrived, add it to the list
271-
conns = append(conns, conn)
272-
case <-timeout:
273-
// we timed out, only happens when there are no active conns
274-
// close the underlying listener, and allow the normal closing process to happen
275-
close(l.wasTimeout)
276-
l.wrapped.Close()
277-
case <-firstClosed:
278-
// a conn closed, remove it from the active list
279-
conns = conns[:copy(conns, conns[1:])]
280326
}
327+
n--
328+
if n == 0 {
329+
l.idleTimer <- time.AfterFunc(l.timeout, l.timerExpired)
330+
} else {
331+
l.active <- n
332+
}
333+
334+
case <-l.timedOut:
335+
panic("jsonrpc2: idleListener idle timer fired before last active connection was closed")
336+
337+
case <-l.idleTimer:
338+
panic("jsonrpc2: idleListener idle timer active before last active connection was closed")
281339
}
282340
}
341+
342+
type idleListenerConn struct {
343+
wrapped io.ReadWriteCloser
344+
l *idleListener
345+
closeOnce sync.Once
346+
}
347+
348+
func (l *idleListener) newConn(rwc io.ReadWriteCloser) *idleListenerConn {
349+
c := &idleListenerConn{
350+
wrapped: rwc,
351+
l: l,
352+
}
353+
354+
// A caller that forgets to call Close may disrupt the idleListener's
355+
// accounting, even though the file descriptor for the underlying connection
356+
// may eventually be garbage-collected anyway.
357+
//
358+
// Set a (best-effort) finalizer to verify that a Close call always occurs.
359+
// (We will clear the finalizer explicitly in Close.)
360+
runtime.SetFinalizer(c, func(c *idleListenerConn) {
361+
panic("jsonrpc2: IdleListener connection became unreachable without a call to Close")
362+
})
363+
364+
return c
365+
}
366+
367+
func (c *idleListenerConn) Read(p []byte) (int, error) { return c.wrapped.Read(p) }
368+
func (c *idleListenerConn) Write(p []byte) (int, error) { return c.wrapped.Write(p) }
369+
370+
func (c *idleListenerConn) Close() error {
371+
defer c.closeOnce.Do(func() {
372+
c.l.connClosed()
373+
runtime.SetFinalizer(c, nil)
374+
})
375+
return c.wrapped.Close()
376+
}

0 commit comments

Comments
 (0)