Skip to content

Using additional params for MP consumer child process #336

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 29, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kafka/consumer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

ITER_TIMEOUT_SECONDS = 60
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
FULL_QUEUE_WAIT_TIME_SECONDS = 0.1


class Consumer(object):
Expand Down
85 changes: 50 additions & 35 deletions kafka/consumer/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,27 @@

import logging
import time
from multiprocessing import Process, Queue as MPQueue, Event, Value

from collections import namedtuple
from multiprocessing import Process, Manager as MPManager

try:
from Queue import Empty
from Queue import Empty, Full
except ImportError: # python 2
from queue import Empty
from queue import Empty, Full

from .base import (
AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
NO_MESSAGES_WAIT_TIME_SECONDS
NO_MESSAGES_WAIT_TIME_SECONDS,
FULL_QUEUE_WAIT_TIME_SECONDS
)
from .simple import Consumer, SimpleConsumer

log = logging.getLogger("kafka")
Events = namedtuple("Events", ["start", "pause", "exit"])

log = logging.getLogger("kafka")

def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
def _mp_consume(client, group, topic, queue, size, events, **consumer_options):
"""
A child process worker which consumes messages based on the
notifications given by the controller process
Expand All @@ -34,20 +38,20 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# We will start consumers without auto-commit. Auto-commit will be
# done by the master controller process.
consumer = SimpleConsumer(client, group, topic,
partitions=chunk,
auto_commit=False,
auto_commit_every_n=None,
auto_commit_every_t=None)
auto_commit_every_t=None,
**consumer_options)

# Ensure that the consumer provides the partition information
consumer.provide_partition_info()

while True:
# Wait till the controller indicates us to start consumption
start.wait()
events.start.wait()

# If we are asked to quit, do so
if exit.is_set():
if events.exit.is_set():
break

# Consume messages and add them to the queue. If the controller
Expand All @@ -56,7 +60,13 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):

message = consumer.get_message()
if message:
queue.put(message)
while True:
try:
queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
break
except Full:
if events.exit.is_set(): break

count += 1

# We have reached the required size. The controller might have
Expand All @@ -65,7 +75,7 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# loop consuming all available messages before the controller
# can reset the 'start' event
if count == size.value:
pause.wait()
events.pause.wait()

else:
# In case we did not receive any message, give up the CPU for
Expand Down Expand Up @@ -105,7 +115,8 @@ class MultiProcessConsumer(Consumer):
def __init__(self, client, group, topic, auto_commit=True,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
num_procs=1, partitions_per_proc=0):
num_procs=1, partitions_per_proc=0,
**simple_consumer_options):

# Initiate the base consumer class
super(MultiProcessConsumer, self).__init__(
Expand All @@ -117,11 +128,13 @@ def __init__(self, client, group, topic, auto_commit=True,

# Variables for managing and controlling the data flow from
# consumer child process to master
self.queue = MPQueue(1024) # Child consumers dump messages into this
self.start = Event() # Indicates the consumers to start fetch
self.exit = Event() # Requests the consumers to shutdown
self.pause = Event() # Requests the consumers to pause fetch
self.size = Value('i', 0) # Indicator of number of messages to fetch
manager = MPManager()
self.queue = manager.Queue(1024) # Child consumers dump messages into this
self.events = Events(
start = manager.Event(), # Indicates the consumers to start fetch
exit = manager.Event(), # Requests the consumers to shutdown
pause = manager.Event()) # Requests the consumers to pause fetch
self.size = manager.Value('i', 0) # Indicator of number of messages to fetch

# dict.keys() returns a view in py3 + it's not a thread-safe operation
# http://blog.labix.org/2008/06/27/watch-out-for-listdictkeys-in-python-3
Expand All @@ -143,12 +156,14 @@ def __init__(self, client, group, topic, auto_commit=True,

self.procs = []
for chunk in chunks:
args = (client.copy(),
group, topic, chunk,
self.queue, self.start, self.exit,
self.pause, self.size)

proc = Process(target=_mp_consume, args=args)
options = {'partitions': list(chunk)}
if simple_consumer_options:
simple_consumer_options.pop('partitions', None)
options.update(simple_consumer_options)

args = (client.copy(), group, topic, self.queue,
self.size, self.events)
proc = Process(target=_mp_consume, args=args, kwargs=options)
proc.daemon = True
proc.start()
self.procs.append(proc)
Expand All @@ -159,9 +174,9 @@ def __repr__(self):

def stop(self):
# Set exit and start off all waiting consumers
self.exit.set()
self.pause.set()
self.start.set()
self.events.exit.set()
self.events.pause.set()
self.events.start.set()

for proc in self.procs:
proc.join()
Expand All @@ -176,10 +191,10 @@ def __iter__(self):
# Trigger the consumer procs to start off.
# We will iterate till there are no more messages available
self.size.value = 0
self.pause.set()
self.events.pause.set()

while True:
self.start.set()
self.events.start.set()
try:
# We will block for a small while so that the consumers get
# a chance to run and put some messages in the queue
Expand All @@ -191,12 +206,12 @@ def __iter__(self):

# Count, check and commit messages if necessary
self.offsets[partition] = message.offset + 1
self.start.clear()
self.events.start.clear()
self.count_since_commit += 1
self._auto_commit()
yield message

self.start.clear()
self.events.start.clear()

def get_messages(self, count=1, block=True, timeout=10):
"""
Expand All @@ -216,7 +231,7 @@ def get_messages(self, count=1, block=True, timeout=10):
# necessary, but these will not be committed to kafka. Also, the extra
# messages can be provided in subsequent runs
self.size.value = count
self.pause.clear()
self.events.pause.clear()

if timeout is not None:
max_time = time.time() + timeout
Expand All @@ -228,7 +243,7 @@ def get_messages(self, count=1, block=True, timeout=10):
# go into overdrive and keep consuming thousands of
# messages when the user might need only a few
if self.queue.empty():
self.start.set()
self.events.start.set()

try:
partition, message = self.queue.get(block, timeout)
Expand All @@ -242,8 +257,8 @@ def get_messages(self, count=1, block=True, timeout=10):
timeout = max_time - time.time()

self.size.value = 0
self.start.clear()
self.pause.set()
self.events.start.clear()
self.events.pause.set()

# Update and commit offsets if necessary
self.offsets.update(new_offsets)
Expand Down
2 changes: 2 additions & 0 deletions pylint.rc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[TYPECHECK]
ignored-classes=SyncManager
5 changes: 3 additions & 2 deletions test/test_consumer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def consumer(self, **kwargs):
group = kwargs.pop('group', self.id().encode('utf-8'))
topic = kwargs.pop('topic', self.topic)

if consumer_class == SimpleConsumer:
if consumer_class in [SimpleConsumer, MultiProcessConsumer]:
kwargs.setdefault('iter_timeout', 0)

return consumer_class(self.client, group, topic, **kwargs)
Expand Down Expand Up @@ -243,7 +243,8 @@ def test_multi_proc_pending(self):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))

consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False)
consumer = MultiProcessConsumer(self.client, "group1", self.topic,
auto_commit=False, iter_timeout=0)

self.assertEqual(consumer.pending(), 20)
self.assertEqual(consumer.pending(partitions=[0]), 10)
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ deps =
unittest2
mock
pylint
commands = pylint {posargs: -E kafka test}
commands = pylint --rcfile=pylint.rc {posargs: -E kafka test}

[testenv:docs]
deps =
Expand Down