Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions src/v/iceberg/manifest_entry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@
namespace iceberg {
namespace {

chunked_hash_map<nested_field::id_t, size_t>
copy_map(const chunked_hash_map<nested_field::id_t, size_t>& m) {
chunked_hash_map<nested_field::id_t, size_t> ret;
constexpr auto copy_map = [](const auto& m) {
std::decay_t<decltype(m)> ret;
ret.reserve(m.size());
for (auto& [k, v] : m) {
ret.emplace(k, v);
if constexpr (requires { v.copy(); }) {
ret.emplace(k, v.copy());
} else {
ret.emplace(k, v);
}
}
return ret;
}
};

} // namespace
data_file data_file::copy() const {
return data_file{
Expand All @@ -31,10 +35,17 @@ data_file data_file::copy() const {
.partition = partition.copy(),
.record_count = record_count,
.file_size_bytes = file_size_bytes,
.column_sizes = copy_map(column_sizes),
.value_counts = copy_map(value_counts),
.null_value_counts = copy_map(null_value_counts),
.nan_value_counts = copy_map(nan_value_counts),
.column_sizes = column_sizes.transform(copy_map),
.value_counts = value_counts.transform(copy_map),
.null_value_counts = null_value_counts.transform(copy_map),
.nan_value_counts = nan_value_counts.transform(copy_map),
.lower_bounds = lower_bounds.transform(copy_map),
.upper_bounds = upper_bounds.transform(copy_map),
.key_metadata = key_metadata.transform(&iobuf::copy),
.split_offsets = split_offsets.transform(&chunked_vector<int64_t>::copy),
.equality_ids = equality_ids.transform(
&chunked_vector<nested_field::id_t>::copy),
.sort_order_id = sort_order_id,
Comment thread
nvartolomei marked this conversation as resolved.
};
}

Expand Down
25 changes: 12 additions & 13 deletions src/v/iceberg/manifest_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,19 @@ struct data_file {

size_t record_count;
size_t file_size_bytes;
Comment on lines 41 to 42

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should these also be int64 as well? I think they will get assigned to one in the snapshot

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Began changing them but then didn't want to get too distracted from the main task. To be done.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool sg

chunked_hash_map<nested_field::id_t, size_t> column_sizes;
chunked_hash_map<nested_field::id_t, size_t> value_counts;
chunked_hash_map<nested_field::id_t, size_t> null_value_counts;
chunked_hash_map<nested_field::id_t, size_t> nan_value_counts;
std::optional<chunked_hash_map<nested_field::id_t, int64_t>> column_sizes;
std::optional<chunked_hash_map<nested_field::id_t, int64_t>> value_counts;
std::optional<chunked_hash_map<nested_field::id_t, int64_t>>
null_value_counts;
std::optional<chunked_hash_map<nested_field::id_t, int64_t>>
nan_value_counts;
std::optional<chunked_hash_map<nested_field::id_t, iobuf>> lower_bounds;
std::optional<chunked_hash_map<nested_field::id_t, iobuf>> upper_bounds;
std::optional<iobuf> key_metadata;
std::optional<chunked_vector<int64_t>> split_offsets;
std::optional<chunked_vector<nested_field::id_t>> equality_ids;
std::optional<int32_t> sort_order_id;

// TODO: The following fields are not supported, and are serialized as
// empty options.
// - distinct_counts
Comment thread
oleiman marked this conversation as resolved.
// - lower_bounds
// - upper_bounds
// - key_metadata
// - split_offsets
// - equality_ids
// - sort_order_ids
friend bool operator==(const data_file&, const data_file&) = default;
data_file copy() const;
};
Expand Down
129 changes: 97 additions & 32 deletions src/v/iceberg/manifest_entry_values.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,24 +77,29 @@ get_required_struct(std::optional<value> v, std::string_view name) {
return ret;
}

chunked_hash_map<nested_field::id_t, size_t>
get_counts_map(std::optional<value> v, std::string_view name) {
template<
typename PrimitiveT,
typename ValT = decltype(get_required_primitive<PrimitiveT>(
std::declval<std::optional<value>>(), ""))>
std::optional<chunked_hash_map<nested_field::id_t, ValT>>
get_map(std::optional<value> v, std::string_view name) {
if (!v.has_value()) {
return {};
return std::nullopt;
}
if (!holds_alternative<std::unique_ptr<map_value>>(*v)) {
throw std::invalid_argument(
fmt::format("Value for {} is not a map: {}", name, *v));
}
auto& as_map = std::get<std::unique_ptr<map_value>>(*v);
chunked_hash_map<nested_field::id_t, size_t> ret;
chunked_hash_map<nested_field::id_t, ValT> ret;
ret.reserve(as_map->kvs.size());
for (auto& kv : as_map->kvs) {
try {
auto k = get_required_primitive<int_value>(
std::move(kv.key), "key");
auto v = get_required_primitive<long_value>(
auto val = get_required_primitive<PrimitiveT>(
std::move(kv.val), "val");
ret.emplace(nested_field::id_t{k}, v);
ret.emplace(nested_field::id_t{k}, std::move(val));
} catch (const std::exception& e) {
throw std::runtime_error(
fmt::format("Error parsing '{}' map: {}", name, e.what()));
Expand All @@ -111,6 +116,56 @@ std::optional<value> to_optional_value(std::optional<T> v) {
return ValueT{*v};
}

template<typename PrimitiveT>
constexpr auto map_to_value = [](const auto& m) -> value {
auto mv = std::make_unique<map_value>();
mv->kvs.reserve(m.size());
for (const auto& [k, v] : m) {
if constexpr (requires { v.copy(); }) {
mv->kvs.emplace_back(int_value{k}, PrimitiveT{v.copy()});
} else {
mv->kvs.emplace_back(int_value{k}, PrimitiveT{v});
}
}
return std::move(mv);
};

template<typename T, typename PrimitiveV>
std::optional<chunked_vector<T>>
get_primitive_list(std::optional<value> v, std::string_view name) {
if (!v.has_value()) {
return std::nullopt;
}
if (!holds_alternative<std::unique_ptr<list_value>>(*v)) {
throw std::invalid_argument(
fmt::format("Value for {} is not a list: {}", name, *v));
}
auto& as_list = std::get<std::unique_ptr<list_value>>(*v);
chunked_vector<T> ret;
ret.reserve(as_list->elements.size());
for (auto& e : as_list->elements) {
try {
auto e_val = get_required_primitive<PrimitiveV>(
std::move(e), "element");
ret.emplace_back(e_val);
} catch (const std::exception& e) {
throw std::runtime_error(
fmt::format("Error parsing '{}' list: {}", name, e.what()));
}
}
return ret;
}

template<typename PrimitiveV>
constexpr auto list_to_value = [](const auto& vec) -> value {
auto lv = std::make_unique<list_value>();
lv->elements.reserve(vec.size());
for (const auto& e : vec) {
lv->elements.emplace_back(PrimitiveV(e));
}
return std::move(lv);
};

int status_to_int(manifest_entry_status s) {
switch (s) {
case manifest_entry_status::existing:
Expand Down Expand Up @@ -185,6 +240,7 @@ data_file_format format_from_str(std::string_view s) {

std::unique_ptr<struct_value> data_file_to_value(const data_file& file) {
auto ret = std::make_unique<struct_value>();
ret->fields.reserve(17);
ret->fields.emplace_back(int_value(content_to_int(file.content_type)));
ret->fields.emplace_back(string_value(iobuf::from(file.file_path())));
ret->fields.emplace_back(string_value(format_to_str(file.file_format)));
Expand All @@ -194,34 +250,31 @@ std::unique_ptr<struct_value> data_file_to_value(const data_file& file) {
ret->fields.emplace_back(
long_value(static_cast<int64_t>(file.file_size_bytes)));

// TODO: serialize the rest of the optional fields.
// column_sizes
ret->fields.emplace_back(std::nullopt);
// value_counts
ret->fields.emplace_back(std::nullopt);
// null_value_counts
ret->fields.emplace_back(std::nullopt);
// nan_value_counts
ret->fields.emplace_back(std::nullopt);
// lower_bounds
ret->fields.emplace_back(std::nullopt);
// upper_bounds
ret->fields.emplace_back(std::nullopt);
// key_metadata
ret->fields.emplace_back(std::nullopt);
// split_offsets
ret->fields.emplace_back(std::nullopt);
// equality_ids
ret->fields.emplace_back(std::nullopt);
// sort_order_id
ret->fields.emplace_back(std::nullopt);
ret->fields.emplace_back(
file.column_sizes.transform(map_to_value<long_value>));
ret->fields.emplace_back(
file.value_counts.transform(map_to_value<long_value>));
ret->fields.emplace_back(
file.null_value_counts.transform(map_to_value<long_value>));
ret->fields.emplace_back(
file.nan_value_counts.transform(map_to_value<long_value>));
ret->fields.emplace_back(
file.lower_bounds.transform(map_to_value<binary_value>));
ret->fields.emplace_back(
file.upper_bounds.transform(map_to_value<binary_value>));
ret->fields.emplace_back(file.key_metadata.transform(
[](const iobuf& b) -> value { return binary_value{b.copy()}; }));
ret->fields.emplace_back(
file.sort_order_id
? std::make_optional<value>(int_value{*file.sort_order_id})
: std::nullopt);
return ret;
}

data_file data_file_from_value(struct_value v) {
data_file file;
auto& fs = v.fields;
if (fs.size() < 10) {
if (fs.size() < 16) {
throw std::invalid_argument("Expected more values");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: is it worth naming this constant and/or sticking it in the exception content? not sure the path this exception takes or whether it's at all actionable 🤷

}
file.content_type = content_from_int(
Expand All @@ -235,12 +288,24 @@ data_file data_file_from_value(struct_value v) {
std::move(fs[4]), "record_count");
file.file_size_bytes = get_required_primitive<long_value>(
std::move(fs[5]), "file_size_bytes");
file.column_sizes = get_counts_map(std::move(fs[6]), "column_sizes");
file.value_counts = get_counts_map(std::move(fs[7]), "value_counts");
file.null_value_counts = get_counts_map(
file.column_sizes = get_map<long_value>(std::move(fs[6]), "column_sizes");
file.value_counts = get_map<long_value>(std::move(fs[7]), "value_counts");
file.null_value_counts = get_map<long_value>(
std::move(fs[8]), "null_value_counts");
file.nan_value_counts = get_counts_map(
file.nan_value_counts = get_map<long_value>(
std::move(fs[9]), "nan_value_counts");
file.lower_bounds = get_map<binary_value>(
std::move(fs[10]), "lower_bounds");
file.upper_bounds = get_map<binary_value>(
std::move(fs[11]), "upper_bounds");
file.key_metadata = get_optional_primitive<iobuf, binary_value>(
std::move(fs[12]), "key_metadata");
file.split_offsets = get_primitive_list<int64_t, long_value>(
std::move(fs[13]), "split_offsets");
file.equality_ids = get_primitive_list<nested_field::id_t, int_value>(
std::move(fs[14]), "equality_ids");
file.sort_order_id = get_optional_primitive<int32_t, int_value>(
std::move(fs[15]), "sort_order_id");
return file;
}

Expand Down
1 change: 1 addition & 0 deletions src/v/iceberg/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ redpanda_cc_gtest(
"//src/v/iceberg:manifest_list_avro",
"//src/v/iceberg:partition_key_type",
"//src/v/iceberg:schema_json",
"//src/v/serde/avro/tests:avro_comparator",
"//src/v/test_utils:gtest",
"//src/v/test_utils:runfiles",
"//src/v/utils:file_io",
Expand Down
48 changes: 37 additions & 11 deletions src/v/iceberg/tests/gen_test_iceberg_manifest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
#!/usr/bin/python
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "pyiceberg==0.9.*",
# ]
# ///

# Copyright 2024 Redpanda Data, Inc.
#
Expand Down Expand Up @@ -95,25 +101,45 @@


def make_manifest_entries(num_entries: int) -> list[ManifestEntry]:
assert num_entries >= 2, (
"Need at least 2 entries to cover both null and non-null cases"
)
manifest_entries: list[ManifestEntry] = []
for i in range(num_entries):
# Even entries: populated optional fields; odd entries: null (None).
if i % 2 == 0:
counts = {1: 100 + i, 2: 200 + i}
lower = {1: b"a", 2: b"b"}
upper = {1: b"y", 2: b"z"}
split = []
eq_ids = []
sort_id = i
key_meta = b"some_key"
else:
counts = None
lower = None
upper = None
split = None
eq_ids = None
sort_id = None
key_meta = None
data_file = DataFile(
content=DataFileContent.DATA,
file_path=f"data/path/file-{i}.parquet",
file_format="PARQUET",
partition={},
record_count=i,
file_size_in_bytes=i,
column_sizes={},
value_counts={},
null_value_counts={},
nan_value_counts={},
lower_bounds={1: b"a", 2: b"b"},
upper_bounds={1: b"y", 2: b"z"},
key_metadata=None,
split_offsets=[],
equality_ids=[],
sort_order_id=i,
column_sizes=counts,
value_counts=counts,
null_value_counts=counts,
nan_value_counts=counts,
lower_bounds=lower,
upper_bounds=upper,
key_metadata=key_meta,
split_offsets=split,
equality_ids=eq_ids,
sort_order_id=sort_id,
)
manifest_entry = ManifestEntry(
status=0,
Expand Down
Loading