Skip to content

PNLP-7915: replace confluent_kafka by aiokafka #155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/kafka_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ template-engine:
bootstrap.servers: "localhost:9092"
debug: all
default.topic.config:
auto.offset.reset: largest
auto.offset.reset: latest
enable.auto.commit: true
enable.auto.offset.store: false
group.id: dialog_policy1
Expand Down
6 changes: 3 additions & 3 deletions core/message/from_message.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# coding=utf-8
from typing import Iterable, Dict, Optional, Set, Any, List, Union, Tuple
from typing import Iterable, Dict, Optional, Any, List, Union, Sequence, Tuple
import json
import uuid

Expand Down Expand Up @@ -56,8 +56,8 @@ class SmartAppFromMessage:
payload: dict
uuid: dict

def __init__(self, value: Dict[str, Any], topic_key: str = None, creation_time: Optional[int] = None,
kafka_key: Optional[str] = None, headers: Optional[Iterable[Tuple[Any, Any]]] = None,
def __init__(self, value: Dict[str, Any], topic_key: Optional[str] = None, creation_time: Optional[int] = None,
kafka_key: Optional[str] = None, headers: Optional[Sequence[Tuple[str, bytes]]] = None,
masking_fields: Optional[Union[Dict[str, int], List[str]]] = None, headers_required: bool = True,
validators: Iterable[MessageValidator] = ()):
self.logging_uuid = str(uuid.uuid4())
Expand Down
66 changes: 0 additions & 66 deletions core/mq/kafka/async_kafka_publisher.py

This file was deleted.

23 changes: 23 additions & 0 deletions core/mq/kafka/consumer_rebalance_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from aiokafka.abc import ConsumerRebalanceListener

if TYPE_CHECKING:
from typing import List, Callable
from kafka import TopicPartition
from aiokafka import AIOKafkaConsumer


class CoreConsumerRebalanceListener(ConsumerRebalanceListener):
def __init__(self, consumer: AIOKafkaConsumer,
on_assign_callback: Callable[[AIOKafkaConsumer, List[TopicPartition]], None]):
self._consumer = consumer
self._on_assign_callback = on_assign_callback

def on_partitions_assigned(self, assigned: List[TopicPartition]):
self._on_assign_callback(self._consumer, assigned)

def on_partitions_revoked(self, revoked):
pass
202 changes: 122 additions & 80 deletions core/mq/kafka/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -1,108 +1,124 @@
# coding: utf-8
from __future__ import annotations

import logging
import os
import time
import uuid
from typing import TYPE_CHECKING

from confluent_kafka import Consumer, TIMESTAMP_NOT_AVAILABLE
from confluent_kafka.cimpl import KafkaError, KafkaException, OFFSET_END, Message as KafkaMessage
from aiokafka import AIOKafkaConsumer, TopicPartition
from aiokafka.helpers import create_ssl_context
from kafka.errors import KafkaError

import core.logging.logger_constants as log_const
from core.logging.logger_utils import log
from core.monitoring.monitoring import monitoring
from core.mq.kafka.base_kafka_consumer import BaseKafkaConsumer
from core.mq.kafka.consumer_rebalance_listener import CoreConsumerRebalanceListener

if TYPE_CHECKING:
from aiokafka import ConsumerRecord
from typing import Optional, Callable, Iterable, AsyncGenerator, Any, Dict, List
from asyncio import AbstractEventLoop


class KafkaConsumer(BaseKafkaConsumer):
def __init__(self, config):
def __init__(self, config: Dict[str, Any], loop: AbstractEventLoop):
self._config = config["consumer"]
self.assign_offset_end = self._config.get("assign_offset_end", False)
conf = self._config["conf"]
conf.setdefault("group.id", str(uuid.uuid1()))
self.autocommit_enabled = conf.get("enable.auto.commit", True)
self._update_old_config(conf)
self._setup_ssl(conf, self._config.get("ssl"))
conf.setdefault("group_id", str(uuid.uuid1()))
self.autocommit_enabled = conf.get("enable_auto_commit", True)
internal_log_path = self._config.get("internal_log_path")
conf["error_cb"] = self._error_callback
if internal_log_path:
debug_logger = logging.getLogger("debug_consumer")
debug_logger = logging.getLogger("debug_consumer") # TODO add debug logger to _consumer events
timestamp = time.strftime("_%d%m%Y_")
debug_logger.addHandler(logging.FileHandler(
"{}/kafka_consumer_debug{}{}.log".format(internal_log_path, timestamp, os.getpid())))
conf["logger"] = debug_logger
self._consumer = Consumer(**conf)
"{}/kafka_consumer_debug{}{}.log".format(internal_log_path, timestamp, os.getpid())
))
self._consumer = AIOKafkaConsumer(**conf, loop=loop)
loop.run_until_complete(self._consumer.start())

@staticmethod
def on_assign_offset_end(consumer, partitions):
def on_assign_offset_end(self, consumer: AIOKafkaConsumer, partitions: List[TopicPartition]) -> None:
for p in partitions:
p.offset = OFFSET_END
KafkaConsumer.on_assign_log(consumer, partitions)
consumer.assign(partitions)

@staticmethod
def on_coop_assign_offset_end(consumer, partitions):
p.offset = consumer.last_stable_offset(p)
self.on_assign_log(consumer, partitions)
try:
consumer.assign(partitions)
except KafkaError as e:
self._error_callback(e)

def on_coop_assign_offset_end(self, consumer: AIOKafkaConsumer, partitions: List[TopicPartition]) -> None:
for p in partitions:
p.offset = OFFSET_END
KafkaConsumer.on_assign_log(consumer, partitions)
consumer.incremental_assign(partitions)
p.offset = consumer.last_stable_offset(p)
self.on_assign_log(consumer, partitions)
consumer.assign(consumer.assignment().update(partitions))

@staticmethod
def on_assign_log(consumer, partitions):
def on_assign_log(self, consumer: AIOKafkaConsumer, partitions: List[TopicPartition]) -> None:
log_level = "WARNING"
for p in partitions:
if p.error:
log_level = "ERROR"
params = {
"partitions": str(list([str(partition) for partition in partitions or []])),
log_const.KEY_NAME: log_const.KAFKA_ON_ASSIGN_VALUE,
"log_level": log_level
}
log("KafkaConsumer.subscribe<on_assign>: assign %(partitions)s %(log_level)s", params=params, level=log_level)

def subscribe(self, topics=None):
def subscribe(self, topics: Optional[Iterable[str]] = None) -> None:
topics = list(set(topics or list(self._config["topics"].values())))

params = {
"topics": topics
}
log("Topics to subscribe: %(topics)s", params=params)

self._consumer.subscribe(
topics,
on_assign=self.get_on_assign_callback() if self.assign_offset_end else KafkaConsumer.on_assign_log
)

def get_on_assign_callback(self):
if "cooperative" in self._config["conf"].get("partition.assignment.strategy", ""):
callback = KafkaConsumer.on_coop_assign_offset_end
try:
self._consumer.subscribe(topics, listener=CoreConsumerRebalanceListener(
consumer=self._consumer,
on_assign_callback=(self.get_on_assign_callback() if self.assign_offset_end
else self.on_assign_log)
))
except KafkaError as e:
self._error_callback(e)

def get_on_assign_callback(self) -> Callable[[AIOKafkaConsumer, List[TopicPartition]], None]:
if "cooperative" in self._config["conf"].get("partition_assignment_strategy", ""):
callback = self.on_coop_assign_offset_end
else:
callback = KafkaConsumer.on_assign_offset_end
callback = self.on_assign_offset_end
return callback

def unsubscribe(self):
def unsubscribe(self) -> None:
self._consumer.unsubscribe()

def poll(self):
msg = self._consumer.poll(self._config["poll_timeout"])
if msg is not None:
return self._process_message(msg)
async def poll(self) -> Optional[ConsumerRecord]:
msg = await self._consumer.getone()
return self._process_message(msg)

def consume(self, num_messages: int = 1):
messages = self._consumer.consume(num_messages=num_messages, timeout=self._config["poll_timeout"])
for msg in messages:
yield self._process_message(msg)
async def consume(self, num_messages: int = 1) -> AsyncGenerator[Optional[ConsumerRecord], None]:
timeout_ms = self._config["poll_timeout"] * 1000
messages = await self._consumer.getmany(max_records=num_messages, timeout_ms=timeout_ms)
for partition_messages in messages.values():
for msg in partition_messages:
processed = self._process_message(msg)
yield processed

def commit_offset(self, msg):
async def commit_offset(self, msg: ConsumerRecord) -> None:
if msg is not None:
if self.autocommit_enabled:
self._consumer.store_offsets(msg)
else:
self._consumer.commit(msg, **{"async": True})

def get_msg_create_time(self, mq_message):
timestamp_type, timestamp = mq_message.timestamp()
return timestamp if timestamp_type is not TIMESTAMP_NOT_AVAILABLE else None

def _error_callback(self, err):
if not self.autocommit_enabled:
tp = TopicPartition(msg.topic, msg.partition)
try:
await self._consumer.commit({tp: msg.offset + 1})
except KafkaError as e:
self._error_callback(e)

def get_msg_create_time(self, mq_message: ConsumerRecord) -> int:
timestamp = mq_message.timestamp
return timestamp

def _error_callback(self, err: Any) -> None:
params = {
"error": str(err),
log_const.KEY_NAME: log_const.EXCEPTION_VALUE
Expand All @@ -111,30 +127,56 @@ def _error_callback(self, err):
monitoring.got_counter("kafka_consumer_exception")

# noinspection PyMethodMayBeStatic
def _process_message(self, msg: KafkaMessage):
err = msg.error()
if err:
if err.code() == KafkaError._PARTITION_EOF:
return None
else:
monitoring.got_counter("kafka_consumer_exception")
params = {
"code": err.code(),
"pid": os.getpid(),
"topic": msg.topic(),
"partition": msg.partition(),
log_const.KEY_NAME: log_const.EXCEPTION_VALUE
}
log(
"KafkaConsumer Error %(code)s at pid %(pid)s: topic=%(topic)s partition=[%(partition)s]\n",
params=params, level="WARNING")
raise KafkaException(err)

if msg.value():
if msg.headers() is None:
msg.set_headers([])
def _process_message(self, msg: ConsumerRecord) -> Optional[ConsumerRecord]:
if msg.value:
if msg.headers is None:
msg.headers = list()
return msg

def close(self):
self._consumer.close()
async def close(self) -> None:
await self._consumer.stop()
log(f"consumer to topics {self._config['topics']} closed.")

def _setup_ssl(self, conf: Dict[str, Any], ssl_config: Optional[Dict[str, Any]] = None) -> None:
if ssl_config:
context = create_ssl_context(**ssl_config)
conf["security_protocol"] = "SSL"
conf["ssl_context"] = context

def _update_old_config(self, conf: Dict[str, Any]) -> None:
if "default.topic.config" in conf:
conf.update(conf["default.topic.config"])
del conf["default.topic.config"]
if "ssl.ca.location" in conf:
context = create_ssl_context(
cafile=conf["ssl.ca.location"],
certfile=conf["ssl.certificate.location"],
keyfile=conf["ssl.key.location"]
)
conf["security_protocol"] = "SSL"
conf["ssl_context"] = context
param_old_to_new = {
"group.id": "group_id",
"enable.auto.commit": "enable_auto_commit",
"partition.assignment.strategy": "partition_assignment_strategy",
"bootstrap.servers": "bootstrap_servers",
"topic.metadata.refresh.interval.ms": "metadata_max_age_ms",
"session.timeout.ms": "session_timeout_ms",
"auto.commit.interval.ms": "auto_commit_interval_ms",
"enable.auto.offset.store": None,
"auto.offset.reset": "auto_offset_reset",
"debug": None,
"security.protocol": "security_protocol",
"broker.version.fallback": None,
"api.version.fallback.ms": None,
}
for old, new in param_old_to_new.items():
if old in conf:
value = conf[old]
if value in ("smallest", "beginning"):
value = "earliest"
elif value in ("largest", "end"):
value = "latest"
if new:
conf[new] = value
del conf[old]
Loading