diff --git a/mooncake-store/include/file_interface.h b/mooncake-store/include/file_interface.h index 550d5670f9..c2ab05ce6e 100644 --- a/mooncake-store/include/file_interface.h +++ b/mooncake-store/include/file_interface.h @@ -174,6 +174,15 @@ class PosixFile : public StorageFile { }; #ifdef USE_URING +/** + * @class UringFile + * @brief StorageFile backed by a process-wide shared io_uring ring. + * + * All UringFile instances share a single SharedUringRing singleton, so + * construction and destruction only register/unregister an fd slot — no + * per-file io_uring_queue_init / io_uring_queue_exit (no mmap/munmap, + * no TLB shootdown). + */ class UringFile : public StorageFile { public: UringFile(const std::string &filename, int fd, unsigned queue_depth = 32, @@ -198,57 +207,42 @@ class UringFile : public StorageFile { size_t length, off_t offset = 0); - // Flush data to stable storage via - // io_uring_prep_fsync(IORING_FSYNC_DATASYNC). Must be called after write - // and before writing dependent metadata files. + // Batch read: submit up to 32 independent reads at once (each at its own + // offset) before waiting for completions, giving NVMe queue depth > 1. + // Use in BatchLoadBucket instead of per-key read_aligned() loops. + struct ReadDesc { + void *buf; + size_t len; + off_t off; + }; + tl::expected batch_read(const ReadDesc *descs, int cnt); + + // Flush data to stable storage via IORING_FSYNC_DATASYNC. + // Must be called after write_aligned and before writing dependent metadata. tl::expected datasync(); - // Buffer registration interface for high-performance I/O - // Register a single buffer with io_uring to avoid get_user_pages() overhead - // Returns true on success, false on failure - bool register_buffer(void *buffer, size_t length); + // Buffer registration — delegates to the shared ring (process-wide). + // Static variant: no file instance needed. Must be called once from a + // single thread before I/O threads begin. Other threads lazily pick up + // the registration on their first I/O call via ensure_buf_registered(). + static bool register_global_buffer(void *buffer, size_t length); + static void unregister_global_buffer(); - // Unregister previously registered buffer + bool register_buffer(void *buffer, size_t length); void unregister_buffer(); - - // Check if a buffer is currently registered - bool is_buffer_registered() const { return buffer_registered_; } + bool is_buffer_registered() const; private: - struct io_uring ring_; - bool ring_initialized_; - bool files_registered_; - bool buffer_registered_; - unsigned queue_depth_; bool use_direct_io_; - static constexpr size_t ALIGNMENT_ = - 4096; // O_DIRECT alignment requirement - - // Registered buffer info - void *registered_buffer_; - size_t registered_buffer_size_; - struct iovec registered_iovec_; - - /// Submit all pending SQEs and wait for exactly @p n completions. - /// Returns the total bytes transferred, or an error. - tl::expected submit_and_wait_n(int n); - - /// Calculate optimal chunk size for parallel I/O based on: - /// - total_len: remaining bytes to transfer - /// - available_depth: number of queue slots available - /// - min_chunk_size: minimum chunk size (must be power of 2) - /// Returns a power-of-2 chunk size that maximizes queue utilization. - size_t calculate_chunk_size(size_t total_len, unsigned available_depth, - size_t min_chunk_size) const; - - /// Allocate aligned buffer for O_DIRECT - void *alloc_aligned_buffer(size_t size) const; + static constexpr size_t ALIGNMENT_ = 4096; - /// Free aligned buffer + /// Allocate / free an O_DIRECT aligned bounce buffer. + void *alloc_aligned_buffer(size_t size) const; void free_aligned_buffer(void *ptr) const; - /// Mutex to serialize concurrent access to ring_ - mutable Mutex ring_mutex_; + /// Return true if @p buf falls entirely within the shared registered + /// buffer. + bool in_registered_buffer(const void *buf, size_t len) const; }; #endif // USE_URING diff --git a/mooncake-store/include/file_storage.h b/mooncake-store/include/file_storage.h index 8399b8fc11..921fb6b0db 100644 --- a/mooncake-store/include/file_storage.h +++ b/mooncake-store/include/file_storage.h @@ -14,30 +14,48 @@ class FileStorage { tl::expected Init(); + /** + * @brief Result of BatchGet operation containing batch_id and buffer + * pointers. + */ + struct BatchGetResult { + uint64_t batch_id; + std::vector pointers; + }; + /** * @brief Reads multiple key-value (KV) entries from local storage and * forwards them to a remote node. * @param keys List of keys to read from the local KV store * @param sizes Expected size in bytes for each value - * @return tl::expected, ErrorCode> indicating - * operation status. + * @return tl::expected containing batch_id and + * buffer pointers. */ - tl::expected, ErrorCode> BatchGet( + tl::expected BatchGet( const std::vector& keys, const std::vector& sizes); FileStorageConfig config_; + /** + * @brief Releases buffer associated with a specific batch_id. + * Called by remote client after transfer completion. + * @param batch_id The unique identifier of the batch to release + * @return true if batch was found and released, false otherwise + */ + bool ReleaseBuffer(uint64_t batch_id); + private: friend class FileStorageTest; struct AllocatedBatch { + uint64_t batch_id; std::vector handles; std::unordered_map slices; std::chrono::steady_clock::time_point lease_timeout; std::vector pointers; uint64_t total_size; - AllocatedBatch() = default; + AllocatedBatch() : batch_id(0), total_size(0) {} AllocatedBatch(AllocatedBatch&&) = default; AllocatedBatch& operator=(AllocatedBatch&&) = default; @@ -86,8 +104,9 @@ class FileStorage { std::shared_ptr storage_backend_; std::shared_ptr client_buffer_allocator_; mutable Mutex client_buffer_mutex_; - std::vector> GUARDED_BY( + std::unordered_map> GUARDED_BY( client_buffer_mutex_) client_buffer_allocated_batches_; + std::atomic next_batch_id_{1}; mutable Mutex offloading_mutex_; bool GUARDED_BY(offloading_mutex_) enable_offloading_; diff --git a/mooncake-store/include/master_service.h b/mooncake-store/include/master_service.h index cb236d9d22..c71be8e555 100644 --- a/mooncake-store/include/master_service.h +++ b/mooncake-store/include/master_service.h @@ -712,6 +712,11 @@ class MasterService { std::vector replica_ids; }; + struct OffloadingTask { + ReplicaID source_id; + std::chrono::system_clock::time_point start_time; + }; + static constexpr size_t kNumShards = 1024; // Number of metadata shards // Sharded metadata maps and their mutexes @@ -722,6 +727,8 @@ class MasterService { std::unordered_set processing_keys GUARDED_BY(mutex); std::unordered_map replication_tasks GUARDED_BY(mutex); + std::unordered_map offloading_tasks + GUARDED_BY(mutex); }; std::array metadata_shards_; @@ -783,7 +790,7 @@ class MasterService { void EvictionThreadFunc(); tl::expected PushOffloadingQueue(const std::string& key, - const Replica& replica); + Replica& replica); // Lease related members const uint64_t default_kv_lease_ttl_; // in milliseconds diff --git a/mooncake-store/include/pyclient.h b/mooncake-store/include/pyclient.h index 40f6b73325..ca17290bbc 100644 --- a/mooncake-store/include/pyclient.h +++ b/mooncake-store/include/pyclient.h @@ -73,6 +73,16 @@ class ClientRequester { const std::vector &keys, const std::vector sizes); + /** + * @brief Notifies remote FileStorage to release buffer after transfer + * completion. This is a fire-and-forget call - errors are logged but not + * propagated. + * @param client_addr Network address of the remote FileStorage service. + * @param batch_id The batch_id returned from batch_get_offload_object. + */ + void release_offload_buffer(const std::string &client_addr, + uint64_t batch_id); + private: /** * @brief A batch of allocated memory buffers, tracking both handles and diff --git a/mooncake-store/include/real_client.h b/mooncake-store/include/real_client.h index ea7c449b74..17ea674e71 100644 --- a/mooncake-store/include/real_client.h +++ b/mooncake-store/include/real_client.h @@ -485,6 +485,14 @@ class RealClient : public PyClient { batch_get_offload_object(const std::vector &keys, const std::vector &sizes); + /** + * @brief Releases buffer associated with a specific batch_id. + * Called by remote client after transfer completion. + * @param batch_id The unique identifier of the batch to release + * @return true if batch was found and released, false otherwise + */ + bool release_offload_buffer(uint64_t batch_id); + /** * @brief Retrieves multiple stored objects from a remote service. * @param target_rpc_service_addr Address of the remote RPC service (e.g., diff --git a/mooncake-store/include/rpc_types.h b/mooncake-store/include/rpc_types.h index acab02090f..24e0b8254a 100644 --- a/mooncake-store/include/rpc_types.h +++ b/mooncake-store/include/rpc_types.h @@ -144,19 +144,22 @@ struct TaskCompleteRequest { YLT_REFL(TaskCompleteRequest, id, status, message); struct BatchGetOffloadObjectResponse { + uint64_t batch_id; std::vector pointers; std::string transfer_engine_addr; uint64_t gc_ttl_ms; - BatchGetOffloadObjectResponse() = default; - BatchGetOffloadObjectResponse(std::vector&& pointers_param, + BatchGetOffloadObjectResponse() : batch_id(0), gc_ttl_ms(0) {} + BatchGetOffloadObjectResponse(uint64_t batch_id_param, + std::vector&& pointers_param, std::string transfer_engine_addr_param, uint64_t gc_ttl_ms_param) - : pointers(std::move(pointers_param)), + : batch_id(batch_id_param), + pointers(std::move(pointers_param)), transfer_engine_addr(std::move(transfer_engine_addr_param)), gc_ttl_ms(gc_ttl_ms_param) {} }; -YLT_REFL(BatchGetOffloadObjectResponse, pointers, transfer_engine_addr, - gc_ttl_ms); +YLT_REFL(BatchGetOffloadObjectResponse, batch_id, pointers, + transfer_engine_addr, gc_ttl_ms); } // namespace mooncake diff --git a/mooncake-store/include/storage_backend.h b/mooncake-store/include/storage_backend.h index e9f1f9275a..a94e8af087 100644 --- a/mooncake-store/include/storage_backend.h +++ b/mooncake-store/include/storage_backend.h @@ -903,7 +903,7 @@ class BucketStorageBackend : public StorageBackendInterface { // Aligned buffer for O_DIRECT I/O operations // We use a fixed-size buffer to avoid frequent allocations - static constexpr size_t kAlignedBufferSize = 16 * 1024 * 1024; // 16MB + static constexpr size_t kAlignedBufferSize = 32 * 1024 * 1024; // 16MB std::unique_ptr aligned_io_buffer_{nullptr, [](void*) {}}; /** diff --git a/mooncake-store/src/file_storage.cpp b/mooncake-store/src/file_storage.cpp index 65abc2f7bd..100ff0ddb8 100644 --- a/mooncake-store/src/file_storage.cpp +++ b/mooncake-store/src/file_storage.cpp @@ -163,39 +163,22 @@ FileStorage::FileStorage(const FileStorageConfig& config, storage_backend_ = create_storage_backend_result.value(); - // Register buffer with UringFile if using BucketStorageBackend + // Register the client buffer with the process-wide io_uring fixed-buffer + // mechanism. This must happen before any I/O threads start so that they + // can lazily pick up the registration on their first I/O call. #ifdef USE_URING - if (config.storage_backend_type == StorageBackendType::kBucket) { - auto bucket_backend = - std::dynamic_pointer_cast(storage_backend_); - if (bucket_backend) { - auto file_result = bucket_backend->GetFileInstance(); - if (file_result) { - auto file = file_result.value(); - auto uring_file = std::dynamic_pointer_cast(file); - if (uring_file) { - auto aligned_allocator = - std::static_pointer_cast( - client_buffer_allocator_); - if (aligned_allocator) { - void* base_ptr = aligned_allocator->get_base_pointer(); - size_t size = aligned_allocator->get_total_size(); - - if (uring_file->register_buffer(base_ptr, size)) { - LOG(INFO) - << "Successfully registered buffer with " - "UringFile: " - << "base=" << base_ptr << ", size=" << size; - } else { - LOG(WARNING) - << "Failed to register buffer with UringFile"; - } - } - } + if (config.use_uring) { + auto aligned_allocator = + std::static_pointer_cast( + client_buffer_allocator_); + if (aligned_allocator) { + void* base_ptr = aligned_allocator->get_base_pointer(); + size_t size = aligned_allocator->get_total_size(); + if (UringFile::register_global_buffer(base_ptr, size)) { + LOG(INFO) << "Successfully registered buffer with UringFile: " + << "base=" << base_ptr << ", size=" << size; } else { - LOG(WARNING) - << "Failed to get file instance for buffer registration: " - << file_result.error(); + LOG(WARNING) << "Failed to register buffer with UringFile"; } } } @@ -284,7 +267,7 @@ tl::expected FileStorage::Init() { return {}; } -tl::expected, ErrorCode> FileStorage::BatchGet( +tl::expected FileStorage::BatchGet( const std::vector& keys, const std::vector& sizes) { auto start_time = std::chrono::steady_clock::now(); auto allocate_res = AllocateBatch(keys, sizes); @@ -310,15 +293,19 @@ tl::expected, ErrorCode> FileStorage::BatchGet( } } + uint64_t batch_id = allocated_batch->batch_id; + BatchGetResult batch_result{batch_id, allocated_batch->pointers}; + MutexLocker locker(&client_buffer_mutex_); - client_buffer_allocated_batches_.emplace_back(std::move(allocated_batch)); + client_buffer_allocated_batches_.emplace(batch_id, + std::move(allocated_batch)); auto end_time = std::chrono::steady_clock::now(); auto elapsed_time = std::chrono::duration_cast( end_time - start_time) .count(); VLOG(1) << "Time taken for FileStorage::BatchGet: " << elapsed_time - << "us, key size: " << keys.size(); - return allocate_res.value<>()->pointers; + << "us, key size: " << keys.size() << ", batch_id: " << batch_id; + return batch_result; } tl::expected FileStorage::OffloadObjects( @@ -512,6 +499,7 @@ tl::expected, ErrorCode> FileStorage::AllocateBatch(const std::vector& keys, const std::vector& sizes) { auto result = std::make_shared(); + result->batch_id = next_batch_id_.fetch_add(1, std::memory_order_relaxed); std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now(); auto lease_timeout = @@ -537,9 +525,9 @@ FileStorage::AllocateBatch(const std::vector& keys, { MutexLocker locker(&client_buffer_mutex_); auto gc_now = std::chrono::steady_clock::now(); - auto it = client_buffer_allocated_batches_.begin(); - while (it != client_buffer_allocated_batches_.end()) { - if (gc_now >= (*it)->lease_timeout) { + for (auto it = client_buffer_allocated_batches_.begin(); + it != client_buffer_allocated_batches_.end();) { + if (gc_now >= it->second->lease_timeout) { it = client_buffer_allocated_batches_.erase(it); } else { ++it; @@ -582,14 +570,16 @@ void FileStorage::ClientBufferGCThreadFunc() { MutexLocker locker(&client_buffer_mutex_); if (!client_buffer_allocated_batches_.empty()) { auto now = std::chrono::steady_clock::now(); - client_buffer_allocated_batches_.erase( - std::remove_if( - client_buffer_allocated_batches_.begin(), - client_buffer_allocated_batches_.end(), - [&](const std::shared_ptr& batch) { - return now >= batch->lease_timeout; - }), - client_buffer_allocated_batches_.end()); + for (auto it = client_buffer_allocated_batches_.begin(); + it != client_buffer_allocated_batches_.end();) { + if (now >= it->second->lease_timeout) { + VLOG(1) << "GC releasing batch_id: " << it->first + << " (lease expired)"; + it = client_buffer_allocated_batches_.erase(it); + } else { + ++it; + } + } } } std::this_thread::sleep_for( @@ -598,4 +588,18 @@ void FileStorage::ClientBufferGCThreadFunc() { LOG(INFO) << "action=client_buffer_gc_thread_stopped"; } +bool FileStorage::ReleaseBuffer(uint64_t batch_id) { + MutexLocker locker(&client_buffer_mutex_); + auto it = client_buffer_allocated_batches_.find(batch_id); + if (it != client_buffer_allocated_batches_.end()) { + VLOG(1) << "Releasing buffer for batch_id: " << batch_id + << " (transfer completed)"; + client_buffer_allocated_batches_.erase(it); + return true; + } + VLOG(1) << "batch_id " << batch_id + << " not found (may have been GC'd already)"; + return false; +} + } // namespace mooncake \ No newline at end of file diff --git a/mooncake-store/src/master_service.cpp b/mooncake-store/src/master_service.cpp index 422c81e96f..8db202e666 100644 --- a/mooncake-store/src/master_service.cpp +++ b/mooncake-store/src/master_service.cpp @@ -268,10 +268,11 @@ void MasterService::ClearInvalidHandles() { while (it != shard->metadata.end()) { if (CleanupStaleHandles(it->second)) { // If the object is empty, we need to erase the iterator and - // also erase the key from processing_keys and - // replication_tasks. + // also erase the key from processing_keys, + // replication_tasks, and offloading_tasks. shard->processing_keys.erase(it->first); shard->replication_tasks.erase(it->first); + shard->offloading_tasks.erase(it->first); it = shard->metadata.erase(it); } else { ++it; @@ -796,10 +797,17 @@ auto MasterService::PutEnd(const UUID& client_id, const std::string& key, [](Replica& replica) { replica.mark_complete(); }); if (enable_offload_) { - metadata.VisitReplicas(&Replica::fn_is_completed, - [this, &key](const Replica& replica) { - PushOffloadingQueue(key, replica); - }); + auto& shard = accessor.GetShard(); + metadata.VisitReplicas( + &Replica::fn_is_completed, [this, &key, &shard](Replica& replica) { + auto result = PushOffloadingQueue(key, replica); + if (result) { + replica.inc_refcnt(); + shard->offloading_tasks.emplace( + key, OffloadingTask{replica.id(), + std::chrono::system_clock::now()}); + } + }); } // If the object is completed, remove it from the processing set. @@ -1642,6 +1650,26 @@ auto MasterService::NotifyOffloadSuccess( for (size_t i = 0; i < keys.size(); ++i) { const auto& key = keys[i]; const auto& metadata = metadatas[i]; + + // Release refcnt and clear offloading task. + { + MetadataAccessorRW accessor(this, key); + if (accessor.Exists()) { + auto& obj_metadata = accessor.Get(); + auto& shard = accessor.GetShard(); + auto task_it = shard->offloading_tasks.find(key); + if (task_it != shard->offloading_tasks.end()) { + auto source = + obj_metadata.GetReplicaByID(task_it->second.source_id); + if (source != nullptr) { + source->dec_refcnt(); + } + shard->offloading_tasks.erase(task_it); + } + } + } + + // Add LOCAL_DISK replica. Replica replica(client_id, metadata.data_size, metadata.transport_endpoint, ReplicaStatus::COMPLETE); auto res = AddReplica(client_id, key, replica); @@ -1655,7 +1683,7 @@ auto MasterService::NotifyOffloadSuccess( } tl::expected MasterService::PushOffloadingQueue( - const std::string& key, const Replica& replica) { + const std::string& key, Replica& replica) { const auto& segment_names = replica.get_segment_names(); if (segment_names.empty()) { return {}; @@ -1846,6 +1874,29 @@ void MasterService::DiscardExpiredProcessingReplicas( task_it = shard->replication_tasks.erase(task_it); } + // Part 3: Discard expired offloading operations. + for (auto task_it = shard->offloading_tasks.begin(); + task_it != shard->offloading_tasks.end();) { + const auto ttl = + task_it->second.start_time + put_start_release_timeout_sec_; + if (ttl > now) { + task_it++; + continue; + } + + auto metadata_it = shard->metadata.find(task_it->first); + if (metadata_it != shard->metadata.end()) { + auto source = + metadata_it->second.GetReplicaByID(task_it->second.source_id); + if (source != nullptr) { + source->dec_refcnt(); + } + } + + LOG(WARNING) << "Offloading task expired for key: " << task_it->first; + task_it = shard->offloading_tasks.erase(task_it); + } + if (!discarded_replicas.empty()) { std::lock_guard lock(discarded_replicas_mutex_); discarded_replicas_.splice(discarded_replicas_.end(), diff --git a/mooncake-store/src/real_client.cpp b/mooncake-store/src/real_client.cpp index c9381d7e55..ef5d9394a6 100644 --- a/mooncake-store/src/real_client.cpp +++ b/mooncake-store/src/real_client.cpp @@ -2656,10 +2656,20 @@ RealClient::batch_get_offload_object(const std::vector &keys, return tl::make_unexpected(result.error()); } return BatchGetOffloadObjectResponse( - std::move(result.value()), client_->GetTransportEndpoint(), + result.value().batch_id, std::move(result.value().pointers), + client_->GetTransportEndpoint(), file_storage_->config_.client_buffer_gc_ttl_ms); } +bool RealClient::release_offload_buffer(uint64_t batch_id) { + if (!file_storage_) { + LOG(WARNING) + << "release_offload_buffer called but file_storage_ is null"; + return false; + } + return file_storage_->ReleaseBuffer(batch_id); +} + tl::expected RealClient::batch_get_into_offload_object_internal( const std::string &target_rpc_service_addr, @@ -2690,7 +2700,14 @@ RealClient::batch_get_into_offload_object_internal( << elapsed_time << "ms, with target_rpc_service_addr: " << target_rpc_service_addr << ", key size: " << objects.size() - << "gc ttl: " << batchGetResp->gc_ttl_ms << "ms."; + << ", batch_id: " << batchGetResp->batch_id + << ", gc ttl: " << batchGetResp->gc_ttl_ms << "ms."; + + // Release buffer immediately after transfer completion (fire-and-forget) + // This allows early buffer reclamation instead of waiting for GC lease + client_requester_->release_offload_buffer(target_rpc_service_addr, + batchGetResp->batch_id); + if (!result) { LOG(ERROR) << "Batch get into offload object failed with error: " << result.error(); @@ -2729,6 +2746,23 @@ ClientRequester::batch_get_offload_object(const std::string &client_addr, return result; } +void ClientRequester::release_offload_buffer(const std::string &client_addr, + uint64_t batch_id) { + // Fire-and-forget: attempt to release buffer, log errors but don't block + auto result = invoke_rpc<&RealClient::release_offload_buffer, bool>( + client_addr, batch_id); + if (!result) { + // This is expected in some cases (e.g., network issues, buffer already + // GC'd) Log at INFO level since GC will eventually clean up anyway + VLOG(1) << "Failed to release_offload_buffer for batch_id=" << batch_id + << " at " << client_addr + << " (will be GC'd): " << result.error(); + } else { + VLOG(1) << "Successfully released buffer for batch_id=" << batch_id + << " at " << client_addr; + } +} + template tl::expected ClientRequester::invoke_rpc( const std::string &client_addr, Args &&...args) { diff --git a/mooncake-store/src/real_client_main.cpp b/mooncake-store/src/real_client_main.cpp index b5ef9e3702..cdb49a90e3 100644 --- a/mooncake-store/src/real_client_main.cpp +++ b/mooncake-store/src/real_client_main.cpp @@ -56,6 +56,7 @@ void RegisterClientRpcService(coro_rpc::coro_rpc_server &server, server.register_handler<&RealClient::query_task>(&real_client); server.register_handler<&RealClient::batch_get_offload_object>( &real_client); + server.register_handler<&RealClient::release_offload_buffer>(&real_client); } } // namespace mooncake diff --git a/mooncake-store/src/storage_backend.cpp b/mooncake-store/src/storage_backend.cpp index 0552bbe2f7..4dce96541b 100644 --- a/mooncake-store/src/storage_backend.cpp +++ b/mooncake-store/src/storage_backend.cpp @@ -757,6 +757,15 @@ std::unique_ptr StorageBackend::create_file( break; } +#ifdef USE_URING + // Use O_DIRECT only for reads: write latency is not sensitive in this + // scenario, and O_DIRECT writes require 4096-byte alignment padding which + // corrupts meta file parsing and wastes disk space on data files. + if (use_uring_ && mode == FileMode::Read) { + flags |= O_DIRECT; + } +#endif + int fd = open(path.c_str(), flags | access_mode, 0644); if (fd < 0) { return nullptr; @@ -776,7 +785,11 @@ std::unique_ptr StorageBackend::create_file( #ifdef USE_URING if (use_uring_) { - return std::make_unique(path, fd, 32, true); + // use_direct_io mirrors the O_DIRECT flag: true for reads, false for + // writes. This avoids unnecessary bounce-buffer allocation on the write + // path while keeping correct alignment enforcement on the read path. + bool use_direct_io = (mode == FileMode::Read); + return std::make_unique(path, fd, 32, use_direct_io); } #endif return std::make_unique(path, fd); @@ -2277,8 +2290,10 @@ BucketStorageBackend::OpenFile(const std::string& path, FileMode mode) const { } #ifdef USE_URING - // Add O_DIRECT flag when using uring for direct I/O - if (file_storage_config_.use_uring) { + // Use O_DIRECT only for reads: write latency is not sensitive in this + // scenario, and O_DIRECT writes require 4096-byte alignment padding which + // corrupts meta file parsing and wastes disk space on data files. + if (file_storage_config_.use_uring && mode == FileMode::Read) { flags |= O_DIRECT; } #endif @@ -2290,7 +2305,7 @@ BucketStorageBackend::OpenFile(const std::string& path, FileMode mode) const { return tl::make_unexpected(ErrorCode::FILE_OPEN_FAIL); } #ifdef USE_URING - if (file_storage_config_.use_uring) { + if (file_storage_config_.use_uring && mode == FileMode::Read) { return std::make_unique(path, fd, 32, true); } #endif diff --git a/mooncake-store/src/uring_file.cpp b/mooncake-store/src/uring_file.cpp index 3a53976af4..870eeba2f0 100644 --- a/mooncake-store/src/uring_file.cpp +++ b/mooncake-store/src/uring_file.cpp @@ -1,13 +1,15 @@ #ifdef USE_URING #include +#include #include #include #include #include +#include #include #include -#include +#include #include #include @@ -16,854 +18,724 @@ namespace mooncake { -// --------------------------------------------------------------------------- -// Construction / Destruction -// --------------------------------------------------------------------------- +// ============================================================================ +// GlobalBufInfo — process-wide buffer registration state. +// +// register_buffer() is called once from the storage-backend init path. +// Each thread-local ring picks up the registration lazily on first I/O. +// ============================================================================ +struct GlobalBufInfo { + std::atomic base{nullptr}; + std::atomic size{0}; +}; +static GlobalBufInfo g_buf; + +// ============================================================================ +// SharedUringRing — per-thread io_uring ring. +// +// Design goals: +// - One ring per *thread*: eliminates all mutex contention between threads. +// Different threads can perform I/O fully concurrently. +// - Within one thread: no lock needed (single owner), so multiple SQEs can +// be batched before waiting, exposing NVMe queue depth > 1. +// - Buffer registration: stored globally in g_buf, registered lazily on +// each thread-local ring on first use (ensure_buf_registered()). +// - File-descriptor registration (IOSQE_FIXED_FILE) intentionally omitted: +// the per-I/O fdget() overhead (~50 ns) is negligible compared to the +// mutex contention that the old global ring imposed (> 1 ms per read). +// ============================================================================ +class SharedUringRing { + public: + static constexpr unsigned QUEUE_DEPTH = 32; + static constexpr size_t MIN_CHUNK = 4096; + + static SharedUringRing& instance() { + thread_local SharedUringRing tl_ring; + return tl_ring; + } + + SharedUringRing(const SharedUringRing&) = delete; + SharedUringRing& operator=(const SharedUringRing&) = delete; + + bool is_initialized() const { return initialized_; } + bool is_buffer_registered() const { return buf_registered_; } + void* buffer_base() const { return buf_base_; } + size_t buffer_size() const { return buf_size_; } + + // ----------------------------------------------------------------- + // Buffer registration + // ----------------------------------------------------------------- + + // Register a buffer on THIS thread's ring. Called lazily from each + // thread before the first fixed-buffer I/O. + bool ensure_buf_registered() { + if (buf_registered_) return true; + if (buf_register_failed_) return false; // don't retry after failure + if (!initialized_) return false; + void* b = g_buf.base.load(std::memory_order_acquire); + size_t s = g_buf.size.load(std::memory_order_acquire); + if (!b || !s) return false; + struct iovec iov{b, s}; + int ret = io_uring_register_buffers(&ring_, &iov, 1); + if (ret < 0) { + int err = -ret; + LOG(WARNING) << "[SharedUringRing] io_uring_register_buffers failed" + << " errno=" << err << " (" << strerror(err) << ")" + << " buf=" << b << " size=" << s + << " pages=" << (s >> 12) + << " — falling back to non-fixed-buffer I/O"; + buf_register_failed_ = true; + return false; + } + buf_registered_ = true; + buf_base_ = b; + buf_size_ = s; + LOG(INFO) << "[SharedUringRing] tid registered buffer addr=" << b + << " size=" << s; + return true; + } -UringFile::UringFile(const std::string &filename, int fd, unsigned queue_depth, - bool use_direct_io) - : StorageFile(filename, fd), - ring_initialized_(false), - files_registered_(false), - buffer_registered_(false), - queue_depth_(queue_depth), - use_direct_io_(use_direct_io), - registered_buffer_(nullptr), - registered_buffer_size_(0) { - if (fd < 0) { - error_code_ = ErrorCode::FILE_INVALID_HANDLE; - return; + void unregister_buf_local() { + if (!initialized_ || !buf_registered_) return; + io_uring_unregister_buffers(&ring_); + buf_registered_ = false; + buf_base_ = nullptr; + buf_size_ = 0; } - int ret = io_uring_queue_init(queue_depth, &ring_, 0); - if (ret < 0) { - LOG(ERROR) << "Failed to initialize io_uring: " << strerror(-ret); - error_code_ = ErrorCode::FILE_INVALID_HANDLE; - return; + // ----------------------------------------------------------------- + // I/O primitives (no mutex — caller is the sole owner of this ring) + // ----------------------------------------------------------------- + + tl::expected read(int fd, void* buf, size_t len, + off_t off) { + ensure_buf_registered(); + bool fix = in_registered_buf(buf, len); + return submit_rw(/*write=*/false, fd, buf, len, off, fix); } - ring_initialized_ = true; - // Register the file descriptor to avoid per-I/O fd lookup overhead. - ret = io_uring_register_files(&ring_, &fd_, 1); - if (ret >= 0) { - files_registered_ = true; - } else { - LOG(WARNING) << "[UringFile] io_uring_register_files failed: " - << strerror(-ret) << " (continuing without)"; + tl::expected write(int fd, const void* buf, size_t len, + off_t off) { + return submit_rw(/*write=*/true, fd, const_cast(buf), len, off, + /*use_fixed_buf=*/false); } - if (use_direct_io_) { - LOG(INFO) << "[UringFile] O_DIRECT mode enabled for " << filename; + tl::expected vector_read(int fd, const iovec* iovs, + int cnt, off_t off) { + return submit_vector(/*write=*/false, fd, iovs, cnt, off); } -} -UringFile::~UringFile() { - auto dtor_start = std::chrono::steady_clock::now(); + tl::expected vector_write(int fd, const iovec* iovs, + int cnt, off_t off) { + return submit_vector(/*write=*/true, fd, iovs, cnt, off); + } + + // Descriptor for one independently-addressed read in a batch. + struct ReadDesc { + void* buf; + size_t len; + off_t off; + }; + + /// Submit up to QUEUE_DEPTH reads at once (each at its own offset), then + /// collect completions. Repeat until all @p cnt descs are done. + /// This gives the NVMe device queue depth > 1 within a single thread. + tl::expected batch_read(int fd, const ReadDesc* descs, + int cnt) { + ensure_buf_registered(); + size_t total = 0; + int remaining = cnt; + int idx = 0; + + while (remaining > 0) { + int batch = std::min(remaining, static_cast(QUEUE_DEPTH)); - if (ring_initialized_) { - if (buffer_registered_) { - io_uring_unregister_buffers(&ring_); + for (int i = 0; i < batch; ++i) { + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + if (!sqe) { + LOG(ERROR) << "[SharedUringRing] SQ full (batch_read)"; + return tl::make_unexpected(ErrorCode::FILE_READ_FAIL); + } + const auto& d = descs[idx + i]; + if (buf_registered_ && in_registered_buf(d.buf, d.len)) + io_uring_prep_read_fixed(sqe, fd, d.buf, d.len, d.off, 0); + else + io_uring_prep_read(sqe, fd, d.buf, d.len, d.off); + } + + auto res = collect(batch); + if (!res) return res; + total += res.value(); + idx += batch; + remaining -= batch; } - if (files_registered_) { - io_uring_unregister_files(&ring_); + return total; + } + + /// Issue IORING_FSYNC_DATASYNC. Blocks until complete. + tl::expected fsync(int fd) { + if (!initialized_) + return tl::make_unexpected(ErrorCode::INTERNAL_ERROR); + + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + if (!sqe) { + LOG(ERROR) << "[SharedUringRing] SQ full (fsync)"; + return tl::make_unexpected(ErrorCode::INTERNAL_ERROR); } + io_uring_prep_fsync(sqe, fd, IORING_FSYNC_DATASYNC); + + auto res = collect(1); + if (!res) return tl::make_unexpected(res.error()); + return {}; + } + + private: + // ----------------------------------------------------------------- + // Construction / destruction + // ----------------------------------------------------------------- + + SharedUringRing() { + int ret = io_uring_queue_init(QUEUE_DEPTH, &ring_, 0); + if (ret < 0) { + LOG(ERROR) << "[SharedUringRing] io_uring_queue_init failed: " + << strerror(-ret); + return; + } + initialized_ = true; + LOG(INFO) << "[SharedUringRing] thread-local ring initialised " + "queue_depth=" + << QUEUE_DEPTH; + } + + ~SharedUringRing() { + if (!initialized_) return; + if (buf_registered_) io_uring_unregister_buffers(&ring_); io_uring_queue_exit(&ring_); } + // ----------------------------------------------------------------- + // Internal helpers + // ----------------------------------------------------------------- + + bool in_registered_buf(const void* buf, size_t len) const { + if (!buf_registered_ || !buf_base_ || !buf_size_) return false; + uintptr_t buf_addr = reinterpret_cast(buf); + uintptr_t rb = reinterpret_cast(buf_base_); + return buf_addr >= rb && (buf_addr + len) <= (rb + buf_size_); + } + + static size_t next_pow2(size_t n) { + if (n == 0) return 1; + --n; + n |= n >> 1; + n |= n >> 2; + n |= n >> 4; + n |= n >> 8; + n |= n >> 16; + n |= n >> 32; + return n + 1; + } + + static size_t calc_chunk(size_t remaining, unsigned depth) { + size_t s = (remaining + depth - 1) / depth; + s = next_pow2(s); + return std::max(s, MIN_CHUNK); + } + + // Drain exactly @expected CQEs and accumulate bytes. + tl::expected collect(int expected) { + int ret = io_uring_submit_and_wait(&ring_, expected); + if (ret < 0) { + LOG(ERROR) << "[SharedUringRing] io_uring_submit_and_wait: " + << strerror(-ret); + return tl::make_unexpected(ErrorCode::INTERNAL_ERROR); + } + size_t total = 0; + bool err = false; + unsigned head, cnt = 0; + struct io_uring_cqe* cqe; + io_uring_for_each_cqe(&ring_, head, cqe) { + if (cqe->res < 0) { + LOG(ERROR) << "[SharedUringRing] CQE error: " + << strerror(-cqe->res); + err = true; + } else { + total += static_cast(cqe->res); + } + ++cnt; + } + io_uring_cq_advance(&ring_, cnt); + if (err) return tl::make_unexpected(ErrorCode::INTERNAL_ERROR); + return total; + } + + // Chunked contiguous read or write. + tl::expected submit_rw(bool is_write, int fd, void* buf, + size_t len, off_t off, + bool use_fixed_buf) { + const ErrorCode err_code = + is_write ? ErrorCode::FILE_WRITE_FAIL : ErrorCode::FILE_READ_FAIL; + const bool fix_buf = (use_fixed_buf && buf_registered_); + + char* ptr = static_cast(buf); + size_t total = 0; + size_t remaining = len; + off_t cur = off; + + while (remaining > 0) { + size_t cs = calc_chunk(remaining, QUEUE_DEPTH); + unsigned n = std::min( + static_cast((remaining + cs - 1) / cs), QUEUE_DEPTH); + + for (unsigned i = 0; i < n; ++i) { + size_t chunk = std::min(cs, remaining); + + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + if (!sqe) { + LOG(ERROR) << "[SharedUringRing] SQ full"; + return tl::make_unexpected(err_code); + } + + if (is_write) { + if (fix_buf) + io_uring_prep_write_fixed(sqe, fd, ptr, chunk, cur, 0); + else + io_uring_prep_write(sqe, fd, ptr, chunk, cur); + } else { + if (fix_buf) + io_uring_prep_read_fixed(sqe, fd, ptr, chunk, cur, 0); + else + io_uring_prep_read(sqe, fd, ptr, chunk, cur); + } + + ptr += chunk; + cur += static_cast(chunk); + remaining -= chunk; + if (remaining == 0) break; + } + + auto res = collect(static_cast(n)); + if (!res) return res; + total += res.value(); + if (!is_write && res.value() == 0) break; // read EOF + if (is_write && res.value() == 0) { + LOG(ERROR) << "[SharedUringRing] zero bytes written"; + return tl::make_unexpected(ErrorCode::FILE_WRITE_FAIL); + } + } + return total; + } + + // Scatter/gather read or write (one SQE per iovec, sequential offsets). + tl::expected submit_vector(bool is_write, int fd, + const iovec* iovs, int cnt, + off_t off) { + const ErrorCode err_code = + is_write ? ErrorCode::FILE_WRITE_FAIL : ErrorCode::FILE_READ_FAIL; + + size_t total = 0; + off_t cur = off; + int remaining = cnt; + int idx = 0; + + while (remaining > 0) { + int batch = std::min(remaining, static_cast(QUEUE_DEPTH)); + + for (int i = 0; i < batch; ++i) { + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + if (!sqe) { + LOG(ERROR) << "[SharedUringRing] SQ full (vector)"; + return tl::make_unexpected(err_code); + } + + if (is_write) + io_uring_prep_write(sqe, fd, iovs[idx].iov_base, + iovs[idx].iov_len, cur); + else + io_uring_prep_read(sqe, fd, iovs[idx].iov_base, + iovs[idx].iov_len, cur); + + cur += static_cast(iovs[idx].iov_len); + ++idx; + } + + auto res = collect(batch); + if (!res) return res; + total += res.value(); + remaining -= batch; + } + return total; + } + + // ----------------------------------------------------------------- + // Data members + // ----------------------------------------------------------------- + struct io_uring ring_{}; + bool initialized_ = false; + + bool buf_registered_ = false; + bool buf_register_failed_ = false; // set on first failure; skip retries + void* buf_base_ = nullptr; + size_t buf_size_ = 0; +}; + +// ============================================================================ +// UringFile — thin wrapper over SharedUringRing +// ============================================================================ + +UringFile::UringFile(const std::string& filename, int fd, + unsigned /*queue_depth*/, bool use_direct_io) + : StorageFile(filename, fd), use_direct_io_(use_direct_io) { + if (fd < 0) { + error_code_ = ErrorCode::FILE_INVALID_HANDLE; + return; + } + if (!SharedUringRing::instance().is_initialized()) { + LOG(WARNING) << "[UringFile] thread-local ring not available for " + << filename; + } + if (use_direct_io_) { + LOG(INFO) << "[UringFile] O_DIRECT mode enabled for " << filename; + } +} + +UringFile::~UringFile() { + auto t0 = std::chrono::steady_clock::now(); + if (fd_ >= 0) { if (close(fd_) != 0) { - LOG(WARNING) << "Failed to close file: " << filename_; + LOG(WARNING) << "[UringFile] close failed: " << filename_; } if (error_code_ == ErrorCode::FILE_WRITE_FAIL) { - if (::unlink(filename_.c_str()) == -1) { - LOG(ERROR) << "Failed to delete corrupted file: " << filename_; - } else { - LOG(INFO) << "Deleted corrupted file: " << filename_; - } + if (::unlink(filename_.c_str()) == -1) + LOG(ERROR) << "[UringFile] failed to delete corrupted file: " + << filename_; + else + LOG(INFO) << "[UringFile] deleted corrupted file: " + << filename_; } } fd_ = -1; - auto dtor_end = std::chrono::steady_clock::now(); - auto dtor_elapsed_ms = - std::chrono::duration_cast(dtor_end - - dtor_start) - .count(); - if (dtor_elapsed_ms > 1) { - LOG(WARNING) << "[UringFile::~UringFile] cleanup took " - << dtor_elapsed_ms << "ms for " << filename_; + auto elapsed_ms = std::chrono::duration_cast( + std::chrono::steady_clock::now() - t0) + .count(); + if (elapsed_ms > 1) { + LOG(WARNING) << "[UringFile::~UringFile] cleanup took " << elapsed_ms + << "ms for " << filename_; } } // --------------------------------------------------------------------------- -// Internal helpers +// Helpers // --------------------------------------------------------------------------- -namespace { -inline size_t next_power_of_2(size_t n) { - if (n == 0) return 1; - n--; - n |= n >> 1; - n |= n >> 2; - n |= n >> 4; - n |= n >> 8; - n |= n >> 16; - n |= n >> 32; - return n + 1; -} -} // namespace - -void *UringFile::alloc_aligned_buffer(size_t size) const { - // Align size up to ALIGNMENT_ - size_t aligned_size = ((size + ALIGNMENT_ - 1) / ALIGNMENT_) * ALIGNMENT_; - void *ptr = nullptr; - if (posix_memalign(&ptr, ALIGNMENT_, aligned_size) != 0) { - LOG(ERROR) << "[UringFile] Failed to allocate aligned buffer of size " - << aligned_size; +void* UringFile::alloc_aligned_buffer(size_t size) const { + size_t aligned = ((size + ALIGNMENT_ - 1) / ALIGNMENT_) * ALIGNMENT_; + void* ptr = nullptr; + if (posix_memalign(&ptr, ALIGNMENT_, aligned) != 0) { + LOG(ERROR) << "[UringFile] posix_memalign(" << aligned << ") failed"; return nullptr; } return ptr; } -void UringFile::free_aligned_buffer(void *ptr) const { - if (ptr) { - free(ptr); - } +void UringFile::free_aligned_buffer(void* ptr) const { + if (ptr) free(ptr); } -/** - * @brief Calculate optimal chunk size for parallel I/O - * - * Strategy: - * 1. Chunk size must be a power of 2 - * 2. Chunk size = max(min_chunk_size, optimal_size) - * 3. optimal_size = the size that can fully utilize remaining queue depth - * - * @param total_len Total length to transfer - * @param available_depth Available queue depth slots - * @param min_chunk_size Minimum chunk size (must be power of 2) - * @return Optimal chunk size (power of 2) - */ -size_t UringFile::calculate_chunk_size(size_t total_len, - unsigned available_depth, - size_t min_chunk_size) const { - if (total_len == 0 || available_depth == 0) { - return min_chunk_size; - } - - // Calculate the size that would fully utilize available queue depth - size_t optimal_size = (total_len + available_depth - 1) / available_depth; - - // Round up to next power of 2 - optimal_size = next_power_of_2(optimal_size); - - // Return max(min_chunk_size, optimal_size) - return std::max(min_chunk_size, optimal_size); +bool UringFile::in_registered_buffer(const void* buf, size_t len) const { + auto& r = SharedUringRing::instance(); + if (!r.is_buffer_registered()) return false; + uintptr_t buf_addr = reinterpret_cast(buf); + uintptr_t rb = reinterpret_cast(r.buffer_base()); + return buf_addr >= rb && (buf_addr + len) <= (rb + r.buffer_size()); } // --------------------------------------------------------------------------- -// Internal helpers - I/O submission +// write // --------------------------------------------------------------------------- -tl::expected UringFile::submit_and_wait_n(int n) { - int ret = io_uring_submit_and_wait(&ring_, n); - if (ret < 0) { - LOG(ERROR) << "[UringFile] io_uring_submit_and_wait failed: " - << strerror(-ret); - return make_error(ErrorCode::INTERNAL_ERROR); - } - - size_t total_bytes = 0; - bool has_error = false; - unsigned head; - unsigned count = 0; - struct io_uring_cqe *cqe; - - io_uring_for_each_cqe(&ring_, head, cqe) { - if (cqe->res < 0) { - LOG(ERROR) << "[UringFile] I/O failed: " << strerror(-cqe->res); - has_error = true; - } else { - total_bytes += static_cast(cqe->res); - } - count++; - } - io_uring_cq_advance(&ring_, count); - - if (has_error) { - return make_error(ErrorCode::INTERNAL_ERROR); - } - return total_bytes; -} - -// --------------------------------------------------------------------------- -// Write implementation with intelligent chunking -// --------------------------------------------------------------------------- - -tl::expected UringFile::write(const std::string &buffer, +tl::expected UringFile::write(const std::string& buffer, size_t length) { return write(std::span(buffer.data(), length), length); } tl::expected UringFile::write(std::span data, size_t length) { - if (fd_ < 0 || !ring_initialized_) { - return make_error(ErrorCode::FILE_NOT_FOUND); - } - if (length == 0) { - return make_error(ErrorCode::FILE_INVALID_BUFFER); - } - - constexpr size_t MIN_CHUNK_SIZE = 4096; + if (fd_ < 0) return make_error(ErrorCode::FILE_NOT_FOUND); + if (length == 0) return make_error(ErrorCode::FILE_INVALID_BUFFER); - // If using O_DIRECT, allocate aligned buffer and copy data - void *aligned_buffer = nullptr; - const char *source_ptr = data.data(); - size_t actual_length = length; + void* bounce = nullptr; + const char* src = data.data(); + size_t write_len = length; if (use_direct_io_) { - // Align length up to ALIGNMENT_ - actual_length = ((length + ALIGNMENT_ - 1) / ALIGNMENT_) * ALIGNMENT_; - aligned_buffer = alloc_aligned_buffer(actual_length); - if (!aligned_buffer) { - return make_error(ErrorCode::FILE_WRITE_FAIL); - } - // Copy data to aligned buffer and zero-pad if necessary - std::memcpy(aligned_buffer, data.data(), length); - if (actual_length > length) { - std::memset(static_cast(aligned_buffer) + length, 0, - actual_length - length); - } - source_ptr = static_cast(aligned_buffer); - } - - size_t total_written = 0; - const char *ptr = source_ptr; - size_t remaining = actual_length; - off_t current_offset = 0; - int target_fd = files_registered_ ? 0 : fd_; - - while (remaining > 0) { - size_t chunk_size = - calculate_chunk_size(remaining, queue_depth_, MIN_CHUNK_SIZE); - unsigned num_chunks = std::min( - static_cast((remaining + chunk_size - 1) / chunk_size), - queue_depth_); - - for (unsigned i = 0; i < num_chunks; i++) { - size_t this_chunk = std::min(chunk_size, remaining); - - struct io_uring_sqe *sqe = io_uring_get_sqe(&ring_); - if (!sqe) { - LOG(ERROR) << "[UringFile::write] Failed to get SQE"; - if (aligned_buffer) free_aligned_buffer(aligned_buffer); - return make_error(ErrorCode::FILE_WRITE_FAIL); - } - - io_uring_prep_write(sqe, target_fd, ptr, this_chunk, - current_offset); - if (files_registered_) { - io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); - } - - ptr += this_chunk; - current_offset += this_chunk; - remaining -= this_chunk; - if (remaining == 0) break; - } - - auto result = submit_and_wait_n(num_chunks); - if (!result) { - if (aligned_buffer) free_aligned_buffer(aligned_buffer); - return make_error(ErrorCode::FILE_WRITE_FAIL); - } - - size_t bytes_written = result.value(); - if (bytes_written == 0) { - LOG(ERROR) << "[UringFile::write] Zero bytes written"; - if (aligned_buffer) free_aligned_buffer(aligned_buffer); - return make_error(ErrorCode::FILE_WRITE_FAIL); - } - - total_written += bytes_written; + write_len = ((length + ALIGNMENT_ - 1) / ALIGNMENT_) * ALIGNMENT_; + bounce = alloc_aligned_buffer(write_len); + if (!bounce) return make_error(ErrorCode::FILE_WRITE_FAIL); + std::memcpy(bounce, data.data(), length); + if (write_len > length) + std::memset(static_cast(bounce) + length, 0, + write_len - length); + src = static_cast(bounce); } - if (aligned_buffer) { - free_aligned_buffer(aligned_buffer); - } + auto res = SharedUringRing::instance().write(fd_, src, write_len, 0); - // For O_DIRECT, we may have written more than requested (due to alignment) - // but return the original length - if (total_written < length) { - LOG(WARNING) << "[UringFile::write] Incomplete write: " << total_written - << " / " << length; - return make_error(ErrorCode::FILE_WRITE_FAIL); - } + if (bounce) free_aligned_buffer(bounce); - return length; // Return original length, not aligned length + if (!res) return make_error(res.error()); + if (res.value() < length) + return make_error(ErrorCode::FILE_WRITE_FAIL); + return length; } // --------------------------------------------------------------------------- -// Read implementation with intelligent chunking +// read // --------------------------------------------------------------------------- -tl::expected UringFile::read(std::string &buffer, +tl::expected UringFile::read(std::string& buffer, size_t length) { - if (fd_ < 0 || !ring_initialized_) { - return make_error(ErrorCode::FILE_NOT_FOUND); - } - if (length == 0) { - return make_error(ErrorCode::FILE_INVALID_BUFFER); - } - - constexpr size_t MIN_CHUNK_SIZE = 4096; + if (fd_ < 0) return make_error(ErrorCode::FILE_NOT_FOUND); + if (length == 0) return make_error(ErrorCode::FILE_INVALID_BUFFER); - // If using O_DIRECT, allocate aligned buffer - void *aligned_buffer = nullptr; - char *read_ptr = nullptr; - size_t actual_length = length; + void* bounce = nullptr; + char* read_ptr = nullptr; + size_t read_len = length; if (use_direct_io_) { - // Align length up to ALIGNMENT_ - actual_length = ((length + ALIGNMENT_ - 1) / ALIGNMENT_) * ALIGNMENT_; - aligned_buffer = alloc_aligned_buffer(actual_length); - if (!aligned_buffer) { - return make_error(ErrorCode::FILE_READ_FAIL); - } - read_ptr = static_cast(aligned_buffer); + read_len = ((length + ALIGNMENT_ - 1) / ALIGNMENT_) * ALIGNMENT_; + bounce = alloc_aligned_buffer(read_len); + if (!bounce) return make_error(ErrorCode::FILE_READ_FAIL); + read_ptr = static_cast(bounce); } else { buffer.resize(length); read_ptr = buffer.data(); } - char *ptr = read_ptr; - size_t remaining = actual_length; - size_t total_read = 0; - off_t current_offset = 0; - int target_fd = files_registered_ ? 0 : fd_; - - while (remaining > 0) { - size_t chunk_size = - calculate_chunk_size(remaining, queue_depth_, MIN_CHUNK_SIZE); - unsigned num_chunks = std::min( - static_cast((remaining + chunk_size - 1) / chunk_size), - queue_depth_); - - size_t batch_size = 0; - for (unsigned i = 0; i < num_chunks; i++) { - size_t this_chunk = std::min(chunk_size, remaining); - - struct io_uring_sqe *sqe = io_uring_get_sqe(&ring_); - if (!sqe) { - LOG(ERROR) << "[UringFile::read] Failed to get SQE"; - if (aligned_buffer) { - free_aligned_buffer(aligned_buffer); - } else { - buffer.clear(); - } - return make_error(ErrorCode::FILE_READ_FAIL); - } - - io_uring_prep_read(sqe, target_fd, ptr, this_chunk, current_offset); - if (files_registered_) { - io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); - } - - ptr += this_chunk; - current_offset += this_chunk; - batch_size += this_chunk; - remaining -= this_chunk; - if (remaining == 0) break; - } - - auto result = submit_and_wait_n(num_chunks); - if (!result) { - if (aligned_buffer) { - free_aligned_buffer(aligned_buffer); - } else { - buffer.clear(); - } - return make_error(ErrorCode::FILE_READ_FAIL); - } - - size_t bytes_read = result.value(); - if (bytes_read == 0) { - break; // EOF - } - - total_read += bytes_read; - if (bytes_read < batch_size) { - break; // EOF - } - } + auto res = SharedUringRing::instance().read(fd_, read_ptr, read_len, 0); - // Copy from aligned buffer to std::string if using O_DIRECT if (use_direct_io_) { - size_t actual_read = std::min(total_read, length); - buffer.assign(static_cast(aligned_buffer), actual_read); - free_aligned_buffer(aligned_buffer); - total_read = actual_read; - } else { - buffer.resize(total_read); - } - - if (total_read != length && total_read == 0) { - return make_error(ErrorCode::FILE_READ_FAIL); + if (res) { + size_t actual = std::min(res.value(), length); + buffer.assign(static_cast(bounce), actual); + } + free_aligned_buffer(bounce); } - return total_read; + if (!res) return make_error(res.error()); + size_t got = use_direct_io_ ? std::min(res.value(), length) : res.value(); + if (!use_direct_io_) buffer.resize(got); + if (got == 0) return make_error(ErrorCode::FILE_READ_FAIL); + return got; } // --------------------------------------------------------------------------- -// Zero-copy aligned I/O interface for O_DIRECT +// write_aligned / read_aligned // --------------------------------------------------------------------------- -tl::expected UringFile::read_aligned(void *buffer, - size_t length, - off_t offset) { - MutexLocker lock(&ring_mutex_); - if (fd_ < 0 || !ring_initialized_) { - return make_error(ErrorCode::FILE_NOT_FOUND); - } - if (length == 0 || buffer == nullptr) { +tl::expected UringFile::write_aligned(const void* buffer, + size_t length, + off_t offset) { + if (fd_ < 0) return make_error(ErrorCode::FILE_NOT_FOUND); + if (!buffer || length == 0) return make_error(ErrorCode::FILE_INVALID_BUFFER); - } - // Verify alignment when using O_DIRECT if (use_direct_io_) { - if (reinterpret_cast(buffer) % ALIGNMENT_ != 0) { - LOG(ERROR) << "[UringFile::read_aligned] Buffer not aligned to " - << ALIGNMENT_; + if (reinterpret_cast(buffer) % ALIGNMENT_) return make_error(ErrorCode::FILE_INVALID_BUFFER); - } - if (length % ALIGNMENT_ != 0) { - LOG(ERROR) << "[UringFile::read_aligned] Length not aligned to " - << ALIGNMENT_; + if (length % ALIGNMENT_ || offset % ALIGNMENT_) return make_error(ErrorCode::FILE_INVALID_BUFFER); - } - if (offset % ALIGNMENT_ != 0) { - LOG(ERROR) << "[UringFile::read_aligned] Offset not aligned to " - << ALIGNMENT_; - return make_error(ErrorCode::FILE_INVALID_BUFFER); - } } - constexpr size_t MIN_CHUNK_SIZE = 4096; - - char *ptr = static_cast(buffer); - size_t remaining = length; - size_t total_read = 0; - off_t current_offset = offset; - int target_fd = files_registered_ ? 0 : fd_; - - // Check if this buffer falls within the registered buffer range - bool use_fixed_buffer = - (buffer_registered_ && - reinterpret_cast(buffer) >= - reinterpret_cast(registered_buffer_) && - reinterpret_cast(buffer) + length <= - reinterpret_cast(registered_buffer_) + - registered_buffer_size_); - - while (remaining > 0) { - size_t chunk_size = - calculate_chunk_size(remaining, queue_depth_, MIN_CHUNK_SIZE); - unsigned num_chunks = std::min( - static_cast((remaining + chunk_size - 1) / chunk_size), - queue_depth_); - - size_t batch_size = 0; - for (unsigned i = 0; i < num_chunks; i++) { - size_t this_chunk = std::min(chunk_size, remaining); - - struct io_uring_sqe *sqe = io_uring_get_sqe(&ring_); - if (!sqe) { - LOG(ERROR) << "[UringFile::read_aligned] Failed to get SQE"; - return make_error(ErrorCode::FILE_READ_FAIL); - } - - if (use_fixed_buffer) { - // Use registered fixed buffer - avoids get_user_pages() - // overhead - io_uring_prep_read_fixed(sqe, target_fd, ptr, this_chunk, - current_offset, 0); - } else { - // Use regular buffer - io_uring_prep_read(sqe, target_fd, ptr, this_chunk, - current_offset); - } - - if (files_registered_) { - io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); - } - - ptr += this_chunk; - current_offset += this_chunk; - batch_size += this_chunk; - remaining -= this_chunk; - if (remaining == 0) break; - } - - auto result = submit_and_wait_n(num_chunks); - if (!result) { - return make_error(ErrorCode::FILE_READ_FAIL); - } - - size_t bytes_read = result.value(); - if (bytes_read == 0) { - break; // EOF - } - - total_read += bytes_read; - if (bytes_read < batch_size) { - break; // EOF - } - } - - return total_read; + return SharedUringRing::instance().write(fd_, buffer, length, offset); } -tl::expected UringFile::write_aligned(const void *buffer, - size_t length, - off_t offset) { - if (fd_ < 0 || !ring_initialized_) { - return make_error(ErrorCode::FILE_NOT_FOUND); - } - if (length == 0 || buffer == nullptr) { +tl::expected UringFile::read_aligned(void* buffer, + size_t length, + off_t offset) { + if (fd_ < 0) return make_error(ErrorCode::FILE_NOT_FOUND); + if (!buffer || length == 0) return make_error(ErrorCode::FILE_INVALID_BUFFER); - } - // Verify alignment when using O_DIRECT if (use_direct_io_) { - if (reinterpret_cast(buffer) % ALIGNMENT_ != 0) { - LOG(ERROR) << "[UringFile::write_aligned] Buffer not aligned to " - << ALIGNMENT_; + if (reinterpret_cast(buffer) % ALIGNMENT_) return make_error(ErrorCode::FILE_INVALID_BUFFER); - } - if (length % ALIGNMENT_ != 0) { - LOG(ERROR) << "[UringFile::write_aligned] Length not aligned to " - << ALIGNMENT_; + if (length % ALIGNMENT_ || offset % ALIGNMENT_) return make_error(ErrorCode::FILE_INVALID_BUFFER); - } - if (offset % ALIGNMENT_ != 0) { - LOG(ERROR) << "[UringFile::write_aligned] Offset not aligned to " - << ALIGNMENT_; - return make_error(ErrorCode::FILE_INVALID_BUFFER); - } } - constexpr size_t MIN_CHUNK_SIZE = 4096; - - const char *ptr = static_cast(buffer); - size_t remaining = length; - size_t total_written = 0; - off_t current_offset = offset; - int target_fd = files_registered_ ? 0 : fd_; - - // Check if this buffer falls within the registered buffer range - bool use_fixed_buffer = - (buffer_registered_ && - reinterpret_cast(buffer) >= - reinterpret_cast(registered_buffer_) && - reinterpret_cast(buffer) + length <= - reinterpret_cast(registered_buffer_) + - registered_buffer_size_); - - while (remaining > 0) { - size_t chunk_size = - calculate_chunk_size(remaining, queue_depth_, MIN_CHUNK_SIZE); - unsigned num_chunks = std::min( - static_cast((remaining + chunk_size - 1) / chunk_size), - queue_depth_); - - for (unsigned i = 0; i < num_chunks; i++) { - size_t this_chunk = std::min(chunk_size, remaining); - - struct io_uring_sqe *sqe = io_uring_get_sqe(&ring_); - if (!sqe) { - LOG(ERROR) << "[UringFile::write_aligned] Failed to get SQE"; - return make_error(ErrorCode::FILE_WRITE_FAIL); - } - - if (use_fixed_buffer) { - // Use registered fixed buffer - avoids get_user_pages() - // overhead - io_uring_prep_write_fixed(sqe, target_fd, ptr, this_chunk, - current_offset, 0); - } else { - // Use regular buffer - io_uring_prep_write(sqe, target_fd, ptr, this_chunk, - current_offset); - } - - if (files_registered_) { - io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); - } - - ptr += this_chunk; - current_offset += this_chunk; - remaining -= this_chunk; - if (remaining == 0) break; - } - - auto result = submit_and_wait_n(num_chunks); - if (!result) { - return make_error(ErrorCode::FILE_WRITE_FAIL); - } - - size_t bytes_written = result.value(); - if (bytes_written == 0) { - LOG(ERROR) << "[UringFile::write_aligned] Zero bytes written"; - return make_error(ErrorCode::FILE_WRITE_FAIL); - } + return SharedUringRing::instance().read(fd_, buffer, length, offset); +} - total_written += bytes_written; - } +// --------------------------------------------------------------------------- +// batch_read — submit multiple independent reads in one ring submission +// --------------------------------------------------------------------------- - if (total_written != length) { - LOG(WARNING) << "[UringFile::write_aligned] Incomplete write: " - << total_written << " / " << length; - return make_error(ErrorCode::FILE_WRITE_FAIL); - } +tl::expected UringFile::batch_read(const ReadDesc* descs, + int cnt) { + if (fd_ < 0) return make_error(ErrorCode::FILE_NOT_FOUND); + if (!descs || cnt <= 0) + return make_error(ErrorCode::FILE_INVALID_BUFFER); - return total_written; + // Map UringFile::ReadDesc → SharedUringRing::ReadDesc (same layout, but + // ensure they stay in sync if either changes). + static_assert(sizeof(ReadDesc) == sizeof(SharedUringRing::ReadDesc), + "ReadDesc layout mismatch"); + const auto* ring_descs = + reinterpret_cast(descs); + return SharedUringRing::instance().batch_read(fd_, ring_descs, cnt); } // --------------------------------------------------------------------------- -// Vectored I/O — multi-SQE parallel submission -// -// Each iovec is submitted as an independent SQE so the NVMe device can -// serve them concurrently, matching the pipelining approach used in the -// benchmark. When iovcnt > queue_depth_, requests are batched. +// vector_write / vector_read // --------------------------------------------------------------------------- -tl::expected UringFile::vector_write(const iovec *iov, +tl::expected UringFile::vector_write(const iovec* iov, int iovcnt, off_t offset) { - if (fd_ < 0 || !ring_initialized_) { - return make_error(ErrorCode::FILE_NOT_FOUND); - } - - int target_fd = files_registered_ ? 0 : fd_; - size_t total_written = 0; - off_t cur_offset = offset; - int remaining = iovcnt; - int idx = 0; - - while (remaining > 0) { - int batch = std::min(remaining, static_cast(queue_depth_)); - - for (int i = 0; i < batch; i++) { - struct io_uring_sqe *sqe = io_uring_get_sqe(&ring_); - if (!sqe) { - LOG(ERROR) << "[UringFile::vector_write] Failed to get SQE"; - return make_error(ErrorCode::FILE_WRITE_FAIL); - } - - io_uring_prep_write(sqe, target_fd, iov[idx].iov_base, - iov[idx].iov_len, cur_offset); - if (files_registered_) { - io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); - } - - cur_offset += static_cast(iov[idx].iov_len); - idx++; - } - - auto result = submit_and_wait_n(batch); - if (!result) { - return make_error(ErrorCode::FILE_WRITE_FAIL); - } - total_written += result.value(); - remaining -= batch; - } - - return total_written; + if (fd_ < 0) return make_error(ErrorCode::FILE_NOT_FOUND); + auto start = std::chrono::steady_clock::now(); + auto res = + SharedUringRing::instance().vector_write(fd_, iov, iovcnt, offset); + auto us = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start) + .count(); + if (us > 1000) + LOG(INFO) << "[UringFile::vector_write] fd=" << fd_ + << " iovcnt=" << iovcnt << " time=" << us << "us"; + return res; } -tl::expected UringFile::vector_read(const iovec *iov, +tl::expected UringFile::vector_read(const iovec* iov, int iovcnt, off_t offset) { - if (fd_ < 0 || !ring_initialized_) { - return make_error(ErrorCode::FILE_NOT_FOUND); - } + if (fd_ < 0) return make_error(ErrorCode::FILE_NOT_FOUND); size_t expected_bytes = 0; - for (int i = 0; i < iovcnt; ++i) { - expected_bytes += iov[i].iov_len; - } - + for (int i = 0; i < iovcnt; ++i) expected_bytes += iov[i].iov_len; auto start = std::chrono::steady_clock::now(); - int target_fd = files_registered_ ? 0 : fd_; - size_t total_read = 0; - off_t cur_offset = offset; - int remaining = iovcnt; - int idx = 0; - - while (remaining > 0) { - int batch = std::min(remaining, static_cast(queue_depth_)); - - for (int i = 0; i < batch; i++) { - struct io_uring_sqe *sqe = io_uring_get_sqe(&ring_); - if (!sqe) { - LOG(ERROR) << "[UringFile::vector_read] Failed to get SQE"; - return make_error(ErrorCode::FILE_READ_FAIL); - } - - io_uring_prep_read(sqe, target_fd, iov[idx].iov_base, - iov[idx].iov_len, cur_offset); - if (files_registered_) { - io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); - } - - cur_offset += static_cast(iov[idx].iov_len); - idx++; - } + auto res = + SharedUringRing::instance().vector_read(fd_, iov, iovcnt, offset); + + auto us = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start) + .count(); + if (us > 1000 || expected_bytes > 1024 * 1024) { + double mbps = (us > 0) + ? (static_cast(expected_bytes) / 1048576.0) / + (static_cast(us) / 1e6) + : 0; + LOG(INFO) << "[UringFile::vector_read] fd=" << fd_ + << " iovcnt=" << iovcnt << " bytes=" << expected_bytes + << " time=" << us << "us (" << (us / 1000.0) << "ms)" + << " throughput=" << mbps << "MB/s"; + } - auto result = submit_and_wait_n(batch); - if (!result) { - auto end = std::chrono::steady_clock::now(); - auto elapsed_us = - std::chrono::duration_cast(end - - start) - .count(); - LOG(ERROR) << "[UringFile::vector_read] FAILED: fd=" << fd_ - << ", offset=" << offset << ", iovcnt=" << iovcnt - << ", expected_bytes=" << expected_bytes - << ", time=" << elapsed_us << "us"; - return make_error(ErrorCode::FILE_READ_FAIL); - } - total_read += result.value(); - remaining -= batch; + if (!res) { + LOG(ERROR) << "[UringFile::vector_read] FAILED fd=" << fd_ + << " offset=" << offset << " iovcnt=" << iovcnt + << " expected=" << expected_bytes; } + return res; +} - auto end = std::chrono::steady_clock::now(); - auto elapsed_us = - std::chrono::duration_cast(end - start) - .count(); +// --------------------------------------------------------------------------- +// datasync +// --------------------------------------------------------------------------- - if (elapsed_us > 1000 || expected_bytes > 1024 * 1024) { - double throughput_mbps = - (elapsed_us > 0) - ? (static_cast(total_read) / (1024.0 * 1024.0)) / - (static_cast(elapsed_us) / 1000000.0) - : 0; - LOG(INFO) << "[UringFile::vector_read] fd=" << fd_ - << ", offset=" << offset << ", iovcnt=" << iovcnt - << ", bytes=" << total_read << ", time=" << elapsed_us - << "us (" << (elapsed_us / 1000.0) << "ms)" - << ", throughput=" << throughput_mbps << "MB/s"; +tl::expected UringFile::datasync() { + if (fd_ < 0) return make_error(ErrorCode::FILE_NOT_FOUND); + auto res = SharedUringRing::instance().fsync(fd_); + if (!res) { + LOG(ERROR) << "[UringFile::datasync] fsync failed for: " << filename_; + return make_error(ErrorCode::FILE_WRITE_FAIL); } - - return total_read; + return {}; } // --------------------------------------------------------------------------- -// Buffer registration for high-performance I/O +// Buffer registration // --------------------------------------------------------------------------- -bool UringFile::register_buffer(void *buffer, size_t length) { - if (!ring_initialized_) { - LOG(ERROR) << "[UringFile::register_buffer] io_uring not initialized"; +bool UringFile::register_global_buffer(void* buffer, size_t length) { + if (!buffer || length == 0) { + LOG(ERROR) + << "[UringFile::register_global_buffer] invalid buffer or length"; return false; } - - if (buffer_registered_) { - LOG(WARNING) << "[UringFile::register_buffer] Buffer already " - "registered, unregistering first"; - unregister_buffer(); + // Disable Transparent Huge Pages on this region before pinning. + // io_uring uses FOLL_LONGTERM to pin pages; on kernel 5.15 the kernel + // must split any 2MB THP into 4KB pages before long-term pinning, which + // can fail (ENOMEM) when huge pages are in use or memory is fragmented. + // MADV_NOHUGEPAGE prevents the kernel from backing this range with THPs, + // making pin_user_pages() reliable regardless of system THP policy. + if (madvise(buffer, length, MADV_NOHUGEPAGE) != 0) { + LOG(WARNING) + << "[UringFile::register_global_buffer] madvise(NOHUGEPAGE)" + << " failed errno=" << errno << " (" << strerror(errno) + << ") — continuing anyway"; + } + g_buf.base.store(buffer, std::memory_order_release); + g_buf.size.store(length, std::memory_order_release); + bool ok = SharedUringRing::instance().ensure_buf_registered(); + if (ok) { + LOG(INFO) << "[UringFile::register_global_buffer] registered" + << " addr=" << buffer << " size=" << length + << " pages=" << (length >> 12); + } else { + LOG(WARNING) + << "[UringFile::register_global_buffer] registration failed" + << " addr=" << buffer << " size=" << length + << " — I/O will use regular (non-fixed-buffer) io_uring," + << " which is correct but slightly less optimal"; } + return ok; +} + +void UringFile::unregister_global_buffer() { + g_buf.base.store(nullptr, std::memory_order_release); + g_buf.size.store(0, std::memory_order_release); + SharedUringRing::instance().unregister_buf_local(); +} +bool UringFile::register_buffer(void* buffer, size_t length) { if (!buffer || length == 0) { - LOG(ERROR) << "[UringFile::register_buffer] Invalid buffer or length"; + LOG(ERROR) << "[UringFile::register_buffer] invalid buffer or length"; return false; } - - // Verify alignment when using O_DIRECT if (use_direct_io_) { - if (reinterpret_cast(buffer) % ALIGNMENT_ != 0) { - LOG(ERROR) << "[UringFile::register_buffer] Buffer not aligned to " - << ALIGNMENT_; + if (reinterpret_cast(buffer) % ALIGNMENT_) { + LOG(ERROR) << "[UringFile::register_buffer] buffer not aligned"; return false; } - if (length % ALIGNMENT_ != 0) { - LOG(ERROR) << "[UringFile::register_buffer] Length not aligned to " - << ALIGNMENT_; + if (length % ALIGNMENT_) { + LOG(ERROR) << "[UringFile::register_buffer] length not aligned"; return false; } } - - registered_iovec_.iov_base = buffer; - registered_iovec_.iov_len = length; - - int ret = io_uring_register_buffers(&ring_, ®istered_iovec_, 1); - if (ret < 0) { - LOG(ERROR) - << "[UringFile::register_buffer] io_uring_register_buffers failed: " - << strerror(-ret); - return false; - } - - registered_buffer_ = buffer; - registered_buffer_size_ = length; - buffer_registered_ = true; - - LOG(INFO) << "[UringFile::register_buffer] Successfully registered buffer: " - << "addr=" << buffer << ", size=" << length; - return true; + // Publish globally; each thread-local ring registers lazily on first I/O. + g_buf.base.store(buffer, std::memory_order_release); + g_buf.size.store(length, std::memory_order_release); + // Register on the calling thread's ring immediately. + bool ok = SharedUringRing::instance().ensure_buf_registered(); + LOG(INFO) << "[UringFile::register_buffer] addr=" << buffer + << " size=" << length << " calling-thread-registered=" << ok; + return ok; } void UringFile::unregister_buffer() { - if (!buffer_registered_) { - return; - } - - if (ring_initialized_) { - int ret = io_uring_unregister_buffers(&ring_); - if (ret < 0) { - LOG(ERROR) << "[UringFile::unregister_buffer] " - "io_uring_unregister_buffers failed: " - << strerror(-ret); - } else { - LOG(INFO) << "[UringFile::unregister_buffer] Successfully " - "unregistered buffer"; - } - } - - registered_buffer_ = nullptr; - registered_buffer_size_ = 0; - buffer_registered_ = false; + // Clear the global state so no new thread picks it up. + g_buf.base.store(nullptr, std::memory_order_release); + g_buf.size.store(0, std::memory_order_release); + // Unregister on the calling thread's ring. + SharedUringRing::instance().unregister_buf_local(); } -// --------------------------------------------------------------------------- -// datasync — flush bucket data to stable storage before metadata write -// --------------------------------------------------------------------------- - -tl::expected UringFile::datasync() { - if (fd_ < 0 || !ring_initialized_) { - return make_error(ErrorCode::FILE_NOT_FOUND); - } - - int target_fd = files_registered_ ? 0 : fd_; - - struct io_uring_sqe *sqe = io_uring_get_sqe(&ring_); - if (!sqe) { - LOG(ERROR) << "[UringFile::datasync] Failed to get SQE"; - return make_error(ErrorCode::INTERNAL_ERROR); - } - - io_uring_prep_fsync(sqe, target_fd, IORING_FSYNC_DATASYNC); - if (files_registered_) { - io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); - } - - auto result = submit_and_wait_n(1); - if (!result) { - LOG(ERROR) << "[UringFile::datasync] fsync failed for: " << filename_; - return make_error(ErrorCode::FILE_WRITE_FAIL); - } - return {}; +bool UringFile::is_buffer_registered() const { + return SharedUringRing::instance().is_buffer_registered(); } } // namespace mooncake