Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
4f2c2bf
feat(Store): Support get local ssd object
zhuxinjie-nz Dec 12, 2025
33dd3bd
Merge remote-tracking branch 'origin/main' into get-offload-object-dev
zhuxinjie-nz Dec 12, 2025
0ce2f48
Merge branch 'main' into get-offload-object-dev
zhuxinjie-nz Jan 7, 2026
597354e
feat(Store): Optimize get local SSD objects by changing from transfer…
zhuxinjie-nz Jan 13, 2026
af5fe64
fix(Store): compilation issues in FileStorageTest.
zhuxinjie-nz Jan 13, 2026
b568e2f
Merge remote-tracking branch 'origin/main' into get-offload-object-dev
zhuxinjie-nz Jan 15, 2026
b2926d4
fix(Store): resolve the conflicts
zhuxinjie-nz Jan 15, 2026
c1ef296
Merge remote-tracking branch 'origin/main' into get-offload-object-dev
zhuxinjie-nz Jan 16, 2026
4bd4a1a
fix(Store): compilation issues
zhuxinjie-nz Jan 16, 2026
ec9f085
fixed misplacint in rpc address
zhangzuo21 Jan 22, 2026
e3bbaa8
fixed missing values of results in batch_get_into_internal
zhangzuo21 Jan 22, 2026
521da5c
fixed locking in gc thread
zhangzuo21 Jan 22, 2026
0674a18
format
zhangzuo21 Jan 22, 2026
234ad88
uring file opt
zhangzuo21 Feb 5, 2026
8fa691d
Merge branch 'main' of https://github.com/kvcache-ai/Mooncake into ge…
zhangzuo21 Feb 5, 2026
9f9ca29
add notification for complete transfer of ssd objects
zhangzuo21 Feb 6, 2026
aa47cad
add read mutex
zhangzuo21 Feb 6, 2026
c93173f
remote temp file for registration
zhangzuo21 Feb 6, 2026
e24d51d
Merge branch 'main' of https://github.com/kvcache-ai/Mooncake into pr…
zhangzuo21 Feb 11, 2026
832aa92
add asio support for tent
zhangzuo21 Feb 11, 2026
abb6122
add protection from eviction during offloading
zhangzuo21 Feb 11, 2026
a2d3843
bug fix
zhangzuo21 Feb 11, 2026
92148ad
bug fix
zhangzuo21 Feb 19, 2026
9ed7d4e
create a shared uring ring
zhangzuo21 Feb 21, 2026
d9af31e
Merge branch 'main' of https://github.com/kvcache-ai/Mooncake into pr…
zhangzuo21 Feb 25, 2026
a44fad3
revert changes in TE
zhangzuo21 Feb 25, 2026
49c365d
add datasync after write
zhangzuo21 Feb 25, 2026
6a80296
format
zhangzuo21 Feb 25, 2026
ed24821
ring per thread
zhangzuo21 Feb 25, 2026
576e8a1
Merge branch 'pr1500' into better-loadbuffer-management
zhangzuo21 Feb 25, 2026
9fa2206
better register buffer
zhangzuo21 Feb 26, 2026
9c3cc15
bug fix
zhangzuo21 Feb 26, 2026
76b35e9
format
zhangzuo21 Feb 26, 2026
089486d
Merge branch 'main' of https://github.com/kvcache-ai/Mooncake into be…
zhangzuo21 Feb 26, 2026
8c50860
modified OffloadingTask::start_time to system_clock
zhangzuo21 Mar 5, 2026
eed10b6
spell check
zhangzuo21 Mar 9, 2026
ebbe987
Merge branch 'main' of https://github.com/kvcache-ai/Mooncake into be…
zhangzuo21 Mar 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 35 additions & 41 deletions mooncake-store/include/file_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,15 @@ class PosixFile : public StorageFile {
};

#ifdef USE_URING
/**
* @class UringFile
* @brief StorageFile backed by a process-wide shared io_uring ring.
*
* All UringFile instances share a single SharedUringRing singleton, so
* construction and destruction only register/unregister an fd slot — no
* per-file io_uring_queue_init / io_uring_queue_exit (no mmap/munmap,
* no TLB shootdown).
*/
class UringFile : public StorageFile {
public:
UringFile(const std::string &filename, int fd, unsigned queue_depth = 32,
Expand All @@ -198,57 +207,42 @@ class UringFile : public StorageFile {
size_t length,
off_t offset = 0);

// Flush data to stable storage via
// io_uring_prep_fsync(IORING_FSYNC_DATASYNC). Must be called after write
// and before writing dependent metadata files.
// Batch read: submit up to 32 independent reads at once (each at its own
// offset) before waiting for completions, giving NVMe queue depth > 1.
// Use in BatchLoadBucket instead of per-key read_aligned() loops.
struct ReadDesc {
void *buf;
size_t len;
off_t off;
};
tl::expected<size_t, ErrorCode> batch_read(const ReadDesc *descs, int cnt);

// Flush data to stable storage via IORING_FSYNC_DATASYNC.
// Must be called after write_aligned and before writing dependent metadata.
tl::expected<void, ErrorCode> datasync();

// Buffer registration interface for high-performance I/O
// Register a single buffer with io_uring to avoid get_user_pages() overhead
// Returns true on success, false on failure
bool register_buffer(void *buffer, size_t length);
// Buffer registration — delegates to the shared ring (process-wide).
// Static variant: no file instance needed. Must be called once from a
// single thread before I/O threads begin. Other threads lazily pick up
// the registration on their first I/O call via ensure_buf_registered().
static bool register_global_buffer(void *buffer, size_t length);
static void unregister_global_buffer();

// Unregister previously registered buffer
bool register_buffer(void *buffer, size_t length);
void unregister_buffer();

// Check if a buffer is currently registered
bool is_buffer_registered() const { return buffer_registered_; }
bool is_buffer_registered() const;

private:
struct io_uring ring_;
bool ring_initialized_;
bool files_registered_;
bool buffer_registered_;
unsigned queue_depth_;
bool use_direct_io_;
static constexpr size_t ALIGNMENT_ =
4096; // O_DIRECT alignment requirement

// Registered buffer info
void *registered_buffer_;
size_t registered_buffer_size_;
struct iovec registered_iovec_;

/// Submit all pending SQEs and wait for exactly @p n completions.
/// Returns the total bytes transferred, or an error.
tl::expected<size_t, ErrorCode> submit_and_wait_n(int n);

/// Calculate optimal chunk size for parallel I/O based on:
/// - total_len: remaining bytes to transfer
/// - available_depth: number of queue slots available
/// - min_chunk_size: minimum chunk size (must be power of 2)
/// Returns a power-of-2 chunk size that maximizes queue utilization.
size_t calculate_chunk_size(size_t total_len, unsigned available_depth,
size_t min_chunk_size) const;

/// Allocate aligned buffer for O_DIRECT
void *alloc_aligned_buffer(size_t size) const;
static constexpr size_t ALIGNMENT_ = 4096;

/// Free aligned buffer
/// Allocate / free an O_DIRECT aligned bounce buffer.
void *alloc_aligned_buffer(size_t size) const;
void free_aligned_buffer(void *ptr) const;

/// Mutex to serialize concurrent access to ring_
mutable Mutex ring_mutex_;
/// Return true if @p buf falls entirely within the shared registered
/// buffer.
bool in_registered_buffer(const void *buf, size_t len) const;
};
#endif // USE_URING

Expand Down
29 changes: 24 additions & 5 deletions mooncake-store/include/file_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,48 @@ class FileStorage {

tl::expected<void, ErrorCode> Init();

/**
* @brief Result of BatchGet operation containing batch_id and buffer
* pointers.
*/
struct BatchGetResult {
uint64_t batch_id;
std::vector<uint64_t> pointers;
};

/**
* @brief Reads multiple key-value (KV) entries from local storage and
* forwards them to a remote node.
* @param keys List of keys to read from the local KV store
* @param sizes Expected size in bytes for each value
* @return tl::expected<std::vector<uint64_t>, ErrorCode> indicating
* operation status.
* @return tl::expected<BatchGetResult, ErrorCode> containing batch_id and
* buffer pointers.
*/
tl::expected<std::vector<uint64_t>, ErrorCode> BatchGet(
tl::expected<BatchGetResult, ErrorCode> BatchGet(
const std::vector<std::string>& keys,
const std::vector<int64_t>& sizes);

FileStorageConfig config_;

/**
* @brief Releases buffer associated with a specific batch_id.
* Called by remote client after transfer completion.
* @param batch_id The unique identifier of the batch to release
* @return true if batch was found and released, false otherwise
*/
bool ReleaseBuffer(uint64_t batch_id);

private:
friend class FileStorageTest;
struct AllocatedBatch {
uint64_t batch_id;
std::vector<BufferHandle> handles;
std::unordered_map<std::string, Slice> slices;
std::chrono::steady_clock::time_point lease_timeout;
std::vector<uint64_t> pointers;
uint64_t total_size;

AllocatedBatch() = default;
AllocatedBatch() : batch_id(0), total_size(0) {}
AllocatedBatch(AllocatedBatch&&) = default;
AllocatedBatch& operator=(AllocatedBatch&&) = default;

Expand Down Expand Up @@ -86,8 +104,9 @@ class FileStorage {
std::shared_ptr<StorageBackendInterface> storage_backend_;
std::shared_ptr<ClientBufferAllocator> client_buffer_allocator_;
mutable Mutex client_buffer_mutex_;
std::vector<std::shared_ptr<AllocatedBatch>> GUARDED_BY(
std::unordered_map<uint64_t, std::shared_ptr<AllocatedBatch>> GUARDED_BY(
client_buffer_mutex_) client_buffer_allocated_batches_;
std::atomic<uint64_t> next_batch_id_{1};

mutable Mutex offloading_mutex_;
bool GUARDED_BY(offloading_mutex_) enable_offloading_;
Expand Down
9 changes: 8 additions & 1 deletion mooncake-store/include/master_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,11 @@ class MasterService {
std::vector<ReplicaID> replica_ids;
};

struct OffloadingTask {
ReplicaID source_id;
std::chrono::system_clock::time_point start_time;
};

static constexpr size_t kNumShards = 1024; // Number of metadata shards

// Sharded metadata maps and their mutexes
Expand All @@ -722,6 +727,8 @@ class MasterService {
std::unordered_set<std::string> processing_keys GUARDED_BY(mutex);
std::unordered_map<std::string, const ReplicationTask> replication_tasks
GUARDED_BY(mutex);
std::unordered_map<std::string, const OffloadingTask> offloading_tasks
GUARDED_BY(mutex);
};
std::array<MetadataShard, kNumShards> metadata_shards_;

Expand Down Expand Up @@ -783,7 +790,7 @@ class MasterService {
void EvictionThreadFunc();

tl::expected<void, ErrorCode> PushOffloadingQueue(const std::string& key,
const Replica& replica);
Replica& replica);

// Lease related members
const uint64_t default_kv_lease_ttl_; // in milliseconds
Expand Down
10 changes: 10 additions & 0 deletions mooncake-store/include/pyclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ class ClientRequester {
const std::vector<std::string> &keys,
const std::vector<int64_t> sizes);

/**
* @brief Notifies remote FileStorage to release buffer after transfer
* completion. This is a fire-and-forget call - errors are logged but not
* propagated.
* @param client_addr Network address of the remote FileStorage service.
* @param batch_id The batch_id returned from batch_get_offload_object.
*/
void release_offload_buffer(const std::string &client_addr,
uint64_t batch_id);

private:
/**
* @brief A batch of allocated memory buffers, tracking both handles and
Expand Down
8 changes: 8 additions & 0 deletions mooncake-store/include/real_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,14 @@ class RealClient : public PyClient {
batch_get_offload_object(const std::vector<std::string> &keys,
const std::vector<int64_t> &sizes);

/**
* @brief Releases buffer associated with a specific batch_id.
* Called by remote client after transfer completion.
* @param batch_id The unique identifier of the batch to release
* @return true if batch was found and released, false otherwise
*/
bool release_offload_buffer(uint64_t batch_id);

/**
* @brief Retrieves multiple stored objects from a remote service.
* @param target_rpc_service_addr Address of the remote RPC service (e.g.,
Expand Down
13 changes: 8 additions & 5 deletions mooncake-store/include/rpc_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,19 +144,22 @@ struct TaskCompleteRequest {
YLT_REFL(TaskCompleteRequest, id, status, message);

struct BatchGetOffloadObjectResponse {
uint64_t batch_id;
std::vector<uint64_t> pointers;
std::string transfer_engine_addr;
uint64_t gc_ttl_ms;

BatchGetOffloadObjectResponse() = default;
BatchGetOffloadObjectResponse(std::vector<uint64_t>&& pointers_param,
BatchGetOffloadObjectResponse() : batch_id(0), gc_ttl_ms(0) {}
BatchGetOffloadObjectResponse(uint64_t batch_id_param,
std::vector<uint64_t>&& pointers_param,
std::string transfer_engine_addr_param,
uint64_t gc_ttl_ms_param)
: pointers(std::move(pointers_param)),
: batch_id(batch_id_param),
pointers(std::move(pointers_param)),
transfer_engine_addr(std::move(transfer_engine_addr_param)),
gc_ttl_ms(gc_ttl_ms_param) {}
};
YLT_REFL(BatchGetOffloadObjectResponse, pointers, transfer_engine_addr,
gc_ttl_ms);
YLT_REFL(BatchGetOffloadObjectResponse, batch_id, pointers,
transfer_engine_addr, gc_ttl_ms);

} // namespace mooncake
2 changes: 1 addition & 1 deletion mooncake-store/include/storage_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,7 @@ class BucketStorageBackend : public StorageBackendInterface {

// Aligned buffer for O_DIRECT I/O operations
// We use a fixed-size buffer to avoid frequent allocations
static constexpr size_t kAlignedBufferSize = 16 * 1024 * 1024; // 16MB
static constexpr size_t kAlignedBufferSize = 32 * 1024 * 1024; // 16MB
std::unique_ptr<void, void (*)(void*)> aligned_io_buffer_{nullptr,
[](void*) {}};
/**
Expand Down
Loading
Loading