Skip to content

Commit 63d1884

Browse files
authored
Gabime/async flush (#3235)
* Revert "Ensure flush callback gets called in move-assign operator (#3232)" This reverts commit b6da594. * Revert "Exchange promise for condition_variable when flushing (fixes #3221) (#3228)" This reverts commit 16e0d2e. * Revert PR #3049
1 parent b6da594 commit 63d1884

File tree

4 files changed

+34
-117
lines changed

4 files changed

+34
-117
lines changed

include/spdlog/async_logger-inl.h

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -32,30 +32,26 @@ SPDLOG_INLINE spdlog::async_logger::async_logger(std::string logger_name,
3232

3333
// send the log message to the thread pool
3434
SPDLOG_INLINE void spdlog::async_logger::sink_it_(const details::log_msg &msg){
35-
SPDLOG_TRY {
36-
if (auto pool_ptr = thread_pool_.lock()){
37-
pool_ptr->post_log(shared_from_this(), msg, overflow_policy_);
38-
}
39-
else {
40-
throw_spdlog_ex("async log: thread pool doesn't exist anymore");
41-
}
42-
}
43-
SPDLOG_LOGGER_CATCH(msg.source)
35+
SPDLOG_TRY{if (auto pool_ptr = thread_pool_.lock()){
36+
pool_ptr->post_log(shared_from_this(), msg, overflow_policy_);
37+
}
38+
else {
39+
throw_spdlog_ex("async log: thread pool doesn't exist anymore");
40+
}
41+
}
42+
SPDLOG_LOGGER_CATCH(msg.source)
4443
}
4544

4645
// send flush request to the thread pool
47-
SPDLOG_INLINE void spdlog::async_logger::flush_() {
48-
SPDLOG_TRY {
49-
auto pool_ptr = thread_pool_.lock();
50-
if (!pool_ptr) {
51-
throw_spdlog_ex("async flush: thread pool doesn't exist anymore");
52-
}
53-
54-
// Wait for the flush operation to complete.
55-
// This might throw exception if the flush message get dropped because of overflow.
56-
pool_ptr->post_and_wait_for_flush(shared_from_this(), overflow_policy_);
57-
}
58-
SPDLOG_LOGGER_CATCH(source_loc())
46+
SPDLOG_INLINE void spdlog::async_logger::flush_(){
47+
SPDLOG_TRY{if (auto pool_ptr = thread_pool_.lock()){
48+
pool_ptr->post_flush(shared_from_this(), overflow_policy_);
49+
}
50+
else {
51+
throw_spdlog_ex("async flush: thread pool doesn't exist anymore");
52+
}
53+
}
54+
SPDLOG_LOGGER_CATCH(source_loc())
5955
}
6056

6157
//

include/spdlog/details/thread_pool-inl.h

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -62,25 +62,9 @@ void SPDLOG_INLINE thread_pool::post_log(async_logger_ptr &&worker_ptr,
6262
post_async_msg_(std::move(async_m), overflow_policy);
6363
}
6464

65-
void SPDLOG_INLINE thread_pool::post_and_wait_for_flush(async_logger_ptr &&worker_ptr,
66-
async_overflow_policy overflow_policy) {
67-
std::mutex m;
68-
std::unique_lock<std::mutex> l(m);
69-
std::condition_variable cv;
70-
std::atomic<async_msg_flush> cv_flag{async_msg_flush::not_synced};
71-
post_async_msg_(async_msg(std::move(worker_ptr), async_msg_type::flush, [&cv, &cv_flag](async_msg_flush flushed) {
72-
cv_flag.store(flushed, std::memory_order_relaxed);
73-
cv.notify_all();
74-
}), overflow_policy);
75-
while(cv_flag.load(std::memory_order_relaxed) == async_msg_flush::not_synced) {
76-
cv.wait_for(l, std::chrono::milliseconds(100), [&cv_flag]() {
77-
return cv_flag.load(std::memory_order_relaxed) != async_msg_flush::not_synced;
78-
});
79-
}
80-
81-
if(cv_flag.load(std::memory_order_relaxed) == async_msg_flush::synced_not_flushed) {
82-
throw spdlog_ex("Request for flushing got dropped.");
83-
}
65+
void SPDLOG_INLINE thread_pool::post_flush(async_logger_ptr &&worker_ptr,
66+
async_overflow_policy overflow_policy) {
67+
post_async_msg_(async_msg(std::move(worker_ptr), async_msg_type::flush), overflow_policy);
8468
}
8569

8670
size_t SPDLOG_INLINE thread_pool::overrun_counter() { return q_.overrun_counter(); }
@@ -124,10 +108,6 @@ bool SPDLOG_INLINE thread_pool::process_next_msg_() {
124108
}
125109
case async_msg_type::flush: {
126110
incoming_async_msg.worker_ptr->backend_flush_();
127-
if(incoming_async_msg.flush_callback) {
128-
incoming_async_msg.flush_callback(async_msg_flush::synced_flushed);
129-
incoming_async_msg.flush_callback = nullptr;
130-
}
131111
return true;
132112
}
133113

include/spdlog/details/thread_pool.h

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,60 +22,46 @@ using async_logger_ptr = std::shared_ptr<spdlog::async_logger>;
2222

2323
enum class async_msg_type { log, flush, terminate };
2424

25-
enum class async_msg_flush { not_synced, synced_flushed, synced_not_flushed };
26-
2725
// Async msg to move to/from the queue
2826
// Movable only. should never be copied
2927
struct async_msg : log_msg_buffer {
3028
async_msg_type msg_type{async_msg_type::log};
3129
async_logger_ptr worker_ptr;
32-
std::function<void(async_msg_flush)> flush_callback;
3330

3431
async_msg() = default;
35-
~async_msg() {
36-
if (flush_callback) {
37-
flush_callback(async_msg_flush::synced_not_flushed);
38-
flush_callback = nullptr;
39-
}
40-
}
32+
~async_msg() = default;
4133

4234
// should only be moved in or out of the queue..
4335
async_msg(const async_msg &) = delete;
4436

45-
async_msg(async_msg &&other) SPDLOG_NOEXCEPT
37+
// support for vs2013 move
38+
#if defined(_MSC_VER) && _MSC_VER <= 1800
39+
async_msg(async_msg &&other)
4640
: log_msg_buffer(std::move(other)),
4741
msg_type(other.msg_type),
48-
worker_ptr(std::move(other.worker_ptr)),
49-
flush_callback(std::move(other.flush_callback)) {
50-
other.flush_callback = nullptr;
51-
}
42+
worker_ptr(std::move(other.worker_ptr)) {}
5243

53-
async_msg &operator=(async_msg &&other) SPDLOG_NOEXCEPT {
54-
*static_cast<log_msg_buffer *>(this) = static_cast<log_msg_buffer&&>(other);
44+
async_msg &operator=(async_msg &&other) {
45+
*static_cast<log_msg_buffer *>(this) = std::move(other);
5546
msg_type = other.msg_type;
5647
worker_ptr = std::move(other.worker_ptr);
57-
std::swap(flush_callback, other.flush_callback);
5848
return *this;
5949
}
50+
#else // (_MSC_VER) && _MSC_VER <= 1800
51+
async_msg(async_msg &&) = default;
52+
async_msg &operator=(async_msg &&) = default;
53+
#endif
6054

6155
// construct from log_msg with given type
6256
async_msg(async_logger_ptr &&worker, async_msg_type the_type, const details::log_msg &m)
6357
: log_msg_buffer{m},
6458
msg_type{the_type},
65-
worker_ptr{std::move(worker)},
66-
flush_callback{} {}
59+
worker_ptr{std::move(worker)} {}
6760

6861
async_msg(async_logger_ptr &&worker, async_msg_type the_type)
6962
: log_msg_buffer{},
7063
msg_type{the_type},
71-
worker_ptr{std::move(worker)},
72-
flush_callback{} {}
73-
74-
async_msg(async_logger_ptr &&worker, async_msg_type the_type, std::function<void(async_msg_flush)> &&callback)
75-
: log_msg_buffer{},
76-
msg_type{the_type},
77-
worker_ptr{std::move(worker)},
78-
flush_callback{std::move(callback)} {}
64+
worker_ptr{std::move(worker)} {}
7965

8066
explicit async_msg(async_msg_type the_type)
8167
: async_msg{nullptr, the_type} {}
@@ -102,8 +88,7 @@ class SPDLOG_API thread_pool {
10288
void post_log(async_logger_ptr &&worker_ptr,
10389
const details::log_msg &msg,
10490
async_overflow_policy overflow_policy);
105-
void post_and_wait_for_flush(async_logger_ptr &&worker_ptr,
106-
async_overflow_policy overflow_policy);
91+
void post_flush(async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy);
10792
size_t overrun_counter();
10893
void reset_overrun_counter();
10994
size_t discard_counter();

tests/test_async.cpp

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -93,50 +93,6 @@ TEST_CASE("flush", "[async]") {
9393
REQUIRE(test_sink->flush_counter() == 1);
9494
}
9595

96-
TEST_CASE("multithread flush", "[async]") {
97-
auto test_sink = std::make_shared<spdlog::sinks::test_sink_mt>();
98-
size_t queue_size = 2;
99-
size_t messages = 10;
100-
size_t n_threads = 10;
101-
size_t flush_count = 1024;
102-
std::mutex mtx;
103-
std::vector<std::string> errmsgs;
104-
{
105-
auto tp = std::make_shared<spdlog::details::thread_pool>(queue_size, 1);
106-
auto logger = std::make_shared<spdlog::async_logger>(
107-
"as", test_sink, tp, spdlog::async_overflow_policy::discard_new);
108-
109-
logger->set_error_handler([&](const std::string &) {
110-
std::unique_lock<std::mutex> lock(mtx);
111-
errmsgs.push_back("Broken promise");
112-
});
113-
114-
for (size_t i = 0; i < messages; i++) {
115-
logger->info("Hello message #{}", i);
116-
}
117-
118-
std::vector<std::thread> threads;
119-
for (size_t i = 0; i < n_threads; i++) {
120-
threads.emplace_back([logger, flush_count] {
121-
for (size_t j = 0; j < flush_count; j++) {
122-
// flush does not throw exception even if failed.
123-
// Instead, the error handler is invoked.
124-
logger->flush();
125-
}
126-
});
127-
}
128-
129-
for (auto &t : threads) {
130-
t.join();
131-
}
132-
}
133-
REQUIRE(test_sink->flush_counter() >= 1);
134-
REQUIRE(test_sink->flush_counter() + errmsgs.size() == n_threads * flush_count);
135-
if (errmsgs.size() > 0) {
136-
REQUIRE(errmsgs[0] == "Broken promise");
137-
}
138-
}
139-
14096
TEST_CASE("async periodic flush", "[async]") {
14197
auto logger = spdlog::create_async<spdlog::sinks::test_sink_mt>("as");
14298
auto test_sink = std::static_pointer_cast<spdlog::sinks::test_sink_mt>(logger->sinks()[0]);

0 commit comments

Comments
 (0)