Skip to content

Commit cbe00a6

Browse files
wolfkdyclaude
andcommitted
async-flush-wal: extract ManualFlushWritableFileWriter subclass
Move manual-flush double-buffering (SwapBuffer/FlushSwappedBuffer) and the unbounded-append-buffer policy out of WritableFileWriter into a dedicated ManualFlushWritableFileWriter subclass. This keeps the base class free of manual_wal_flush concerns and replaces runtime boolean checks with virtual dispatch (AppendBufferSizeLimit, ShouldImplicitFlushOnAppend). CreateWAL now instantiates the subclass when manual_wal_flush is enabled, and FlushWAL static_casts to call the double-buffer methods. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 8422edf commit cbe00a6

File tree

6 files changed

+179
-117
lines changed

6 files changed

+179
-117
lines changed

db/db_impl/db_impl.cc

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
#include "env/unique_id_gen.h"
5757
#include "file/file_util.h"
5858
#include "file/filename.h"
59+
#include "file/manual_flush_writable_file_writer.h"
5960
#include "file/random_access_file_reader.h"
6061
#include "file/sst_file_manager_impl.h"
6162
#include "logging/auto_roll_logger.h"
@@ -1442,10 +1443,14 @@ Status DBImpl::FlushWAL(bool sync) {
14421443
cur_log_number = log.number;
14431444
// Swap the buffer under the lock so AddRecord() can continue
14441445
// writing to a fresh buffer without blocking.
1445-
cur_log_writer->file()->SwapBuffer();
1446+
// Safe: CreateWAL() instantiates ManualFlushWritableFileWriter when
1447+
// manual_wal_flush is true (which is a precondition for this branch).
1448+
static_cast<ManualFlushWritableFileWriter*>(cur_log_writer->file())->SwapBuffer();
14461449
}
14471450
// Write the swapped buffer to disk without holding the mutex.
1448-
io_s = cur_log_writer->file()->FlushSwappedBuffer();
1451+
// Safe: CreateWAL() instantiates ManualFlushWritableFileWriter when
1452+
// manual_wal_flush is true (which is a precondition for this branch).
1453+
io_s = static_cast<ManualFlushWritableFileWriter*>(cur_log_writer->file())->FlushSwappedBuffer();
14491454
{
14501455
InstrumentedMutexLock wl(&log_write_mutex_);
14511456
// Find the log we marked, since logs_.back() may have changed

db/db_impl/db_impl_open.cc

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "file/filename.h"
1717
#include "file/read_write_util.h"
1818
#include "file/sst_file_manager_impl.h"
19+
#include "file/manual_flush_writable_file_writer.h"
1920
#include "file/writable_file_writer.h"
2021
#include "logging/logging.h"
2122
#include "monitoring/persistent_stats_history.h"
@@ -1739,12 +1740,20 @@ IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
17391740

17401741
const auto& listeners = immutable_db_options_.listeners;
17411742
FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
1742-
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
1743-
std::move(lfile), log_fname, opt_file_options,
1744-
immutable_db_options_.clock, io_tracer_, nullptr /* stats */, listeners,
1745-
nullptr, tmp_set.Contains(FileType::kWalFile),
1746-
tmp_set.Contains(FileType::kWalFile),
1747-
immutable_db_options_.manual_wal_flush));
1743+
std::unique_ptr<WritableFileWriter> file_writer;
1744+
if (immutable_db_options_.manual_wal_flush) {
1745+
file_writer.reset(new ManualFlushWritableFileWriter(
1746+
std::move(lfile), log_fname, opt_file_options,
1747+
immutable_db_options_.clock, io_tracer_, nullptr /* stats */,
1748+
listeners, nullptr, tmp_set.Contains(FileType::kWalFile),
1749+
tmp_set.Contains(FileType::kWalFile)));
1750+
} else {
1751+
file_writer.reset(new WritableFileWriter(
1752+
std::move(lfile), log_fname, opt_file_options,
1753+
immutable_db_options_.clock, io_tracer_, nullptr /* stats */,
1754+
listeners, nullptr, tmp_set.Contains(FileType::kWalFile),
1755+
tmp_set.Contains(FileType::kWalFile)));
1756+
}
17481757
*new_log = new log::Writer(std::move(file_writer), log_file_num,
17491758
immutable_db_options_.recycle_log_file_num > 0,
17501759
immutable_db_options_.manual_wal_flush,
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2+
// This source code is licensed under both the GPLv2 (found in the
3+
// COPYING file in the root directory) and Apache 2.0 License
4+
// (found in the LICENSE.Apache file in the root directory).
5+
6+
#include "file/manual_flush_writable_file_writer.h"
7+
8+
#include "rocksdb/io_status.h"
9+
10+
namespace ROCKSDB_NAMESPACE {
11+
12+
void ManualFlushWritableFileWriter::SwapBuffer() {
13+
// Double-buffering is only for buffered I/O; direct I/O has its own path.
14+
assert(!use_direct_io());
15+
assert(flush_buf_.CurrentSize() == 0);
16+
// Ping-pong: move buf_ → flush_buf_ for writing; reuse flush_buf_'s old
17+
// allocation (now empty) as the new buf_ for incoming Append()s.
18+
std::swap(buf_, flush_buf_);
19+
// Ensure buf_ has an allocation (first swap, or after flush_buf_ was
20+
// freshly constructed).
21+
if (buf_.Capacity() == 0) {
22+
buf_.Alignment(writable_file()->GetRequiredBufferAlignment());
23+
buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_));
24+
}
25+
}
26+
27+
IOStatus ManualFlushWritableFileWriter::FlushSwappedBuffer(
28+
Env::IOPriority op_rate_limiter_priority) {
29+
if (seen_error()) {
30+
return AssertFalseAndGetStatusForPrevError();
31+
}
32+
// Double-buffering is only for buffered I/O; direct I/O has its own path.
33+
assert(!use_direct_io());
34+
IOStatus s;
35+
36+
if (flush_buf_.CurrentSize() > 0) {
37+
// Use WriteToFile() rather than WriteBuffered() to avoid touching buf_,
38+
// which may be receiving new Append()s concurrently.
39+
s = WriteToFile(flush_buf_.BufferStart(), flush_buf_.CurrentSize(),
40+
op_rate_limiter_priority);
41+
if (!s.ok()) {
42+
set_seen_error();
43+
flush_buf_.Clear();
44+
return s;
45+
}
46+
}
47+
flush_buf_.Clear();
48+
49+
{
50+
IOOptions io_options;
51+
io_options.rate_limiter_priority = DecideRateLimiterPriority(
52+
writable_file()->GetIOPriority(), op_rate_limiter_priority);
53+
s = writable_file()->Flush(io_options, nullptr);
54+
}
55+
56+
if (!s.ok()) {
57+
set_seen_error();
58+
}
59+
return s;
60+
}
61+
62+
} // namespace ROCKSDB_NAMESPACE
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2+
// This source code is licensed under both the GPLv2 (found in the
3+
// COPYING file in the root directory) and Apache 2.0 License
4+
// (found in the LICENSE.Apache file in the root directory).
5+
6+
#pragma once
7+
#include "file/writable_file_writer.h"
8+
#include "util/aligned_buffer.h"
9+
10+
namespace ROCKSDB_NAMESPACE {
11+
12+
// ManualFlushWritableFileWriter extends WritableFileWriter for use with
13+
// manual_wal_flush mode. It suppresses all implicit I/O during Append()
14+
// and provides double-buffering so FlushWAL() can swap the write buffer
15+
// under the mutex and then write to disk without holding it.
16+
//
17+
// Double-buffering protocol:
18+
// 1. Call SwapBuffer() while holding the write mutex:
19+
// moves buf_ → flush_buf_, resets buf_ for new Append()s.
20+
// 2. Call FlushSwappedBuffer() WITHOUT holding the write mutex:
21+
// writes flush_buf_ to the OS and issues Flush(); does not touch buf_.
22+
class ManualFlushWritableFileWriter : public WritableFileWriter {
23+
public:
24+
// Inherits the same constructor signature as WritableFileWriter but always
25+
// operates in manual-flush mode (manual_flush parameter is not exposed).
26+
ManualFlushWritableFileWriter(
27+
std::unique_ptr<FSWritableFile>&& file,
28+
const std::string& file_name,
29+
const FileOptions& options,
30+
SystemClock* clock = nullptr,
31+
const std::shared_ptr<IOTracer>& io_tracer = nullptr,
32+
Statistics* stats = nullptr,
33+
const std::vector<std::shared_ptr<EventListener>>& listeners = {},
34+
FileChecksumGenFactory* file_checksum_gen_factory = nullptr,
35+
bool perform_data_verification = false,
36+
bool buffered_data_with_checksum = false)
37+
: WritableFileWriter(std::move(file), file_name, options, clock,
38+
io_tracer, stats, listeners,
39+
file_checksum_gen_factory,
40+
perform_data_verification,
41+
buffered_data_with_checksum) {}
42+
43+
// Swap buf_ into flush_buf_ so that FlushSwappedBuffer() can write it to
44+
// disk without holding the mutex while Append()s continue into the fresh
45+
// buf_. Must be called under the write mutex.
46+
void SwapBuffer();
47+
48+
// Write flush_buf_ to the OS and issue Flush(). Safe to call without any
49+
// mutex because it does not touch buf_. Must follow SwapBuffer().
50+
IOStatus FlushSwappedBuffer(
51+
Env::IOPriority op_rate_limiter_priority = Env::IO_TOTAL);
52+
53+
protected:
54+
// Allow buf_ to grow without bound so that Append() never triggers
55+
// implicit I/O. FlushWAL() is responsible for draining the buffer.
56+
size_t AppendBufferSizeLimit() const override { return size_t(-1); }
57+
58+
// Never flush implicitly during Append(); the caller is responsible for
59+
// explicitly calling SwapBuffer() + FlushSwappedBuffer().
60+
bool ShouldImplicitFlushOnAppend() const override { return false; }
61+
62+
private:
63+
// Second buffer for ping-pong double-buffering.
64+
// Non-empty only between SwapBuffer() and FlushSwappedBuffer() completing.
65+
AlignedBuffer flush_buf_;
66+
};
67+
68+
} // namespace ROCKSDB_NAMESPACE

file/writable_file_writer.cc

Lines changed: 4 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,7 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum,
7171

7272
// See whether we need to enlarge the buffer to avoid the flush
7373
if (buf_.Capacity() - buf_.CurrentSize() < left) {
74-
// When manual_flush_ is true, allow buffer to grow beyond
75-
// max_buffer_size_ to avoid any implicit I/O during Append().
76-
size_t limit = manual_flush_ ? size_t(-1) : max_buffer_size_;
74+
size_t limit = AppendBufferSizeLimit();
7775
for (size_t cap = buf_.Capacity();
7876
cap < limit; // There is still room to increase
7977
cap *= 2) {
@@ -87,7 +85,7 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum,
8785
}
8886

8987
// Flush only when buffered I/O
90-
if (!use_direct_io() && !manual_flush_ &&
88+
if (!use_direct_io() && ShouldImplicitFlushOnAppend() &&
9189
(buf_.Capacity() - buf_.CurrentSize()) < left) {
9290
if (buf_.CurrentSize() > 0) {
9391
s = Flush(op_rate_limiter_priority);
@@ -334,70 +332,12 @@ IOStatus WritableFileWriter::Close() {
334332
return s;
335333
}
336334

337-
void WritableFileWriter::SwapBuffer() {
338-
assert(manual_flush_);
339-
// Double-buffering is only for buffered I/O; direct I/O has its own path.
340-
assert(!use_direct_io());
341-
assert(flush_buf_.CurrentSize() == 0);
342-
// Ping-pong: reuse flush_buf_'s allocation as the new buf_,
343-
// and move the full buf_ to flush_buf_ for writing to disk.
344-
std::swap(buf_, flush_buf_);
345-
// If buf_ has no allocation (first swap or after move), allocate one.
346-
if (buf_.Capacity() == 0) {
347-
buf_.Alignment(writable_file_->GetRequiredBufferAlignment());
348-
buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_));
349-
}
350-
}
351-
352-
IOStatus WritableFileWriter::FlushSwappedBuffer(
353-
Env::IOPriority op_rate_limiter_priority) {
354-
if (seen_error()) {
355-
return AssertFalseAndGetStatusForPrevError();
356-
}
357-
assert(manual_flush_);
358-
// Double-buffering is only for buffered I/O; direct I/O has its own path.
359-
assert(!use_direct_io());
360-
IOStatus s;
361-
362-
if (flush_buf_.CurrentSize() > 0) {
363-
// Use WriteToFile() (not WriteBuffered()) to avoid touching buf_.
364-
s = WriteToFile(flush_buf_.BufferStart(), flush_buf_.CurrentSize(),
365-
op_rate_limiter_priority);
366-
if (!s.ok()) {
367-
set_seen_error();
368-
flush_buf_.Clear();
369-
return s;
370-
}
371-
}
372-
flush_buf_.Clear();
373-
374-
{
375-
IOOptions io_options;
376-
io_options.rate_limiter_priority =
377-
WritableFileWriter::DecideRateLimiterPriority(
378-
writable_file_->GetIOPriority(), op_rate_limiter_priority);
379-
s = writable_file_->Flush(io_options, nullptr);
380-
}
381-
382-
if (!s.ok()) {
383-
set_seen_error();
384-
}
385-
return s;
386-
}
387-
388335
// write out the cached data to the OS cache or storage if direct I/O
389336
// enabled
390337
IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) {
391338
if (seen_error()) {
392339
return AssertFalseAndGetStatusForPrevError();
393340
}
394-
// In manual_flush_ mode flush_buf_ is only non-empty while
395-
// FlushSwappedBuffer() is running without the write mutex. Callers that
396-
// subsequently call Flush() (e.g. Sync -> Flush) must wait for
397-
// IsFlushing() to clear before proceeding, so flush_buf_ must be empty by
398-
// the time we get here.
399-
assert(!manual_flush_ || flush_buf_.CurrentSize() == 0);
400-
401341
IOStatus s;
402342
TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Flush:0", REDUCE_ODDS2);
403343

@@ -708,31 +648,13 @@ IOStatus WritableFileWriter::WriteBuffered(
708648
if (seen_error()) {
709649
return AssertFalseAndGetStatusForPrevError();
710650
}
711-
// In manual_flush_ mode flush_buf_ is only non-empty while
712-
// FlushSwappedBuffer() is running concurrently. WriteBuffered() must not
713-
// be called in that window — it would race on the underlying file with
714-
// FlushSwappedBuffer()'s WriteToFile(). Callers must wait for IsFlushing()
715-
// to clear (via wal_io_cv_) before issuing any Flush/Sync that reaches here.
716-
assert(!manual_flush_ || flush_buf_.CurrentSize() == 0);
717651

718652
IOStatus s = WriteToFile(data, size, op_rate_limiter_priority);
653+
buf_.Size(0);
654+
buffered_data_crc32c_checksum_ = 0;
719655
if (!s.ok()) {
720-
// If writable_file_->Append() failed, then the data may or may not
721-
// exist in the underlying memory buffer, OS page cache, remote file
722-
// system's buffer, etc. If WritableFileWriter keeps the data in
723-
// buf_, then a future Close() or write retry may send the data to
724-
// the underlying file again. If the data does exist in the
725-
// underlying buffer and gets written to the file eventually despite
726-
// returning error, the file may end up with two duplicate pieces of
727-
// data. Therefore, clear the buf_ at the WritableFileWriter layer
728-
// and let caller determine error handling.
729-
buf_.Size(0);
730-
buffered_data_crc32c_checksum_ = 0;
731656
set_seen_error();
732-
return s;
733657
}
734-
buf_.Size(0);
735-
buffered_data_crc32c_checksum_ = 0;
736658
return s;
737659
}
738660

0 commit comments

Comments
 (0)