@@ -116,8 +116,6 @@ func connect(ctx context.Context, scheme, addr string, opts Options) (conn *Conn
116
116
// remove deadline
117
117
conn .tcpConn .SetDeadline (time.Time {})
118
118
119
- go conn .worker ()
120
-
121
119
return
122
120
}
123
121
@@ -490,65 +488,29 @@ func (conn *Connection) setError(err error) {
490
488
}
491
489
}
492
490
493
- func (conn * Connection ) worker () {
494
- var wg sync.WaitGroup
495
-
496
- wg .Add (2 )
497
-
498
- go func () {
499
- err := conn .writer ()
500
- conn .setError (err )
501
- conn .stop ()
502
- wg .Done ()
503
- }()
504
-
505
- go func () {
506
- err := conn .reader ()
507
- conn .setError (err )
508
- conn .stop ()
509
- wg .Done ()
510
- }()
511
-
512
- wg .Wait ()
513
-
514
- // release all pending packets
515
- writeChan := conn .writeChan
516
-
517
- CLEANUP_LOOP:
518
- for {
519
- select {
520
- case req := <- writeChan :
521
- pp := req .packet
522
- if pp != nil {
523
- req .packet = nil
524
- conn .releasePacket (pp )
525
- }
526
- default :
527
- break CLEANUP_LOOP
528
- }
529
- }
530
-
531
- // send error reply to all pending requests
532
- conn .requests .CleanUp (func (req * request ) {
533
- select {
534
- case req .replyChan <- & AsyncResult {
535
- Error : ConnectionClosedError (conn ),
536
- ErrorCode : ErrNoConnection ,
537
- Opaque : req .opaque ,
538
- }:
539
- default :
540
- }
541
- requestPool .Put (req )
542
- })
543
-
544
- close (conn .closed )
545
- }
546
-
547
491
func (conn * Connection ) writer () (err error ) {
548
492
writeChan := conn .writeChan
549
493
stopChan := conn .exit
550
494
w := bufio .NewWriterSize (conn .ccw , DefaultWriterBufSize )
551
495
496
+ defer close (conn .closed )
497
+
498
+ defer func () {
499
+ CLEANUP_LOOP:
500
+ for {
501
+ select {
502
+ case req := <- writeChan :
503
+ pp := req .packet
504
+ if pp != nil {
505
+ req .packet = nil
506
+ conn .releasePacket (pp )
507
+ }
508
+ default :
509
+ break CLEANUP_LOOP
510
+ }
511
+ }
512
+ }()
513
+
552
514
wr := func (w io.Writer , req * request ) error {
553
515
packet := req .packet
554
516
@@ -604,6 +566,25 @@ func (conn *Connection) reader() (err error) {
604
566
var pp * BinaryPacket
605
567
var requestID uint64
606
568
569
+ defer func () {
570
+ <- conn .closed
571
+ }()
572
+
573
+ defer func () {
574
+ // send error reply to all pending requests
575
+ conn .requests .CleanUp (func (req * request ) {
576
+ select {
577
+ case req .replyChan <- & AsyncResult {
578
+ Error : ConnectionClosedError (conn ),
579
+ ErrorCode : ErrNoConnection ,
580
+ Opaque : req .opaque ,
581
+ }:
582
+ default :
583
+ }
584
+ requestPool .Put (req )
585
+ })
586
+ }()
587
+
607
588
r := bufio .NewReaderSize (conn .ccr , DefaultReaderBufSize )
608
589
609
590
READER_LOOP:
0 commit comments