Skip to content

Commit 6f92177

Browse files
authored
Merge pull request dart-archive/sse#4 from dart-lang/close-cleanup
Close cleanup
2 parents a54125b + c0a5bc4 commit 6f92177

File tree

6 files changed

+79
-36
lines changed

6 files changed

+79
-36
lines changed

pkgs/sse/.travis.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ language: dart
44
sudo: required
55
dist: trusty
66
addons:
7+
chrome: stable
78
apt:
89
sources:
910
- google-chrome
@@ -29,7 +30,7 @@ before_install:
2930
- sh -e /etc/init.d/xvfb start
3031

3132
before_script:
32-
- wget http://chromedriver.storage.googleapis.com/2.35/chromedriver_linux64.zip
33+
- wget http://chromedriver.storage.googleapis.com/2.46/chromedriver_linux64.zip
3334
- unzip chromedriver_linux64.zip
3435
- export PATH=$PATH:$PWD
3536
- ./tool/travis-setup.sh

pkgs/sse/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
## 2.0.0
2+
3+
- No longer expose `close` and `onClose` on an `SseConnection`. This is simply
4+
handled by the underlying `stream` / `sink`.
5+
- Fix a bug where resources of the `SseConnection` were not properly closed.
6+
17
## 1.0.0
28

39
- Internal cleanup.

pkgs/sse/lib/server/sse_handler.dart

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,28 @@ String _sseHeaders(String origin) => 'HTTP/1.1 200 OK\r\n'
2121

2222
/// A bi-directional SSE connection between server and browser.
2323
class SseConnection extends StreamChannelMixin<String> {
24+
/// Incoming messages from the Browser client.
2425
final _incomingController = StreamController<String>();
26+
27+
/// Outgoing messages to the Browser client.
2528
final _outgoingController = StreamController<String>();
26-
final _closeCompleter = Completer<Null>();
29+
2730
final Sink _sink;
28-
final String _clientId;
2931

30-
SseConnection(this._sink, this._clientId) {
32+
final _closedCompleter = Completer<void>();
33+
34+
SseConnection(this._sink) {
3135
_outgoingController.stream.listen((data) {
32-
if (!_closeCompleter.isCompleted) {
36+
if (!_closedCompleter.isCompleted) {
3337
// JSON encode the message to escape new lines.
3438
_sink.add('data: ${json.encode(data)}\n');
3539
_sink.add('\n');
3640
}
3741
});
42+
_outgoingController.onCancel = _close;
43+
_incomingController.onCancel = _close;
3844
}
3945

40-
Future get onClose => _closeCompleter.future;
41-
4246
/// The message added to the sink has to be JSON encodable.
4347
@override
4448
StreamSink<String> get sink => _outgoingController.sink;
@@ -50,8 +54,13 @@ class SseConnection extends StreamChannelMixin<String> {
5054
@override
5155
Stream<String> get stream => _incomingController.stream;
5256

53-
void close() {
54-
if (!_closeCompleter.isCompleted) _closeCompleter.complete();
57+
void _close() {
58+
if (!_closedCompleter.isCompleted) {
59+
_closedCompleter.complete();
60+
_sink.close();
61+
if (!_outgoingController.isClosed) _outgoingController.close();
62+
if (!_incomingController.isClosed) _incomingController.close();
63+
}
5564
}
5665
}
5766

@@ -63,15 +72,15 @@ class SseConnection extends StreamChannelMixin<String> {
6372
class SseHandler {
6473
final _logger = Logger('SseHandler');
6574
final Uri _uri;
66-
67-
final Set<SseConnection> _connections = Set<SseConnection>();
68-
75+
final _connections = <String, SseConnection>{};
6976
final _connectionController = StreamController<SseConnection>();
7077

78+
StreamQueue<SseConnection> _connectionsStream;
79+
7180
SseHandler(this._uri);
7281

7382
StreamQueue<SseConnection> get connections =>
74-
StreamQueue(_connectionController.stream);
83+
_connectionsStream ??= StreamQueue(_connectionController.stream);
7584

7685
shelf.Handler get handler => _handle;
7786

@@ -82,19 +91,22 @@ class SseHandler {
8291
var sink = utf8.encoder.startChunkedConversion(channel.sink);
8392
sink.add(_sseHeaders(req.headers['origin']));
8493
var clientId = req.url.queryParameters['sseClientId'];
85-
var connection = SseConnection(sink, clientId);
86-
_connections.add(connection);
87-
unawaited(connection.onClose.then((_) {
88-
_connections.remove(connection);
94+
var connection = SseConnection(sink);
95+
_connections[clientId] = connection;
96+
unawaited(connection._closedCompleter.future.then((_) {
97+
_connections.remove(clientId);
8998
}));
99+
// Remove connection when it is remotely closed or the stream is
100+
// cancelled.
90101
channel.stream.listen((_) {
91102
// SSE is unidirectional. Responses are handled through POST requests.
92103
}, onDone: () {
93-
connection.close();
104+
connection._close();
94105
});
106+
95107
_connectionController.add(connection);
96108
});
97-
return null;
109+
return shelf.Response.notFound('');
98110
}
99111

100112
String _getOriginalPath(shelf.Request req) => req.requestedUri.path;
@@ -122,11 +134,7 @@ class SseHandler {
122134
var clientId = req.url.queryParameters['sseClientId'];
123135
var message = await req.readAsString();
124136
var jsonObject = json.decode(message) as String;
125-
for (var connection in _connections) {
126-
if (connection._clientId == clientId) {
127-
connection._incomingController.add(jsonObject);
128-
}
129-
}
137+
_connections[clientId]?._incomingController?.add(jsonObject);
130138
} catch (e, st) {
131139
_logger.fine('Failed to handle incoming message. $e $st');
132140
}

pkgs/sse/pubspec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: sse
2-
version: 1.0.0
2+
version: 2.0.0
33
author: Dart Team <[email protected]>
44
homepage: https://github.com/dart-lang/sse
55
description: >-

pkgs/sse/test/sse_test.dart

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,21 @@ void main() {
1717
HttpServer server;
1818
WebDriver webdriver;
1919
SseHandler handler;
20+
Process chromeDriver;
21+
22+
setUpAll(() async {
23+
try {
24+
chromeDriver = await Process.start(
25+
'chromedriver', ['--port=4444', '--url-base=wd/hub']);
26+
} catch (e) {
27+
throw StateError(
28+
'Could not start ChromeDriver. Is it installed?\nError: $e');
29+
}
30+
});
31+
32+
tearDownAll(() {
33+
chromeDriver.kill();
34+
});
2035

2136
setUp(() async {
2237
handler = SseHandler(Uri.parse('/test'));
@@ -28,7 +43,11 @@ void main() {
2843
listDirectories: true, defaultDocument: 'index.html'));
2944

3045
server = await io.serve(cascade.handler, 'localhost', 0);
31-
webdriver = await createDriver();
46+
webdriver = await createDriver(desired: {
47+
'chromeOptions': {
48+
'args': ['--headless']
49+
}
50+
});
3251
});
3352

3453
tearDown(() async {
@@ -55,12 +74,12 @@ void main() {
5574
var connections = handler.connections;
5675
await webdriver.get('http://localhost:${server.port}');
5776
var connectionA = await connections.next;
77+
connectionA.sink.add('foo');
78+
expect(await connectionA.stream.first, 'foo');
79+
5880
await webdriver.get('http://localhost:${server.port}');
5981
var connectionB = await connections.next;
60-
61-
connectionA.sink.add('foo');
6282
connectionB.sink.add('bar');
63-
await connectionA.onClose;
6483
expect(await connectionB.stream.first, 'bar');
6584
});
6685

@@ -69,8 +88,8 @@ void main() {
6988
await webdriver.get('http://localhost:${server.port}');
7089
var connection = await handler.connections.next;
7190
expect(handler.numberOfClients, 1);
72-
connection.close();
73-
await connection.onClose;
91+
await connection.sink.close();
92+
await pumpEventQueue();
7493
expect(handler.numberOfClients, 0);
7594
});
7695

@@ -83,7 +102,20 @@ void main() {
83102
var closeButton = await webdriver.findElement(const By.tagName('button'));
84103
await closeButton.click();
85104

86-
await connection.onClose;
105+
// Should complete since the connection is closed.
106+
await connection.stream.toList();
107+
expect(handler.numberOfClients, 0);
108+
});
109+
110+
test('Cancelling the listener closes the connection', () async {
111+
expect(handler.numberOfClients, 0);
112+
await webdriver.get('http://localhost:${server.port}');
113+
var connection = await handler.connections.next;
114+
expect(handler.numberOfClients, 1);
115+
116+
var sub = connection.stream.listen((_) {});
117+
await sub.cancel();
118+
await pumpEventQueue();
87119
expect(handler.numberOfClients, 0);
88120
});
89121

pkgs/sse/tool/travis.sh

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,6 @@ if [[ $ANALYSIS_STATUS -ne 0 ]]; then
2323
STATUS=$ANALYSIS_STATUS
2424
fi
2525

26-
# Start chromedriver.
27-
chromedriver --port=4444 --url-base=wd/hub &
28-
PIDC=$!
29-
3026
# Run tests.
3127
pub run test -r expanded -p vm -j 1
3228
TEST_STATUS=$?

0 commit comments

Comments
 (0)