Skip to content
This repository was archived by the owner on Feb 25, 2025. It is now read-only.

[Impeller] Destroy all per-thread command pools tied to a context before deleting the context #46286

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 72 additions & 13 deletions impeller/renderer/backend/vulkan/command_pool_vk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ class BackgroundCommandPoolVK final {
static bool kResetOnBackgroundThread = false;

CommandPoolVK::~CommandPoolVK() {
if (!pool_) {
return;
}

auto const context = context_.lock();
if (!context) {
return;
Expand All @@ -84,6 +88,11 @@ vk::UniqueCommandBuffer CommandPoolVK::CreateCommandBuffer() {
return {};
}

Lock lock(pool_mutex_);
if (!pool_) {
return {};
}

auto const device = context->GetDevice();
vk::CommandBufferAllocateInfo info;
info.setCommandPool(pool_.get());
Expand All @@ -97,17 +106,39 @@ vk::UniqueCommandBuffer CommandPoolVK::CreateCommandBuffer() {
}

void CommandPoolVK::CollectCommandBuffer(vk::UniqueCommandBuffer&& buffer) {
Lock lock(pool_mutex_);
if (!pool_) {
// If the command pool has already been destroyed, just free the buffer.
// If the command pool has already been destroyed, then its buffers have
// already been freed.
buffer.release();
return;
}
collected_buffers_.push_back(std::move(buffer));
}

void CommandPoolVK::Destroy() {
Lock lock(pool_mutex_);
pool_.reset();

// When the command pool is destroyed, all of its command buffers are freed.
// Handles allocated from that pool are now invalid and must be discarded.
for (auto& buffer : collected_buffers_) {
buffer.release();
}
collected_buffers_.clear();
}

// Associates a resource with a thread and context.
using CommandPoolMap =
std::unordered_map<uint64_t, std::shared_ptr<CommandPoolVK>>;
FML_THREAD_LOCAL fml::ThreadLocalUniquePtr<CommandPoolMap> resources_;
FML_THREAD_LOCAL fml::ThreadLocalUniquePtr<CommandPoolMap> tls_command_pool_map;

// Map each context to a list of all thread-local command pools associated
// with that context.
static Mutex g_all_pools_map_mutex;
static std::unordered_map<const ContextVK*,
std::vector<std::weak_ptr<CommandPoolVK>>>
g_all_pools_map IPLR_GUARDED_BY(g_all_pools_map_mutex);

// TODO(matanlurey): Return a status_or<> instead of nullptr when we have one.
std::shared_ptr<CommandPoolVK> CommandPoolRecyclerVK::Get() {
Expand All @@ -117,14 +148,13 @@ std::shared_ptr<CommandPoolVK> CommandPoolRecyclerVK::Get() {
}

// If there is a resource in used for this thread and context, return it.
auto resources = resources_.get();
if (!resources) {
resources = new CommandPoolMap();
resources_.reset(resources);
if (!tls_command_pool_map.get()) {
tls_command_pool_map.reset(new CommandPoolMap());
}
CommandPoolMap& pool_map = *tls_command_pool_map.get();
auto const hash = strong_context->GetHash();
auto const it = resources->find(hash);
if (it != resources->end()) {
auto const it = pool_map.find(hash);
if (it != pool_map.end()) {
return it->second;
}

Expand All @@ -136,7 +166,13 @@ std::shared_ptr<CommandPoolVK> CommandPoolRecyclerVK::Get() {

auto const resource =
std::make_shared<CommandPoolVK>(std::move(*pool), context_);
resources->emplace(hash, resource);
pool_map.emplace(hash, resource);

{
Lock all_pools_lock(g_all_pools_map_mutex);
g_all_pools_map[strong_context.get()].push_back(resource);
}

return resource;
}

Expand Down Expand Up @@ -199,11 +235,34 @@ CommandPoolRecyclerVK::~CommandPoolRecyclerVK() {
}

void CommandPoolRecyclerVK::Dispose() {
auto const resources = resources_.get();
if (!resources) {
return;
CommandPoolMap* pool_map = tls_command_pool_map.get();
if (pool_map) {
pool_map->clear();
}
}

void CommandPoolRecyclerVK::DestroyThreadLocalPools(const ContextVK* context) {
// Delete the context's entry in this thread's command pool map.
if (tls_command_pool_map.get()) {
tls_command_pool_map.get()->erase(context->GetHash());
}

// Destroy all other thread-local CommandPoolVK instances associated with
// this context.
Lock all_pools_lock(g_all_pools_map_mutex);
auto found = g_all_pools_map.find(context);
if (found != g_all_pools_map.end()) {
for (auto& weak_pool : found->second) {
auto pool = weak_pool.lock();
if (!pool) {
continue;
}
// Delete all objects held by this pool. The destroyed pool will still
// remain in its thread's TLS map until that thread exits.
pool->Destroy();
}
g_all_pools_map.erase(found);
}
resources->clear();
}

} // namespace impeller
16 changes: 13 additions & 3 deletions impeller/renderer/backend/vulkan/command_pool_vk.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class CommandPoolRecyclerVK;
/// @see |CommandPoolRecyclerVK|
class CommandPoolVK final {
public:
CommandPoolVK(CommandPoolVK&&) = default;
~CommandPoolVK();

/// @brief Creates a resource that manages the life of a command pool.
Expand All @@ -54,14 +53,19 @@ class CommandPoolVK final {
/// @see |GarbageCollectBuffersIfAble|
void CollectCommandBuffer(vk::UniqueCommandBuffer&& buffer);

/// @brief Delete all Vulkan objects in this command pool.
void Destroy();

private:
FML_DISALLOW_COPY_AND_ASSIGN(CommandPoolVK);

vk::UniqueCommandPool pool_;
Mutex pool_mutex_;
vk::UniqueCommandPool pool_ IPLR_GUARDED_BY(pool_mutex_);
std::weak_ptr<ContextVK>& context_;

// Used to retain a reference on these until the pool is reset.
std::vector<vk::UniqueCommandBuffer> collected_buffers_;
std::vector<vk::UniqueCommandBuffer> collected_buffers_
IPLR_GUARDED_BY(pool_mutex_);
};

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -93,6 +97,12 @@ class CommandPoolRecyclerVK final
public:
~CommandPoolRecyclerVK();

/// @brief Clean up resources held by all per-thread command pools
/// associated with the given context.
///
/// @param[in] context The context.
static void DestroyThreadLocalPools(const ContextVK* context);

/// @brief Creates a recycler for the given |ContextVK|.
///
/// @param[in] context The context to create the recycler for.
Expand Down
4 changes: 1 addition & 3 deletions impeller/renderer/backend/vulkan/context_vk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ ContextVK::~ContextVK() {
if (device_holder_ && device_holder_->device) {
[[maybe_unused]] auto result = device_holder_->device->waitIdle();
}
if (command_pool_recycler_) {
command_pool_recycler_.get()->Dispose();
}
CommandPoolRecyclerVK::DestroyThreadLocalPools(this);
}

Context::BackendType ContextVK::GetBackendType() const {
Expand Down
32 changes: 32 additions & 0 deletions impeller/renderer/backend/vulkan/context_vk_unittests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "flutter/fml/synchronization/waitable_event.h"
#include "flutter/testing/testing.h" // IWYU pragma: keep
#include "impeller/renderer/backend/vulkan/command_pool_vk.h"
#include "impeller/renderer/backend/vulkan/context_vk.h"
Expand All @@ -25,6 +26,37 @@ TEST(ContextVKTest, DeletesCommandPools) {
ASSERT_FALSE(weak_context.lock());
}

TEST(ContextVKTest, DeletesCommandPoolsOnAllThreads) {
std::weak_ptr<ContextVK> weak_context;
std::weak_ptr<CommandPoolVK> weak_pool_main;

std::shared_ptr<ContextVK> context = MockVulkanContextBuilder().Build();
weak_pool_main = context->GetCommandPoolRecycler()->Get();
weak_context = context;
ASSERT_TRUE(weak_pool_main.lock());
ASSERT_TRUE(weak_context.lock());

// Start a second thread that obtains a command pool.
fml::AutoResetWaitableEvent latch1, latch2;
std::weak_ptr<CommandPoolVK> weak_pool_thread;
std::thread thread([&]() {
weak_pool_thread = context->GetCommandPoolRecycler()->Get();
latch1.Signal();
latch2.Wait();
});

// Delete the ContextVK on the main thread.
latch1.Wait();
context.reset();
ASSERT_FALSE(weak_pool_main.lock());
ASSERT_FALSE(weak_context.lock());

// Stop the second thread and check that its command pool has been deleted.
latch2.Signal();
thread.join();
ASSERT_FALSE(weak_pool_thread.lock());
}

TEST(ContextVKTest, DeletePipelineAfterContext) {
std::shared_ptr<Pipeline<PipelineDescriptor>> pipeline;
std::shared_ptr<std::vector<std::string>> functions;
Expand Down