Skip to content

Commit 7d0978b

Browse files
authored
[Store] Add Upsert API for in-place object updates (#1662)
Co-authored-by: fatSheep <tzh2005t@gmail.com>
1 parent 5ff565d commit 7d0978b

24 files changed

Lines changed: 3670 additions & 81 deletions

docs/source/design/mooncake-store.md

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,25 @@ struct ReplicateConfig {
105105
};
106106
```
107107

108+
### Upsert
109+
110+
```C++
111+
tl::expected<void, ErrorCode> Upsert(const ObjectKey& key,
112+
std::vector<Slice>& slices,
113+
const ReplicateConfig& config);
114+
115+
std::vector<tl::expected<void, ErrorCode>> BatchUpsert(
116+
const std::vector<ObjectKey>& keys,
117+
std::vector<std::vector<Slice>>& batched_slices,
118+
const ReplicateConfig& config);
119+
```
120+
121+
`Upsert` inserts `key` if it does not exist and updates the existing object if
122+
it does. It uses the same replication configuration model as `Put`, while
123+
allowing the store to reuse existing placement for in-place updates when the
124+
current layout permits it. `BatchUpsert` performs the same operation for
125+
multiple keys using a shared replication configuration.
126+
108127
### Remove
109128
110129
```C++
@@ -516,6 +535,40 @@ The Master Service handles object-related interfaces as follows:
516535
517536
Before writing an object, the Client calls PutStart to request storage space allocation from the Master Service. After completing data writing, the Client calls PutEnd to notify the Master Service to mark the object write as completed.
518537
538+
- Upsert
539+
540+
```C++
541+
tl::expected<std::vector<Replica::Descriptor>, ErrorCode> UpsertStart(
542+
const std::string& key,
543+
const std::vector<size_t>& slice_lengths,
544+
const ReplicateConfig& config);
545+
546+
std::vector<tl::expected<std::vector<Replica::Descriptor>, ErrorCode>>
547+
BatchUpsertStart(const std::vector<std::string>& keys,
548+
const std::vector<std::vector<uint64_t>>& slice_lengths,
549+
const ReplicateConfig& config);
550+
551+
tl::expected<void, ErrorCode> UpsertEnd(
552+
const std::string& key, ReplicaType replica_type);
553+
554+
std::vector<tl::expected<void, ErrorCode>> BatchUpsertEnd(
555+
const std::vector<std::string>& keys);
556+
557+
tl::expected<void, ErrorCode> UpsertRevoke(
558+
const std::string& key, ReplicaType replica_type);
559+
560+
std::vector<tl::expected<void, ErrorCode>> BatchUpsertRevoke(
561+
const std::vector<std::string>& keys);
562+
```
563+
564+
`UpsertStart` / `UpsertEnd` / `UpsertRevoke` mirror the existing put lifecycle
565+
but operate on insert-or-update semantics. If the key does not exist, the flow
566+
behaves like `PutStart`. If the key already exists, the Master may reuse the
567+
current allocation for an in-place update or allocate new space when the object
568+
layout changes. The batch variants provide the same control flow for multiple
569+
keys and are the lower-level primitives used by the high-level `BatchUpsert`
570+
path.
571+
519572
- GetReplicaList
520573

521574
```C++

docs/source/python-api-reference/mooncake-store.md

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,120 @@ result = store.put_batch(keys, values)
629629

630630
---
631631

632+
#### upsert()
633+
634+
Insert a new object if the key does not exist, or update the existing object in place when possible. They use the same replication configuration model as `put()`.
635+
636+
Upsert binary data in the distributed storage.
637+
638+
```python
639+
def upsert(self, key: str, value: bytes, config: ReplicateConfig = None) -> int
640+
```
641+
642+
**Parameters:**
643+
- `key` (str): Unique object identifier
644+
- `value` (bytes): Binary data to insert or update
645+
- `config` (ReplicateConfig, optional): Replication configuration
646+
647+
**Returns:**
648+
- `int`: Status code (0 = success, non-zero = error code)
649+
650+
**Example:**
651+
```python
652+
config = ReplicateConfig()
653+
config.replica_num = 2
654+
655+
rc = store.upsert("weights", b"new-bytes", config)
656+
if rc == 0:
657+
print("Upsert succeeded")
658+
```
659+
660+
#### upsert_from()
661+
662+
Upsert object data directly from a pre-allocated buffer (zero-copy).
663+
664+
```python
665+
def upsert_from(self, key: str, buffer_ptr: int, size: int, config: ReplicateConfig = None) -> int
666+
```
667+
668+
**Parameters:**
669+
- `key` (str): Object identifier
670+
- `buffer_ptr` (int): Memory address of the source buffer
671+
- `size` (int): Number of bytes to insert or update
672+
- `config` (ReplicateConfig, optional): Replication configuration
673+
674+
**Returns:**
675+
- `int`: Status code (0 = success, non-zero = error code)
676+
677+
**Note:** This is the zero-copy counterpart of `upsert()`. As with
678+
`put_from()`, register the buffer before issuing the request.
679+
680+
#### batch_upsert_from()
681+
682+
Upsert multiple objects directly from pre-allocated buffers.
683+
684+
```python
685+
def batch_upsert_from(self, keys: List[str], buffer_ptrs: List[int], sizes: List[int],
686+
config: ReplicateConfig = None) -> List[int]
687+
```
688+
689+
**Parameters:**
690+
- `keys` (List[str]): List of object identifiers
691+
- `buffer_ptrs` (List[int]): List of source buffer addresses
692+
- `sizes` (List[int]): List of byte lengths for each buffer
693+
- `config` (ReplicateConfig, optional): Replication configuration shared by all objects
694+
695+
**Returns:**
696+
- `List[int]`: List of status codes for each upsert
697+
698+
#### upsert_parts()
699+
700+
Upsert data from multiple buffer parts as a single object (insert or update).
701+
702+
```python
703+
def upsert_parts(self, key: str, *parts, config: ReplicateConfig = None) -> int
704+
```
705+
706+
**Parameters:**
707+
- `key` (str): Object identifier
708+
- `*parts`: Variable number of bytes-like objects to concatenate
709+
- `config` (ReplicateConfig, optional): Replication configuration
710+
711+
**Returns:**
712+
- `int`: Status code (0 = success, non-zero = error code)
713+
714+
**Example:**
715+
```python
716+
part1 = b"Hello, "
717+
part2 = b"World!"
718+
result = store.upsert_parts("greeting", part1, part2)
719+
```
720+
721+
#### upsert_batch()
722+
723+
Upsert multiple objects in a single batch operation.
724+
725+
```python
726+
def upsert_batch(self, keys: List[str], values: List[bytes], config: ReplicateConfig = None) -> int
727+
```
728+
729+
**Parameters:**
730+
- `keys` (List[str]): List of object identifiers
731+
- `values` (List[bytes]): List of binary data to insert or update
732+
- `config` (ReplicateConfig, optional): Replication configuration for all objects
733+
734+
**Returns:**
735+
- `int`: Status code (0 = success, non-zero = error code)
736+
737+
**Example:**
738+
```python
739+
keys = ["key1", "key2", "key3"]
740+
values = [b"value1", b"value2", b"value3"]
741+
result = store.upsert_batch(keys, values)
742+
```
743+
744+
---
745+
632746
#### get_batch()
633747
Retrieve multiple objects in a single batch operation.
634748

@@ -1533,6 +1647,133 @@ def batch_pub_tensor(self, keys: List[str], tensors_list: List[torch.Tensor], co
15331647

15341648
---
15351649

1650+
#### upsert_tensor()
1651+
1652+
Insert a tensor if its key is missing, or update the existing tensor if the key already exists. The current tensor upsert helpers use the default `ReplicateConfig` and therefore do not take a `config` parameter.
1653+
1654+
Upsert a PyTorch tensor into the store.
1655+
1656+
```python
1657+
def upsert_tensor(self, key: str, tensor: torch.Tensor) -> int
1658+
```
1659+
1660+
**Parameters:**
1661+
- `key` (str): Object identifier
1662+
- `tensor` (torch.Tensor): The PyTorch tensor to insert or update
1663+
1664+
**Returns:**
1665+
- `int`: Status code (0 = success, non-zero = error code)
1666+
1667+
**Note:** This function requires `torch` to be installed and available in the environment.
1668+
1669+
#### upsert_tensor_from()
1670+
1671+
Upsert a tensor directly from a pre-allocated buffer. The buffer layout must be
1672+
`[TensorMetadata][tensor data]`, matching the layout used by
1673+
`get_tensor_into()`.
1674+
1675+
```python
1676+
def upsert_tensor_from(self, key: str, buffer_ptr: int, size: int) -> int
1677+
```
1678+
1679+
**Parameters:**
1680+
- `key` (str): Object identifier
1681+
- `buffer_ptr` (int): Buffer pointer containing serialized tensor metadata and payload
1682+
- `size` (int): Actual serialized byte length of the tensor buffer
1683+
1684+
**Returns:**
1685+
- `int`: Status code (0 = success, non-zero = error code)
1686+
1687+
**Note:** This function is not supported for dummy client.
1688+
1689+
#### batch_upsert_tensor_from()
1690+
1691+
Upsert multiple tensors directly from pre-allocated buffers. Each buffer must
1692+
use layout `[TensorMetadata][tensor data]`.
1693+
1694+
```python
1695+
def batch_upsert_tensor_from(self, keys: List[str], buffer_ptrs: List[int], sizes: List[int]) -> List[int]
1696+
```
1697+
1698+
**Parameters:**
1699+
- `keys` (List[str]): List of object identifiers
1700+
- `buffer_ptrs` (List[int]): List of serialized tensor buffer pointers
1701+
- `sizes` (List[int]): List of actual serialized byte lengths
1702+
1703+
**Returns:**
1704+
- `List[int]`: List of status codes for each tensor upsert
1705+
1706+
#### batch_upsert_tensor()
1707+
1708+
Upsert a batch of PyTorch tensors into the store (insert or update).
1709+
1710+
```python
1711+
def batch_upsert_tensor(self, keys: List[str], tensors_list: List[torch.Tensor]) -> List[int]
1712+
```
1713+
1714+
**Parameters:**
1715+
- `keys` (List[str]): List of object identifiers
1716+
- `tensors_list` (List[torch.Tensor]): List of tensors to insert or update
1717+
1718+
**Returns:**
1719+
- `List[int]`: List of status codes for each tensor operation.
1720+
1721+
**Note:** This function requires `torch` to be installed and available in the environment. Not supported for dummy client.
1722+
1723+
#### upsert_pub_tensor()
1724+
1725+
Upsert a PyTorch tensor with configurable replication settings (insert or update).
1726+
1727+
```python
1728+
def upsert_pub_tensor(self, key: str, tensor: torch.Tensor, config: ReplicateConfig = None) -> int
1729+
```
1730+
1731+
**Parameters:**
1732+
- `key` (str): Unique object identifier
1733+
- `tensor` (torch.Tensor): PyTorch tensor to insert or update
1734+
- `config` (ReplicateConfig, optional): Replication configuration
1735+
1736+
**Returns:**
1737+
- `int`: Status code (0 = success, non-zero = error code)
1738+
1739+
**Note:** This function requires `torch` to be installed and available in the environment. Not supported for dummy client.
1740+
1741+
**Example:**
1742+
```python
1743+
import torch
1744+
from mooncake.store import ReplicateConfig
1745+
1746+
tensor = torch.randn(100, 100)
1747+
1748+
config = ReplicateConfig()
1749+
config.replica_num = 2
1750+
config.with_soft_pin = True
1751+
1752+
result = store.upsert_pub_tensor("my_tensor", tensor, config)
1753+
if result == 0:
1754+
print("Tensor upserted successfully")
1755+
```
1756+
1757+
#### batch_upsert_pub_tensor()
1758+
1759+
Batch upsert PyTorch tensors with configurable replication settings (insert or update).
1760+
1761+
```python
1762+
def batch_upsert_pub_tensor(self, keys: List[str], tensors_list: List[torch.Tensor], config: ReplicateConfig = None) -> List[int]
1763+
```
1764+
1765+
**Parameters:**
1766+
- `keys` (List[str]): List of object identifiers
1767+
- `tensors_list` (List[torch.Tensor]): List of tensors to insert or update
1768+
- `config` (ReplicateConfig, optional): Replication configuration
1769+
1770+
**Returns:**
1771+
- `List[int]`: List of status codes for each tensor operation.
1772+
1773+
**Note:** This function requires `torch` to be installed and available in the environment. Not supported for dummy client.
1774+
1775+
---
1776+
15361777
### PyTorch Tensor Operations (Zero Copy)
15371778

15381779
These methods provide direct support for storing and retrieving PyTorch tensors. They automatically handle serialization and metadata, and include built-in support for **Tensor Parallelism (TP)** by automatically splitting and reconstructing tensor shards.

0 commit comments

Comments
 (0)