Skip to content

Commit 1e6032e

Browse files
authored
connection pool for limit maximum number of concurrent connections
1 parent 24af94b commit 1e6032e

File tree

1 file changed

+173
-55
lines changed

1 file changed

+173
-55
lines changed

memcache/memcache.go

Lines changed: 173 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ const (
7070
// DefaultMaxIdleConns is the default maximum number of idle connections
7171
// kept for any single address.
7272
DefaultMaxIdleConns = 2
73+
74+
// default of check conn alive interval
75+
DefaultKeepAliveInterval = time.Second * 10
7376
)
7477

7578
const buffered = 8 // arbitrary buffered channel size, for readability
@@ -126,7 +129,11 @@ func New(server ...string) *Client {
126129

127130
// NewFromSelector returns a new Client using the provided ServerSelector.
128131
func NewFromSelector(ss ServerSelector) *Client {
129-
return &Client{selector: ss}
132+
return &Client{
133+
selector: ss,
134+
closeGoroutine: make(chan struct{}),
135+
keepAliveExited: make(chan struct{}),
136+
}
130137
}
131138

132139
// Client is a memcache client.
@@ -154,8 +161,10 @@ type Client struct {
154161

155162
selector ServerSelector
156163

157-
lk sync.Mutex
158-
freeconn map[string][]*conn
164+
freeconn map[string]chan *conn
165+
166+
closeGoroutine chan struct{}
167+
keepAliveExited chan struct{}
159168
}
160169

161170
// Item is an item to be got or stored in a memcached server.
@@ -207,37 +216,145 @@ func (cn *conn) condRelease(err *error) {
207216
if *err == nil || resumableError(*err) {
208217
cn.release()
209218
} else {
219+
// this conn is not work well, return a new conn
220+
go func() {
221+
cnNew := cn.c.createNewConn(cn.addr)
222+
cn.c.putFreeConn(cn.addr, cnNew)
223+
}()
224+
// release error conn
210225
cn.nc.Close()
211226
}
212227
}
213228

214-
func (c *Client) putFreeConn(addr net.Addr, cn *conn) {
215-
c.lk.Lock()
216-
defer c.lk.Unlock()
229+
// create a new conn, must success or cancel by caller
230+
func (c *Client) createNewConn(addr net.Addr) *conn {
231+
for {
232+
233+
select {
234+
case <-c.closeGoroutine:
235+
return nil
236+
case <-time.After(time.Second):
237+
nc, err := c.dial(addr)
238+
if err == nil {
239+
return &conn{
240+
nc: nc,
241+
addr: addr,
242+
rw: bufio.NewReadWriter(bufio.NewReader(nc), bufio.NewWriter(nc)),
243+
c: c,
244+
}
245+
}
246+
}
247+
}
248+
}
249+
250+
// prepare a conn pool and put in channel
251+
func (c *Client) PreparePool() {
252+
// todo: remove old
253+
217254
if c.freeconn == nil {
218-
c.freeconn = make(map[string][]*conn)
255+
c.freeconn = make(map[string]chan *conn)
256+
}
257+
258+
c.selector.Each(func(a net.Addr) error {
259+
c.freeconn[a.String()] = make(chan *conn, c.maxIdleConns())
260+
for i := 0; i < c.maxIdleConns(); i++ {
261+
262+
go func() {
263+
// c.freeconn[a.String()] <- c.createNewConn(a)
264+
c.putFreeConn(a, c.createNewConn(a))
265+
}()
266+
}
267+
return nil
268+
})
269+
270+
c.keepAlive()
271+
}
272+
273+
// check and keep alive of all conn
274+
func (c *Client) keepAlive() {
275+
keepAliveItem := func(cn *conn, start <-chan struct{}) {
276+
277+
addr := cn.addr
278+
go func() {
279+
280+
<-start
281+
// TODO: cache panic then create new ?
282+
if cn.nc.SetDeadline(time.Now().Add(time.Second*1)) == nil {
283+
if subPing(cn.rw) == nil {
284+
// alive, give conn back
285+
c.putFreeConn(addr, cn)
286+
return
287+
}
288+
}
289+
290+
// disconnected, create new
291+
cn.nc.Close()
292+
cnNew := c.createNewConn(addr)
293+
c.putFreeConn(addr, cnNew)
294+
}()
295+
296+
}
297+
298+
keepAliveList := func(freelist <-chan *conn) {
299+
start := make(chan struct{})
300+
defer close(start)
301+
for {
302+
select {
303+
case cn, ok := <-freelist:
304+
if !ok {
305+
return
306+
}
307+
keepAliveItem(cn, start)
308+
default:
309+
return
310+
}
311+
}
219312
}
313+
314+
// check conn alive
315+
go func() {
316+
defer close(c.keepAliveExited)
317+
for {
318+
select {
319+
case <-c.closeGoroutine:
320+
return
321+
case <-time.After(DefaultKeepAliveInterval):
322+
323+
for _, freelist := range c.freeconn {
324+
keepAliveList(freelist)
325+
}
326+
}
327+
328+
}
329+
}()
330+
}
331+
332+
func (c *Client) putFreeConn(addr net.Addr, cn *conn) {
333+
if cn == nil {
334+
return
335+
}
336+
337+
// fmt.Println("memcache: put")
338+
// defer fmt.Println("memcache: put ok")
339+
220340
freelist := c.freeconn[addr.String()]
221-
if len(freelist) >= c.maxIdleConns() {
341+
342+
select {
343+
case freelist <- cn:
344+
default:
222345
cn.nc.Close()
223-
return
224346
}
225-
c.freeconn[addr.String()] = append(freelist, cn)
347+
226348
}
227349

228350
func (c *Client) getFreeConn(addr net.Addr) (cn *conn, ok bool) {
229-
c.lk.Lock()
230-
defer c.lk.Unlock()
231-
if c.freeconn == nil {
232-
return nil, false
233-
}
234-
freelist, ok := c.freeconn[addr.String()]
235-
if !ok || len(freelist) == 0 {
351+
352+
select {
353+
case cn = <-c.freeconn[addr.String()]:
354+
return cn, true
355+
case <-time.After(time.Second * 5):
236356
return nil, false
237357
}
238-
cn = freelist[len(freelist)-1]
239-
c.freeconn[addr.String()] = freelist[:len(freelist)-1]
240-
return cn, true
241358
}
242359

243360
func (c *Client) netTimeout() time.Duration {
@@ -294,19 +411,15 @@ func (c *Client) getConn(addr net.Addr) (*conn, error) {
294411
if ok {
295412
cn.extendDeadline()
296413
return cn, nil
414+
} else {
415+
return nil, errors.New("no free connection available")
297416
}
298-
nc, err := c.dial(addr)
299-
if err != nil {
300-
return nil, err
301-
}
302-
cn = &conn{
303-
nc: nc,
304-
addr: addr,
305-
rw: bufio.NewReadWriter(bufio.NewReader(nc), bufio.NewWriter(nc)),
306-
c: c,
307-
}
308-
cn.extendDeadline()
309-
return cn, nil
417+
// cn, err := c.createNewConn(addr)
418+
// if err != nil {
419+
// return nil, err
420+
// }
421+
// cn.extendDeadline()
422+
// return cn, nil
310423
}
311424

312425
func (c *Client) onItem(item *Item, fn func(*Client, *bufio.ReadWriter, *Item) error) error {
@@ -416,28 +529,30 @@ func (c *Client) flushAllFromAddr(addr net.Addr) error {
416529
})
417530
}
418531

532+
func subPing(rw *bufio.ReadWriter) error {
533+
if _, err := fmt.Fprintf(rw, "version\r\n"); err != nil {
534+
return err
535+
}
536+
if err := rw.Flush(); err != nil {
537+
return err
538+
}
539+
line, err := rw.ReadSlice('\n')
540+
if err != nil {
541+
return err
542+
}
543+
544+
switch {
545+
case bytes.HasPrefix(line, versionPrefix):
546+
break
547+
default:
548+
return fmt.Errorf("memcache: unexpected response line from ping: %q", string(line))
549+
}
550+
return nil
551+
}
552+
419553
// ping sends the version command to the given addr
420554
func (c *Client) ping(addr net.Addr) error {
421-
return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error {
422-
if _, err := fmt.Fprintf(rw, "version\r\n"); err != nil {
423-
return err
424-
}
425-
if err := rw.Flush(); err != nil {
426-
return err
427-
}
428-
line, err := rw.ReadSlice('\n')
429-
if err != nil {
430-
return err
431-
}
432-
433-
switch {
434-
case bytes.HasPrefix(line, versionPrefix):
435-
break
436-
default:
437-
return fmt.Errorf("memcache: unexpected response line from ping: %q", string(line))
438-
}
439-
return nil
440-
})
555+
return c.withAddrRw(addr, subPing)
441556
}
442557

443558
func (c *Client) touchFromAddr(addr net.Addr, keys []string, expiration int32) error {
@@ -761,11 +876,14 @@ func (c *Client) incrDecr(verb, key string, delta uint64) (uint64, error) {
761876
//
762877
// After Close, the Client may still be used.
763878
func (c *Client) Close() error {
764-
c.lk.Lock()
765-
defer c.lk.Unlock()
766879
var ret error
880+
881+
close(c.closeGoroutine)
882+
883+
<-c.keepAliveExited
884+
767885
for _, conns := range c.freeconn {
768-
for _, c := range conns {
886+
for c := range conns {
769887
if err := c.nc.Close(); err != nil && ret == nil {
770888
ret = err
771889
}

0 commit comments

Comments
 (0)