Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,6 @@ melos_overrides.yaml
# FVM Version Cache
.fvm/
.fvmrc

# Local history
.history
1 change: 1 addition & 0 deletions plugins/http2_adapter/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ See the [Migration Guide][] for the complete breaking changes list.**
## Unreleased

- Add `handshakeTimeout` (defaults to 15 seconds) to the `ConnectionManager` to prevent long waiting if there's something wrong with the handshake procedure.
- Fix `StateError: Bad state: Cannot add event after closing` caused by race condition e.g. when the server closed the connection before receiving the request body.

## 2.6.0

Expand Down
55 changes: 43 additions & 12 deletions plugins/http2_adapter/lib/src/http2_adapter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -126,39 +126,70 @@ class Http2Adapter implements HttpClientAdapter {
final streamWR = WeakReference<ClientTransportStream>(stream);

final hasRequestData = requestStream != null;
if (hasRequestData && cancelFuture != null) {
cancelFuture.whenComplete(() {
streamWR.target?.outgoingMessages.close();
});
}

List<Uint8List>? list;
if (!excludeMethods.contains(options.method) && hasRequestData) {
list = await requestStream.toList();
requestStream = Stream.fromIterable(list);
}

if (hasRequestData) {
Future<dynamic> requestStreamFuture = requestStream!.listen((data) {
//TODO(EVERYONE): Investigate why this statement can cause "StateError: Bad state: Cannot add event after closing"
stream.outgoingMessages.add(DataStreamMessage(data));
}).asFuture();
StreamSubscription<Uint8List>? requestSub;
final requestCompleter = Completer<void>();

requestSub = requestStream!.listen(
(Uint8List data) {
try {
stream.outgoingMessages.add(DataStreamMessage(data));
} on StateError {
requestSub?.cancel();
if (!requestCompleter.isCompleted) {
requestCompleter.complete();
}
}
},
onError: (Object e, StackTrace st) {
if (!requestCompleter.isCompleted) {
requestCompleter.completeError(e, st);
}
},
onDone: () {
if (!requestCompleter.isCompleted) {
requestCompleter.complete();
}
},
cancelOnError: true,
);

if (cancelFuture != null) {
cancelFuture.whenComplete(() {
requestSub?.cancel().catchError((_) {}).whenComplete(() {
streamWR.target?.outgoingMessages.close().catchError((_) {});
});
});
}

Future<dynamic> requestStreamFuture = requestCompleter.future;
final sendTimeout = options.sendTimeout ?? Duration.zero;
if (sendTimeout > Duration.zero) {
requestStreamFuture = requestStreamFuture.timeout(
sendTimeout,
onTimeout: () {
stream.outgoingMessages.close().catchError((_) {});
requestSub?.cancel().catchError((_) {});
streamWR.target?.outgoingMessages.close().catchError((_) {});
throw DioException.sendTimeout(
timeout: sendTimeout,
requestOptions: options,
);
},
);
}

await requestStreamFuture;
}
await stream.outgoingMessages.close();

try {
await stream.outgoingMessages.close();
} catch (_) {}

final responseSink = StreamController<Uint8List>();
final responseHeaders = Headers();
Expand Down
60 changes: 60 additions & 0 deletions plugins/http2_adapter/test/http2_test.dart
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import 'dart:async';
import 'dart:io';
import 'dart:typed_data';

import 'package:dio/dio.dart';
import 'package:dio_http2_adapter/dio_http2_adapter.dart';
import 'package:dio_test/util.dart';
import 'package:http2/transport.dart';
import 'package:test/test.dart';

void main() {
Expand Down Expand Up @@ -99,6 +102,63 @@ void main() {
expect(res.data.toString(), contains('TEST'));
});

test(
'request does not fail with StateError when the server closes the stream before client sends body',
() async {
final serverSocket =
await ServerSocket.bind(InternetAddress.loopbackIPv4, 0);

serverSocket.listen((rawSocket) {
final serverConn = ServerTransportConnection.viaSocket(rawSocket);

serverConn.incomingStreams.listen((ServerTransportStream stream) async {
await for (final msg in stream.incomingMessages) {
if (msg is HeadersStreamMessage) {
stream.terminate();
break;
}
}
});
});

final dio = Dio();
final adapter = Http2Adapter(null);
dio.httpClientAdapter = adapter;

final Stream<Uint8List> requestStream = (() async* {
for (int i = 0; i < 20; i++) {
await Future.delayed(const Duration(milliseconds: 5));
yield Uint8List.fromList(List.filled(1024, i));
}
})();

final completer = Completer<Object?>();

runZonedGuarded(() async {
await adapter.fetch(
RequestOptions(
path: '/test',
method: 'POST',
baseUrl: 'http://127.0.0.1:${serverSocket.port}',
headers: {},
),
requestStream,
null,
);
completer.complete(null);
}, (e, _) {
if (!completer.isCompleted) {
completer.complete(e);
}
});

final result = await completer.future;
expect(result, isNot(isA<StateError>()));

adapter.close(force: true);
await serverSocket.close();
});

group(ConnectionManager, () {
test('returns correct connection', () async {
final manager = ConnectionManager();
Expand Down