Skip to content

Added pass through parameters to the multiprocess consumer. #180

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class SimpleConsumer(Consumer):
topic: the topic to consume
partitions: An optional list of partitions to consume the data from

auto_commit: default True. Whether or not to auto commit the offsets
auto_commit: default True. Whether or not to auto commit the offsets
auto_commit_every_n: default 100. How many messages to consume
before a commit
auto_commit_every_t: default 5000. How much time (in milliseconds) to
Expand Down Expand Up @@ -450,7 +450,7 @@ def _fetch(self):
log.debug("Done iterating over partition %s" % partition)
partitions = retry_partitions

def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size, fetch_size, buf_size, max_buf_size):
"""
A child process worker which consumes messages based on the
notifications given by the controller process
Expand All @@ -469,7 +469,10 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
partitions=chunk,
auto_commit=False,
auto_commit_every_n=None,
auto_commit_every_t=None)
auto_commit_every_t=None,
fetch_size_bytes=fetch_size,
buffer_size=buf_size,
max_buffer_size=max_buf_size)

# Ensure that the consumer provides the partition information
consumer.provide_partition_info()
Expand Down Expand Up @@ -516,15 +519,20 @@ class MultiProcessConsumer(Consumer):
group: a name for this consumer, used for offset storage and must be unique
topic: the topic to consume

auto_commit: default True. Whether or not to auto commit the offsets
auto_commit: default True. Whether or not to auto commit the offsets
auto_commit_every_n: default 100. How many messages to consume
before a commit
auto_commit_every_t: default 5000. How much time (in milliseconds) to
wait before commit
num_procs: Number of processes to start for consuming messages.
The available partitions will be divided among these processes
num_procs: Number of processes to start for consuming messages.
The available partitions will be divided among these processes
partitions_per_proc: Number of partitions to be allocated per process
(overrides num_procs)
(overrides num_procs)
fetch_size_bytes: number of bytes to request in a FetchRequest
buffer_size: default 4K. Initial number of bytes to tell kafka we
have available. This will double as needed.
max_buffer_size: default 16K. Max number of bytes to tell kafka we have
available. None means no limit.

Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will
Expand All @@ -535,7 +543,11 @@ class MultiProcessConsumer(Consumer):
def __init__(self, client, group, topic, auto_commit=True,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
num_procs=1, partitions_per_proc=0):
num_procs=1,
partitions_per_proc=0,
fetch_size_bytes=FETCH_MIN_BYTES,
buffer_size=FETCH_BUFFER_SIZE_BYTES,
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES):

# Initiate the base consumer class
super(MultiProcessConsumer, self).__init__(
Expand Down Expand Up @@ -574,7 +586,8 @@ def __init__(self, client, group, topic, auto_commit=True,
args = (client.copy(),
group, topic, chunk,
self.queue, self.start, self.exit,
self.pause, self.size)
self.pause, self.size, fetch_size_bytes,
buffer_size, max_buffer_size)

proc = Process(target=_mp_consume, args=args)
proc.daemon = True
Expand Down