Skip to content

Iceberg header translation#30866

Open
wdberkeley wants to merge 3 commits into
devfrom
header-translation
Open

Iceberg header translation#30866
wdberkeley wants to merge 3 commits into
devfrom
header-translation

Conversation

@wdberkeley

Copy link
Copy Markdown
Contributor

Adds translation of headers into UTF-8 strings instead of bytes.

Three commits

  1. Rename "schemaless" to "base" to correct a misnomer, because now even key-value mode's schema has two variations.
  2. Implement header value translation to string. Uses the utf8 sanitization machinery.
  3. Ducktape test

Saving release notes for the follow-up with key translation.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v26.1.x
  • v25.3.x
  • v25.2.x

Release Notes

  • none

`schemaless_struct_type` was a misnomer: all translators (key-value and
schema-based) extend the same base struct, not just the schemaless mode.
Rename to `rp_base_struct_type` and `rp_base_desc` to reflect the role.
This is especially important with upcoming header and key translation,
where even key-value mode has binary header and string header schemas.

Removes dead `default_schema()` code too.
When `headers:value_type=string` is set on a topic, decode header values
as UTF-8 strings rather than storing raw bytes. Invalid byte sequences
are sanitized to U+FFFD via `utf8_sanitize`. The Iceberg schema is
patched by `apply_headers_config` to use `string_type` for the header
value field instead of `binary_type`.

Adds unit tests for schema type and value construction under binary and
string configs, null header handling, and UTF-8 sanitization wiring.
Adds `test_header_string_mode`: creates a topic with
`headers:value_type=string`, produces a message with one valid UTF-8
header and one with a leading invalid byte, and verifies via pyiceberg
that string values round-trip correctly and invalid bytes are sanitized
to U+FFFD. Also asserts via Spark SQL that the header value column is
`varchar` and queryable as a string literal.
Copilot AI review requested due to automatic review settings June 22, 2026 23:48

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends the datalake/Iceberg translation pipeline to support storing Kafka record header values as UTF-8 strings (with invalid sequences sanitized), while also renaming the “schemaless” base schema concept to “rp_base” to better reflect current usage. It adds both unit and end-to-end coverage to validate binary vs string header behavior.

Changes:

  • Rename the canonical “schemaless” row/schema concept to a “base” Redpanda system-field descriptor (rp_base_*) and update downstream call sites.
  • Add configurable header value translation (binary vs UTF-8 string with utf8_sanitize) and thread the header config through translators/coordinator/schema creation.
  • Add new C++ gtest coverage for schema/type/value wiring and a Ducktape e2e test for header string mode (including sanitization behavior).

Reviewed changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
tests/rptest/tests/datalake/datalake_e2e_test.py Adds header assertions for default (binary) behavior and a new e2e test for headers:value_type=string including sanitization.
src/v/datalake/table_definition.h Renames canonical descriptors to rp_base_*, adds apply_headers_config, and plumbs header config into build_rp_struct.
src/v/datalake/table_definition.cc Implements schema patching for header value type + header value translation using utf8_sanitize.
src/v/datalake/record_translator.h Adds header-config-aware constructors and stores per-translator header config.
src/v/datalake/record_translator.cc Applies header config to both schema construction and row translation.
src/v/datalake/datalake_manager.cc Passes topic iceberg header config into the selected translator.
src/v/datalake/coordinator/coordinator.cc Threads topic metadata into schema provider so header config can affect the table schema.
src/v/datalake/partitioning_writer.h Updates comment to reflect “base redpanda system fields” terminology.
src/v/datalake/catalog_schema_manager.h Updates comment wording from “schemaless” to “key-value”.
src/v/datalake/tests/partitioning_writer_test.cc Updates to use rp_base_struct_type().
src/v/datalake/tests/table_definition_test.cc New unit tests for header schema/value typing and UTF-8 sanitization wiring.
src/v/datalake/tests/BUILD Registers the new table_definition_test gtest target.
src/v/datalake/BUILD Adds //src/v/strings:utf8 dependency needed by table_definition.cc.
src/v/datalake/coordinator/tests/iceberg_snapshot_remover_test.cc Updates to rp_base_struct_type().
src/v/datalake/coordinator/tests/iceberg_file_committer_test.cc Updates to rp_base_struct_type().

Comment on lines +45 to +49
const iceberg::list_value& get_headers_list(const iceberg::struct_value& rp) {
const auto& hdr_opt = rp.fields[3];
EXPECT_TRUE(hdr_opt.has_value());
return *std::get<std::unique_ptr<iceberg::list_value>>(*hdr_opt);
}
Comment on lines +52 to +56
const iceberg::struct_value&
get_kv_struct(const iceberg::list_value& list, size_t idx) {
return *std::get<std::unique_ptr<iceberg::struct_value>>(
*list.elements[idx]);
}

// ---- rp_base_struct_type ------------------------------------------------

TEST(SchemalessStructType, BinaryConfigProducesBinaryHeaderValueType) {
std::get<iceberg::primitive_type>(val_field_type)));
}

TEST(SchemalessStructType, StringConfigProducesStringHeaderValueType) {
@@ -12,24 +12,24 @@
#include "datalake/schema_descriptor.h"
#include "iceberg/schema.h"
@vbotbuildovich

Copy link
Copy Markdown
Collaborator

Retry command for Build#86115

please wait until all jobs are finished before running the slash command

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkingReplicationTests.test_auto_prefix_trimming@{"source_cluster_spec":{"cluster_type":"redpanda"},"storage_mode":"tiered","with_failures":false}

@vbotbuildovich

Copy link
Copy Markdown
Collaborator

CI test results

test results on build#86115
test_status test_class test_method test_arguments test_kind job_url passed reason test_history
FLAKY(FAIL) ShadowLinkingReplicationTests test_auto_prefix_trimming {"source_cluster_spec": {"cluster_type": "redpanda"}, "storage_mode": "tiered", "with_failures": false} integration https://buildkite.com/redpanda/redpanda/builds/86115#019ef1cb-a290-443c-a486-891228be70cc 15/21 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0398, p0=0.0009, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_auto_prefix_trimming

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants