This repository was archived by the owner on Feb 25, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6k
Fix race condition introduced by background platform channels #29377
Merged
Merged
Changes from 13 commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
45cc4b5
Fix race condition introduced by background platform channels
0cab9a1
Format
ff024ca
Lock
6b02fb0
WIP
a480997
Rework queue
c59266f
Merge remote-tracking branch 'upstream/master' into bg_channel_dart_init
4bae8ac
Log error
5f0cbf8
Flag
fcc5ae1
edits
c90f6ea
Simplify
983943a
Fix formatter
db476b4
missing params
ea25323
Use container
0b794ea
tweaks
4f193f2
Test
d36def9
line
291e88e
Add final
d40a3b7
disableBufferingIncomingMessages
9aad025
Merge remote-tracking branch 'upstream/master' into bg_channel_dart_init
b14cdef
feedback
7d6dd55
final + annotation
4120dbd
Jason feedback
7fce615
Merge remote-tracking branch 'upstream/master' into bg_channel_dart_init
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,6 +13,8 @@ | |
| import io.flutter.plugin.common.BinaryMessenger; | ||
| import java.nio.ByteBuffer; | ||
| import java.util.HashMap; | ||
| import java.util.LinkedList; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.WeakHashMap; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
|
|
@@ -33,22 +35,41 @@ class DartMessenger implements BinaryMessenger, PlatformMessageHandler { | |
|
|
||
| @NonNull private final FlutterJNI flutterJNI; | ||
|
|
||
| @NonNull private final ConcurrentHashMap<String, HandlerInfo> messageHandlers; | ||
| /** | ||
| * Maps a channel name to an object that contains the task queue and the handler associated with | ||
| * the channel. | ||
| * | ||
| * <p>Reads and writes to this map must lock {@code handlersLock}. | ||
| */ | ||
| @NonNull | ||
| private final ConcurrentHashMap<String, HandlerInfo> messageHandlers = new ConcurrentHashMap<>(); | ||
|
|
||
| /** | ||
| * Maps a channel name to queue of task dispatchers. This queue is processed when the channel | ||
| * handler is registered. | ||
| * | ||
| * <p>Reads and writes to this map must lock {@code handlersLock}. | ||
| */ | ||
| @NonNull | ||
| private final ConcurrentHashMap<String, List<DelayedMessageInfo>> delayedTaskDispatcher = | ||
blasten marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| new ConcurrentHashMap<>(); | ||
|
|
||
| @NonNull private final Map<Integer, BinaryMessenger.BinaryReply> pendingReplies; | ||
| @NonNull private final Object handlersLock = new Object(); | ||
| private boolean canDelayTasks = false; | ||
|
|
||
| @NonNull private final Map<Integer, BinaryMessenger.BinaryReply> pendingReplies = new HashMap<>(); | ||
| private int nextReplyId = 1; | ||
|
|
||
| @NonNull private final DartMessengerTaskQueue platformTaskQueue = new PlatformTaskQueue(); | ||
|
|
||
| @NonNull private WeakHashMap<TaskQueue, DartMessengerTaskQueue> createdTaskQueues; | ||
| @NonNull | ||
| private WeakHashMap<TaskQueue, DartMessengerTaskQueue> createdTaskQueues = | ||
| new WeakHashMap<TaskQueue, DartMessengerTaskQueue>(); | ||
|
|
||
| @NonNull private TaskQueueFactory taskQueueFactory; | ||
|
|
||
| DartMessenger(@NonNull FlutterJNI flutterJNI, @NonNull TaskQueueFactory taskQueueFactory) { | ||
| this.flutterJNI = flutterJNI; | ||
| this.messageHandlers = new ConcurrentHashMap<>(); | ||
| this.pendingReplies = new HashMap<>(); | ||
| this.createdTaskQueues = new WeakHashMap<TaskQueue, DartMessengerTaskQueue>(); | ||
| this.taskQueueFactory = taskQueueFactory; | ||
| } | ||
|
|
||
|
|
@@ -72,6 +93,10 @@ public DartMessengerTaskQueue makeBackgroundTaskQueue() { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Holds information about a platform handler, such as the task queue that processes messages from | ||
| * Dart. | ||
| */ | ||
| private static class HandlerInfo { | ||
| @NonNull public final BinaryMessenger.BinaryMessageHandler handler; | ||
| @Nullable public final DartMessengerTaskQueue taskQueue; | ||
|
|
@@ -84,6 +109,22 @@ private static class HandlerInfo { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Holds information that allows to dispatch a Dart message to a platform handler when it becomes | ||
| * available. | ||
| */ | ||
| private static class DelayedMessageInfo { | ||
| @NonNull public final ByteBuffer message; | ||
| int replyId; | ||
| long messageData; | ||
|
|
||
| DelayedMessageInfo(@NonNull ByteBuffer message, int replyId, long messageData) { | ||
| this.message = message; | ||
| this.replyId = replyId; | ||
| this.messageData = messageData; | ||
| } | ||
| } | ||
|
|
||
| private static class DefaultTaskQueue implements DartMessengerTaskQueue { | ||
| @NonNull private final ExecutorService executor; | ||
|
|
||
|
|
@@ -124,19 +165,59 @@ public void setMessageHandler( | |
| @Nullable TaskQueue taskQueue) { | ||
| if (handler == null) { | ||
| Log.v(TAG, "Removing handler for channel '" + channel + "'"); | ||
| messageHandlers.remove(channel); | ||
| } else { | ||
| DartMessengerTaskQueue dartMessengerTaskQueue = null; | ||
| if (taskQueue != null) { | ||
| dartMessengerTaskQueue = createdTaskQueues.get(taskQueue); | ||
| if (dartMessengerTaskQueue == null) { | ||
| throw new IllegalArgumentException( | ||
| "Unrecognized TaskQueue, use BinaryMessenger to create your TaskQueue (ex makeBackgroundTaskQueue)."); | ||
| } | ||
| synchronized (handlersLock) { | ||
| messageHandlers.remove(channel); | ||
| } | ||
| return; | ||
| } | ||
| DartMessengerTaskQueue dartMessengerTaskQueue = null; | ||
| if (taskQueue != null) { | ||
| dartMessengerTaskQueue = createdTaskQueues.get(taskQueue); | ||
| if (dartMessengerTaskQueue == null) { | ||
| throw new IllegalArgumentException( | ||
| "Unrecognized TaskQueue, use BinaryMessenger to create your TaskQueue (ex makeBackgroundTaskQueue)."); | ||
| } | ||
| Log.v(TAG, "Setting handler for channel '" + channel + "'"); | ||
| } | ||
| Log.v(TAG, "Setting handler for channel '" + channel + "'"); | ||
| synchronized (handlersLock) { | ||
| messageHandlers.put(channel, new HandlerInfo(handler, dartMessengerTaskQueue)); | ||
| } | ||
| runDelayedTasksForChannel(channel); | ||
| } | ||
|
|
||
| /** | ||
| * Runs the tasks that handle messages received from Dart for the provided channel name. | ||
| * | ||
| * <p>The channel may not have associated tasks if it was registered prior to reciving the first | ||
| * message from Dart. | ||
| * | ||
| * @param channel The channel name. | ||
| */ | ||
| public void runDelayedTasksForChannel(@NonNull String channel) { | ||
blasten marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| LinkedList<DelayedMessageInfo> list; | ||
| synchronized (handlersLock) { | ||
| if (!delayedTaskDispatcher.contains(channel)) { | ||
| return; | ||
| } | ||
| list = (LinkedList) delayedTaskDispatcher.get(channel); | ||
| delayedTaskDispatcher.remove(channel); | ||
| } | ||
| while (!list.isEmpty()) { | ||
| final DelayedMessageInfo info = list.poll(); | ||
blasten marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| dispatchMessageToQueue( | ||
| channel, messageHandlers.get(channel), info.message, info.replyId, info.messageData); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Enables the ability to queue tasks when messages are received from Dart. | ||
| * | ||
| * <p>This is useful when there are pending channel handler registrations. For example, Dart may | ||
| * be initialized concurrently, and prior to the registration of the channel handlers. This | ||
| * implies that Dart may start sending messages while plugins are being registered. | ||
| */ | ||
| public void enableDelayedTaskQueue() { | ||
blasten marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| canDelayTasks = true; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -188,39 +269,65 @@ private void invokeHandler( | |
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void handleMessageFromDart( | ||
| @NonNull final String channel, | ||
| private void dispatchMessageToQueue( | ||
| @NonNull String channel, | ||
| @Nullable HandlerInfo handlerInfo, | ||
| @Nullable ByteBuffer message, | ||
| final int replyId, | ||
| int replyId, | ||
| long messageData) { | ||
| // Called from the ui thread. | ||
| Log.v(TAG, "Received message from Dart over channel '" + channel + "'"); | ||
| @Nullable final HandlerInfo handlerInfo = messageHandlers.get(channel); | ||
| @Nullable | ||
| final DartMessengerTaskQueue taskQueue = (handlerInfo != null) ? handlerInfo.taskQueue : null; | ||
| Runnable myRunnable = | ||
| () -> { | ||
| Trace.beginSection("DartMessenger#handleMessageFromDart on " + channel); | ||
| try { | ||
| invokeHandler(handlerInfo, message, replyId); | ||
| if (message != null && message.isDirect()) { | ||
| // This ensures that if a user retains an instance to the ByteBuffer and it happens to | ||
| // be direct they will get a deterministic error. | ||
| // This ensures that if a user retains an instance to the ByteBuffer and it | ||
| // happens to be direct they will get a deterministic error. | ||
| message.limit(0); | ||
| } | ||
| } finally { | ||
| Trace.endSection(); | ||
blasten marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // This is deleting the data underneath the message object. | ||
| flutterJNI.cleanupMessageData(messageData); | ||
| Trace.endSection(); | ||
| } | ||
| }; | ||
| @NonNull | ||
| final DartMessengerTaskQueue nonnullTaskQueue = | ||
| taskQueue == null ? platformTaskQueue : taskQueue; | ||
| nonnullTaskQueue.dispatch(myRunnable); | ||
| } | ||
|
|
||
| @Override | ||
| public void handleMessageFromDart( | ||
| @NonNull String channel, @Nullable ByteBuffer message, int replyId, long messageData) { | ||
| // Called from the ui thread. | ||
| Log.v(TAG, "Received message from Dart over channel '" + channel + "'"); | ||
|
|
||
| HandlerInfo handlerInfo; | ||
| synchronized (handlersLock) { | ||
| if (messageHandlers.contains(channel)) { | ||
blasten marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| handlerInfo = messageHandlers.get(channel); | ||
| } else if (canDelayTasks) { | ||
| // The channel is not defined when the Dart VM sends a message before the channels are | ||
| // registered. | ||
| // | ||
| // This is possible if the Dart VM starts before channel registration, and if the thread | ||
| // that registers the channels is busy or slow at registering the channel handlers. | ||
| // | ||
| // In such cases, the task dispatchers are queued, and processed when the channel is | ||
| // defined. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't this also happen if the handler gets unregistered?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that is right. In this case, |
||
| if (!delayedTaskDispatcher.contains(channel)) { | ||
| delayedTaskDispatcher.put(channel, new LinkedList<>()); | ||
| } | ||
| List<Runnable> delayedTaskQueue = delayedTaskDispatcher.get(channel); | ||
blasten marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| delayedTaskQueue.add(new DelayedMessageInfo(message, replyId, messageData)); | ||
| } | ||
| } | ||
| if (handlerInfo != null) { | ||
| dispatchMessageToQueue(handlerInfo, message, replyId, messageData); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void handlePlatformMessageResponse(int replyId, @Nullable ByteBuffer reply) { | ||
| Log.v(TAG, "Received message reply from Dart."); | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.