Skip to content

Commit 5868d31

Browse files
committed
Allow injecting clients into Kafka consumers and producers.
1 parent 4dc0899 commit 5868d31

File tree

2 files changed

+19
-4
lines changed

2 files changed

+19
-4
lines changed

kafka/consumer/group.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,10 @@ class KafkaConsumer(six.Iterator):
4646
It just needs to have at least one broker that will respond to a
4747
Metadata API Request. Default port is 9092. If no servers are
4848
specified, will default to localhost:9092.
49-
client_id (str): A name for this client. This string is passed in
49+
client (kafka.client_async.KafkaClient): a kafka client to
50+
use, or if unprovided, one is constructed from the provided
51+
configuration.
52+
client_id (str): a name for this client. This string is passed in
5053
each request to servers and can be used to identify specific
5154
server-side log entries that correspond to this client. Also
5255
submitted to GroupCoordinator for logging with respect to
@@ -243,6 +246,7 @@ class KafkaConsumer(six.Iterator):
243246
"""
244247
DEFAULT_CONFIG = {
245248
'bootstrap_servers': 'localhost',
249+
'client': None,
246250
'client_id': 'kafka-python-' + __version__,
247251
'group_id': None,
248252
'key_deserializer': None,
@@ -338,7 +342,11 @@ def __init__(self, *topics, **configs):
338342
log.warning('use api_version=%s [tuple] -- "%s" as str is deprecated',
339343
str(self.config['api_version']), str_version)
340344

341-
self._client = KafkaClient(metrics=self._metrics, **self.config)
345+
client = self.config.pop('client', None) or KafkaClient(
346+
metrics=self._metrics,
347+
**self.config
348+
)
349+
self._client = client
342350

343351
# Get auto-discovered version from client if necessary
344352
if self.config['api_version'] is None:

kafka/producer/kafka.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ class KafkaProducer(object):
8686
It just needs to have at least one broker that will respond to a
8787
Metadata API Request. Default port is 9092. If no servers are
8888
specified, will default to localhost:9092.
89+
client (kafka.client_async.KafkaClient): a kafka client to
90+
use, or if unprovided, one is constructed from the provided
91+
configuration.
8992
client_id (str): a name for this client. This string is passed in
9093
each request to servers and can be used to identify specific
9194
server-side log entries that correspond to this client.
@@ -273,6 +276,7 @@ class KafkaProducer(object):
273276
"""
274277
DEFAULT_CONFIG = {
275278
'bootstrap_servers': 'localhost',
279+
'client': None,
276280
'client_id': None,
277281
'key_serializer': None,
278282
'value_serializer': None,
@@ -359,8 +363,11 @@ def __init__(self, **configs):
359363
reporters = [reporter() for reporter in self.config['metric_reporters']]
360364
self._metrics = Metrics(metric_config, reporters)
361365

362-
client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
363-
**self.config)
366+
client = self.config['client'] or KafkaClient(
367+
metrics=self._metrics,
368+
metric_group_prefix='producer',
369+
**self.config
370+
)
364371

365372
# Get auto-discovered version from client if necessary
366373
if self.config['api_version'] is None:

0 commit comments

Comments
 (0)