Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/v/cloud_topics/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ redpanda_cc_library(
],
visibility = [":__subpackages__"],
deps = [
"//src/v/random:generators",
"//src/v/serde",
"//src/v/utils:named_type",
"//src/v/utils:uuid",
Expand Down
25 changes: 21 additions & 4 deletions src/v/cloud_topics/level_zero/gc/level_zero_gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ level_zero_gc::try_to_collect() {
// used to detect unsorted object listings
seastar::sstring last_key;
std::optional<cluster_epoch> last_epoch;
object_id::prefix_t last_prefix{0};

for (const auto& object : candidate_objects.value().contents) {
const auto object_epoch = object_path_factory::level_zero_path_to_epoch(
Expand All @@ -492,15 +493,30 @@ level_zero_gc::try_to_collect() {
co_return std::unexpected(collection_error::invalid_object_name);
}

// detect non-lexicographic ordering. this may indicate that GC will not
// operate efficiently with the underlying storage system. see the class
// comment for more details about what this means in practice.
const auto object_pfx = object_path_factory::level_zero_path_to_prefix(
object.key);

if (!object_pfx.has_value()) {
vlog(
cd_log.error,
"Unable to parse prefix during L0 GC: {}",
object_pfx.error());
co_return std::unexpected(collection_error::invalid_object_name);
}

// detect non-lexicographic ordering. this may indicate that GC will
// not operate efficiently with the underlying storage system. see
// the class comment for more details about what this means in
// practice.
if (!last_epoch.has_value()) {
last_key = object.key;
last_epoch = object_epoch.value();
last_prefix = object_pfx.value();
}

if (object_epoch.value() < last_epoch) {
if (
object_pfx.value() < last_prefix
|| (object_pfx.value() == last_prefix && object_epoch.value() < last_epoch)) {
constexpr std::chrono::minutes rate_limit{1};
static seastar::logger::rate_limit rate(rate_limit);
vloglr(
Expand All @@ -514,6 +530,7 @@ level_zero_gc::try_to_collect() {

last_key = object.key;
last_epoch = object_epoch.value();
last_prefix = object_pfx.value();

// object's epoch is not yet eligible
if (object_epoch.value() > max_gc_epoch.value()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class LevelZeroGCTest : public testing::Test {
cloud_topics::object_id{
.epoch = cloud_topics::cluster_epoch(epoch),
.name = uuid_t::create(),
.prefix = 0,
});
cloud_storage_clients::client::list_bucket_item item{
.key = key().string(),
Expand Down
43 changes: 41 additions & 2 deletions src/v/cloud_topics/object_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,22 @@ constexpr auto level_zero_data_dir_str = "level_zero/data/";
constexpr size_t epoch_digits = 18;
static_assert(std::numeric_limits<int64_t>::digits10 == epoch_digits);

constexpr size_t prefix_digits = 3;

namespace cloud_topics {

cloud_storage_clients::object_key
object_path_factory::level_zero_path(object_id id) {
vassert(id.epoch() >= 0, "level zero object has negative epoch: {}", id);
return cloud_storage_clients::object_key(
ssx::sformat(
"{0}{2:0{1}}/{3}",
"{0}{5:0{4}}/{2:0{1}}/{3}",
level_zero_data_dir_str,
epoch_digits,
id.epoch(),
id.name));
id.name,
prefix_digits,
id.prefix));
}

cloud_storage_clients::object_key object_path_factory::level_zero_data_dir() {
Expand All @@ -53,6 +57,13 @@ object_path_factory::level_zero_path_to_epoch(std::string_view key) {
fmt::format("L0 object name missing prefix: {}", key));
}
name.remove_prefix(it + std::strlen(level_zero_data_dir_str));

if (name.size() < prefix_digits + 1) {
return std::unexpected(
fmt::format("L0 object name is too short: {}", key));
}
name.remove_prefix(prefix_digits + 1);

if (name.size() < epoch_digits) {
return std::unexpected(
fmt::format("L0 object name is too short: {}", key));
Expand All @@ -72,4 +83,32 @@ object_path_factory::level_zero_path_to_epoch(std::string_view key) {
return cluster_epoch(epoch);
}

std::expected<object_id::prefix_t, std::string>
object_path_factory::level_zero_path_to_prefix(std::string_view key) {
// find the level zero prefix and chop it off
auto name = key;
auto it = name.find(level_zero_data_dir_str);
if (it == std::string_view::npos) {
return std::unexpected(
fmt::format("L0 object name missing prefix: {}", key));
}
name.remove_prefix(it + std::strlen(level_zero_data_dir_str));

if (name.size() < prefix_digits + 1) {
return std::unexpected(
fmt::format("L0 object name is too short: {}", key));
}
name.remove_suffix(name.size() - prefix_digits);
Comment thread
oleiman marked this conversation as resolved.

// parse the prefix into a uint16_t
object_id::prefix_t pfx{0};
auto res = std::from_chars(name.data(), name.data() + name.size(), pfx);
if (res.ptr != name.data() + name.size() || res.ec != std::errc{}) {
return std::unexpected(
fmt::format("L0 object name has invalid prefix: {}", key));
}

return pfx;
}

} // namespace cloud_topics
3 changes: 3 additions & 0 deletions src/v/cloud_topics/object_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ class object_path_factory {
*/
static std::expected<cluster_epoch, std::string>
level_zero_path_to_epoch(std::string_view);

static std::expected<object_id::prefix_t, std::string>
level_zero_path_to_prefix(std::string_view);
};

} // namespace cloud_topics
59 changes: 50 additions & 9 deletions src/v/cloud_topics/tests/object_utils_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
#define UUID_REGEX \
"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"

#define PREFIX_REGEX "[0-9]{3}"

TEST(ObjectPathFactory, LevelZeroPathFormat) {
auto path = cloud_topics::object_path_factory::level_zero_path(
cloud_topics::object_id::create(cloud_topics::cluster_epoch{42}));
EXPECT_THAT(
path().string(),
::testing::MatchesRegex(
"^level_zero/data/000000000000000042/" UUID_REGEX "$"));
"^level_zero/data/" PREFIX_REGEX "/000000000000000042/" UUID_REGEX
"$"));
}

TEST(ObjectPathFactory, LevelZeroDataDir) {
Expand All @@ -33,31 +36,69 @@ TEST(ObjectPathFactory, LevelZeroDataDir) {
TEST(ObjectPathFactory, LevelZeroParseEpoch) {
EXPECT_EQ(
cloud_topics::object_path_factory::level_zero_path_to_epoch(
"level_zero/data/000000000000010042/"),
"level_zero/data/000/000000000000010042/"),
cloud_topics::cluster_epoch(10042));

EXPECT_EQ(
cloud_topics::object_path_factory::level_zero_path_to_epoch(
"level_zero/data/000000000000010042/asdfalksjdflkjsdflkj"),
"level_zero/data/000/000000000000010042/asdfalksjdflkjsdflkj"),
cloud_topics::cluster_epoch(10042));

EXPECT_EQ(
cloud_topics::object_path_factory::level_zero_path_to_epoch(
"level_asdf_zero/data/000000000000010042/asdfasdf")
"level_asdf_zero/data/000/000000000000010042/asdfasdf")
.error(),
"L0 object name missing prefix: "
"level_asdf_zero/data/000000000000010042/asdfasdf");
"level_asdf_zero/data/000/000000000000010042/asdfasdf");

EXPECT_EQ(
cloud_topics::object_path_factory::level_zero_path_to_epoch(
"level_zero/data/000/0000000000010042/")
.error(),
"L0 object name is too short: level_zero/data/000/0000000000010042/");

EXPECT_EQ(
cloud_topics::object_path_factory::level_zero_path_to_epoch(
"level_zero/data/0000000000010042/")
"level_zero/data/00/")
.error(),
"L0 object name is too short: level_zero/data/0000000000010042/");
"L0 object name is too short: level_zero/data/00/");

EXPECT_EQ(
cloud_topics::object_path_factory::level_zero_path_to_epoch(
"level_zero/data/00000X0000000010042/asdfasdf")
"level_zero/data/000/00000X0000000010042/asdfasdf")
.error(),
"L0 object name has invalid epoch: "
"level_zero/data/00000X0000000010042/asdfasdf");
"level_zero/data/000/00000X0000000010042/asdfasdf");
}

TEST(ObjectPathFactory, LevelZeroParsePrefix) {
EXPECT_EQ(
cloud_topics::object_path_factory::level_zero_path_to_prefix(
"level_zero/data/123/"),
123);

EXPECT_EQ(
cloud_topics::object_path_factory::level_zero_path_to_prefix(
"level_zero/data/123/000000000000010042/"),
123);

EXPECT_EQ(
cloud_topics::object_path_factory::level_zero_path_to_prefix(
"level_asdf_zero/data/000/000000000000010042/asdfasdf")
.error(),
"L0 object name missing prefix: "
"level_asdf_zero/data/000/000000000000010042/asdfasdf");

EXPECT_EQ(
cloud_topics::object_path_factory::level_zero_path_to_prefix(
"level_zero/data/00/")
.error(),
"L0 object name is too short: level_zero/data/00/");

EXPECT_EQ(
cloud_topics::object_path_factory::level_zero_path_to_prefix(
"level_zero/data/0X0/0000000000000010042/asdfasdf")
.error(),
"L0 object name has invalid prefix: "
"level_zero/data/0X0/0000000000000010042/asdfasdf");
}
19 changes: 14 additions & 5 deletions src/v/cloud_topics/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#pragma once

#include "random/generators.h"
#include "serde/envelope.h"
#include "utils/named_type.h"
#include "utils/uuid.h"
Expand Down Expand Up @@ -38,22 +39,30 @@ inline constexpr cluster_epoch prev_cluster_epoch(cluster_epoch e) {
}

/// Is the identifier of a cloud topic object L0 object, it is a combination
/// of a unique name (UUIDv4) and a cluster epoch.
/// of a unique name (UUIDv4), a cluster epoch, and a random 3-digit numeric
/// prefix.
struct object_id
: serde::envelope<object_id, serde::version<0>, serde::version<0>> {
: serde::envelope<object_id, serde::version<1>, serde::compat_version<0>> {
cluster_epoch epoch;
uuid_t name;
using prefix_t = uint16_t;
prefix_t prefix;
static object_id create(cluster_epoch epoch) {
return {.epoch = epoch, .name = uuid_t::create()};
return {
.epoch = epoch,
.name = uuid_t::create(),
.prefix = random_generators::get_int<prefix_t>(0, prefix_max)};
}
auto serde_fields() { return std::tie(epoch, name); }
auto serde_fields() { return std::tie(epoch, name, prefix); }
bool operator==(const object_id& other) const = default;
auto operator<=>(const object_id& other) const = default;

template<typename H>
friend H AbslHashValue(H h, const object_id& id) {
return H::combine(std::move(h), id.epoch(), id.name);
return H::combine(std::move(h), id.epoch(), id.name, id.prefix);
}

static constexpr prefix_t prefix_max = 999;
};

/// Type of ownership
Expand Down