Skip to content

Commit f35995a

Browse files
committed
Merge pull request #336 from scrapinghub/feature-mp-consumer-params
Using additional params for MP consumer child process
2 parents 9a8cd15 + fb118fb commit f35995a

File tree

5 files changed

+57
-38
lines changed

5 files changed

+57
-38
lines changed

kafka/consumer/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
ITER_TIMEOUT_SECONDS = 60
2727
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
28+
FULL_QUEUE_WAIT_TIME_SECONDS = 0.1
2829

2930

3031
class Consumer(object):

kafka/consumer/multiprocess.py

Lines changed: 50 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,27 @@
22

33
import logging
44
import time
5-
from multiprocessing import Process, Queue as MPQueue, Event, Value
5+
6+
from collections import namedtuple
7+
from multiprocessing import Process, Manager as MPManager
68

79
try:
8-
from Queue import Empty
10+
from Queue import Empty, Full
911
except ImportError: # python 2
10-
from queue import Empty
12+
from queue import Empty, Full
1113

1214
from .base import (
1315
AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
14-
NO_MESSAGES_WAIT_TIME_SECONDS
16+
NO_MESSAGES_WAIT_TIME_SECONDS,
17+
FULL_QUEUE_WAIT_TIME_SECONDS
1518
)
1619
from .simple import Consumer, SimpleConsumer
1720

18-
log = logging.getLogger("kafka")
21+
Events = namedtuple("Events", ["start", "pause", "exit"])
1922

23+
log = logging.getLogger("kafka")
2024

21-
def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
25+
def _mp_consume(client, group, topic, queue, size, events, **consumer_options):
2226
"""
2327
A child process worker which consumes messages based on the
2428
notifications given by the controller process
@@ -34,20 +38,20 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
3438
# We will start consumers without auto-commit. Auto-commit will be
3539
# done by the master controller process.
3640
consumer = SimpleConsumer(client, group, topic,
37-
partitions=chunk,
3841
auto_commit=False,
3942
auto_commit_every_n=None,
40-
auto_commit_every_t=None)
43+
auto_commit_every_t=None,
44+
**consumer_options)
4145

4246
# Ensure that the consumer provides the partition information
4347
consumer.provide_partition_info()
4448

4549
while True:
4650
# Wait till the controller indicates us to start consumption
47-
start.wait()
51+
events.start.wait()
4852

4953
# If we are asked to quit, do so
50-
if exit.is_set():
54+
if events.exit.is_set():
5155
break
5256

5357
# Consume messages and add them to the queue. If the controller
@@ -56,7 +60,13 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
5660

5761
message = consumer.get_message()
5862
if message:
59-
queue.put(message)
63+
while True:
64+
try:
65+
queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
66+
break
67+
except Full:
68+
if events.exit.is_set(): break
69+
6070
count += 1
6171

6272
# We have reached the required size. The controller might have
@@ -65,7 +75,7 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
6575
# loop consuming all available messages before the controller
6676
# can reset the 'start' event
6777
if count == size.value:
68-
pause.wait()
78+
events.pause.wait()
6979

7080
else:
7181
# In case we did not receive any message, give up the CPU for
@@ -105,7 +115,8 @@ class MultiProcessConsumer(Consumer):
105115
def __init__(self, client, group, topic, auto_commit=True,
106116
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
107117
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
108-
num_procs=1, partitions_per_proc=0):
118+
num_procs=1, partitions_per_proc=0,
119+
**simple_consumer_options):
109120

110121
# Initiate the base consumer class
111122
super(MultiProcessConsumer, self).__init__(
@@ -117,11 +128,13 @@ def __init__(self, client, group, topic, auto_commit=True,
117128

118129
# Variables for managing and controlling the data flow from
119130
# consumer child process to master
120-
self.queue = MPQueue(1024) # Child consumers dump messages into this
121-
self.start = Event() # Indicates the consumers to start fetch
122-
self.exit = Event() # Requests the consumers to shutdown
123-
self.pause = Event() # Requests the consumers to pause fetch
124-
self.size = Value('i', 0) # Indicator of number of messages to fetch
131+
manager = MPManager()
132+
self.queue = manager.Queue(1024) # Child consumers dump messages into this
133+
self.events = Events(
134+
start = manager.Event(), # Indicates the consumers to start fetch
135+
exit = manager.Event(), # Requests the consumers to shutdown
136+
pause = manager.Event()) # Requests the consumers to pause fetch
137+
self.size = manager.Value('i', 0) # Indicator of number of messages to fetch
125138

126139
# dict.keys() returns a view in py3 + it's not a thread-safe operation
127140
# http://blog.labix.org/2008/06/27/watch-out-for-listdictkeys-in-python-3
@@ -143,12 +156,14 @@ def __init__(self, client, group, topic, auto_commit=True,
143156

144157
self.procs = []
145158
for chunk in chunks:
146-
args = (client.copy(),
147-
group, topic, chunk,
148-
self.queue, self.start, self.exit,
149-
self.pause, self.size)
150-
151-
proc = Process(target=_mp_consume, args=args)
159+
options = {'partitions': list(chunk)}
160+
if simple_consumer_options:
161+
simple_consumer_options.pop('partitions', None)
162+
options.update(simple_consumer_options)
163+
164+
args = (client.copy(), group, topic, self.queue,
165+
self.size, self.events)
166+
proc = Process(target=_mp_consume, args=args, kwargs=options)
152167
proc.daemon = True
153168
proc.start()
154169
self.procs.append(proc)
@@ -159,9 +174,9 @@ def __repr__(self):
159174

160175
def stop(self):
161176
# Set exit and start off all waiting consumers
162-
self.exit.set()
163-
self.pause.set()
164-
self.start.set()
177+
self.events.exit.set()
178+
self.events.pause.set()
179+
self.events.start.set()
165180

166181
for proc in self.procs:
167182
proc.join()
@@ -176,10 +191,10 @@ def __iter__(self):
176191
# Trigger the consumer procs to start off.
177192
# We will iterate till there are no more messages available
178193
self.size.value = 0
179-
self.pause.set()
194+
self.events.pause.set()
180195

181196
while True:
182-
self.start.set()
197+
self.events.start.set()
183198
try:
184199
# We will block for a small while so that the consumers get
185200
# a chance to run and put some messages in the queue
@@ -191,12 +206,12 @@ def __iter__(self):
191206

192207
# Count, check and commit messages if necessary
193208
self.offsets[partition] = message.offset + 1
194-
self.start.clear()
209+
self.events.start.clear()
195210
self.count_since_commit += 1
196211
self._auto_commit()
197212
yield message
198213

199-
self.start.clear()
214+
self.events.start.clear()
200215

201216
def get_messages(self, count=1, block=True, timeout=10):
202217
"""
@@ -216,7 +231,7 @@ def get_messages(self, count=1, block=True, timeout=10):
216231
# necessary, but these will not be committed to kafka. Also, the extra
217232
# messages can be provided in subsequent runs
218233
self.size.value = count
219-
self.pause.clear()
234+
self.events.pause.clear()
220235

221236
if timeout is not None:
222237
max_time = time.time() + timeout
@@ -228,7 +243,7 @@ def get_messages(self, count=1, block=True, timeout=10):
228243
# go into overdrive and keep consuming thousands of
229244
# messages when the user might need only a few
230245
if self.queue.empty():
231-
self.start.set()
246+
self.events.start.set()
232247

233248
try:
234249
partition, message = self.queue.get(block, timeout)
@@ -242,8 +257,8 @@ def get_messages(self, count=1, block=True, timeout=10):
242257
timeout = max_time - time.time()
243258

244259
self.size.value = 0
245-
self.start.clear()
246-
self.pause.set()
260+
self.events.start.clear()
261+
self.events.pause.set()
247262

248263
# Update and commit offsets if necessary
249264
self.offsets.update(new_offsets)

pylint.rc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[TYPECHECK]
2+
ignored-classes=SyncManager

test/test_consumer_integration.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def consumer(self, **kwargs):
6161
group = kwargs.pop('group', self.id().encode('utf-8'))
6262
topic = kwargs.pop('topic', self.topic)
6363

64-
if consumer_class == SimpleConsumer:
64+
if consumer_class in [SimpleConsumer, MultiProcessConsumer]:
6565
kwargs.setdefault('iter_timeout', 0)
6666

6767
return consumer_class(self.client, group, topic, **kwargs)
@@ -243,7 +243,8 @@ def test_multi_proc_pending(self):
243243
self.send_messages(0, range(0, 10))
244244
self.send_messages(1, range(10, 20))
245245

246-
consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False)
246+
consumer = MultiProcessConsumer(self.client, "group1", self.topic,
247+
auto_commit=False, iter_timeout=0)
247248

248249
self.assertEqual(consumer.pending(), 20)
249250
self.assertEqual(consumer.pending(partitions=[0]), 10)

tox.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ deps =
3737
unittest2
3838
mock
3939
pylint
40-
commands = pylint {posargs: -E kafka test}
40+
commands = pylint --rcfile=pylint.rc {posargs: -E kafka test}
4141

4242
[testenv:docs]
4343
deps =

0 commit comments

Comments
 (0)