Skip to content

Feature: unified codec configuration #2837

@ce1ebrimbor

Description

@ce1ebrimbor

Problem

Encoding and decoding use two different patterns in FastStream. Decoding is a function passed to the broker constructor (decoder=), called inside the handler DI chain with no middleware hook. Encoding is done through middleware via publish_scope. There is no decode_scope equivalent.

broker = KafkaBroker(
    "localhost:9092",
    decoder=my_avro_decoder,                   # function, wired into DI
    middlewares=[MyAvroEncoderMiddleware(...)],  # middleware class, wired into publish_scope
)

This means a developer working with a binary format like Confluent Avro or Protobuf has to implement the same serialization concern in two different places, with two different lifecycles, often sharing state awkwardly between them.

The workaround today is to decode in on_consume middleware and set a passthrough decoder= to prevent the default JSON decoder from re-processing the already-decoded body. It works, but requires understanding FastStream internals.

Proposal

Add a codec= parameter to the broker that handles both directions through a single object:

broker = KafkaBroker(
    "localhost:9092",
    codec=ConfluentAvroCodec(schema_registry_url="http://localhost:8081"),
)

Codec protocol

@runtime_checkable
class Codec(Protocol):
    async def decode(self, msg: StreamMessage[Any]) -> DecodedMessage: ...
    async def encode(self, cmd: PublishCommand) -> PublishCommand: ...

Both methods are async to support codecs that fetch schemas or perform IO during serialization.

Internal wiring

When codec= is provided, FastStream:

  • Wires codec.decode as the decoder function, slotting into the existing _get_parser_and_decoder chain
  • Auto-generates a middleware that calls codec.encode in publish_scope

No changes to the DI chain, middleware lifecycle, or parser composition. The codec is syntactic sugar over existing extension points.

Deprecation of decoder=

The codec= parameter replaces decoder=. Once codec is available, decoder= should be deprecated with a warning pointing users to codec= instead. The decoder= parameter would remain functional for a deprecation cycle but be removed in a future major version.

If both codec= and decoder= are provided, raise ValueError — the two are mutually exclusive to avoid ambiguous decode paths.

Example

codec = ConfluentAvroCodec(
    schema_registry_url="http://localhost:8081",
    topics_versions={"orders.events": "latest"},
)
broker = KafkaBroker("localhost:9092", codec=codec)

@broker.subscriber("orders.events")
async def handle_order(msg: OrderEvent) -> None:
    ...

await broker.publish(OrderEvent(...), topic="orders.events")

One object, one configuration site, both directions.

P.S. If the maintainers consider this a valuable addition, I'm happy to implement it and open a PR.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions