@@ -22,7 +22,7 @@ public class AdaptedPipeline : IDuplexPipe, IDisposable
2222 public AdaptedPipeline ( Pipe inputPipe ,
2323 Pipe outputPipe ,
2424 ILogger log ,
25- IDuplexPipe transport = null )
25+ IDuplexPipe transport )
2626 {
2727 _transport = transport ;
2828 Input = inputPipe ;
@@ -65,6 +65,11 @@ private async Task WriteOutputAsync(Stream stream)
6565
6666 try
6767 {
68+ if ( result . IsCanceled )
69+ {
70+ break ;
71+ }
72+
6873 if ( buffer . IsEmpty )
6974 {
7075 if ( result . IsCompleted )
@@ -113,7 +118,10 @@ private async Task WriteOutputAsync(Stream stream)
113118 {
114119 Output . Reader . Complete ( ) ;
115120
116- _transport ? . Output . Complete ( ) ;
121+ _transport . Output . Complete ( ) ;
122+
123+ // Cancel any pending flushes due to back-pressure
124+ Input . Writer . CancelPendingFlush ( ) ;
117125 }
118126 }
119127
@@ -151,7 +159,7 @@ private async Task ReadInputAsync(Stream stream)
151159
152160 var result = await Input . Writer . FlushAsync ( ) ;
153161
154- if ( result . IsCompleted )
162+ if ( result . IsCompleted || result . IsCanceled )
155163 {
156164 break ;
157165 }
@@ -168,7 +176,10 @@ private async Task ReadInputAsync(Stream stream)
168176
169177 // The application could have ended the input pipe so complete
170178 // the transport pipe as well
171- _transport ? . Input . Complete ( ) ;
179+ _transport . Input . Complete ( ) ;
180+
181+ // Cancel any pending reads from the application
182+ Output . Reader . CancelPendingRead ( ) ;
172183 }
173184 }
174185
0 commit comments