diff --git a/ci/licenses_golden/licenses_flutter b/ci/licenses_golden/licenses_flutter index 5d04beaa2c0fe..455aedecbd64d 100644 --- a/ci/licenses_golden/licenses_flutter +++ b/ci/licenses_golden/licenses_flutter @@ -230,6 +230,7 @@ FILE: ../../../flutter/lib/io/dart_io.cc FILE: ../../../flutter/lib/io/dart_io.h FILE: ../../../flutter/lib/snapshot/libraries.json FILE: ../../../flutter/lib/snapshot/snapshot.h +FILE: ../../../flutter/lib/ui/channel_buffers.dart FILE: ../../../flutter/lib/ui/compositing.dart FILE: ../../../flutter/lib/ui/compositing/scene.cc FILE: ../../../flutter/lib/ui/compositing/scene.h @@ -427,6 +428,7 @@ FILE: ../../../flutter/lib/web_ui/lib/src/engine/validators.dart FILE: ../../../flutter/lib/web_ui/lib/src/engine/vector_math.dart FILE: ../../../flutter/lib/web_ui/lib/src/engine/window.dart FILE: ../../../flutter/lib/web_ui/lib/src/ui/canvas.dart +FILE: ../../../flutter/lib/web_ui/lib/src/ui/channel_buffers.dart FILE: ../../../flutter/lib/web_ui/lib/src/ui/compositing.dart FILE: ../../../flutter/lib/web_ui/lib/src/ui/geometry.dart FILE: ../../../flutter/lib/web_ui/lib/src/ui/hash_codes.dart diff --git a/lib/ui/channel_buffers.dart b/lib/ui/channel_buffers.dart new file mode 100644 index 0000000000000..26ba96d1ebb6d --- /dev/null +++ b/lib/ui/channel_buffers.dart @@ -0,0 +1,196 @@ +// Copyright 2013 The Flutter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +part of dart.ui; + +/// A saved platform message for a channel with its callback. +class _StoredMessage { + /// Default constructor, takes in a [ByteData] that represents the + /// payload of the message and a [PlatformMessageResponseCallback] + /// that represents the callback that will be called when the message + /// is handled. + _StoredMessage(this._data, this._callback); + + /// Representation of the message's payload. + final ByteData _data; + ByteData get data => _data; + + /// Callback to be called when the message is received. + final PlatformMessageResponseCallback _callback; + PlatformMessageResponseCallback get callback => _callback; +} + +/// A fixed-size circular queue. +class _RingBuffer { + /// The underlying data for the RingBuffer. ListQueue's dynamically resize, + /// [_RingBuffer]s do not. + final collection.ListQueue _queue; + + _RingBuffer(this._capacity) + : _queue = collection.ListQueue(_capacity); + + /// Returns the number of items in the [_RingBuffer]. + int get length => _queue.length; + + /// The number of items that can be stored in the [_RingBuffer]. + int _capacity; + int get capacity => _capacity; + + /// Returns true if there are no items in the [_RingBuffer]. + bool get isEmpty => _queue.isEmpty; + + /// A callback that get's called when items are ejected from the [_RingBuffer] + /// by way of an overflow or a resizing. + Function(T) _dropItemCallback; + set dropItemCallback(Function(T) callback) { + _dropItemCallback = callback; + } + + /// Returns true on overflow. + bool push(T val) { + if (_capacity <= 0) { + return true; + } else { + final int overflowCount = _dropOverflowItems(_capacity - 1); + _queue.addLast(val); + return overflowCount > 0; + } + } + + /// Returns null when empty. + T pop() { + return _queue.isEmpty ? null : _queue.removeFirst(); + } + + /// Removes items until then length reaches [lengthLimit] and returns + /// the number of items removed. + int _dropOverflowItems(int lengthLimit) { + int result = 0; + while (_queue.length > lengthLimit) { + final T item = _queue.removeFirst(); + if (_dropItemCallback != null) { + _dropItemCallback(item); + } + result += 1; + } + return result; + } + + /// Returns the number of discarded items resulting from resize. + int resize(int newSize) { + _capacity = newSize; + return _dropOverflowItems(newSize); + } +} + +/// Signature for [ChannelBuffers.drain]. +typedef DrainChannelCallback = Future Function(ByteData, PlatformMessageResponseCallback); + +/// Storage of channel messages until the channels are completely routed, +/// i.e. when a message handler is attached to the channel on the framework side. +/// +/// Each channel has a finite buffer capacity and in a FIFO manner messages will +/// be deleted if the capacity is exceeded. The intention is that these buffers +/// will be drained once a callback is setup on the BinaryMessenger in the +/// Flutter framework. +/// +/// Clients of Flutter shouldn't need to allocate their own ChannelBuffers +/// and should only access this package's [channelBuffers] if they are writing +/// their own custom [BinaryMessenger]. +class ChannelBuffers { + /// By default we store one message per channel. There are tradeoffs associated + /// with any size. The correct size should be chosen for the semantics of your + /// channel. + /// + /// Size 0 implies you want to ignore any message that gets sent before the engine + /// is ready (keeping in mind there is no way to know when the engine is ready). + /// + /// Size 1 implies that you only care about the most recent value. + /// + /// Size >1 means you want to process every single message and want to chose a + /// buffer size that will avoid any overflows. + static const int kDefaultBufferSize = 1; + + /// A mapping between a channel name and its associated [_RingBuffer]. + final Map> _messages = + >{}; + + _RingBuffer<_StoredMessage> _makeRingBuffer(int size) { + final _RingBuffer<_StoredMessage> result = _RingBuffer<_StoredMessage>(size); + result.dropItemCallback = _onDropItem; + return result; + } + + void _onDropItem(_StoredMessage message) { + message.callback(null); + } + + /// Returns true on overflow. + bool push(String channel, ByteData data, PlatformMessageResponseCallback callback) { + _RingBuffer<_StoredMessage> queue = _messages[channel]; + if (queue == null) { + queue = _makeRingBuffer(kDefaultBufferSize); + _messages[channel] = queue; + } + final bool didOverflow = queue.push(_StoredMessage(data, callback)); + if (didOverflow) { + // TODO(aaclarke): Update this message to include instructions on how to resize + // the buffer once that is available to users and print in all engine builds + // after we verify that dropping messages isn't part of normal execution. + _printDebug('Overflow on channel: $channel. ' + 'Messages on this channel are being discarded in FIFO fashion. ' + 'The engine may not be running or you need to adjust ' + 'the buffer size if of the channel.'); + } + return didOverflow; + } + + /// Returns null on underflow. + _StoredMessage _pop(String channel) { + final _RingBuffer<_StoredMessage> queue = _messages[channel]; + final _StoredMessage result = queue?.pop(); + return result; + } + + bool _isEmpty(String channel) { + final _RingBuffer<_StoredMessage> queue = _messages[channel]; + return (queue == null) ? true : queue.isEmpty; + } + + /// Changes the capacity of the queue associated with the given channel. + /// + /// This could result in the dropping of messages if newSize is less + /// than the current length of the queue. + void resize(String channel, int newSize) { + _RingBuffer<_StoredMessage> queue = _messages[channel]; + if (queue == null) { + queue = _makeRingBuffer(newSize); + _messages[channel] = queue; + } else { + final int numberOfDroppedMessages = queue.resize(newSize); + if (numberOfDroppedMessages > 0) { + _Logger._printString('Dropping messages on channel "$channel" as a result of shrinking the buffer size.'); + } + } + } + + /// Remove and process all stored messages for a given channel. + /// + /// This should be called once a channel is prepared to handle messages + /// (i.e. when a message handler is setup in the framework). + Future drain(String channel, DrainChannelCallback callback) async { + while (!_isEmpty(channel)) { + final _StoredMessage message = _pop(channel); + await callback(message.data, message.callback); + } + } +} + +/// [ChannelBuffer]s that allow the storage of messages between the +/// Engine and the Framework. Typically messages that can't be delivered +/// are stored here until the Framework is able to process them. +/// +/// See also: +/// * [BinaryMessenger] - The place where ChannelBuffers are typically read. +final ChannelBuffers channelBuffers = ChannelBuffers(); diff --git a/lib/ui/dart_runtime_hooks.cc b/lib/ui/dart_runtime_hooks.cc index 1b08181313543..000c4c43726a1 100644 --- a/lib/ui/dart_runtime_hooks.cc +++ b/lib/ui/dart_runtime_hooks.cc @@ -47,11 +47,12 @@ namespace flutter { #define DECLARE_FUNCTION(name, count) \ extern void name(Dart_NativeArguments args); -#define BUILTIN_NATIVE_LIST(V) \ - V(Logger_PrintString, 1) \ - V(SaveCompilationTrace, 0) \ - V(ScheduleMicrotask, 1) \ - V(GetCallbackHandle, 1) \ +#define BUILTIN_NATIVE_LIST(V) \ + V(Logger_PrintString, 1) \ + V(Logger_PrintDebugString, 1) \ + V(SaveCompilationTrace, 0) \ + V(ScheduleMicrotask, 1) \ + V(GetCallbackHandle, 1) \ V(GetCallbackFromHandle, 1) BUILTIN_NATIVE_LIST(DECLARE_FUNCTION); @@ -152,6 +153,12 @@ void DartRuntimeHooks::Install(bool is_ui_isolate, InitDartIO(builtin, script_uri); } +void Logger_PrintDebugString(Dart_NativeArguments args) { +#if FLUTTER_RUNTIME_MODE == FLUTTER_RUNTIME_MODE_DEBUG + Logger_PrintString(args); +#endif +} + // Implementation of native functions which are used for some // test/debug functionality in standalone dart mode. void Logger_PrintString(Dart_NativeArguments args) { diff --git a/lib/ui/dart_ui.gni b/lib/ui/dart_ui.gni index c52a9f025f231..3a07fbd526048 100644 --- a/lib/ui/dart_ui.gni +++ b/lib/ui/dart_ui.gni @@ -3,6 +3,7 @@ # found in the LICENSE file. dart_ui_files = [ + "$flutter_root/lib/ui/channel_buffers.dart", "$flutter_root/lib/ui/compositing.dart", "$flutter_root/lib/ui/geometry.dart", "$flutter_root/lib/ui/hash_codes.dart", diff --git a/lib/ui/hooks.dart b/lib/ui/hooks.dart index 9c596c2e26008..6fb6b3a1556e1 100644 --- a/lib/ui/hooks.dart +++ b/lib/ui/hooks.dart @@ -161,7 +161,9 @@ void _dispatchPlatformMessage(String name, ByteData data, int responseId) { }, ); } else { - window._respondToPlatformMessage(responseId, null); + channelBuffers.push(name, data, (ByteData responseData) { + window._respondToPlatformMessage(responseId, responseData); + }); } } diff --git a/lib/ui/natives.dart b/lib/ui/natives.dart index b48d8d440541a..841c925993d62 100644 --- a/lib/ui/natives.dart +++ b/lib/ui/natives.dart @@ -11,8 +11,13 @@ void _print(dynamic arg) { _Logger._printString(arg.toString()); } +void _printDebug(dynamic arg) { + _Logger._printDebugString(arg.toString()); +} + class _Logger { static void _printString(String s) native 'Logger_PrintString'; + static void _printDebugString(String s) native 'Logger_PrintDebugString'; } // If we actually run on big endian machines, we'll need to do something smarter diff --git a/lib/ui/ui.dart b/lib/ui/ui.dart index cef4db81cee0e..5c04721e7fec0 100644 --- a/lib/ui/ui.dart +++ b/lib/ui/ui.dart @@ -22,6 +22,7 @@ import 'dart:math' as math; import 'dart:nativewrappers'; import 'dart:typed_data'; +part 'channel_buffers.dart'; part 'compositing.dart'; part 'geometry.dart'; part 'hash_codes.dart'; diff --git a/lib/web_ui/lib/src/ui/channel_buffers.dart b/lib/web_ui/lib/src/ui/channel_buffers.dart new file mode 100644 index 0000000000000..e872d1b9d2691 --- /dev/null +++ b/lib/web_ui/lib/src/ui/channel_buffers.dart @@ -0,0 +1,29 @@ +// Copyright 2013 The Flutter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +part of ui; + +/// A callback for [ChannelBuffers.drain], called as it pops stored messages. +typedef DrainChannelCallback = Future Function(ByteData, PlatformMessageResponseCallback); + +/// Web implementation of [ChannelBuffers]. Currently it just drops all messages +/// to match legacy behavior and acts as if all caches are size zero. +class ChannelBuffers { + /// Always returns true to denote an overflow. + bool push(String channel, ByteData data, PlatformMessageResponseCallback callback) { + callback(null); + return true; + } + + /// Noop in web_ui, caches are always size zero. + void resize(String channel, int newSize) {} + + /// Remove and process all stored messages for a given channel. + /// + /// A noop in web_ui since all caches are size zero. + Future drain(String channel, DrainChannelCallback callback) async { + } +} + +final ChannelBuffers channelBuffers = ChannelBuffers(); diff --git a/lib/web_ui/lib/ui.dart b/lib/web_ui/lib/ui.dart index 9ea3e1fcedcec..cb71d24c1d01f 100644 --- a/lib/web_ui/lib/ui.dart +++ b/lib/web_ui/lib/ui.dart @@ -23,6 +23,7 @@ export 'src/engine.dart' webOnlyInitializeEngine; part 'src/ui/canvas.dart'; +part 'src/ui/channel_buffers.dart'; part 'src/ui/compositing.dart'; part 'src/ui/geometry.dart'; part 'src/ui/hash_codes.dart'; diff --git a/testing/dart/channel_buffers_test.dart b/testing/dart/channel_buffers_test.dart new file mode 100644 index 0000000000000..ee5312484ee12 --- /dev/null +++ b/testing/dart/channel_buffers_test.dart @@ -0,0 +1,134 @@ +import 'dart:ui' as ui; +import 'dart:typed_data'; +import 'dart:convert'; + +import 'package:test/test.dart'; + +void main() { + + ByteData _makeByteData(String str) { + var list = utf8.encode(str); + var buffer = list is Uint8List ? list.buffer : new Uint8List.fromList(list).buffer; + return ByteData.view(buffer); + } + + String _getString(ByteData data) { + final buffer = data.buffer; + var list = buffer.asUint8List(data.offsetInBytes, data.lengthInBytes); + return utf8.decode(list); + } + + test('push drain', () async { + String channel = "foo"; + ByteData data = _makeByteData('bar'); + ui.ChannelBuffers buffers = ui.ChannelBuffers(); + ui.PlatformMessageResponseCallback callback = (ByteData responseData) {}; + buffers.push(channel, data, callback); + await buffers.drain(channel, (ByteData drainedData, ui.PlatformMessageResponseCallback drainedCallback) { + expect(drainedData, equals(data)); + expect(drainedCallback, equals(callback)); + }); + }); + + test('push drain zero', () async { + String channel = "foo"; + ByteData data = _makeByteData('bar'); + ui.ChannelBuffers buffers = ui.ChannelBuffers(); + ui.PlatformMessageResponseCallback callback = (ByteData responseData) {}; + buffers.resize(channel, 0); + buffers.push(channel, data, callback); + bool didCall = false; + await buffers.drain(channel, (ByteData drainedData, ui.PlatformMessageResponseCallback drainedCallback) { + didCall = true; + }); + expect(didCall, equals(false)); + }); + + test('empty', () async { + String channel = "foo"; + ByteData data = _makeByteData('bar'); + ui.ChannelBuffers buffers = ui.ChannelBuffers(); + ui.PlatformMessageResponseCallback callback = (ByteData responseData) {}; + bool didCall = false; + await buffers.drain(channel, (ByteData drainedData, ui.PlatformMessageResponseCallback drainedCallback) { + didCall = true; + }); + expect(didCall, equals(false)); + }); + + test('overflow', () async { + String channel = "foo"; + ByteData one = _makeByteData('one'); + ByteData two = _makeByteData('two'); + ByteData three = _makeByteData('three'); + ByteData four = _makeByteData('four'); + ui.ChannelBuffers buffers = ui.ChannelBuffers(); + ui.PlatformMessageResponseCallback callback = (ByteData responseData) {}; + buffers.resize(channel, 3); + expect(buffers.push(channel, one, callback), equals(false)); + expect(buffers.push(channel, two, callback), equals(false)); + expect(buffers.push(channel, three, callback), equals(false)); + expect(buffers.push(channel, four, callback), equals(true)); + int counter = 0; + await buffers.drain(channel, (ByteData drainedData, ui.PlatformMessageResponseCallback drainedCallback) { + if (counter++ == 0) { + expect(drainedData, equals(two)); + expect(drainedCallback, equals(callback)); + } + }); + expect(counter, equals(3)); + }); + + test('resize drop', () async { + String channel = "foo"; + ByteData one = _makeByteData('one'); + ByteData two = _makeByteData('two'); + ui.ChannelBuffers buffers = ui.ChannelBuffers(); + buffers.resize(channel, 100); + ui.PlatformMessageResponseCallback callback = (ByteData responseData) {}; + expect(buffers.push(channel, one, callback), equals(false)); + expect(buffers.push(channel, two, callback), equals(false)); + buffers.resize(channel, 1); + int counter = 0; + await buffers.drain(channel, (ByteData drainedData, ui.PlatformMessageResponseCallback drainedCallback) { + if (counter++ == 0) { + expect(drainedData, equals(two)); + expect(drainedCallback, equals(callback)); + } + }); + expect(counter, equals(1)); + }); + + test('resize dropping calls callback', () async { + String channel = "foo"; + ByteData one = _makeByteData('one'); + ByteData two = _makeByteData('two'); + ui.ChannelBuffers buffers = ui.ChannelBuffers(); + bool didCallCallback = false; + ui.PlatformMessageResponseCallback oneCallback = (ByteData responseData) { + didCallCallback = true; + }; + ui.PlatformMessageResponseCallback twoCallback = (ByteData responseData) {}; + buffers.resize(channel, 100); + expect(buffers.push(channel, one, oneCallback), equals(false)); + expect(buffers.push(channel, two, twoCallback), equals(false)); + buffers.resize(channel, 1); + expect(didCallCallback, equals(true)); + }); + + test('overflow calls callback', () async { + String channel = "foo"; + ByteData one = _makeByteData('one'); + ByteData two = _makeByteData('two'); + ui.ChannelBuffers buffers = ui.ChannelBuffers(); + bool didCallCallback = false; + ui.PlatformMessageResponseCallback oneCallback = (ByteData responseData) { + didCallCallback = true; + }; + ui.PlatformMessageResponseCallback twoCallback = (ByteData responseData) {}; + buffers.resize(channel, 1); + expect(buffers.push(channel, one, oneCallback), equals(false)); + expect(buffers.push(channel, two, twoCallback), equals(true)); + expect(didCallCallback, equals(true)); + }); +} diff --git a/testing/dart/window_hooks_integration_test.dart b/testing/dart/window_hooks_integration_test.dart index 22ee978fc3cc3..51089abff2375 100644 --- a/testing/dart/window_hooks_integration_test.dart +++ b/testing/dart/window_hooks_integration_test.dart @@ -18,6 +18,7 @@ import 'dart:typed_data'; import 'package:test/test.dart'; // HACK: these parts are to get access to private functions tested here. +part '../../lib/ui/channel_buffers.dart'; part '../../lib/ui/compositing.dart'; part '../../lib/ui/geometry.dart'; part '../../lib/ui/hash_codes.dart';