Skip to content

Commit 26b5dbb

Browse files
author
Steve Browne
committed
Implemented a basic transport based on the fetch api to replace xhr.
1 parent 94b71a3 commit 26b5dbb

File tree

2 files changed

+347
-2
lines changed

2 files changed

+347
-2
lines changed
Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
// Copyright (c) 2022, the gRPC project authors. Please see the AUTHORS file
2+
// for details. All rights reserved.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
import 'dart:async';
17+
import 'dart:html';
18+
import 'dart:js_util' as js_util;
19+
import 'dart:typed_data';
20+
21+
import 'package:async/async.dart';
22+
import 'package:meta/meta.dart';
23+
24+
import '../../client/call.dart';
25+
import '../../shared/message.dart';
26+
import '../../shared/status.dart';
27+
import '../connection.dart';
28+
import 'cors.dart' as cors;
29+
import 'transport.dart';
30+
import 'web_streams.dart';
31+
32+
const _contentTypeKey = 'Content-Type';
33+
34+
/// Implementation of Fetch API simulating @HttpRequest for minimal changes
35+
class FetchHttpRequest {
36+
// Request parameters
37+
var method = 'GET';
38+
var uri = '';
39+
var referrerPolicy = 'origin';
40+
var mode = 'cors';
41+
var credentials = 'omit';
42+
var cache = 'default';
43+
var redirect = 'follow';
44+
var integrity = '';
45+
var keepAlive = true;
46+
var headers = <String, String>{};
47+
var readyState = HttpRequest.UNSENT;
48+
set withCredentials(bool value) => credentials = value ? 'include' : 'omit';
49+
set responseType(String unused) {}
50+
51+
// Streams and controllers
52+
final onReadyStateChangeController = StreamController<int>.broadcast();
53+
Stream<int> get onReadyStateChange => onReadyStateChangeController.stream;
54+
final onProgressController = StreamController<Uint8List>.broadcast();
55+
Stream<Uint8List> get onProgress => onProgressController.stream;
56+
final onErrorController = StreamController<int>.broadcast();
57+
Stream<int> get onError => onErrorController.stream;
58+
59+
// Response information
60+
CancelableOperation<dynamic>? _cancelable;
61+
dynamic _response;
62+
dynamic get response => _response;
63+
int get status =>
64+
response != null ? js_util.getProperty(response, 'status') : 0;
65+
Map<String, String> get responseHeaders => response != null
66+
? toDartMap(js_util.getProperty(response, 'headers'))
67+
: <String, String>{};
68+
String get responseText =>
69+
response != null ? js_util.getProperty(response, 'statusText') : '';
70+
dynamic get body =>
71+
response != null ? js_util.getProperty(response, 'body') : null;
72+
73+
static Map<String, String> toDartMap(Headers obj) =>
74+
Map.fromIterable(getObjectKeys(obj),
75+
value: (key) => js_util.callMethod(obj, 'get', [key]).toString());
76+
77+
static List<String> getObjectKeys(Headers obj) =>
78+
List<String>.from(js_util.callMethod(obj, 'keys', []));
79+
80+
Future send([List<int>? data]) async {
81+
final wgs = WorkerGlobalScope.instance;
82+
_setReadyState(HttpRequest.LOADING);
83+
final init = <String, dynamic>{
84+
'method': method,
85+
'referrerPolicy': referrerPolicy,
86+
'mode': mode,
87+
'credentials': credentials,
88+
'cache': cache,
89+
'redirect': redirect,
90+
'integrity': integrity,
91+
'keepalive': keepAlive,
92+
if (headers.isNotEmpty) 'headers': headers,
93+
if (data != null) 'body': String.fromCharCodes(data),
94+
};
95+
final operation =
96+
_cancelable = CancelableOperation.fromFuture(wgs.fetch(uri, init));
97+
98+
_response = await operation.value;
99+
_setReadyState(HttpRequest.HEADERS_RECEIVED);
100+
if (status < 200 || status >= 300) {
101+
onErrorController.add(status);
102+
}
103+
104+
final reader = body?.getReader();
105+
if (reader == null) {
106+
onErrorController.add(0);
107+
return;
108+
}
109+
110+
while (true) {
111+
final result = await js_util.promiseToFuture(reader.read());
112+
final value = js_util.getProperty(result, 'value');
113+
if (value != null) {
114+
onProgressController.add(value as Uint8List);
115+
}
116+
if (js_util.getProperty(result, 'done')) {
117+
_setReadyState(HttpRequest.DONE);
118+
break;
119+
}
120+
}
121+
}
122+
123+
void _setReadyState(int state) {
124+
readyState = state;
125+
onReadyStateChangeController.add(state);
126+
if (state == HttpRequest.DONE) {}
127+
}
128+
129+
void open(String method, String uri) {
130+
this.method = method;
131+
this.uri = uri;
132+
_setReadyState(HttpRequest.OPENED);
133+
}
134+
135+
void abort() async {
136+
await _cancelable?.cancel();
137+
close();
138+
}
139+
140+
void close() {
141+
onReadyStateChangeController.close();
142+
onProgressController.close();
143+
onErrorController.close();
144+
}
145+
146+
void setRequestHeader(String name, String value) {
147+
headers[name] = value;
148+
}
149+
150+
void overrideMimeType(String mimeType) {}
151+
}
152+
153+
class FetchTransportStream implements GrpcTransportStream {
154+
final FetchHttpRequest _request;
155+
final ErrorHandler _onError;
156+
final Function(FetchTransportStream stream) _onDone;
157+
bool _headersReceived = false;
158+
final StreamController<ByteBuffer> _incomingProcessor = StreamController();
159+
final StreamController<GrpcMessage> _incomingMessages = StreamController();
160+
final StreamController<List<int>> _outgoingMessages = StreamController();
161+
162+
@override
163+
Stream<GrpcMessage> get incomingMessages => _incomingMessages.stream;
164+
165+
@override
166+
StreamSink<List<int>> get outgoingMessages => _outgoingMessages.sink;
167+
168+
FetchTransportStream(this._request,
169+
{required ErrorHandler onError, required onDone})
170+
: _onError = onError,
171+
_onDone = onDone {
172+
_outgoingMessages.stream
173+
.map(frame)
174+
.listen((data) => _request.send(data), cancelOnError: true);
175+
176+
_request.onReadyStateChange.listen((data) {
177+
if (_incomingProcessor.isClosed) {
178+
return;
179+
}
180+
switch (_request.readyState) {
181+
case HttpRequest.HEADERS_RECEIVED:
182+
_onHeadersReceived();
183+
break;
184+
case HttpRequest.DONE:
185+
_onRequestDone();
186+
_close();
187+
break;
188+
}
189+
});
190+
191+
_request.onError.listen((_) {
192+
if (_incomingProcessor.isClosed) {
193+
return;
194+
}
195+
_onError(GrpcError.unavailable('FetchTransportStream connection-error'),
196+
StackTrace.current);
197+
terminate();
198+
});
199+
200+
_request.onProgress.listen((bytes) {
201+
if (_incomingProcessor.isClosed) {
202+
return;
203+
}
204+
_incomingProcessor.add(bytes.buffer);
205+
});
206+
207+
_incomingProcessor.stream
208+
.transform(GrpcWebDecoder())
209+
.transform(grpcDecompressor())
210+
.listen(_incomingMessages.add,
211+
onError: _onError, onDone: _incomingMessages.close);
212+
}
213+
214+
bool _validateResponseState() {
215+
try {
216+
validateHttpStatusAndContentType(
217+
_request.status, _request.responseHeaders,
218+
rawResponse: _request.responseText);
219+
return true;
220+
} catch (e, st) {
221+
_onError(e, st);
222+
return false;
223+
}
224+
}
225+
226+
void _onHeadersReceived() {
227+
_headersReceived = true;
228+
if (!_validateResponseState()) {
229+
return;
230+
}
231+
_incomingMessages.add(GrpcMetadata(_request.responseHeaders));
232+
}
233+
234+
void _onRequestDone() {
235+
if (!_headersReceived && !_validateResponseState()) {
236+
return;
237+
}
238+
if (_request.response == null) {
239+
_onError(
240+
GrpcError.unavailable('FetchTransportStream request null response',
241+
null, _request.responseText),
242+
StackTrace.current);
243+
return;
244+
}
245+
}
246+
247+
void _close() {
248+
_incomingProcessor.close();
249+
_outgoingMessages.close();
250+
_onDone(this);
251+
}
252+
253+
@override
254+
Future<void> terminate() async {
255+
_close();
256+
_request.abort();
257+
}
258+
}
259+
260+
class FetchClientConnection extends ClientConnection {
261+
final Uri uri;
262+
263+
final _requests = <FetchTransportStream>{};
264+
265+
FetchClientConnection(this.uri);
266+
267+
@override
268+
String get authority => uri.authority;
269+
@override
270+
String get scheme => uri.scheme;
271+
272+
void _initializeRequest(
273+
FetchHttpRequest request, Map<String, String> metadata) {
274+
for (final header in metadata.keys) {
275+
request.setRequestHeader(header, metadata[header]!);
276+
}
277+
// Overriding the mimetype allows us to stream and parse the data
278+
request.overrideMimeType('text/plain; charset=x-user-defined');
279+
request.responseType = 'text';
280+
}
281+
282+
@visibleForTesting
283+
FetchHttpRequest createRequest() => FetchHttpRequest();
284+
285+
@override
286+
GrpcTransportStream makeRequest(String path, Duration? timeout,
287+
Map<String, String> metadata, ErrorHandler onError,
288+
{CallOptions? callOptions}) {
289+
// gRPC-web headers.
290+
if (_getContentTypeHeader(metadata) == null) {
291+
metadata['Content-Type'] = 'application/grpc-web+proto';
292+
metadata['X-User-Agent'] = 'grpc-web-dart/0.1';
293+
metadata['X-Grpc-Web'] = '1';
294+
}
295+
296+
var requestUri = uri.resolve(path);
297+
if (callOptions is WebCallOptions &&
298+
callOptions.bypassCorsPreflight == true) {
299+
requestUri = cors.moveHttpHeadersToQueryParam(metadata, requestUri);
300+
}
301+
302+
final request = createRequest();
303+
request.open('POST', requestUri.toString());
304+
if (callOptions is WebCallOptions && callOptions.withCredentials == true) {
305+
request.withCredentials = true;
306+
}
307+
// Must set headers after calling open().
308+
_initializeRequest(request, metadata);
309+
310+
final transportStream =
311+
FetchTransportStream(request, onError: onError, onDone: _removeStream);
312+
_requests.add(transportStream);
313+
return transportStream;
314+
}
315+
316+
void _removeStream(FetchTransportStream stream) {
317+
_requests.remove(stream);
318+
}
319+
320+
@override
321+
Future<void> terminate() async {
322+
for (var request in List.of(_requests)) {
323+
request.terminate();
324+
}
325+
}
326+
327+
@override
328+
void dispatchCall(ClientCall call) {
329+
call.onConnectionReady(this);
330+
}
331+
332+
@override
333+
Future<void> shutdown() async {}
334+
}
335+
336+
MapEntry<String, String>? _getContentTypeHeader(Map<String, String> metadata) {
337+
for (var entry in metadata.entries) {
338+
if (entry.key.toLowerCase() == _contentTypeKey.toLowerCase()) {
339+
return entry;
340+
}
341+
}
342+
return null;
343+
}

lib/src/client/web_channel.dart

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515

1616
import 'channel.dart';
1717
import 'connection.dart';
18-
import 'transport/xhr_transport.dart';
18+
import 'transport/fetch_transport.dart';
19+
//import 'transport/xhr_transport.dart';
1920

2021
/// A channel to a grpc-web endpoint.
2122
class GrpcWebClientChannel extends ClientChannelBase {
@@ -25,6 +26,7 @@ class GrpcWebClientChannel extends ClientChannelBase {
2526

2627
@override
2728
ClientConnection createConnection() {
28-
return XhrClientConnection(uri);
29+
//return XhrClientConnection(uri);
30+
return FetchClientConnection(uri);
2931
}
3032
}

0 commit comments

Comments
 (0)