Skip to content
This repository was archived by the owner on Feb 25, 2025. It is now read-only.

Allow embedders to schedule a callback on all engine managed threads. #15980

Merged
merged 3 commits into from
Jan 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 59 additions & 12 deletions fml/concurrent_message_loop.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count)
WorkerMain();
});
}

for (const auto& worker : workers_) {
worker_thread_ids_.emplace_back(worker.get_id());
}
}

ConcurrentMessageLoop::~ConcurrentMessageLoop() {
Expand Down Expand Up @@ -73,25 +77,43 @@ void ConcurrentMessageLoop::PostTask(const fml::closure& task) {
void ConcurrentMessageLoop::WorkerMain() {
while (true) {
std::unique_lock lock(tasks_mutex_);
tasks_condition_.wait(lock,
[&]() { return tasks_.size() > 0 || shutdown_; });
tasks_condition_.wait(lock, [&]() {
return tasks_.size() > 0 || shutdown_ || HasThreadTasksLocked();
});

if (tasks_.size() == 0) {
// This can only be caused by shutdown.
FML_DCHECK(shutdown_);
break;
// Shutdown cannot be read with the task mutex unlocked.
bool shutdown_now = shutdown_;
fml::closure task;
std::vector<fml::closure> thread_tasks;

if (tasks_.size() != 0) {
task = tasks_.front();
tasks_.pop();
}

auto task = tasks_.front();
tasks_.pop();
if (HasThreadTasksLocked()) {
thread_tasks = GetThreadTasksLocked();
FML_DCHECK(!HasThreadTasksLocked());
}

// Don't hold onto the mutex while the task is being executed as it could
// itself try to post another tasks to this message loop.
// Don't hold onto the mutex while tasks are being executed as they could
// themselves try to post more tasks to the message loop.
lock.unlock();

TRACE_EVENT0("flutter", "ConcurrentWorkerWake");
// Execute the one tasks we woke up for.
task();
// Execute the primary task we woke up for.
if (task) {
task();
}

// Execute any thread tasks.
for (const auto& thread_task : thread_tasks) {
thread_task();
}

if (shutdown_now) {
break;
}
}
}

Expand All @@ -101,6 +123,31 @@ void ConcurrentMessageLoop::Terminate() {
tasks_condition_.notify_all();
}

void ConcurrentMessageLoop::PostTaskToAllWorkers(fml::closure task) {
if (!task) {
return;
}

std::scoped_lock lock(tasks_mutex_);
for (const auto& worker_thread_id : worker_thread_ids_) {
thread_tasks_[worker_thread_id].emplace_back(task);
}
tasks_condition_.notify_all();
}

bool ConcurrentMessageLoop::HasThreadTasksLocked() const {
return thread_tasks_.count(std::this_thread::get_id()) > 0;
}

std::vector<fml::closure> ConcurrentMessageLoop::GetThreadTasksLocked() {
auto found = thread_tasks_.find(std::this_thread::get_id());
FML_DCHECK(found != thread_tasks_.end());
std::vector<fml::closure> pending_tasks;
std::swap(pending_tasks, found->second);
thread_tasks_.erase(found);
return pending_tasks;
}

ConcurrentTaskRunner::ConcurrentTaskRunner(
std::weak_ptr<ConcurrentMessageLoop> weak_loop)
: weak_loop_(std::move(weak_loop)) {}
Expand Down
9 changes: 9 additions & 0 deletions fml/concurrent_message_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#define FLUTTER_FML_CONCURRENT_MESSAGE_LOOP_H_

#include <condition_variable>
#include <map>
#include <queue>
#include <thread>

Expand All @@ -30,6 +31,8 @@ class ConcurrentMessageLoop

void Terminate();

void PostTaskToAllWorkers(fml::closure task);

private:
friend ConcurrentTaskRunner;

Expand All @@ -38,6 +41,8 @@ class ConcurrentMessageLoop
std::mutex tasks_mutex_;
std::condition_variable tasks_condition_;
std::queue<fml::closure> tasks_;
std::vector<std::thread::id> worker_thread_ids_;
std::map<std::thread::id, std::vector<fml::closure>> thread_tasks_;
bool shutdown_ = false;

ConcurrentMessageLoop(size_t worker_count);
Expand All @@ -46,6 +51,10 @@ class ConcurrentMessageLoop

void PostTask(const fml::closure& task);

bool HasThreadTasksLocked() const;

std::vector<fml::closure> GetThreadTasksLocked();

FML_DISALLOW_COPY_AND_ASSIGN(ConcurrentMessageLoop);
};

Expand Down
4 changes: 4 additions & 0 deletions runtime/dart_vm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -504,4 +504,8 @@ DartVM::GetConcurrentWorkerTaskRunner() const {
return concurrent_message_loop_->GetTaskRunner();
}

std::shared_ptr<fml::ConcurrentMessageLoop> DartVM::GetConcurrentMessageLoop() {
return concurrent_message_loop_;
}

} // namespace flutter
13 changes: 13 additions & 0 deletions runtime/dart_vm.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,19 @@ class DartVM {
std::shared_ptr<fml::ConcurrentTaskRunner> GetConcurrentWorkerTaskRunner()
const;

//----------------------------------------------------------------------------
/// @brief The concurrent message loop hosts threads that are used by the
/// engine to perform tasks long running background tasks.
/// Typically, to post tasks to this message loop, the
/// `GetConcurrentWorkerTaskRunner` method may be used.
///
/// @see GetConcurrentWorkerTaskRunner
///
/// @return The concurrent message loop used by this running Dart VM
/// instance.
///
std::shared_ptr<fml::ConcurrentMessageLoop> GetConcurrentMessageLoop();

private:
const Settings settings_;
std::shared_ptr<fml::ConcurrentMessageLoop> concurrent_message_loop_;
Expand Down
10 changes: 8 additions & 2 deletions shell/common/shell.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,14 @@ class Shell final : public PlatformView::Delegate,
/// @brief Accessor for the disable GPU SyncSwitch
std::shared_ptr<fml::SyncSwitch> GetIsGpuDisabledSyncSwitch() const;

//----------------------------------------------------------------------------
/// @brief Get a pointer to the Dart VM used by this running shell
/// instance.
///
/// @return The Dart VM pointer.
///
DartVM* GetDartVM();

private:
using ServiceProtocolHandler =
std::function<bool(const ServiceProtocol::Handler::ServiceProtocolMap&,
Expand Down Expand Up @@ -424,8 +432,6 @@ class Shell final : public PlatformView::Delegate,
std::unique_ptr<Rasterizer> rasterizer,
std::unique_ptr<ShellIOManager> io_manager);

DartVM* GetDartVM();

void ReportTimings();

// |PlatformView::Delegate|
Expand Down
25 changes: 24 additions & 1 deletion shell/platform/embedder/embedder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1724,7 +1724,6 @@ FlutterEngineResult FlutterEnginePostDartObject(
return kSuccess;
}

FLUTTER_EXPORT
FlutterEngineResult FlutterEngineNotifyLowMemoryWarning(
FLUTTER_API_SYMBOL(FlutterEngine) raw_engine) {
auto engine = reinterpret_cast<flutter::EmbedderEngine*>(raw_engine);
Expand All @@ -1747,3 +1746,27 @@ FlutterEngineResult FlutterEngineNotifyLowMemoryWarning(
kInternalInconsistency,
"Could not dispatch the low memory notification message.");
}

FlutterEngineResult FlutterEnginePostCallbackOnAllNativeThreads(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question -- I didn't look through the two bugs immediately before reviewing but what's the rationale for posting to all threads and having the user potentially do something conditionally on the type param in the closure rather than having this take a thread-type and posting only to the specified task runner? Is the use-case such that you want to post on all or none?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see -- so we can provide some guarantee of synchronicity, looks like.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use case requires posting to all threads. But, I can see your approach as being viable as well. Posting to specific threads repeatedly is not a use case that we want to handle. This is supposed to be called once very early on in engine lifecycle.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be warning people not to attempt to do any cross-thread latching in the closures they pass in?

FLUTTER_API_SYMBOL(FlutterEngine) engine,
FlutterNativeThreadCallback callback,
void* user_data) {
if (engine == nullptr) {
return LOG_EMBEDDER_ERROR(kInvalidArguments, "Invalid engine handle.");
}

if (callback == nullptr) {
return LOG_EMBEDDER_ERROR(kInvalidArguments,
"Invalid native thread callback.");
}

return reinterpret_cast<flutter::EmbedderEngine*>(engine)
->PostTaskOnEngineManagedNativeThreads(
[callback, user_data](FlutterNativeThreadType type) {
callback(type, user_data);
})
? kSuccess
: LOG_EMBEDDER_ERROR(kInvalidArguments,
"Internal error while attempting to post "
"tasks to all threads.");
}
64 changes: 64 additions & 0 deletions shell/platform/embedder/embedder.h
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,31 @@ typedef struct {
};
} FlutterEngineDartObject;

/// This enum allows embedders to determine the type of the engine thread in the
/// FlutterNativeThreadCallback. Based on the thread type, the embedder may be
/// able to tweak the thread priorities for optimum performance.
typedef enum {
/// The Flutter Engine considers the thread on which the FlutterEngineRun call
/// is made to be the platform thread. There is only one such thread per
/// engine instance.
kFlutterNativeThreadTypePlatform,
/// This is the thread the Flutter Engine uses to execute rendering commands
/// based on the selected client rendering API. There is only one such thread
/// per engine instance.
kFlutterNativeThreadTypeRender,
/// This is a dedicated thread on which the root Dart isolate is serviced.
/// There is only one such thread per engine instance.
kFlutterNativeThreadTypeUI,
/// Multiple threads are used by the Flutter engine to perform long running
/// background tasks.
kFlutterNativeThreadTypeWorker,
} FlutterNativeThreadType;

/// A callback made by the engine in response to
/// `FlutterEnginePostCallbackOnAllNativeThreads` on all internal thread.
typedef void (*FlutterNativeThreadCallback)(FlutterNativeThreadType type,
void* user_data);

typedef struct {
/// The size of this struct. Must be sizeof(FlutterProjectArgs).
size_t struct_size;
Expand Down Expand Up @@ -1667,6 +1692,45 @@ FLUTTER_EXPORT
FlutterEngineResult FlutterEngineNotifyLowMemoryWarning(
FLUTTER_API_SYMBOL(FlutterEngine) engine);

//------------------------------------------------------------------------------
/// @brief Schedule a callback to be run on all engine managed threads.
/// The engine will attempt to service this callback the next time
/// the message loop for each managed thread is idle. Since the
/// engine manages the entire lifecycle of multiple threads, there
/// is no opportunity for the embedders to finely tune the
/// priorities of threads directly, or, perform other thread
/// specific configuration (for example, setting thread names for
/// tracing). This callback gives embedders a chance to affect such
/// tuning.
///
/// @attention This call is expensive and must be made as few times as
/// possible. The callback must also return immediately as not doing
/// so may risk performance issues (especially for callbacks of type
/// kFlutterNativeThreadTypeUI and kFlutterNativeThreadTypeRender).
///
/// @attention Some callbacks (especially the ones of type
/// kFlutterNativeThreadTypeWorker) may be called after the
/// FlutterEngine instance has shut down. Embedders must be careful
/// in handling the lifecycle of objects associated with the user
/// data baton.
///
/// @attention In case there are multiple running Flutter engine instances,
/// their workers are shared.
///
/// @param[in] engine A running engine instance.
/// @param[in] callback The callback that will get called multiple times on
/// each engine managed thread.
/// @param[in] user_data A baton passed by the engine to the callback. This
/// baton is not interpreted by the engine in any way.
///
/// @return Returns if the callback was successfully posted to all threads.
///
FLUTTER_EXPORT
FlutterEngineResult FlutterEnginePostCallbackOnAllNativeThreads(
FLUTTER_API_SYMBOL(FlutterEngine) engine,
FlutterNativeThreadCallback callback,
void* user_data);

#if defined(__cplusplus)
} // extern "C"
#endif
Expand Down
29 changes: 28 additions & 1 deletion shell/platform/embedder/embedder_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,34 @@ bool EmbedderEngine::RunTask(const FlutterTask* task) {
task->task);
}

const Shell& EmbedderEngine::GetShell() const {
bool EmbedderEngine::PostTaskOnEngineManagedNativeThreads(
std::function<void(FlutterNativeThreadType)> closure) const {
if (!IsValid() || closure == nullptr) {
return false;
}

const auto trampoline = [closure](FlutterNativeThreadType type,
fml::RefPtr<fml::TaskRunner> runner) {
runner->PostTask([closure, type] { closure(type); });
};

// Post the task to all thread host threads.
const auto& task_runners = shell_->GetTaskRunners();
trampoline(kFlutterNativeThreadTypeRender, task_runners.GetGPUTaskRunner());
trampoline(kFlutterNativeThreadTypeWorker, task_runners.GetIOTaskRunner());
trampoline(kFlutterNativeThreadTypeUI, task_runners.GetUITaskRunner());
trampoline(kFlutterNativeThreadTypePlatform,
task_runners.GetPlatformTaskRunner());

// Post the task to all worker threads.
auto vm = shell_->GetDartVM();
vm->GetConcurrentMessageLoop()->PostTaskToAllWorkers(
[closure]() { closure(kFlutterNativeThreadTypeWorker); });

return true;
}

Shell& EmbedderEngine::GetShell() {
FML_DCHECK(shell_);
return *shell_.get();
}
Expand Down
5 changes: 4 additions & 1 deletion shell/platform/embedder/embedder_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ class EmbedderEngine {

bool RunTask(const FlutterTask* task);

const Shell& GetShell() const;
bool PostTaskOnEngineManagedNativeThreads(
std::function<void(FlutterNativeThreadType)> closure) const;

Shell& GetShell();

private:
const std::unique_ptr<EmbedderThreadHost> thread_host_;
Expand Down
Loading