@@ -192,7 +192,7 @@ func (c *ClientConn) mainLoop() {
192
192
break
193
193
}
194
194
// TODO(dfc) A note on blocking channel use.
195
- // The msg, win, data and dataExt channels of a clientChan can
195
+ // The msg, data and dataExt channels of a clientChan can
196
196
// cause this loop to block indefinately if the consumer does
197
197
// not service them.
198
198
switch packet [0 ] {
@@ -233,7 +233,6 @@ func (c *ClientConn) mainLoop() {
233
233
case * channelCloseMsg :
234
234
ch := c .getChan (msg .PeersId )
235
235
ch .theyClosed = true
236
- close (ch .stdin .win )
237
236
ch .stdout .eof ()
238
237
ch .stderr .eof ()
239
238
close (ch .msg )
@@ -255,7 +254,10 @@ func (c *ClientConn) mainLoop() {
255
254
case * channelRequestMsg :
256
255
c .getChan (msg .PeersId ).msg <- msg
257
256
case * windowAdjustMsg :
258
- c .getChan (msg .PeersId ).stdin .win <- int (msg .AdditionalBytes )
257
+ if ! c .getChan (msg .PeersId ).stdin .win .add (msg .AdditionalBytes ) {
258
+ // invalid window update
259
+ break
260
+ }
259
261
case * disconnectMsg :
260
262
break
261
263
default :
@@ -324,7 +326,7 @@ func newClientChan(t *transport, id uint32) *clientChan {
324
326
msg : make (chan interface {}, 16 ),
325
327
}
326
328
c .stdin = & chanWriter {
327
- win : make ( chan int , 16 ) ,
329
+ win : & window { Cond : sync . NewCond ( new (sync. Mutex ))} ,
328
330
clientChan : c ,
329
331
}
330
332
c .stdout = & chanReader {
@@ -345,7 +347,7 @@ func (c *clientChan) waitForChannelOpenResponse() error {
345
347
case * channelOpenConfirmMsg :
346
348
// fixup peersId field
347
349
c .peersId = msg .MyId
348
- c .stdin .win <- int (msg .MyWindow )
350
+ c .stdin .win . add (msg .MyWindow )
349
351
return nil
350
352
case * channelOpenFailureMsg :
351
353
return errors .New (safeString (msg .Message ))
@@ -417,22 +419,16 @@ func (c *chanlist) remove(id uint32) {
417
419
418
420
// A chanWriter represents the stdin of a remote process.
419
421
type chanWriter struct {
420
- win chan int // receives window adjustments
421
- rwin int // current rwin size
422
+ win * window
422
423
clientChan * clientChan // the channel backing this writer
423
424
}
424
425
425
426
// Write writes data to the remote process's standard input.
426
427
func (w * chanWriter ) Write (data []byte ) (written int , err error ) {
427
428
for len (data ) > 0 {
428
- for w .rwin < 1 {
429
- win , ok := <- w .win
430
- if ! ok {
431
- return 0 , io .EOF
432
- }
433
- w .rwin += win
434
- }
435
- n := min (len (data ), w .rwin )
429
+ // n cannot be larger than 2^31 as len(data) cannot
430
+ // be larger than 2^31
431
+ n := int (w .win .reserve (uint32 (len (data ))))
436
432
peersId := w .clientChan .peersId
437
433
packet := []byte {
438
434
msgChannelData ,
@@ -443,7 +439,6 @@ func (w *chanWriter) Write(data []byte) (written int, err error) {
443
439
break
444
440
}
445
441
data = data [n :]
446
- w .rwin -= n
447
442
written += n
448
443
}
449
444
return
@@ -507,3 +502,46 @@ func (r *chanReader) Read(data []byte) (int, error) {
507
502
}
508
503
panic ("unreachable" )
509
504
}
505
+
506
+ // window represents the buffer available to clients
507
+ // wishing to write to a channel.
508
+ type window struct {
509
+ * sync.Cond
510
+ win uint32 // RFC 4254 5.2 says the window size can grow to 2^32-1
511
+ }
512
+
513
+ // add adds win to the amount of window available
514
+ // for consumers.
515
+ func (w * window ) add (win uint32 ) bool {
516
+ if win == 0 {
517
+ return false
518
+ }
519
+ w .L .Lock ()
520
+ if w .win + win < win {
521
+ w .L .Unlock ()
522
+ return false
523
+ }
524
+ w .win += win
525
+ // It is unusual that multiple goroutines would be attempting to reserve
526
+ // window space, but not guaranteed. Use broadcast to notify all waiters
527
+ // that additional window is available.
528
+ w .Broadcast ()
529
+ w .L .Unlock ()
530
+ return true
531
+ }
532
+
533
+ // reserve reserves win from the available window capacity.
534
+ // If no capacity remains, reserve will block. reserve may
535
+ // return less than requested.
536
+ func (w * window ) reserve (win uint32 ) uint32 {
537
+ w .L .Lock ()
538
+ for w .win == 0 {
539
+ w .Wait ()
540
+ }
541
+ if w .win < win {
542
+ win = w .win
543
+ }
544
+ w .win -= win
545
+ w .L .Unlock ()
546
+ return win
547
+ }
0 commit comments