diff --git a/docs/docs/en/getting-started/serialization/codec.md b/docs/docs/en/getting-started/serialization/codec.md new file mode 100644 index 0000000000..8e49f4534d --- /dev/null +++ b/docs/docs/en/getting-started/serialization/codec.md @@ -0,0 +1,67 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 10 +--- + +# Custom Codec + +A codec provides a unified interface for both encoding (publishing) and decoding (consuming) messages. Unlike the older `decoder=` approach, a codec handles both directions in a single class. + +## Protocol + +Implement the `CodecProto` interface to create a custom codec: + +```python +class CodecProto(Protocol): + async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": ... + async def encode( + self, + cmd: "PublishCommand", + serializer: "SerializerProto | None" = None, + ) -> tuple[bytes, str | None]: ... +``` + +- **`decode`** — receives a `StreamMessage` with raw bytes in `msg.body` and returns the decoded Python value. +- **`encode`** — receives a `PublishCommand` containing the message body, destination, and headers. Returns a `(bytes, content_type)` tuple. Access the payload via `cmd.body` and the target topic/subject/queue via `cmd.destination`. + +If no codec is set, `DefaultCodec` is used automatically. It handles JSON objects, plain text, and raw bytes. + +## Example: Schema Registry + +A Confluent Avro codec that encodes and decodes messages using the [Confluent wire format](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format){target="_blank"} (magic byte + schema ID + Avro payload). Requires `fastavro` and `confluent-kafka`: + +```bash +pip install fastavro confluent-kafka +``` + +```python linenums="1" hl_lines="22-66 68-76" +{!> docs_src/getting_started/serialization/codec_schema_registry_kafka.py !} +``` + +!!! note + The codec fetches and caches schemas from the registry at startup and on first encounter. The `subject` follows Confluent's naming convention: `{topic}-value`. + +## Priority + +You can set a codec at the broker level or override it per subscriber. The subscriber-level codec always wins: + +```python +broker = KafkaBroker(codec=BrokerCodec()) + +@broker.subscriber("test", codec=SubscriberCodec()) # ← this wins +async def handle(body: str) -> None: + ... + +# If no codec is set at any level, DefaultCodec is used (JSON/text/bytes) +``` + +## Compatibility + +- **`codec=` and `parser=`** work together. The parser controls how the raw broker message is parsed into a `StreamMessage`; the codec then decodes or encodes the body. +- **`codec=` and `decoder=`** cannot be used together. Specifying both raises a `ValueError`. +- For the legacy `decoder=` approach, see [Custom Decoder](./decoder.md){.internal-link}. diff --git a/docs/docs/en/getting-started/serialization/decoder.md b/docs/docs/en/getting-started/serialization/decoder.md index 36a97b7989..47732fe029 100644 --- a/docs/docs/en/getting-started/serialization/decoder.md +++ b/docs/docs/en/getting-started/serialization/decoder.md @@ -8,6 +8,11 @@ search: boost: 10 --- +!!! warning "Superseded by Codec" + The `decoder=` parameter has been superseded by the new **Codec** system, which handles both encoding and decoding in a single interface. See [Custom Codec](./codec.md){.internal-link} for the recommended approach. + + Note: `codec=` and `decoder=` cannot be used together — specifying both will raise a `ValueError`. + # Custom Decoder At this stage, the body of a **StreamMessage** is transformed into the format that it will take when it enters your handler function. This stage is the one you will need to redefine more often. diff --git a/docs/docs/navigation_template.txt b/docs/docs/navigation_template.txt index 8ae793fa49..d30018d7e3 100644 --- a/docs/docs/navigation_template.txt +++ b/docs/docs/navigation_template.txt @@ -23,6 +23,7 @@ search: - [Context](getting-started/context.md) - [Custom Serialization](getting-started/serialization/index.md) - [Parser](getting-started/serialization/parser.md) + - [Codec](getting-started/serialization/codec.md) - [Decoder](getting-started/serialization/decoder.md) - [Examples](getting-started/serialization/examples.md) - [Lifespan](getting-started/lifespan/index.md) diff --git a/docs/docs_src/getting_started/serialization/codec_schema_registry_kafka.py b/docs/docs_src/getting_started/serialization/codec_schema_registry_kafka.py new file mode 100644 index 0000000000..ad23d6bf4f --- /dev/null +++ b/docs/docs_src/getting_started/serialization/codec_schema_registry_kafka.py @@ -0,0 +1,93 @@ +import io +import json +import struct +from typing import TYPE_CHECKING, Any, Dict + +import fastavro +from confluent_kafka.schema_registry import SchemaRegistryClient + +from faststream import FastStream +from faststream.kafka import KafkaBroker + +if TYPE_CHECKING: + from fast_depends.library.serializer import SerializerProto + + from faststream._internal.basic_types import DecodedMessage + from faststream.message import StreamMessage + from faststream.response.response import PublishCommand + +HEADER = struct.Struct(">bI") # magic byte (0) + 4-byte schema ID + + +class SchemaRegistryCodec: + def __init__( + self, + registry_url: str, + topics: Dict[str, int], + ) -> None: + self._client = SchemaRegistryClient({"url": registry_url}) + self._schema_cache: Dict[int, Any] = {} + self._topic_schemas: Dict[str, tuple[int, Any]] = {} + + for topic, version in topics.items(): + subject = f"{topic}-value" + meta = self._client.get_version(subject, version) + schema = fastavro.parse_schema(json.loads(meta.schema.schema_str)) + self._topic_schemas[topic] = (meta.schema_id, schema) + self._schema_cache[meta.schema_id] = schema + + def _get_schema(self, schema_id: int) -> Any: + if schema_id not in self._schema_cache: + raw = self._client.get_schema(schema_id) + self._schema_cache[schema_id] = fastavro.parse_schema( + json.loads(raw.schema_str) + ) + return self._schema_cache[schema_id] + + async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": + schema_id = int.from_bytes(msg.body[1:5], byteorder="big") + schema = self._get_schema(schema_id) + decoded: dict[str, Any] = fastavro.schemaless_reader( + io.BytesIO(msg.body[5:]), schema + ) + return decoded # type: ignore[return-value] + + async def encode( + self, + cmd: "PublishCommand", + serializer: "SerializerProto | None" = None, + ) -> tuple[bytes, str | None]: + schema_id, schema = self._topic_schemas[cmd.destination] + body = cmd.body + data = body.model_dump(mode="json") if hasattr(body, "model_dump") else body + buf = io.BytesIO() + buf.write(HEADER.pack(0, schema_id)) + fastavro.schemaless_writer(buf, schema, data) + return buf.getvalue(), "application/avro" + + +codec = SchemaRegistryCodec( + registry_url="http://localhost:8081", + topics={ + "orders": 1, + "users": 2, + }, +) +broker = KafkaBroker(codec=codec) +app = FastStream(broker) + + +@broker.subscriber("orders") +async def handle_order(body: dict[str, Any]) -> None: + ... + + +@broker.subscriber("users") +async def handle_user(body: dict[str, Any]) -> None: + ... + + +@app.after_startup +async def test() -> None: + await broker.publish({"order_id": "123", "amount": 99.99}, "orders") + await broker.publish({"name": "John", "age": 25}, "users") diff --git a/faststream/_internal/parser.py b/faststream/_internal/parser.py index 815075136a..a70772f6ad 100644 --- a/faststream/_internal/parser.py +++ b/faststream/_internal/parser.py @@ -7,8 +7,9 @@ if TYPE_CHECKING: from fast_depends.library.serializer import SerializerProto - from faststream._internal.basic_types import DecodedMessage, SendableMessage + from faststream._internal.basic_types import DecodedMessage from faststream.message import StreamMessage + from faststream.response.response import PublishCommand MsgType = TypeVar("MsgType") @@ -64,7 +65,7 @@ async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": @abstractmethod async def encode( self, - msg: "SendableMessage", + cmd: "PublishCommand", serializer: "SerializerProto | None" = None, ) -> tuple[bytes, str | None]: ... @@ -74,7 +75,7 @@ class BatchCodecProto(Protocol): @abstractmethod async def encode_batch( self, - msgs: Sequence["SendableMessage"], + cmd: "PublishCommand", serializer: "SerializerProto | None" = None, ) -> list[tuple[bytes, str | None]]: ... @@ -91,7 +92,7 @@ async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": async def encode( self, - msg: "SendableMessage", + cmd: "PublishCommand", serializer: "SerializerProto | None" = None, ) -> tuple[bytes, str | None]: - return encode_message(msg, serializer) + return encode_message(cmd.body, serializer) diff --git a/faststream/confluent/publisher/producer.py b/faststream/confluent/publisher/producer.py index 87748ae1ae..a84d6668a0 100644 --- a/faststream/confluent/publisher/producer.py +++ b/faststream/confluent/publisher/producer.py @@ -139,7 +139,7 @@ async def publish( cmd: "KafkaPublishCommand", ) -> "asyncio.Future[Message | None] | Message | None": """Publish a message to a topic.""" - message, content_type = await self.codec.encode(cmd.body, self.serializer) + message, content_type = await self.codec.encode(cmd, self.serializer) headers_to_send = { "content-type": content_type or "", @@ -164,12 +164,20 @@ async def publish_batch(self, cmd: "KafkaPublishCommand") -> None: headers_to_send = cmd.headers_to_publish() if isinstance(self.codec, BatchCodecProto): - encoded_batch = await self.codec.encode_batch( - cmd.batch_bodies, self.serializer - ) + encoded_batch = await self.codec.encode_batch(cmd, self.serializer) else: + from faststream.response.response import PublishCommand as _BaseCmd + encoded_batch = [ - await self.codec.encode(msg, self.serializer) for msg in cmd.batch_bodies + await self.codec.encode( + _BaseCmd( + body=msg, + destination=cmd.destination, + _publish_type=cmd.publish_type, + ), + self.serializer, + ) + for msg in cmd.batch_bodies ] for message_position, (message, content_type) in enumerate(encoded_batch): diff --git a/faststream/confluent/testing.py b/faststream/confluent/testing.py index d637f43ce3..cfea9a7ac0 100644 --- a/faststream/confluent/testing.py +++ b/faststream/confluent/testing.py @@ -22,6 +22,7 @@ from faststream.confluent.subscriber.usecase import BatchSubscriber from faststream.exceptions import SubscriberNotFound from faststream.message import gen_cor_id +from faststream.response.response import PublishCommand as _BasePublishCommand if TYPE_CHECKING: from fast_depends.library.serializer import SerializerProto @@ -195,10 +196,18 @@ async def publish_batch(self, cmd: "KafkaPublishCommand") -> None: serializer = self.broker.config.fd_config._serializer if isinstance(self.codec, BatchCodecProto): - encoded = await self.codec.encode_batch(cmd.batch_bodies, serializer) + encoded = await self.codec.encode_batch(cmd, serializer) else: encoded = [ - await self.codec.encode(body, serializer) for body in cmd.batch_bodies + await self.codec.encode( + _BasePublishCommand( + body=body, + destination=cmd.destination, + _publish_type=cmd.publish_type, + ), + serializer, + ) + for body in cmd.batch_bodies ] for handler in _find_handler( @@ -345,8 +354,14 @@ async def build_message( codec: Optional["CodecProto"] = None, ) -> MockConfluentMessage: """Build a mock confluent_kafka.Message for a sendable message.""" + from faststream.response.publish_type import PublishType + from faststream.response.response import PublishCommand as _BaseCmd + codec_instance = codec or DefaultCodec() - msg, content_type = await codec_instance.encode(message, serializer) + publish_cmd = _BaseCmd( + body=message, destination=topic, _publish_type=PublishType.PUBLISH + ) + msg, content_type = await codec_instance.encode(publish_cmd, serializer) k = key or b"" headers = { "content-type": content_type or "", diff --git a/faststream/kafka/publisher/producer.py b/faststream/kafka/publisher/producer.py index 3a6fd5c512..6a9d95d4ee 100644 --- a/faststream/kafka/publisher/producer.py +++ b/faststream/kafka/publisher/producer.py @@ -110,7 +110,7 @@ async def publish( cmd: "KafkaPublishCommand", ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: """Publish a message to a topic.""" - message, content_type = await self.codec.encode(cmd.body, self.serializer) + message, content_type = await self.codec.encode(cmd, self.serializer) headers_to_send = { "content-type": content_type or "", @@ -141,12 +141,19 @@ async def publish_batch( headers_to_send = cmd.headers_to_publish() if isinstance(self.codec, BatchCodecProto): - encoded_batch = await self.codec.encode_batch( - cmd.batch_bodies, self.serializer - ) + encoded_batch = await self.codec.encode_batch(cmd, self.serializer) else: + from faststream.response.response import PublishCommand as _BaseCmd + encoded_batch = [ - await self.codec.encode(body, self.serializer) + await self.codec.encode( + _BaseCmd( + body=body, + destination=cmd.destination, + _publish_type=cmd.publish_type, + ), + self.serializer, + ) for body in cmd.batch_bodies ] diff --git a/faststream/kafka/testing.py b/faststream/kafka/testing.py index c8c1ef7dc8..7d698e64f9 100755 --- a/faststream/kafka/testing.py +++ b/faststream/kafka/testing.py @@ -25,6 +25,7 @@ from faststream.kafka.publisher.usecase import BatchPublisher from faststream.kafka.subscriber.usecase import BatchSubscriber from faststream.message import gen_cor_id +from faststream.response.response import PublishCommand as _BasePublishCommand if TYPE_CHECKING: from fast_depends.library.serializer import SerializerProto @@ -244,10 +245,18 @@ async def publish_batch( serializer = self.broker.config.fd_config._serializer if isinstance(self.codec, BatchCodecProto): - encoded = await self.codec.encode_batch(cmd.batch_bodies, serializer) + encoded = await self.codec.encode_batch(cmd, serializer) else: encoded = [ - await self.codec.encode(body, serializer) for body in cmd.batch_bodies + await self.codec.encode( + _BasePublishCommand( + body=body, + destination=cmd.destination, + _publish_type=cmd.publish_type, + ), + serializer, + ) + for body in cmd.batch_bodies ] for handler in _find_handler( @@ -309,7 +318,13 @@ async def build_message( codec: Optional["CodecProto"] = None, ) -> "ConsumerRecord": """Build a Kafka ConsumerRecord for a sendable message.""" - msg, content_type = await (codec or DefaultCodec()).encode(message, serializer) + from faststream.response.publish_type import PublishType + from faststream.response.response import PublishCommand as _BaseCmd + + publish_cmd = _BaseCmd( + body=message, destination=topic, _publish_type=PublishType.PUBLISH + ) + msg, content_type = await (codec or DefaultCodec()).encode(publish_cmd, serializer) k = key or b"" diff --git a/faststream/mqtt/publisher/producer.py b/faststream/mqtt/publisher/producer.py index 6cb4dc83ef..8ad0026808 100644 --- a/faststream/mqtt/publisher/producer.py +++ b/faststream/mqtt/publisher/producer.py @@ -92,7 +92,7 @@ async def publish(self, cmd: "MQTTPublishCommand") -> None: if cmd.headers: msg = "MQTT 3.1.1 does not support message headers. Use MQTT 5.0." raise FeatureNotSupportedException(msg) - payload, _ = await self.codec.encode(cmd.body, self.serializer) + payload, _ = await self.codec.encode(cmd, self.serializer) await self._connected_client.publish( cmd.destination, payload, @@ -117,7 +117,7 @@ async def request(self, cmd: "MQTTPublishCommand") -> "zmqtt.Message": await sub.start() try: - payload, _ = await self.codec.encode(cmd.body, self.serializer) + payload, _ = await self.codec.encode(cmd, self.serializer) await self._connected_client.publish( cmd.destination, payload, @@ -144,7 +144,7 @@ def __init__( @override async def publish(self, cmd: "MQTTPublishCommand") -> None: - payload, content_type = await self.codec.encode(cmd.body, self.serializer) + payload, content_type = await self.codec.encode(cmd, self.serializer) user_props: list[tuple[str, str]] = [ (k, str(v)) for k, v in (cmd.headers or {}).items() @@ -174,7 +174,7 @@ async def request(self, cmd: "MQTTPublishCommand") -> "zmqtt.Message": ID explicitly so the responder echoes it back and the caller can verify it on the response StreamMessage. """ - payload, content_type = await self.codec.encode(cmd.body, self.serializer) + payload, content_type = await self.codec.encode(cmd, self.serializer) correlation_id = cmd.correlation_id or gen_cor_id() user_props: list[tuple[str, str]] = [ diff --git a/faststream/mqtt/testing.py b/faststream/mqtt/testing.py index 518bf89202..07077389ad 100644 --- a/faststream/mqtt/testing.py +++ b/faststream/mqtt/testing.py @@ -21,6 +21,8 @@ from faststream.mqtt.parser import MQTTParserV5, MQTTParserV311 from faststream.mqtt.publisher.producer import ZmqttBaseProducer from faststream.mqtt.response import MQTTPublishCommand +from faststream.response.publish_type import PublishType +from faststream.response.response import PublishCommand as _BasePublishCommand if TYPE_CHECKING: from fast_depends.library.serializer import SerializerProto @@ -287,7 +289,12 @@ async def build_message( """ if codec is None: codec = DefaultCodec() - payload, content_type = await codec.encode(message, serializer=serializer) + publish_cmd = _BasePublishCommand( + body=message, + destination=topic, + _publish_type=PublishType.PUBLISH, + ) + payload, content_type = await codec.encode(publish_cmd, serializer=serializer) if version == "3.1.1": return zmqtt.Message( diff --git a/faststream/nats/publisher/producer.py b/faststream/nats/publisher/producer.py index 8095efd391..561baa9f4c 100644 --- a/faststream/nats/publisher/producer.py +++ b/faststream/nats/publisher/producer.py @@ -90,7 +90,7 @@ def disconnect(self) -> None: @override async def publish(self, cmd: "NatsPublishCommand") -> None: - payload, content_type = await self.codec.encode(cmd.body, self.serializer) + payload, content_type = await self.codec.encode(cmd, self.serializer) headers_to_send = { "content-type": content_type or "", @@ -106,7 +106,7 @@ async def publish(self, cmd: "NatsPublishCommand") -> None: @override async def request(self, cmd: "NatsPublishCommand") -> "Msg": - payload, content_type = await self.codec.encode(cmd.body, self.serializer) + payload, content_type = await self.codec.encode(cmd, self.serializer) headers_to_send = { "content-type": content_type or "", @@ -157,7 +157,7 @@ def disconnect(self) -> None: @override async def publish(self, cmd: "NatsPublishCommand") -> "PubAck": - payload, content_type = await self.codec.encode(cmd.body, self.serializer) + payload, content_type = await self.codec.encode(cmd, self.serializer) headers_to_send = { "content-type": content_type or "", @@ -174,7 +174,7 @@ async def publish(self, cmd: "NatsPublishCommand") -> "PubAck": @override async def request(self, cmd: "NatsPublishCommand") -> "Msg": - payload, content_type = await self.codec.encode(cmd.body, self.serializer) + payload, content_type = await self.codec.encode(cmd, self.serializer) reply_to = self.__state.connection._nc.new_inbox() future: asyncio.Future[Msg] = asyncio.Future() diff --git a/faststream/nats/testing.py b/faststream/nats/testing.py index f8d8b1c5f2..c0248f1ffc 100644 --- a/faststream/nats/testing.py +++ b/faststream/nats/testing.py @@ -16,6 +16,8 @@ from faststream.nats.parser import NatsParser from faststream.nats.publisher.producer import NatsFastProducer from faststream.nats.schemas.js_stream import is_subject_match_wildcard +from faststream.response.publish_type import PublishType +from faststream.response.response import PublishCommand as _BasePublishCommand if TYPE_CHECKING: from fast_depends.library.serializer import SerializerProto @@ -271,7 +273,12 @@ async def build_message( ) -> "PatchedMessage": if codec is None: codec = DefaultCodec() - msg, content_type = await codec.encode(message, serializer=serializer) + publish_cmd = _BasePublishCommand( + body=message, + destination=subject, + _publish_type=PublishType.PUBLISH, + ) + msg, content_type = await codec.encode(publish_cmd, serializer=serializer) return PatchedMessage( _client=None, # type: ignore[arg-type] subject=subject, diff --git a/faststream/rabbit/parser.py b/faststream/rabbit/parser.py index 796b6d41f0..0437ad63e3 100644 --- a/faststream/rabbit/parser.py +++ b/faststream/rabbit/parser.py @@ -60,6 +60,7 @@ async def decode_message( async def encode_message( message: "AioPikaSendableMessage", *, + destination: str = "", persist: bool = False, reply_to: str | None = None, headers: Optional["HeadersType"] = None, @@ -80,8 +81,14 @@ async def encode_message( if isinstance(message, Message): return message + from faststream.response.publish_type import PublishType + from faststream.response.response import PublishCommand as _BaseCmd + + publish_cmd = _BaseCmd( + body=message, destination=destination, _publish_type=PublishType.PUBLISH + ) message_body, generated_content_type = await (codec or DefaultCodec()).encode( - message, serializer + publish_cmd, serializer ) delivery_mode = ( diff --git a/faststream/rabbit/publisher/producer.py b/faststream/rabbit/publisher/producer.py index ddd1caf16a..b1960bd28e 100644 --- a/faststream/rabbit/publisher/producer.py +++ b/faststream/rabbit/publisher/producer.py @@ -195,6 +195,7 @@ async def _publish( ) -> Optional["aiormq.abc.ConfirmationFrameType"]: message = await AioPikaParser.encode_message( message=message, + destination=routing_key, serializer=self.serializer, codec=self.codec, **message_options, diff --git a/faststream/rabbit/testing.py b/faststream/rabbit/testing.py index c4da14febf..38ffbf92c0 100644 --- a/faststream/rabbit/testing.py +++ b/faststream/rabbit/testing.py @@ -186,6 +186,7 @@ async def build_message( correlation_id = correlation_id or gen_cor_id() msg = await AioPikaParser.encode_message( message=message, + destination=routing, persist=persist, reply_to=reply_to, headers=headers, diff --git a/faststream/redis/parser/binary.py b/faststream/redis/parser/binary.py index b64f3c396e..50d965e93a 100644 --- a/faststream/redis/parser/binary.py +++ b/faststream/redis/parser/binary.py @@ -31,6 +31,7 @@ async def encode( reply_to: str | None, headers: dict[str, Any] | None, correlation_id: str, + destination: str = "", serializer: Optional["SerializerProto"] = None, codec: Optional["CodecProto"] = None, ) -> bytes: @@ -39,6 +40,7 @@ async def encode( reply_to=reply_to, headers=headers, correlation_id=correlation_id, + destination=destination, serializer=serializer, codec=codec, ) diff --git a/faststream/redis/parser/message.py b/faststream/redis/parser/message.py index b0357f52cd..759b58998b 100644 --- a/faststream/redis/parser/message.py +++ b/faststream/redis/parser/message.py @@ -35,11 +35,18 @@ async def build( reply_to: str | None, headers: dict[str, Any] | None, correlation_id: str, + destination: str = "", serializer: Optional["SerializerProto"] = None, codec: Optional["CodecProto"] = None, ) -> "MessageFormat": + from faststream.response.publish_type import PublishType + from faststream.response.response import PublishCommand as _BaseCmd + codec_instance = codec or DefaultCodec() - payload, content_type = await codec_instance.encode(message, serializer) # type: ignore[arg-type] + publish_cmd = _BaseCmd( + body=message, destination=destination, _publish_type=PublishType.PUBLISH + ) + payload, content_type = await codec_instance.encode(publish_cmd, serializer) headers_to_send = { "correlation_id": correlation_id, @@ -68,6 +75,7 @@ async def encode( reply_to: str | None, headers: dict[str, Any] | None, correlation_id: str, + destination: str = "", serializer: Optional["SerializerProto"] = None, codec: Optional["CodecProto"] = None, ) -> bytes: diff --git a/faststream/redis/publisher/producer.py b/faststream/redis/publisher/producer.py index fb8bdc73f5..5a218282de 100644 --- a/faststream/redis/publisher/producer.py +++ b/faststream/redis/publisher/producer.py @@ -63,6 +63,7 @@ async def publish_batch(self, cmd: "RedisPublishCommand") -> int: correlation_id=cmd.correlation_id or "", reply_to=cmd.reply_to, headers=cmd.headers, + destination=cmd.destination, serializer=self.serializer, codec=self.codec, ) @@ -115,6 +116,7 @@ async def publish(self, cmd: "RedisPublishCommand") -> int | bytes: reply_to=cmd.reply_to, headers=cmd.headers, correlation_id=cmd.correlation_id or "", + destination=cmd.destination, serializer=self.serializer, codec=self.codec, ) @@ -161,6 +163,7 @@ async def request(self, cmd: "RedisPublishCommand") -> "Any": reply_to=reply_to, headers=cmd.headers, correlation_id=cmd.correlation_id or "", + destination=cmd.destination, serializer=self.serializer, codec=self.codec, ) @@ -218,6 +221,7 @@ async def publish(self, cmd: "RedisPublishCommand") -> int | bytes: reply_to=cmd.reply_to, headers=cmd.headers, correlation_id=cmd.correlation_id or "", + destination=cmd.destination, serializer=self.serializer, codec=self.codec, ) @@ -252,6 +256,7 @@ async def request(self, cmd: "RedisPublishCommand") -> "Any": reply_to=reply_to, headers=cmd.headers, correlation_id=cmd.correlation_id or "", + destination=cmd.destination, serializer=self.serializer, codec=self.codec, ) diff --git a/faststream/redis/testing.py b/faststream/redis/testing.py index 4c40e503b2..7adfd0d044 100644 --- a/faststream/redis/testing.py +++ b/faststream/redis/testing.py @@ -230,6 +230,7 @@ async def publish(self, cmd: "RedisPublishCommand") -> int | bytes: correlation_id=cmd.correlation_id or gen_cor_id(), headers=cmd.headers, message_format=cmd.message_format, + destination=cmd.destination, serializer=self.broker.config.fd_config._serializer, codec=self.codec, ) @@ -257,6 +258,7 @@ async def request(self, cmd: "RedisPublishCommand") -> "PubSubMessage": correlation_id=cmd.correlation_id or gen_cor_id(), headers=cmd.headers, message_format=cmd.message_format, + destination=cmd.destination, serializer=self.broker.config.fd_config._serializer, codec=self.codec, ) @@ -337,6 +339,7 @@ async def build_message( message_format: type["MessageFormat"], reply_to: str = "", headers: dict[str, Any] | None = None, + destination: str = "", serializer: Optional["SerializerProto"] = None, codec: Optional["CodecProto"] = None, ) -> bytes: @@ -345,6 +348,7 @@ async def build_message( reply_to=reply_to, headers=headers, correlation_id=correlation_id, + destination=destination, serializer=serializer, codec=codec, ) diff --git a/tests/brokers/base/codec.py b/tests/brokers/base/codec.py index 52ebfa692b..d529d90dd3 100644 --- a/tests/brokers/base/codec.py +++ b/tests/brokers/base/codec.py @@ -103,9 +103,9 @@ async def test_codec_encode_called(self, queue: str) -> None: mock = MagicMock() class TrackingCodec(DefaultCodec): - async def encode(self, msg, serializer=None): + async def encode(self, cmd, serializer=None): mock() - return await super().encode(msg, serializer) + return await super().encode(cmd, serializer) broker = self.get_broker(codec=TrackingCodec()) @@ -121,6 +121,9 @@ async def handle(m) -> None: assert mock.called, "codec.encode was not called on publish" async def test_default_codec_encode_matches_encode_message(self, queue: str) -> None: + from faststream.response.publish_type import PublishType + from faststream.response.response import PublishCommand + codec = DefaultCodec() test_cases = [ @@ -131,7 +134,12 @@ async def test_default_codec_encode_matches_encode_message(self, queue: str) -> ] for msg in test_cases: - codec_result = await codec.encode(msg, None) + codec_result = await codec.encode( + PublishCommand( + body=msg, destination="test", _publish_type=PublishType.PUBLISH + ), + None, + ) direct_result = encode_message(msg, None) assert codec_result == direct_result, ( f"DefaultCodec.encode({msg!r}) = {codec_result!r} " @@ -151,10 +159,23 @@ async def test_batch_codec_decode_batch_called( class TrackingBatchCodec(DefaultCodec): async def encode_batch( self, - msgs: Sequence[Any], + cmd, serializer: Any = None, ) -> list[tuple[bytes, str | None]]: - return [await DefaultCodec.encode(self, m, serializer) for m in msgs] + from faststream.response.response import PublishCommand + + return [ + await DefaultCodec.encode( + self, + PublishCommand( + body=body, + destination=cmd.destination, + _publish_type=cmd.publish_type, + ), + serializer, + ) + for body in cmd.batch_bodies + ] async def decode_batch(self, msg: Any) -> list[Any]: decode_batch_mock() @@ -182,11 +203,24 @@ async def test_batch_codec_encode_batch_called( class TrackingBatchCodec(DefaultCodec): async def encode_batch( self, - msgs: Sequence[Any], + cmd, serializer: Any = None, ) -> list[tuple[bytes, str | None]]: + from faststream.response.response import PublishCommand + encode_batch_mock() - return [await DefaultCodec.encode(self, m, serializer) for m in msgs] + return [ + await DefaultCodec.encode( + self, + PublishCommand( + body=body, + destination=cmd.destination, + _publish_type=cmd.publish_type, + ), + serializer, + ) + for body in cmd.batch_bodies + ] async def decode_batch(self, msg: Any) -> list[Any]: return [b.decode() if isinstance(b, bytes) else b for b in msg.body]