Skip to content

Commit 179825b

Browse files
ISV-6424: Implement base producer (#4)
* add integration tests * fix out-of-order offset commits Signed-off-by: Marek Szymutko <mszymutk@redhat.com> Assisted-by: Claude-4.5-Sonnet (Cursor)
1 parent 26b1b81 commit 179825b

21 files changed

+1537
-155
lines changed

.github/workflows/integration.yml

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
name: Integration Tests
2+
3+
on:
4+
push:
5+
branches:
6+
- main
7+
pull_request:
8+
9+
jobs:
10+
integration:
11+
runs-on: ubuntu-latest
12+
strategy:
13+
matrix:
14+
python-version: ["3.13"]
15+
16+
steps:
17+
- uses: actions/checkout@v4
18+
19+
- name: Set up Python ${{ matrix.python-version }}
20+
uses: actions/setup-python@v5
21+
with:
22+
python-version: ${{ matrix.python-version }}
23+
24+
- name: Install uv
25+
uses: astral-sh/setup-uv@v5
26+
with:
27+
version: "latest"
28+
29+
- name: Install tox
30+
run: |
31+
uv venv
32+
uv pip install tox
33+
34+
- name: Start Kafka
35+
run: |
36+
docker compose up -d
37+
echo "Waiting for Kafka to be healthy..."
38+
timeout 120 bash -c 'until docker compose ps kafka | grep -q "healthy"; do sleep 2; done'
39+
echo "Kafka is healthy and ready for tests"
40+
41+
- name: Run integration tests
42+
run: |
43+
uv run tox -e integration
44+
45+
- name: Show Kafka logs on failure
46+
if: failure()
47+
run: |
48+
docker compose logs kafka
49+
50+
- name: Cleanup
51+
if: always()
52+
run: |
53+
docker compose down -v

README.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,46 @@ used.
1111

1212
The aim is to provide a fault-tolerant platform for parallel message
1313
processing.
14+
15+
## Local testing
16+
17+
This project uses [`uv`][1]. To set up the project locally, use
18+
19+
```bash
20+
uv pip install .
21+
```
22+
23+
To test, you also need development tools, add them to your local environment
24+
using this command:
25+
26+
```bash
27+
uv pip install --group dev
28+
```
29+
30+
Then you can use [`tox`][2] to run linting and unit tests.
31+
32+
For integration tests you also need [`podman`][3] or [`docker`][4] with
33+
`compose`. Run:
34+
35+
```bash
36+
docker compose up -d
37+
```
38+
39+
Wait a while and then run:
40+
41+
```bash
42+
tox -e integration
43+
```
44+
45+
Don't forget to clean up the test environment afterward. Use
46+
47+
```bash
48+
podman compose down
49+
```
50+
51+
to do that (or switch `podman` with `docker` depending on your tool of choice).
52+
53+
[1]: https://docs.astral.sh/uv/
54+
[2]: https://tox.wiki/en/4.32.0/
55+
[3]: https://podman.io/
56+
[4]: https://www.docker.com/

compose.yml

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
services:
2+
kafka:
3+
image: apache/kafka:latest
4+
hostname: kafka
5+
container_name: kafka
6+
ports:
7+
- "9092:9092"
8+
- "9093:9093"
9+
environment:
10+
# KRaft settings
11+
KAFKA_NODE_ID: 1
12+
KAFKA_PROCESS_ROLES: broker,controller
13+
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9094
14+
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
15+
16+
# Listener configuration
17+
KAFKA_LISTENERS: SASL_PLAINTEXT://0.0.0.0:9092,PLAINTEXT://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094
18+
KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://localhost:9092,PLAINTEXT://localhost:9093
19+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: SASL_PLAINTEXT:SASL_PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
20+
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
21+
22+
# SASL/SCRAM configuration
23+
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512
24+
KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-512
25+
26+
# Dynamic topic creation
27+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
28+
29+
# Compression settings
30+
KAFKA_COMPRESSION_TYPE: snappy
31+
32+
# Log settings
33+
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
34+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
35+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
36+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
37+
38+
# JAAS configuration for SASL
39+
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
40+
41+
volumes:
42+
- ./kafka_config/kafka-startup.sh:/tmp/kafka-startup.sh:ro,z
43+
- ./kafka_config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf:ro,z
44+
tmpfs:
45+
- /tmp/kraft-combined-logs:uid=1000,gid=1000
46+
command: ["/bin/bash", "/tmp/kafka-startup.sh"]
47+
healthcheck:
48+
test: ["CMD", "/opt/kafka/bin/kafka-broker-api-versions.sh", "--bootstrap-server", "localhost:9093"]
49+
interval: 10s
50+
timeout: 10s
51+
retries: 5
52+
start_period: 30s

kafka_config/kafka-startup.sh

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#!/bin/bash
2+
set -e
3+
4+
echo "Starting Kafka in background..."
5+
/etc/kafka/docker/run &
6+
KAFKA_PID=$!
7+
8+
# Wait for Kafka to be ready
9+
echo "Waiting for Kafka to be ready..."
10+
for i in {1..60}; do
11+
if /opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9093 > /dev/null 2>&1; then
12+
echo "Kafka is ready!"
13+
break
14+
fi
15+
if [ $i -eq 60 ]; then
16+
echo "Kafka failed to start in time"
17+
exit 1
18+
fi
19+
sleep 2
20+
done
21+
22+
echo "Creating SCRAM credentials..."
23+
24+
# Create SCRAM credentials for test user
25+
/opt/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9093 \
26+
--alter \
27+
--add-config 'SCRAM-SHA-512=[password=testpassword]' \
28+
--entity-type users \
29+
--entity-name testuser
30+
31+
echo "================================"
32+
echo "SCRAM credentials created successfully!"
33+
echo "Username: testuser"
34+
echo "Password: testpassword"
35+
echo "================================"
36+
37+
# Keep container running
38+
wait $KAFKA_PID
39+
40+
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
KafkaServer {
2+
org.apache.kafka.common.security.scram.ScramLoginModule required;
3+
};
4+

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@ exclude = ["tests*"]
2323

2424
[tool.pytest.ini_options]
2525
pythonpath = ["src"]
26+
2627
[dependency-groups]
2728
dev = [
2829
"coverage>=7.12.0",
2930
"mypy>=1.18.2",
3031
"pytest>=9.0.1",
32+
"pytest-asyncio>=0.24.0",
3133
"pytest-cov>=7.0.0",
3234
"ruff>=0.14.5",
3335
"tox>=4.32.0",
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Retriable Kafka client module"""
22

33
from .consumer import BaseConsumer
4-
from .types import TopicConfig
4+
from .types import ConsumerConfig
55
from .orchestrate import consume_topics
66

7-
__all__ = ("BaseConsumer", "TopicConfig", "consume_topics")
7+
__all__ = ("BaseConsumer", "ConsumerConfig", "consume_topics")

0 commit comments

Comments
 (0)