diff --git a/kafka/consumer.py b/kafka/consumer.py index 8ac28daf4..a2775a739 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -207,7 +207,7 @@ class SimpleConsumer(Consumer): topic: the topic to consume partitions: An optional list of partitions to consume the data from - auto_commit: default True. Whether or not to auto commit the offsets + auto_commit: default True. Whether or not to auto commit the offsets auto_commit_every_n: default 100. How many messages to consume before a commit auto_commit_every_t: default 5000. How much time (in milliseconds) to @@ -450,7 +450,7 @@ def _fetch(self): log.debug("Done iterating over partition %s" % partition) partitions = retry_partitions -def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): +def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size, fetch_size, buf_size, max_buf_size): """ A child process worker which consumes messages based on the notifications given by the controller process @@ -469,7 +469,10 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): partitions=chunk, auto_commit=False, auto_commit_every_n=None, - auto_commit_every_t=None) + auto_commit_every_t=None, + fetch_size_bytes=fetch_size, + buffer_size=buf_size, + max_buffer_size=max_buf_size) # Ensure that the consumer provides the partition information consumer.provide_partition_info() @@ -516,15 +519,20 @@ class MultiProcessConsumer(Consumer): group: a name for this consumer, used for offset storage and must be unique topic: the topic to consume - auto_commit: default True. Whether or not to auto commit the offsets + auto_commit: default True. Whether or not to auto commit the offsets auto_commit_every_n: default 100. How many messages to consume before a commit auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit - num_procs: Number of processes to start for consuming messages. - The available partitions will be divided among these processes + num_procs: Number of processes to start for consuming messages. + The available partitions will be divided among these processes partitions_per_proc: Number of partitions to be allocated per process - (overrides num_procs) + (overrides num_procs) + fetch_size_bytes: number of bytes to request in a FetchRequest + buffer_size: default 4K. Initial number of bytes to tell kafka we + have available. This will double as needed. + max_buffer_size: default 16K. Max number of bytes to tell kafka we have + available. None means no limit. Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will @@ -535,7 +543,11 @@ class MultiProcessConsumer(Consumer): def __init__(self, client, group, topic, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, - num_procs=1, partitions_per_proc=0): + num_procs=1, + partitions_per_proc=0, + fetch_size_bytes=FETCH_MIN_BYTES, + buffer_size=FETCH_BUFFER_SIZE_BYTES, + max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES): # Initiate the base consumer class super(MultiProcessConsumer, self).__init__( @@ -574,7 +586,8 @@ def __init__(self, client, group, topic, auto_commit=True, args = (client.copy(), group, topic, chunk, self.queue, self.start, self.exit, - self.pause, self.size) + self.pause, self.size, fetch_size_bytes, + buffer_size, max_buffer_size) proc = Process(target=_mp_consume, args=args) proc.daemon = True