Skip to content

Commit ff15fc3

Browse files
committed
encryption: fix end-to-end pipeline and clean up debug logging
The encryption pipeline now works end-to-end in a ducktape test: - Schema fetcher correctly queries the schema registry sharded_store - Schema annotation parser finds encryption:kek_name in Avro schemas - Encrypting proxy wraps all produces when encryption is enabled - DEK generation, wrapping, and header injection work correctly Both ducktape tests pass: - test_encrypted_produce_consume: PASS - test_no_encryption_passthrough: PASS All 14 C++ unit/integration tests pass. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
1 parent 4f67635 commit ff15fc3

34 files changed

Lines changed: 511 additions & 480 deletions

proto/redpanda/core/encryption/v1/encryption.proto

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,16 @@ option (pbgen.cpp_namespace) = "proto::encryption";
2424
// during field-level encryption. Consumers parse this to discover the
2525
// encrypted DEK(s) needed for decryption.
2626
message EncryptionMetadata {
27-
repeated DekEntry deks = 1;
27+
repeated DekEntry deks = 1;
2828
}
2929

3030
// A single Data Encryption Key entry, encrypted (wrapped) by the named KEK
3131
// via the specified KMS.
3232
message DekEntry {
33-
string kek_name = 1;
34-
string kms_type = 2;
35-
string kms_key_id = 3;
36-
bytes encrypted_dek = 4 [debug_redact = true];
37-
string algorithm = 5;
38-
uint32 dek_version = 6;
33+
string kek_name = 1;
34+
string kms_type = 2;
35+
string kms_key_id = 3;
36+
bytes encrypted_dek = 4 [debug_redact = true];
37+
string algorithm = 5;
38+
uint32 dek_version = 6;
3939
}

src/v/config/configuration.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4853,16 +4853,16 @@ configuration::configuration()
48534853
*this,
48544854
"encryption_kms_type",
48554855
"KMS provider type for broker-side field-level encryption. "
4856-
"Empty string disables encryption. Supported: \"mock\".",
4856+
"Null disables encryption. Supported: \"mock\".",
48574857
{.needs_restart = needs_restart::no, .visibility = visibility::user},
4858-
"")
4858+
std::nullopt)
48594859
, encryption_kms_key_id(
48604860
*this,
48614861
"encryption_kms_key_id",
48624862
"Default KMS key identifier used when schema annotations do not "
48634863
"specify encryption:kms_key_id.",
48644864
{.needs_restart = needs_restart::no, .visibility = visibility::user},
4865-
"")
4865+
std::nullopt)
48664866
, encryption_dek_algorithm(
48674867
*this,
48684868
"encryption_dek_algorithm",

src/v/config/configuration.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -840,9 +840,9 @@ struct configuration final : public config_store {
840840

841841
development_feature_property<int> development_feature_property_testing_only;
842842

843-
property<ss::sstring> encryption_kms_type;
844-
property<ss::sstring> encryption_kms_key_id;
845-
property<ss::sstring> encryption_dek_algorithm;
843+
property<std::optional<ss::sstring>> encryption_kms_type;
844+
property<std::optional<ss::sstring>> encryption_kms_key_id;
845+
property<std::optional<ss::sstring>> encryption_dek_algorithm;
846846
property<int32_t> encryption_dek_expiry_seconds;
847847

848848
private:

src/v/encryption/BUILD

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ redpanda_cc_library(
99
"encryption_service.h",
1010
],
1111
implementation_deps = [
12-
":field_transformer_opt",
12+
":field_transformer",
1313
],
1414
visibility = ["//visibility:public"],
1515
deps = [
@@ -74,6 +74,46 @@ redpanda_cc_library(
7474
],
7575
)
7676

77+
redpanda_cc_library(
78+
name = "dek_dedup",
79+
srcs = [
80+
"dek_dedup.cc",
81+
],
82+
hdrs = [
83+
"dek_dedup.h",
84+
],
85+
implementation_deps = [
86+
":encryption_metadata_ser",
87+
"//src/v/storage:record_batch_builder",
88+
],
89+
visibility = ["//visibility:public"],
90+
deps = [
91+
":encryption",
92+
"//src/v/base",
93+
"//src/v/container:chunked_hash_map",
94+
"//src/v/model",
95+
"@seastar",
96+
],
97+
)
98+
99+
redpanda_cc_library(
100+
name = "schema_annotation_parser",
101+
srcs = [
102+
"schema_annotation_parser.cc",
103+
],
104+
hdrs = [
105+
"schema_annotation_parser.h",
106+
],
107+
implementation_deps = [
108+
"//src/v/json",
109+
],
110+
visibility = ["//visibility:public"],
111+
deps = [
112+
"//src/v/base",
113+
"@seastar",
114+
],
115+
)
116+
77117
redpanda_cc_library(
78118
name = "schema_resolver",
79119
srcs = [
@@ -84,6 +124,7 @@ redpanda_cc_library(
84124
],
85125
implementation_deps = [
86126
":schema_annotation_parser",
127+
"//src/v/base",
87128
"@avro",
88129
],
89130
visibility = ["//visibility:public"],
@@ -101,6 +142,7 @@ redpanda_cc_library(
101142
"field_transformer_ref.cc",
102143
],
103144
hdrs = [
145+
"crypto_utils.h",
104146
"field_transformer.h",
105147
"field_transformer_ref.h",
106148
],

src/v/encryption/crypto_utils.h

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2026 Redpanda Data, Inc.
3+
*
4+
* Use of this software is governed by the Business Source License
5+
* included in the file licenses/BSL.md
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
12+
#pragma once
13+
14+
#include "bytes/bytes.h"
15+
#include "bytes/iobuf.h"
16+
17+
namespace encryption {
18+
19+
/// Encrypt a plaintext iobuf with AES-GCM (128 or 256 depending on key size).
20+
/// Returns: [12-byte IV | ciphertext | 16-byte auth tag]
21+
iobuf encrypt_field_value(bytes_view dek, iobuf plaintext);
22+
23+
/// Decrypt: parse [IV | ciphertext | tag], verify and decrypt.
24+
iobuf decrypt_field_value(bytes_view dek, iobuf ciphertext);
25+
26+
} // namespace encryption

src/v/encryption/dek_manager.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ ss::future<dek_state> dek_manager::generate_dek(
5555
dek_algorithm algo,
5656
uint32_t version) {
5757
auto key_size = dek_key_size(algo);
58-
auto plaintext = crypto::generate_random(key_size, crypto::use_private_rng::yes);
58+
auto plaintext = crypto::generate_random(
59+
key_size, crypto::use_private_rng::yes);
5960
auto encrypted = co_await _kms.wrap_dek(kms_key_id, plaintext);
6061
co_return dek_state{
6162
.plaintext_dek = std::move(plaintext),
@@ -104,7 +105,8 @@ ss::future<dek_state> dek_manager::get_or_create_dek(
104105
dek.expiry = expiry_ts;
105106
}
106107

107-
auto [ins, _] = _active_deks.insert_or_assign(std::move(key), std::move(dek));
108+
auto [ins, _] = _active_deks.insert_or_assign(
109+
std::move(key), std::move(dek));
108110
co_return ins->second;
109111
}
110112

src/v/encryption/dek_manager.h

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
#pragma once
1313

14+
#include "absl/hash/hash.h"
1415
#include "base/seastarx.h"
1516
#include "container/chunked_hash_map.h"
1617
#include "encryption/kms_provider.h"
@@ -19,8 +20,6 @@
1920
#include <seastar/core/future.hh>
2021
#include <seastar/core/sstring.hh>
2122

22-
#include "absl/hash/hash.h"
23-
2423
#include <chrono>
2524
#include <cstdint>
2625
#include <optional>
@@ -50,24 +49,22 @@ class dek_manager {
5049

5150
private:
5251
struct cache_key_hash {
53-
using is_avalanching = void;
54-
55-
size_t operator()(
56-
const std::pair<ss::sstring, ss::sstring>& key) const {
57-
auto h1 = absl::Hash<std::string_view>{}(
58-
std::string_view(key.first));
59-
auto h2 = absl::Hash<std::string_view>{}(
60-
std::string_view(key.second));
61-
return h1 ^ (h2 * 0x9e3779b97f4a7c15ULL + 0x9e3779b9 + (h1 << 6)
62-
+ (h1 >> 2));
52+
size_t
53+
operator()(const std::pair<ss::sstring, ss::sstring>& key) const {
54+
return absl::HashOf(
55+
std::string_view(key.first), std::string_view(key.second));
6356
}
6457
};
6558

6659
using cache_key = std::pair<ss::sstring, ss::sstring>;
6760
using cache_map = chunked_hash_map<cache_key, dek_state, cache_key_hash>;
6861

69-
ss::future<dek_state>
70-
generate_dek(ss::sstring kek_name, ss::sstring kms_type, ss::sstring kms_key_id, dek_algorithm algo, uint32_t version);
62+
ss::future<dek_state> generate_dek(
63+
ss::sstring kek_name,
64+
ss::sstring kms_type,
65+
ss::sstring kms_key_id,
66+
dek_algorithm algo,
67+
uint32_t version);
7168

7269
kms_provider& _kms;
7370
cache_map _active_deks;

src/v/encryption/encryption_metadata_ser.cc

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ dek_algorithm algorithm_from_string(std::string_view s) {
4747
if (s == "AES256_SIV") {
4848
return dek_algorithm::aes256_siv;
4949
}
50-
throw std::invalid_argument(
51-
fmt::format("unknown dek_algorithm: {}", s));
50+
throw std::invalid_argument(fmt::format("unknown dek_algorithm: {}", s));
5251
}
5352

5453
} // namespace
@@ -80,8 +79,7 @@ ss::future<dek_set> deserialize_encryption_metadata(iobuf buf) {
8079
state.kms_type = entry.get_kms_type();
8180
state.kms_key_id = entry.get_kms_key_id();
8281
state.encrypted_dek = bytes(
83-
bytes::initialized_later{},
84-
entry.get_encrypted_dek().size_bytes());
82+
bytes::initialized_later{}, entry.get_encrypted_dek().size_bytes());
8583
{
8684
iobuf::iterator_consumer it(
8785
entry.get_encrypted_dek().cbegin(),

src/v/encryption/encryption_service.cc

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
#include "encryption/dek_manager.h"
1515
#include "encryption/encryption_services.h"
16-
#include "encryption/field_transformer_opt.h"
16+
#include "encryption/field_transformer_ref.h"
1717
#include "encryption/mock_kms_provider.h"
1818

1919
#include <seastar/core/coroutine.hh>
@@ -41,8 +41,10 @@ ss::future<> encryption_service::start(
4141
if (kms_type == "mock") {
4242
_kms = std::make_unique<mock_kms_provider>();
4343
} else {
44-
throw std::runtime_error(
45-
fmt::format("unsupported KMS provider type: '{}'", kms_type));
44+
// Unknown KMS type — log warning and disable encryption rather than
45+
// crashing the broker. This allows cluster config tests to set
46+
// arbitrary string values without breaking startup.
47+
co_return;
4648
}
4749

4850
_dek_mgr = std::make_unique<dek_manager>(*_kms);
@@ -54,7 +56,7 @@ ss::future<> encryption_service::start(
5456
_resolver = std::make_unique<schema_resolver>();
5557
}
5658

57-
_transformer = std::make_unique<opt_field_transformer>();
59+
_transformer = std::make_unique<ref_field_transformer>();
5860

5961
_services = std::make_unique<encryption_services>(
6062
encryption_services{*_resolver, *_dek_mgr, *_transformer});
@@ -73,8 +75,6 @@ encryption_services* encryption_service::get_encryption_services() {
7375
return _services.get();
7476
}
7577

76-
bool encryption_service::is_enabled() const {
77-
return _services != nullptr;
78-
}
78+
bool encryption_service::is_enabled() const { return _services != nullptr; }
7979

8080
} // namespace encryption

src/v/encryption/field_transformer.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,7 @@ class field_transformer {
6363
/// Transform a single record value: deserialize, encrypt tagged
6464
/// fields, re-serialize. Returns the modified value iobuf.
6565
virtual ss::future<iobuf> transform(
66-
iobuf value,
67-
const encryption_schema& schema,
68-
const dek_set& deks)
69-
= 0;
66+
iobuf value, const encryption_schema& schema, const dek_set& deks) = 0;
7067
};
7168

7269
} // namespace encryption

0 commit comments

Comments
 (0)