Skip to content

Commit 4678e05

Browse files
feat(ISV-6425): extend base consumer with retry mechanism (#5)
Signed-off-by: Marek Szymutko <mszymutk@redhat.com> Assisted-by: Claude-4.5-Sonnet (Cursor)
1 parent 179825b commit 4678e05

27 files changed

+3138
-827
lines changed

README.md

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,47 @@ additional functionalities to these selected parts.
66

77
## Features
88

9-
As this library is currently under development, this library should not be
10-
used.
11-
129
The aim is to provide a fault-tolerant platform for parallel message
1310
processing.
1411

12+
### Parallel processing
13+
14+
Each consumer requires an executor pool, which will be used for message
15+
processing. Each consumer can consume from multiple topics and processes
16+
the messages from these topics by a single callable. The callable must be
17+
specified by the user of this library.
18+
19+
The library also ensures exactly-once processing when used correctly.
20+
To ensure this, the tasks should take short enough time that all
21+
of them finish before the cluster forces rebalancing. The library tries to
22+
finish tasks from revoked partitions before the rebalance while stopping
23+
additional non-started tasks. The default timeout for each task to finish is
24+
30 seconds, but can be changed. Kafka cluster behavior change may also be
25+
needed with longer tasks. This behavior only appears during rebalancing and
26+
graceful stopping of the consumer.
27+
28+
### Fault-tolerance
29+
30+
Each consumer accepts configuration with retry topics. A retry topic is
31+
a Kafka topic used for asynchronous retrying of message processing. If the
32+
specified target callable fails, the consumer will commit the original message
33+
and resends the same message to a retry topic with special headers. The headers
34+
include information about the next timestamp at which the message should be
35+
processed again (to give some time to the error to disappear if the processing
36+
depends on some outside infrastructure).
37+
38+
The retry topic is polled alongside the original topic. If a message contains
39+
the special timestamp header, its Kafka partition of origin will be paused and
40+
the message will be stored locally. The processing will resume only after the
41+
specified timestamp passes. The message will not be processed before the
42+
timestamp, it can only gather delay (depending on the occupation of the
43+
worker pool). Once the message is sent to the pool for re-processing, the
44+
consumption of the blocked partition is resumed.
45+
46+
This whole mechanism **does not ensure message ordering**. When a message is
47+
sent to be retried, another message processing from the same topic is still
48+
unblocked.
49+
1550
## Local testing
1651

1752
This project uses [`uv`][1]. To set up the project locally, use
@@ -33,7 +68,7 @@ For integration tests you also need [`podman`][3] or [`docker`][4] with
3368
`compose`. Run:
3469

3570
```bash
36-
docker compose up -d
71+
podman compose up -d
3772
```
3873

3974
Wait a while and then run:
Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,16 @@
11
"""Retriable Kafka client module"""
22

33
from .consumer import BaseConsumer
4-
from .types import ConsumerConfig
5-
from .orchestrate import consume_topics
4+
from .config import ConsumerConfig, ProducerConfig, ConsumeTopicConfig
5+
from .orchestrate import consume_topics, ConsumerThread
6+
from .producer import BaseProducer
67

7-
__all__ = ("BaseConsumer", "ConsumerConfig", "consume_topics")
8+
__all__ = (
9+
"BaseConsumer",
10+
"BaseProducer",
11+
"consume_topics",
12+
"ConsumerConfig",
13+
"ConsumerThread",
14+
"ProducerConfig",
15+
"ConsumeTopicConfig",
16+
)
Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,21 @@
44
from typing import Callable, Any
55

66

7-
@dataclass
7+
@dataclass(kw_only=True)
88
class _CommonConfig:
99
"""
1010
Topic configuration common for consumers and producers.
1111
Attributes:
1212
kafka_hosts: list of Kafka node URLs to connect to
13-
topics: list of topic names to connect to
1413
username: Kafka username
1514
password: Kafka password
15+
additional_settings: additional settings to pass directly to Kafka
1616
"""
1717

1818
kafka_hosts: list[str]
19-
topics: list[str]
2019
username: str
2120
password: str
21+
additional_settings: dict[str, Any] = field(default_factory=dict)
2222

2323

2424
@dataclass
@@ -30,28 +30,57 @@ class ProducerConfig(_CommonConfig):
3030
topics: list of topic names to publish to
3131
username: producer username
3232
password: producer password
33+
additional_settings: additional settings to pass directly to Kafka consumer
3334
retries: number of attempts to publish the message
3435
fallback_factor: how many times longer should each backoff take
3536
fallback_base: what is the starting backoff in seconds
3637
"""
3738

39+
topics: list[str]
3840
retries: int = field(default=3)
3941
fallback_factor: float = field(default=2.0)
4042
fallback_base: float = field(default=5.0)
4143

4244

45+
@dataclass
46+
class ConsumeTopicConfig:
47+
"""
48+
Configuration for retry mechanism of a consumer.
49+
Must be used from within ConsumerConfig.
50+
Attributes:
51+
base_topic: Topic that this consumer subscribes to
52+
retry_topic: Topic used for resending failed messages
53+
retries: maximal number of attempts to re-process the
54+
message originated from base_topic
55+
fallback_delay: Number of seconds to wait before a message
56+
should be re-processed. This is a non-blocking event.
57+
"""
58+
59+
base_topic: str
60+
retry_topic: str | None = field(default=None)
61+
retries: int = field(default=5)
62+
fallback_delay: float = field(default=15.0)
63+
64+
4365
@dataclass
4466
class ConsumerConfig(_CommonConfig):
4567
"""
4668
Topic configuration for each consumer.
4769
Attributes:
4870
kafka_hosts: list of Kafka node URLs to connect to
49-
topics: list of topic names to connect to
71+
topics: list of configuration for topics and their
72+
retry policies
73+
cancel_future_wait_time: Maximal time to wait for a task
74+
to finish before discarding it on rebalance or soft shutdown.
75+
Doesn't affect tasks which are ran in normal circumstances.
5076
username: consumer username
5177
password: consumer password
78+
additional_settings: additional settings to pass directly to Kafka producer
5279
group_id: consumer group ID to use when consuming
5380
target: Callable to execute on all parsed messages
5481
"""
5582

5683
group_id: str
5784
target: Callable[[dict[str, Any]], Any]
85+
topics: list[ConsumeTopicConfig] = field(default_factory=list)
86+
cancel_future_wait_time: float = field(default=30.0)

0 commit comments

Comments
 (0)