Skip to content

Commit 3b71016

Browse files
authored
Merge pull request #130 from salute-developers/PNLP-7328_config_kafka_replyTopic
PNLP-7328: map reply_topic_key to kafka_replyTopic before kafka request
2 parents 590d7a9 + 63c2066 commit 3b71016

File tree

7 files changed

+40
-23
lines changed

7 files changed

+40
-23
lines changed

core/basic_models/actions/client_profile_actions.py

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
from core.basic_models.actions.command import Command
44
from core.basic_models.actions.string_actions import StringAction
5-
from core.configs.global_constants import KAFKA
5+
from core.configs.config_constants import REPLY_TOPIC_KEY
6+
from core.configs.global_constants import KAFKA, KAFKA_REPLY_TOPIC
67
from core.text_preprocessing.base import BaseTextPreprocessingResult
78
from core.utils.pickle_copy import pickle_deepcopy
89
from scenarios.user.user_model import User
@@ -62,12 +63,14 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
6263
})
6364
settings_kafka_key = config["template_settings"].get("client_profile_kafka_key")
6465
self.kafka_key: str = self.kafka_key or settings_kafka_key or self.DEFAULT_KAFKA_KEY
65-
self.request_data = {
66-
"topic_key": "client_info",
67-
"kafka_key": self.kafka_key,
68-
"kafka_replyTopic":
69-
config["template_settings"]["consumer_topic"]
70-
}
66+
if self.request_data is None:
67+
self.request_data = dict()
68+
if "topic_key" not in self.request_data:
69+
self.request_data["topic_key"] = "client_info"
70+
if "kafka_key" not in self.request_data:
71+
self.request_data["kafka_key"] = self.kafka_key
72+
if REPLY_TOPIC_KEY not in self.request_data and KAFKA_REPLY_TOPIC not in self.request_data:
73+
self.request_data[KAFKA_REPLY_TOPIC] = config["template_settings"]["consumer_topic"]
7174

7275
async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessingResult,
7376
params: Optional[Dict[str, Union[str, float, int]]] = None) -> Optional[List[Command]]:
@@ -160,14 +163,16 @@ async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessing
160163
"projectId": user.settings["template_settings"]["project_id"]
161164
}
162165
})
163-
settings_kafka_key: Optional[str] = user.settings["template_settings"].get("client_profile_kafka_key")
164-
kafka_key: str = self.kafka_key or settings_kafka_key or self.DEFAULT_KAFKA_KEY
165-
self.request_data = {
166-
"topic_key": "client_info_remember",
167-
"kafka_key": kafka_key,
168-
"kafka_replyTopic":
169-
user.settings["template_settings"]["consumer_topic"]
170-
}
166+
if self.request_data is None:
167+
self.request_data = dict()
168+
if "topic_key" not in self.request_data:
169+
self.request_data["topic_key"] = "client_info_remember"
170+
if "kafka_key" not in self.request_data:
171+
settings_kafka_key: Optional[str] = user.settings["template_settings"].get("client_profile_kafka_key")
172+
kafka_key: str = self.kafka_key or settings_kafka_key or self.DEFAULT_KAFKA_KEY
173+
self.request_data["kafka_key"] = kafka_key
174+
if REPLY_TOPIC_KEY not in self.request_data and KAFKA_REPLY_TOPIC not in self.request_data:
175+
self.request_data[KAFKA_REPLY_TOPIC] = user.settings["template_settings"]["consumer_topic"]
171176

172177
commands = await super().run(user, text_preprocessing_result, params)
173178
return commands

core/configs/config_constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
REPLY_TOPIC = "reply_topic"
2+
REPLY_TOPIC_KEY = "reply_topic_key"

core/logging/logger_utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import core.logging.logger_constants as log_const
1111
import scenarios.logging.logger_constants as scenarios_log_const
1212
from core.basic_models.classifiers.basic_classifiers import Classifier
13+
from core.configs.global_constants import KAFKA_REPLY_TOPIC
1314
from core.utils.masking_message import masking
1415
from core.utils.stats_timer import StatsTimer
1516

@@ -22,7 +23,7 @@
2223

2324

2425
class LoggerMessageCreator:
25-
LOGGER_HEADERS = ["kafka_replyTopic", "app_callback_id"]
26+
LOGGER_HEADERS = [KAFKA_REPLY_TOPIC, "app_callback_id"]
2627
ART_NAMES = [
2728
"channel", "type", "device_channel", "device_channel_version", "device_platform", "group",
2829
"device_platform_version", "device_platform_client_type", "csa_profile_id", "test_deploy"

core/mq/kafka/kafka_publisher.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import logging
33
import os
44
import time
5-
from typing import Union
5+
from typing import Union, Optional, List, Tuple, Any
66

77
from confluent_kafka import Producer
88

@@ -28,7 +28,7 @@ def __init__(self, config):
2828
conf["logger"] = debug_logger
2929
self._producer = Producer(**conf)
3030

31-
def send(self, value: Union[str, bytes], key=None, topic_key=None, headers=None):
31+
def send(self, value: Union[str, bytes], key=None, topic_key=None, headers: Optional[List[Tuple[Any, Any]]] = None):
3232
try:
3333
topic = self._config["topic"]
3434
if topic_key is not None:
@@ -47,7 +47,7 @@ def send(self, value: Union[str, bytes], key=None, topic_key=None, headers=None)
4747
monitoring.got_counter("kafka_producer_exception")
4848
self._poll()
4949

50-
def send_to_topic(self, value, key=None, topic=None, headers=None):
50+
def send_to_topic(self, value, key=None, topic=None, headers: Optional[List[Tuple[Any, Any]]] = None):
5151
try:
5252
if topic is None:
5353
params = {

core/request/kafka_request.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class KafkaRequest(BaseRequest):
1313
TOPIC = "topic"
1414

1515
def __init__(self, items, id=None):
16-
super(KafkaRequest, self).__init__(items)
16+
super().__init__(items)
1717
items = items or {}
1818
self.topic_key = items.get(self.TOPIC_KEY)
1919
self.kafka_key = items.get(self.KAFKA_KEY)

smart_kit/request/kafka_request.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,36 @@
11
from core.configs.global_constants import CALLBACK_ID_HEADER, KAFKA_REPLY_TOPIC
22
from core.request.kafka_request import KafkaRequest
3+
from core.configs.config_constants import REPLY_TOPIC_KEY, REPLY_TOPIC
4+
from smart_kit.configs.settings import Settings
35

46

57
class SmartKitKafkaRequest(KafkaRequest):
68
KAFKA_REPLY_TOPIC = KAFKA_REPLY_TOPIC
79
KAFKA_EXTRA_HEADERS = "kafka_extraHeaders"
10+
REPLY_TOPIC_KEY = REPLY_TOPIC_KEY
811

912
def __init__(self, items, id=None):
10-
super(SmartKitKafkaRequest, self).__init__(items)
13+
super().__init__(items)
1114
items = items or {}
1215
self._callback_id = items.get(self._callback_id_header_name)
1316
self._kafka_replyTopic = items.get(self.KAFKA_REPLY_TOPIC)
1417
self._kafka_extraHeaders = items.get(self.KAFKA_EXTRA_HEADERS) or {}
18+
# _reply_topic_key has priority over _kafka_replyTopic
19+
self._reply_topic_key = items.get(self.REPLY_TOPIC_KEY)
1520

1621
@property
1722
def _callback_id_header_name(self):
1823
return CALLBACK_ID_HEADER
1924

2025
def _get_new_headers(self, source_mq_message):
21-
headers_dict = dict(super(SmartKitKafkaRequest, self)._get_new_headers(source_mq_message))
26+
headers_dict = dict(super()._get_new_headers(source_mq_message))
2227
if self._callback_id:
2328
headers_dict[self._callback_id_header_name] = str(self._callback_id).encode()
24-
if self._kafka_replyTopic:
29+
if self._reply_topic_key:
30+
reply_topics = Settings()["template_settings"][REPLY_TOPIC]
31+
mapped_reply_topic = reply_topics[self._reply_topic_key]
32+
headers_dict[self.KAFKA_REPLY_TOPIC] = str(mapped_reply_topic).encode()
33+
elif self._kafka_replyTopic:
2534
headers_dict[self.KAFKA_REPLY_TOPIC] = str(self._kafka_replyTopic).encode()
2635
if self._kafka_extraHeaders:
2736
for k, v in self._kafka_extraHeaders.items():

tests/core_tests/mq_test/kafka_test/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)