Format-preserving encryption for Kafka Connect — Single Message Transforms (SMTs) powered by Cyphera.
Built on io.cyphera:cyphera from Maven Central.
docker compose up -d
# Wait ~30s for Kafka + Connect to startKafka Connect REST API at http://localhost:8083. See DEMO.md for the full walkthrough.
| SMT | Config | Description |
|---|---|---|
CypheraProtect$Value |
field.name, policy.name |
Protect a field in message values |
CypheraProtect$Key |
field.name, policy.name |
Protect a field in message keys |
CypheraAccess$Value |
field.name |
Access a field in message values (tag-based) |
CypheraAccess$Key |
field.name |
Access a field in message keys (tag-based) |
mvn package -DskipTestsProduces target/cyphera-kafka-connect-0.1.0.jar (fat JAR, excludes Kafka Connect API).
docker build -t cyphera-kafka-connect .- Copy the JAR to a directory under Kafka Connect's
plugin.path:mkdir -p /opt/kafka-connect-plugins/cyphera cp target/cyphera-kafka-connect-0.1.0.jar /opt/kafka-connect-plugins/cyphera/
- Place
cyphera.jsonat/etc/cyphera/cyphera.json(or setCYPHERA_POLICY_FILE) - Restart Kafka Connect workers
Add the SMTs to any connector config:
{
"name": "my-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"...": "...",
"transforms": "protect-ssn",
"transforms.protect-ssn.type": "io.cyphera.kafka.connect.CypheraProtect$Value",
"transforms.protect-ssn.field.name": "ssn",
"transforms.protect-ssn.policy.name": "ssn"
}
}{
"name": "my-sink-connector",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"...": "...",
"transforms": "access-ssn",
"transforms.access-ssn.type": "io.cyphera.kafka.connect.CypheraAccess$Value",
"transforms.access-ssn.field.name": "ssn"
}
}{
"transforms": "protect-ssn,protect-cc",
"transforms.protect-ssn.type": "io.cyphera.kafka.connect.CypheraProtect$Value",
"transforms.protect-ssn.field.name": "ssn",
"transforms.protect-ssn.policy.name": "ssn",
"transforms.protect-cc.type": "io.cyphera.kafka.connect.CypheraProtect$Value",
"transforms.protect-cc.field.name": "credit_card",
"transforms.protect-cc.policy.name": "credit_card"
}- Policy file:
/etc/cyphera/cyphera.json(orCYPHERA_POLICY_FILEenv var) - Set env var in the Kafka Connect worker config or Docker environment
- Policy loaded on first transform call — restart Connect workers to reload
- SMT errors follow Kafka Connect error handling (
errors.tolerance,errors.deadletterqueue.topic.name) - Check Connect worker logs for
CypheraLoaderentries - REST API:
GET http://localhost:8083/connectors/{name}/status
- Build a new JAR with the updated SDK version
- Replace the JAR in the plugin directory
- Rolling restart Connect workers (zero downtime in distributed mode)
- Plugin not found — JAR not in
plugin.path. CheckGET http://localhost:8083/connector-pluginsfor registered transforms. - "Unknown policy" — policy file not found or name misspelled. Check
CYPHERA_POLICY_FILEon the worker. - ClassNotFoundException — JAR missing or corrupt. Re-copy and restart.
{
"policies": {
"ssn": { "engine": "ff1", "key_ref": "demo-key", "tag": "T01" },
"credit_card": { "engine": "ff1", "key_ref": "demo-key", "tag": "T02" }
},
"keys": {
"demo-key": { "material": "2B7E151628AED2A6ABF7158809CF4F3C" }
}
}- Multi-field support (protect multiple fields in one SMT instance)
- Schema Registry / Avro support
- ksqlDB UDF companion (same JAR, registered as ksqlDB functions)
- Confluent Hub listing
- Metrics via JMX (records protected/accessed per second)
Apache 2.0 — Copyright 2026 Horizon Digital Engineering LLC