Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions docs/docs/en/getting-started/serialization/codec.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 10
---

# Custom Codec

A codec provides a unified interface for both encoding (publishing) and decoding (consuming) messages. Unlike the older `decoder=` approach, a codec handles both directions in a single class.

## Protocol

Implement the `CodecProto` interface to create a custom codec:

```python
class CodecProto(Protocol):
async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": ...
async def encode(
self,
cmd: "PublishCommand",
serializer: "SerializerProto | None" = None,
) -> tuple[bytes, str | None]: ...
```

- **`decode`** — receives a `StreamMessage` with raw bytes in `msg.body` and returns the decoded Python value.
- **`encode`** — receives a `PublishCommand` containing the message body, destination, and headers. Returns a `(bytes, content_type)` tuple. Access the payload via `cmd.body` and the target topic/subject/queue via `cmd.destination`.

If no codec is set, `DefaultCodec` is used automatically. It handles JSON objects, plain text, and raw bytes.

## Example: Schema Registry

A Confluent Avro codec that encodes and decodes messages using the [Confluent wire format](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format){target="_blank"} (magic byte + schema ID + Avro payload). Requires `fastavro` and `confluent-kafka`:

```bash
pip install fastavro confluent-kafka
```

```python linenums="1" hl_lines="22-66 68-76"
{!> docs_src/getting_started/serialization/codec_schema_registry_kafka.py !}
```

!!! note
The codec fetches and caches schemas from the registry at startup and on first encounter. The `subject` follows Confluent's naming convention: `{topic}-value`.

## Priority

You can set a codec at the broker level or override it per subscriber. The subscriber-level codec always wins:

```python
broker = KafkaBroker(codec=BrokerCodec())

@broker.subscriber("test", codec=SubscriberCodec()) # ← this wins
async def handle(body: str) -> None:
...

# If no codec is set at any level, DefaultCodec is used (JSON/text/bytes)
```

## Compatibility

- **`codec=` and `parser=`** work together. The parser controls how the raw broker message is parsed into a `StreamMessage`; the codec then decodes or encodes the body.
- **`codec=` and `decoder=`** cannot be used together. Specifying both raises a `ValueError`.
- For the legacy `decoder=` approach, see [Custom Decoder](./decoder.md){.internal-link}.
5 changes: 5 additions & 0 deletions docs/docs/en/getting-started/serialization/decoder.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ search:
boost: 10
---

!!! warning "Superseded by Codec"
The `decoder=` parameter has been superseded by the new **Codec** system, which handles both encoding and decoding in a single interface. See [Custom Codec](./codec.md){.internal-link} for the recommended approach.

Note: `codec=` and `decoder=` cannot be used together — specifying both will raise a `ValueError`.

# Custom Decoder

At this stage, the body of a **StreamMessage** is transformed into the format that it will take when it enters your handler function. This stage is the one you will need to redefine more often.
Expand Down
1 change: 1 addition & 0 deletions docs/docs/navigation_template.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ search:
- [Context](getting-started/context.md)
- [Custom Serialization](getting-started/serialization/index.md)
- [Parser](getting-started/serialization/parser.md)
- [Codec](getting-started/serialization/codec.md)
- [Decoder](getting-started/serialization/decoder.md)
- [Examples](getting-started/serialization/examples.md)
- [Lifespan](getting-started/lifespan/index.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import io
import json
import struct
from typing import TYPE_CHECKING, Any, Dict

import fastavro
from confluent_kafka.schema_registry import SchemaRegistryClient

from faststream import FastStream
from faststream.kafka import KafkaBroker

if TYPE_CHECKING:
from fast_depends.library.serializer import SerializerProto

from faststream._internal.basic_types import DecodedMessage
from faststream.message import StreamMessage
from faststream.response.response import PublishCommand

HEADER = struct.Struct(">bI") # magic byte (0) + 4-byte schema ID


class SchemaRegistryCodec:
def __init__(
self,
registry_url: str,
topics: Dict[str, int],
) -> None:
self._client = SchemaRegistryClient({"url": registry_url})
self._schema_cache: Dict[int, Any] = {}
self._topic_schemas: Dict[str, tuple[int, Any]] = {}

for topic, version in topics.items():
subject = f"{topic}-value"
meta = self._client.get_version(subject, version)
schema = fastavro.parse_schema(json.loads(meta.schema.schema_str))
self._topic_schemas[topic] = (meta.schema_id, schema)
self._schema_cache[meta.schema_id] = schema

def _get_schema(self, schema_id: int) -> Any:
if schema_id not in self._schema_cache:
raw = self._client.get_schema(schema_id)
self._schema_cache[schema_id] = fastavro.parse_schema(
json.loads(raw.schema_str)
)
return self._schema_cache[schema_id]

async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage":
schema_id = int.from_bytes(msg.body[1:5], byteorder="big")
schema = self._get_schema(schema_id)
decoded: dict[str, Any] = fastavro.schemaless_reader(
io.BytesIO(msg.body[5:]), schema
)
return decoded # type: ignore[return-value]

async def encode(
self,
cmd: "PublishCommand",
serializer: "SerializerProto | None" = None,
) -> tuple[bytes, str | None]:
schema_id, schema = self._topic_schemas[cmd.destination]
body = cmd.body
data = body.model_dump(mode="json") if hasattr(body, "model_dump") else body
buf = io.BytesIO()
buf.write(HEADER.pack(0, schema_id))
fastavro.schemaless_writer(buf, schema, data)
return buf.getvalue(), "application/avro"


codec = SchemaRegistryCodec(
registry_url="http://localhost:8081",
topics={
"orders": 1,
"users": 2,
},
)
broker = KafkaBroker(codec=codec)
app = FastStream(broker)


@broker.subscriber("orders")
async def handle_order(body: dict[str, Any]) -> None:
...


@broker.subscriber("users")
async def handle_user(body: dict[str, Any]) -> None:
...


@app.after_startup
async def test() -> None:
await broker.publish({"order_id": "123", "amount": 99.99}, "orders")
await broker.publish({"name": "John", "age": 25}, "users")
11 changes: 6 additions & 5 deletions faststream/_internal/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
if TYPE_CHECKING:
from fast_depends.library.serializer import SerializerProto

from faststream._internal.basic_types import DecodedMessage, SendableMessage
from faststream._internal.basic_types import DecodedMessage
from faststream.message import StreamMessage
from faststream.response.response import PublishCommand

MsgType = TypeVar("MsgType")

Expand Down Expand Up @@ -64,7 +65,7 @@ async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage":
@abstractmethod
async def encode(
self,
msg: "SendableMessage",
cmd: "PublishCommand",
serializer: "SerializerProto | None" = None,
) -> tuple[bytes, str | None]: ...

Expand All @@ -74,7 +75,7 @@ class BatchCodecProto(Protocol):
@abstractmethod
async def encode_batch(
self,
msgs: Sequence["SendableMessage"],
cmd: "PublishCommand",
serializer: "SerializerProto | None" = None,
) -> list[tuple[bytes, str | None]]: ...

Expand All @@ -91,7 +92,7 @@ async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage":

async def encode(
self,
msg: "SendableMessage",
cmd: "PublishCommand",
serializer: "SerializerProto | None" = None,
) -> tuple[bytes, str | None]:
return encode_message(msg, serializer)
return encode_message(cmd.body, serializer)
18 changes: 13 additions & 5 deletions faststream/confluent/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async def publish(
cmd: "KafkaPublishCommand",
) -> "asyncio.Future[Message | None] | Message | None":
"""Publish a message to a topic."""
message, content_type = await self.codec.encode(cmd.body, self.serializer)
message, content_type = await self.codec.encode(cmd, self.serializer)

headers_to_send = {
"content-type": content_type or "",
Expand All @@ -164,12 +164,20 @@ async def publish_batch(self, cmd: "KafkaPublishCommand") -> None:
headers_to_send = cmd.headers_to_publish()

if isinstance(self.codec, BatchCodecProto):
encoded_batch = await self.codec.encode_batch(
cmd.batch_bodies, self.serializer
)
encoded_batch = await self.codec.encode_batch(cmd, self.serializer)
else:
from faststream.response.response import PublishCommand as _BaseCmd

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason to use function-levels here (or anywhere)


encoded_batch = [
await self.codec.encode(msg, self.serializer) for msg in cmd.batch_bodies
await self.codec.encode(
_BaseCmd(
body=msg,
destination=cmd.destination,
_publish_type=cmd.publish_type,
),
self.serializer,
)
for msg in cmd.batch_bodies
]

for message_position, (message, content_type) in enumerate(encoded_batch):
Expand Down
21 changes: 18 additions & 3 deletions faststream/confluent/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from faststream.confluent.subscriber.usecase import BatchSubscriber
from faststream.exceptions import SubscriberNotFound
from faststream.message import gen_cor_id
from faststream.response.response import PublishCommand as _BasePublishCommand

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous file it was imported as _BaseCmd - we should use same naming for same cases


if TYPE_CHECKING:
from fast_depends.library.serializer import SerializerProto
Expand Down Expand Up @@ -195,10 +196,18 @@ async def publish_batch(self, cmd: "KafkaPublishCommand") -> None:
serializer = self.broker.config.fd_config._serializer

if isinstance(self.codec, BatchCodecProto):
encoded = await self.codec.encode_batch(cmd.batch_bodies, serializer)
encoded = await self.codec.encode_batch(cmd, serializer)
else:
encoded = [
await self.codec.encode(body, serializer) for body in cmd.batch_bodies
await self.codec.encode(
_BasePublishCommand(
body=body,
destination=cmd.destination,
_publish_type=cmd.publish_type,
),
serializer,
)
for body in cmd.batch_bodies
]

for handler in _find_handler(
Expand Down Expand Up @@ -345,8 +354,14 @@ async def build_message(
codec: Optional["CodecProto"] = None,
) -> MockConfluentMessage:
"""Build a mock confluent_kafka.Message for a sendable message."""
from faststream.response.publish_type import PublishType

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function-level imports again

from faststream.response.response import PublishCommand as _BaseCmd

codec_instance = codec or DefaultCodec()
msg, content_type = await codec_instance.encode(message, serializer)
publish_cmd = _BaseCmd(
body=message, destination=topic, _publish_type=PublishType.PUBLISH
)
msg, content_type = await codec_instance.encode(publish_cmd, serializer)
k = key or b""
headers = {
"content-type": content_type or "",
Expand Down
17 changes: 12 additions & 5 deletions faststream/kafka/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async def publish(
cmd: "KafkaPublishCommand",
) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
"""Publish a message to a topic."""
message, content_type = await self.codec.encode(cmd.body, self.serializer)
message, content_type = await self.codec.encode(cmd, self.serializer)

headers_to_send = {
"content-type": content_type or "",
Expand Down Expand Up @@ -141,12 +141,19 @@ async def publish_batch(
headers_to_send = cmd.headers_to_publish()

if isinstance(self.codec, BatchCodecProto):
encoded_batch = await self.codec.encode_batch(
cmd.batch_bodies, self.serializer
)
encoded_batch = await self.codec.encode_batch(cmd, self.serializer)
else:
from faststream.response.response import PublishCommand as _BaseCmd

encoded_batch = [
await self.codec.encode(body, self.serializer)
await self.codec.encode(
_BaseCmd(
body=body,
destination=cmd.destination,
_publish_type=cmd.publish_type,
),
self.serializer,
)
for body in cmd.batch_bodies
]

Expand Down
21 changes: 18 additions & 3 deletions faststream/kafka/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from faststream.kafka.publisher.usecase import BatchPublisher
from faststream.kafka.subscriber.usecase import BatchSubscriber
from faststream.message import gen_cor_id
from faststream.response.response import PublishCommand as _BasePublishCommand

if TYPE_CHECKING:
from fast_depends.library.serializer import SerializerProto
Expand Down Expand Up @@ -244,10 +245,18 @@ async def publish_batch(
serializer = self.broker.config.fd_config._serializer

if isinstance(self.codec, BatchCodecProto):
encoded = await self.codec.encode_batch(cmd.batch_bodies, serializer)
encoded = await self.codec.encode_batch(cmd, serializer)
else:
encoded = [
await self.codec.encode(body, serializer) for body in cmd.batch_bodies
await self.codec.encode(
_BasePublishCommand(
body=body,
destination=cmd.destination,
_publish_type=cmd.publish_type,
),
serializer,
)
for body in cmd.batch_bodies
]

for handler in _find_handler(
Expand Down Expand Up @@ -309,7 +318,13 @@ async def build_message(
codec: Optional["CodecProto"] = None,
) -> "ConsumerRecord":
"""Build a Kafka ConsumerRecord for a sendable message."""
msg, content_type = await (codec or DefaultCodec()).encode(message, serializer)
from faststream.response.publish_type import PublishType

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And again

from faststream.response.response import PublishCommand as _BaseCmd

publish_cmd = _BaseCmd(
body=message, destination=topic, _publish_type=PublishType.PUBLISH
)
msg, content_type = await (codec or DefaultCodec()).encode(publish_cmd, serializer)

k = key or b""

Expand Down
Loading
Loading