-
Notifications
You must be signed in to change notification settings - Fork 356
feat(codec): add destination parameter to encode #2902
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
c8afed6
a4a0bbc
3e0d1d1
1239f71
f7334c8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| --- | ||
| # 0.5 - API | ||
| # 2 - Release | ||
| # 3 - Contributing | ||
| # 5 - Template Page | ||
| # 10 - Default | ||
| search: | ||
| boost: 10 | ||
| --- | ||
|
|
||
| # Custom Codec | ||
|
|
||
| A codec provides a unified interface for both encoding (publishing) and decoding (consuming) messages. Unlike the older `decoder=` approach, a codec handles both directions in a single class. | ||
|
|
||
| ## Protocol | ||
|
|
||
| Implement the `CodecProto` interface to create a custom codec: | ||
|
|
||
| ```python | ||
| class CodecProto(Protocol): | ||
| async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": ... | ||
| async def encode( | ||
| self, | ||
| cmd: "PublishCommand", | ||
| serializer: "SerializerProto | None" = None, | ||
| ) -> tuple[bytes, str | None]: ... | ||
| ``` | ||
|
|
||
| - **`decode`** — receives a `StreamMessage` with raw bytes in `msg.body` and returns the decoded Python value. | ||
| - **`encode`** — receives a `PublishCommand` containing the message body, destination, and headers. Returns a `(bytes, content_type)` tuple. Access the payload via `cmd.body` and the target topic/subject/queue via `cmd.destination`. | ||
|
|
||
| If no codec is set, `DefaultCodec` is used automatically. It handles JSON objects, plain text, and raw bytes. | ||
|
|
||
| ## Example: Schema Registry | ||
|
|
||
| A Confluent Avro codec that encodes and decodes messages using the [Confluent wire format](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format){target="_blank"} (magic byte + schema ID + Avro payload). Requires `fastavro` and `confluent-kafka`: | ||
|
|
||
| ```bash | ||
| pip install fastavro confluent-kafka | ||
| ``` | ||
|
|
||
| ```python linenums="1" hl_lines="22-66 68-76" | ||
| {!> docs_src/getting_started/serialization/codec_schema_registry_kafka.py !} | ||
| ``` | ||
|
|
||
| !!! note | ||
| The codec fetches and caches schemas from the registry at startup and on first encounter. The `subject` follows Confluent's naming convention: `{topic}-value`. | ||
|
|
||
| ## Priority | ||
|
|
||
| You can set a codec at the broker level or override it per subscriber. The subscriber-level codec always wins: | ||
|
|
||
| ```python | ||
| broker = KafkaBroker(codec=BrokerCodec()) | ||
|
|
||
| @broker.subscriber("test", codec=SubscriberCodec()) # ← this wins | ||
| async def handle(body: str) -> None: | ||
| ... | ||
|
|
||
| # If no codec is set at any level, DefaultCodec is used (JSON/text/bytes) | ||
| ``` | ||
|
|
||
| ## Compatibility | ||
|
|
||
| - **`codec=` and `parser=`** work together. The parser controls how the raw broker message is parsed into a `StreamMessage`; the codec then decodes or encodes the body. | ||
| - **`codec=` and `decoder=`** cannot be used together. Specifying both raises a `ValueError`. | ||
| - For the legacy `decoder=` approach, see [Custom Decoder](./decoder.md){.internal-link}. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,93 @@ | ||
| import io | ||
| import json | ||
| import struct | ||
| from typing import TYPE_CHECKING, Any, Dict | ||
|
|
||
| import fastavro | ||
| from confluent_kafka.schema_registry import SchemaRegistryClient | ||
|
|
||
| from faststream import FastStream | ||
| from faststream.kafka import KafkaBroker | ||
|
|
||
| if TYPE_CHECKING: | ||
| from fast_depends.library.serializer import SerializerProto | ||
|
|
||
| from faststream._internal.basic_types import DecodedMessage | ||
| from faststream.message import StreamMessage | ||
| from faststream.response.response import PublishCommand | ||
|
|
||
| HEADER = struct.Struct(">bI") # magic byte (0) + 4-byte schema ID | ||
|
|
||
|
|
||
| class SchemaRegistryCodec: | ||
| def __init__( | ||
| self, | ||
| registry_url: str, | ||
| topics: Dict[str, int], | ||
| ) -> None: | ||
| self._client = SchemaRegistryClient({"url": registry_url}) | ||
| self._schema_cache: Dict[int, Any] = {} | ||
| self._topic_schemas: Dict[str, tuple[int, Any]] = {} | ||
|
|
||
| for topic, version in topics.items(): | ||
| subject = f"{topic}-value" | ||
| meta = self._client.get_version(subject, version) | ||
| schema = fastavro.parse_schema(json.loads(meta.schema.schema_str)) | ||
| self._topic_schemas[topic] = (meta.schema_id, schema) | ||
| self._schema_cache[meta.schema_id] = schema | ||
|
|
||
| def _get_schema(self, schema_id: int) -> Any: | ||
| if schema_id not in self._schema_cache: | ||
| raw = self._client.get_schema(schema_id) | ||
| self._schema_cache[schema_id] = fastavro.parse_schema( | ||
| json.loads(raw.schema_str) | ||
| ) | ||
| return self._schema_cache[schema_id] | ||
|
|
||
| async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": | ||
| schema_id = int.from_bytes(msg.body[1:5], byteorder="big") | ||
| schema = self._get_schema(schema_id) | ||
| decoded: dict[str, Any] = fastavro.schemaless_reader( | ||
| io.BytesIO(msg.body[5:]), schema | ||
| ) | ||
| return decoded # type: ignore[return-value] | ||
|
|
||
| async def encode( | ||
| self, | ||
| cmd: "PublishCommand", | ||
| serializer: "SerializerProto | None" = None, | ||
| ) -> tuple[bytes, str | None]: | ||
| schema_id, schema = self._topic_schemas[cmd.destination] | ||
| body = cmd.body | ||
| data = body.model_dump(mode="json") if hasattr(body, "model_dump") else body | ||
| buf = io.BytesIO() | ||
| buf.write(HEADER.pack(0, schema_id)) | ||
| fastavro.schemaless_writer(buf, schema, data) | ||
| return buf.getvalue(), "application/avro" | ||
|
|
||
|
|
||
| codec = SchemaRegistryCodec( | ||
| registry_url="http://localhost:8081", | ||
| topics={ | ||
| "orders": 1, | ||
| "users": 2, | ||
| }, | ||
| ) | ||
| broker = KafkaBroker(codec=codec) | ||
| app = FastStream(broker) | ||
|
|
||
|
|
||
| @broker.subscriber("orders") | ||
| async def handle_order(body: dict[str, Any]) -> None: | ||
| ... | ||
|
|
||
|
|
||
| @broker.subscriber("users") | ||
| async def handle_user(body: dict[str, Any]) -> None: | ||
| ... | ||
|
|
||
|
|
||
| @app.after_startup | ||
| async def test() -> None: | ||
| await broker.publish({"order_id": "123", "amount": 99.99}, "orders") | ||
| await broker.publish({"name": "John", "age": 25}, "users") |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ | |
| from faststream.confluent.subscriber.usecase import BatchSubscriber | ||
| from faststream.exceptions import SubscriberNotFound | ||
| from faststream.message import gen_cor_id | ||
| from faststream.response.response import PublishCommand as _BasePublishCommand | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previous file it was imported as |
||
|
|
||
| if TYPE_CHECKING: | ||
| from fast_depends.library.serializer import SerializerProto | ||
|
|
@@ -195,10 +196,18 @@ async def publish_batch(self, cmd: "KafkaPublishCommand") -> None: | |
| serializer = self.broker.config.fd_config._serializer | ||
|
|
||
| if isinstance(self.codec, BatchCodecProto): | ||
| encoded = await self.codec.encode_batch(cmd.batch_bodies, serializer) | ||
| encoded = await self.codec.encode_batch(cmd, serializer) | ||
| else: | ||
| encoded = [ | ||
| await self.codec.encode(body, serializer) for body in cmd.batch_bodies | ||
| await self.codec.encode( | ||
| _BasePublishCommand( | ||
| body=body, | ||
| destination=cmd.destination, | ||
| _publish_type=cmd.publish_type, | ||
| ), | ||
| serializer, | ||
| ) | ||
| for body in cmd.batch_bodies | ||
| ] | ||
|
|
||
| for handler in _find_handler( | ||
|
|
@@ -345,8 +354,14 @@ async def build_message( | |
| codec: Optional["CodecProto"] = None, | ||
| ) -> MockConfluentMessage: | ||
| """Build a mock confluent_kafka.Message for a sendable message.""" | ||
| from faststream.response.publish_type import PublishType | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function-level imports again |
||
| from faststream.response.response import PublishCommand as _BaseCmd | ||
|
|
||
| codec_instance = codec or DefaultCodec() | ||
| msg, content_type = await codec_instance.encode(message, serializer) | ||
| publish_cmd = _BaseCmd( | ||
| body=message, destination=topic, _publish_type=PublishType.PUBLISH | ||
| ) | ||
| msg, content_type = await codec_instance.encode(publish_cmd, serializer) | ||
| k = key or b"" | ||
| headers = { | ||
| "content-type": content_type or "", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ | |
| from faststream.kafka.publisher.usecase import BatchPublisher | ||
| from faststream.kafka.subscriber.usecase import BatchSubscriber | ||
| from faststream.message import gen_cor_id | ||
| from faststream.response.response import PublishCommand as _BasePublishCommand | ||
|
|
||
| if TYPE_CHECKING: | ||
| from fast_depends.library.serializer import SerializerProto | ||
|
|
@@ -244,10 +245,18 @@ async def publish_batch( | |
| serializer = self.broker.config.fd_config._serializer | ||
|
|
||
| if isinstance(self.codec, BatchCodecProto): | ||
| encoded = await self.codec.encode_batch(cmd.batch_bodies, serializer) | ||
| encoded = await self.codec.encode_batch(cmd, serializer) | ||
| else: | ||
| encoded = [ | ||
| await self.codec.encode(body, serializer) for body in cmd.batch_bodies | ||
| await self.codec.encode( | ||
| _BasePublishCommand( | ||
| body=body, | ||
| destination=cmd.destination, | ||
| _publish_type=cmd.publish_type, | ||
| ), | ||
| serializer, | ||
| ) | ||
| for body in cmd.batch_bodies | ||
| ] | ||
|
|
||
| for handler in _find_handler( | ||
|
|
@@ -309,7 +318,13 @@ async def build_message( | |
| codec: Optional["CodecProto"] = None, | ||
| ) -> "ConsumerRecord": | ||
| """Build a Kafka ConsumerRecord for a sendable message.""" | ||
| msg, content_type = await (codec or DefaultCodec()).encode(message, serializer) | ||
| from faststream.response.publish_type import PublishType | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And again |
||
| from faststream.response.response import PublishCommand as _BaseCmd | ||
|
|
||
| publish_cmd = _BaseCmd( | ||
| body=message, destination=topic, _publish_type=PublishType.PUBLISH | ||
| ) | ||
| msg, content_type = await (codec or DefaultCodec()).encode(publish_cmd, serializer) | ||
|
|
||
| k = key or b"" | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a reason to use function-levels here (or anywhere)