-
Notifications
You must be signed in to change notification settings - Fork 723
[Store] Add hard pin mechanism for eviction-protected objects #1728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
152fcf4
d50b37b
724af76
0bdc2a7
6acbd56
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -766,7 +766,7 @@ auto MasterService::PutStart(const UUID& client_id, const std::string& key, | |||||||||||||
| shard->metadata.emplace( | ||||||||||||||
| std::piecewise_construct, std::forward_as_tuple(key), | ||||||||||||||
| std::forward_as_tuple(client_id, now, total_length, std::move(replicas), | ||||||||||||||
| config.with_soft_pin)); | ||||||||||||||
| config.with_soft_pin, config.with_hard_pin)); | ||||||||||||||
| // Also insert the metadata into processing set for monitoring. | ||||||||||||||
| shard->processing_keys.insert(key); | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -2886,6 +2886,10 @@ void MasterService::BatchEvict(double evict_ratio_target, | |||||||||||||
| candidates; // can be removed | ||||||||||||||
| for (auto it = shard->metadata.begin(); it != shard->metadata.end(); | ||||||||||||||
| it++) { | ||||||||||||||
| // Hard-pinned objects are never evicted | ||||||||||||||
| if (it->second.IsHardPinned()) { | ||||||||||||||
| continue; | ||||||||||||||
| } | ||||||||||||||
| // Skip objects that are not expired or have incomplete replicas | ||||||||||||||
| if (!it->second.IsLeaseExpired(now) || | ||||||||||||||
| !can_evict_replicas(it->second)) { | ||||||||||||||
|
|
@@ -2920,7 +2924,8 @@ void MasterService::BatchEvict(double evict_ratio_target, | |||||||||||||
| while (it != shard->metadata.end()) { | ||||||||||||||
| // Skip objects that are not allowed to be evicted in the first | ||||||||||||||
| // pass | ||||||||||||||
| if (!it->second.IsLeaseExpired(now) || | ||||||||||||||
| if (it->second.IsHardPinned() || | ||||||||||||||
| !it->second.IsLeaseExpired(now) || | ||||||||||||||
| it->second.IsSoftPinned(now) || | ||||||||||||||
| !can_evict_replicas(it->second)) { | ||||||||||||||
| ++it; | ||||||||||||||
|
|
@@ -2983,7 +2988,8 @@ void MasterService::BatchEvict(double evict_ratio_target, | |||||||||||||
| (start_idx + i) % kNumShards); | ||||||||||||||
| auto it = shard->metadata.begin(); | ||||||||||||||
| while (it != shard->metadata.end() && target_evict_num > 0) { | ||||||||||||||
| if (it->second.lease_timeout <= target_timeout && | ||||||||||||||
| if (!it->second.IsHardPinned() && | ||||||||||||||
| it->second.lease_timeout <= target_timeout && | ||||||||||||||
| !it->second.IsSoftPinned(now) && | ||||||||||||||
| can_evict_replicas(it->second)) { | ||||||||||||||
| // Evict this object | ||||||||||||||
|
|
@@ -3025,9 +3031,9 @@ void MasterService::BatchEvict(double evict_ratio_target, | |||||||||||||
|
|
||||||||||||||
| auto it = shard->metadata.begin(); | ||||||||||||||
| while (it != shard->metadata.end() && target_evict_num > 0) { | ||||||||||||||
| // Skip objects that are not expired or have incomplete | ||||||||||||||
| // replicas | ||||||||||||||
| if (!it->second.IsLeaseExpired(now) || | ||||||||||||||
| // Skip hard-pinned or not-yet-expired objects | ||||||||||||||
| if (it->second.IsHardPinned() || | ||||||||||||||
| !it->second.IsLeaseExpired(now) || | ||||||||||||||
| !can_evict_replicas(it->second)) { | ||||||||||||||
| ++it; | ||||||||||||||
| continue; | ||||||||||||||
|
|
@@ -3485,7 +3491,8 @@ MasterService::MetadataSerializer::DeserializeShard(const msgpack::object& obj, | |||||||||||||
| std::forward_as_tuple( | ||||||||||||||
| metadata_ptr->client_id, metadata_ptr->put_start_time, | ||||||||||||||
| metadata_ptr->size, metadata_ptr->PopReplicas(), | ||||||||||||||
| metadata_ptr->soft_pin_timeout.has_value())); | ||||||||||||||
| metadata_ptr->soft_pin_timeout.has_value(), | ||||||||||||||
| metadata_ptr->IsHardPinned())); | ||||||||||||||
|
|
||||||||||||||
| it->second.lease_timeout = metadata_ptr->lease_timeout; | ||||||||||||||
| it->second.soft_pin_timeout = metadata_ptr->soft_pin_timeout; | ||||||||||||||
|
|
@@ -3500,10 +3507,12 @@ MasterService::MetadataSerializer::SerializeMetadata( | |||||||||||||
| MsgpackPacker& packer) const { | ||||||||||||||
| // Pack ObjectMetadata using array structure for efficiency | ||||||||||||||
| // Format: [client_id, put_start_time, size, lease_timeout, | ||||||||||||||
| // has_soft_pin_timeout, soft_pin_timeout, replicas_count, replicas...] | ||||||||||||||
| // has_soft_pin_timeout, soft_pin_timeout, replicas_count, replicas..., | ||||||||||||||
| // hard_pinned] | ||||||||||||||
|
|
||||||||||||||
| size_t array_size = 7; // size, lease_timeout, has_soft_pin_timeout, | ||||||||||||||
| // soft_pin_timeout, replicas_count | ||||||||||||||
| size_t array_size = 8; // client_id, put_start_time, size, lease_timeout, | ||||||||||||||
| // has_soft_pin_timeout, soft_pin_timeout, | ||||||||||||||
| // replicas_count + hard_pinned | ||||||||||||||
|
Comment on lines
+3513
to
+3515
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment for
Suggested change
|
||||||||||||||
| array_size += metadata.CountReplicas(); // One element per replica | ||||||||||||||
| packer.pack_array(array_size); | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -3552,6 +3561,8 @@ MasterService::MetadataSerializer::SerializeMetadata( | |||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| packer.pack(metadata.IsHardPinned()); | ||||||||||||||
|
|
||||||||||||||
| return {}; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -3567,6 +3578,7 @@ MasterService::MetadataSerializer::DeserializeMetadata( | |||||||||||||
|
|
||||||||||||||
| // Need at least 7 elements: client_id, put_start_time, size, lease_timeout, | ||||||||||||||
| // has_soft_pin_timeout, soft_pin_timeout, replicas_count | ||||||||||||||
| // (8th element = hard_pinned is optional for backward compat) | ||||||||||||||
| if (obj.via.array.size < 7) { | ||||||||||||||
| return tl::unexpected(SerializationError( | ||||||||||||||
| ErrorCode::DESERIALIZE_FAIL, | ||||||||||||||
|
|
@@ -3599,8 +3611,10 @@ MasterService::MetadataSerializer::DeserializeMetadata( | |||||||||||||
| // Deserialize replicas count | ||||||||||||||
| uint32_t replicas_count = array[index++].as<uint32_t>(); | ||||||||||||||
|
|
||||||||||||||
| // Check if array size matches replicas_count | ||||||||||||||
| if (obj.via.array.size != 7 + replicas_count) { | ||||||||||||||
| // Array size: 7 + replicas_count (old format) or 8 + replicas_count (new | ||||||||||||||
| // format with hard_pinned) | ||||||||||||||
| if (obj.via.array.size != 7 + replicas_count && | ||||||||||||||
| obj.via.array.size != 8 + replicas_count) { | ||||||||||||||
| return tl::unexpected(SerializationError( | ||||||||||||||
| ErrorCode::DESERIALIZE_FAIL, | ||||||||||||||
| "deserialize ObjectMetadata array size mismatch")); | ||||||||||||||
|
|
@@ -3619,13 +3633,19 @@ MasterService::MetadataSerializer::DeserializeMetadata( | |||||||||||||
| replicas.emplace_back(std::move(*result.value())); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Deserialize hard_pinned (if present, otherwise default to false) | ||||||||||||||
| bool is_hard_pinned = false; | ||||||||||||||
| if (index < obj.via.array.size) { | ||||||||||||||
| is_hard_pinned = array[index++].as<bool>(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Create ObjectMetadata instance | ||||||||||||||
| bool enable_soft_pin = has_soft_pin_timeout; | ||||||||||||||
| auto metadata = std::make_unique<ObjectMetadata>( | ||||||||||||||
| client_id, | ||||||||||||||
| std::chrono::system_clock::time_point( | ||||||||||||||
| std::chrono::milliseconds(put_start_time_timestamp)), | ||||||||||||||
| size, std::move(replicas), enable_soft_pin); | ||||||||||||||
| size, std::move(replicas), enable_soft_pin, is_hard_pinned); | ||||||||||||||
| metadata->lease_timeout = std::chrono::system_clock::time_point( | ||||||||||||||
|
Comment on lines
3642
to
3649
|
||||||||||||||
| std::chrono::milliseconds(lease_timestamp)); | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we avoid hardcoding here? Use
size_t array_size = sizeof(struct xxx) + sizeof(struct xxx)