Skip to content

Commit 56eb39d

Browse files
authored
Drop unused kafka.producer.buffer / SimpleBufferPool (#2580)
1 parent 0024227 commit 56eb39d

File tree

5 files changed

+14
-179
lines changed

5 files changed

+14
-179
lines changed

kafka/producer/buffer.py

-115
This file was deleted.

kafka/producer/kafka.py

+9-20
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import socket
77
import threading
88
import time
9+
import warnings
910
import weakref
1011

1112
from kafka.vendor import six
@@ -72,11 +73,6 @@ class KafkaProducer(object):
7273
can lead to fewer, more efficient requests when not under maximal load at
7374
the cost of a small amount of latency.
7475
75-
The buffer_memory controls the total amount of memory available to the
76-
producer for buffering. If records are sent faster than they can be
77-
transmitted to the server then this buffer space will be exhausted. When
78-
the buffer space is exhausted additional send calls will block.
79-
8076
The key_serializer and value_serializer instruct how to turn the key and
8177
value objects the user provides into bytes.
8278
@@ -166,12 +162,6 @@ class KafkaProducer(object):
166162
messages with the same key are assigned to the same partition.
167163
When a key is None, the message is delivered to a random partition
168164
(filtered to partitions with available leaders only, if possible).
169-
buffer_memory (int): The total bytes of memory the producer should use
170-
to buffer records waiting to be sent to the server. If records are
171-
sent faster than they can be delivered to the server the producer
172-
will block up to max_block_ms, raising an exception on timeout.
173-
In the current implementation, this setting is an approximation.
174-
Default: 33554432 (32MB)
175165
connections_max_idle_ms: Close idle connections after the number of
176166
milliseconds specified by this config. The broker closes idle
177167
connections after connections.max.idle.ms, so this avoids hitting
@@ -319,7 +309,6 @@ class KafkaProducer(object):
319309
'batch_size': 16384,
320310
'linger_ms': 0,
321311
'partitioner': DefaultPartitioner(),
322-
'buffer_memory': 33554432,
323312
'connections_max_idle_ms': 9 * 60 * 1000,
324313
'max_block_ms': 60000,
325314
'max_request_size': 1048576,
@@ -361,6 +350,8 @@ class KafkaProducer(object):
361350
'kafka_client': KafkaClient,
362351
}
363352

353+
DEPRECATED_CONFIGS = ('buffer_memory',)
354+
364355
_COMPRESSORS = {
365356
'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP),
366357
'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY),
@@ -376,6 +367,11 @@ def __init__(self, **configs):
376367
if key in configs:
377368
self.config[key] = configs.pop(key)
378369

370+
for key in self.DEPRECATED_CONFIGS:
371+
if key in configs:
372+
configs.pop(key)
373+
warnings.warn('Deprecated Producer config: %s' % (key,), DeprecationWarning)
374+
379375
# Only check for extra config keys in top-level class
380376
assert not configs, 'Unrecognized configs: %s' % (configs,)
381377

@@ -640,9 +636,7 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
640636
tp = TopicPartition(topic, partition)
641637
log.debug("Sending (key=%r value=%r headers=%r) to %s", key, value, headers, tp)
642638
result = self._accumulator.append(tp, timestamp_ms,
643-
key_bytes, value_bytes, headers,
644-
self.config['max_block_ms'],
645-
estimated_size=message_size)
639+
key_bytes, value_bytes, headers)
646640
future, batch_is_full, new_batch_created = result
647641
if batch_is_full or new_batch_created:
648642
log.debug("Waking up the sender since %s is either full or"
@@ -697,11 +691,6 @@ def _ensure_valid_record_size(self, size):
697691
"The message is %d bytes when serialized which is larger than"
698692
" the maximum request size you have configured with the"
699693
" max_request_size configuration" % (size,))
700-
if size > self.config['buffer_memory']:
701-
raise Errors.MessageSizeTooLargeError(
702-
"The message is %d bytes when serialized which is larger than"
703-
" the total memory buffer you have configured with the"
704-
" buffer_memory configuration." % (size,))
705694

706695
def _wait_on_metadata(self, topic, max_wait):
707696
"""

kafka/producer/record_accumulator.py

+4-29
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import time
88

99
import kafka.errors as Errors
10-
from kafka.producer.buffer import SimpleBufferPool
1110
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
1211
from kafka.record.memory_records import MemoryRecordsBuilder
1312
from kafka.structs import TopicPartition
@@ -36,7 +35,7 @@ def get(self):
3635

3736

3837
class ProducerBatch(object):
39-
def __init__(self, tp, records, buffer):
38+
def __init__(self, tp, records):
4039
self.max_record_size = 0
4140
now = time.time()
4241
self.created = now
@@ -48,7 +47,6 @@ def __init__(self, tp, records, buffer):
4847
self.topic_partition = tp
4948
self.produce_future = FutureProduceResult(tp)
5049
self._retry = False
51-
self._buffer = buffer # We only save it, we don't write to it
5250

5351
@property
5452
def record_count(self):
@@ -123,9 +121,6 @@ def in_retry(self):
123121
def set_retry(self):
124122
self._retry = True
125123

126-
def buffer(self):
127-
return self._buffer
128-
129124
def __str__(self):
130125
return 'ProducerBatch(topic_partition=%s, record_count=%d)' % (
131126
self.topic_partition, self.records.next_offset())
@@ -145,12 +140,6 @@ class RecordAccumulator(object):
145140
A small batch size will make batching less common and may reduce
146141
throughput (a batch size of zero will disable batching entirely).
147142
Default: 16384
148-
buffer_memory (int): The total bytes of memory the producer should use
149-
to buffer records waiting to be sent to the server. If records are
150-
sent faster than they can be delivered to the server the producer
151-
will block up to max_block_ms, raising an exception on timeout.
152-
In the current implementation, this setting is an approximation.
153-
Default: 33554432 (32MB)
154143
compression_attrs (int): The compression type for all data generated by
155144
the producer. Valid values are gzip(1), snappy(2), lz4(3), or
156145
none(0).
@@ -168,7 +157,6 @@ class RecordAccumulator(object):
168157
all retries in a short period of time. Default: 100
169158
"""
170159
DEFAULT_CONFIG = {
171-
'buffer_memory': 33554432,
172160
'batch_size': 16384,
173161
'compression_attrs': 0,
174162
'linger_ms': 0,
@@ -189,18 +177,13 @@ def __init__(self, **configs):
189177
self._appends_in_progress = AtomicInteger()
190178
self._batches = collections.defaultdict(collections.deque) # TopicPartition: [ProducerBatch]
191179
self._tp_locks = {None: threading.Lock()} # TopicPartition: Lock, plus a lock to add entries
192-
self._free = SimpleBufferPool(self.config['buffer_memory'],
193-
self.config['batch_size'],
194-
metrics=self.config['metrics'],
195-
metric_group_prefix=self.config['metric_group_prefix'])
196180
self._incomplete = IncompleteProducerBatches()
197181
# The following variables should only be accessed by the sender thread,
198182
# so we don't need to protect them w/ locking.
199183
self.muted = set()
200184
self._drain_index = 0
201185

202-
def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
203-
estimated_size=0):
186+
def append(self, tp, timestamp_ms, key, value, headers):
204187
"""Add a record to the accumulator, return the append result.
205188
206189
The append result will contain the future metadata, and flag for
@@ -213,8 +196,6 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
213196
key (bytes): The key for the record
214197
value (bytes): The value for the record
215198
headers (List[Tuple[str, bytes]]): The header fields for the record
216-
max_time_to_block_ms (int): The maximum time in milliseconds to
217-
block for buffer memory to be available
218199
219200
Returns:
220201
tuple: (future, batch_is_full, new_batch_created)
@@ -240,9 +221,6 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
240221
batch_is_full = len(dq) > 1 or last.records.is_full()
241222
return future, batch_is_full, False
242223

243-
size = max(self.config['batch_size'], estimated_size)
244-
log.debug("Allocating a new %d byte message buffer for %s", size, tp) # trace
245-
buf = self._free.allocate(size, max_time_to_block_ms)
246224
with self._tp_locks[tp]:
247225
# Need to check if producer is closed again after grabbing the
248226
# dequeue lock.
@@ -254,7 +232,6 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
254232
if future is not None:
255233
# Somebody else found us a batch, return the one we
256234
# waited for! Hopefully this doesn't happen often...
257-
self._free.deallocate(buf)
258235
batch_is_full = len(dq) > 1 or last.records.is_full()
259236
return future, batch_is_full, False
260237

@@ -264,7 +241,7 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
264241
self.config['batch_size']
265242
)
266243

267-
batch = ProducerBatch(tp, records, buf)
244+
batch = ProducerBatch(tp, records)
268245
future = batch.try_append(timestamp_ms, key, value, headers)
269246
if not future:
270247
raise Exception()
@@ -384,7 +361,6 @@ def ready(self, cluster):
384361
unknown_leaders_exist = False
385362
now = time.time()
386363

387-
exhausted = bool(self._free.queued() > 0)
388364
# several threads are accessing self._batches -- to simplify
389365
# concurrent access, we iterate over a snapshot of partitions
390366
# and lock each partition separately as needed
@@ -414,7 +390,7 @@ def ready(self, cluster):
414390
full = bool(len(dq) > 1 or batch.records.is_full())
415391
expired = bool(waited_time >= time_to_wait)
416392

417-
sendable = (full or expired or exhausted or self._closed or
393+
sendable = (full or expired or self._closed or
418394
self._flush_in_progress())
419395

420396
if sendable and not backing_off:
@@ -506,7 +482,6 @@ def drain(self, cluster, nodes, max_size):
506482
def deallocate(self, batch):
507483
"""Deallocate the record batch."""
508484
self._incomplete.remove(batch)
509-
self._free.deallocate(batch.buffer())
510485

511486
def _flush_in_progress(self):
512487
"""Are there any threads currently waiting on a flush?"""

test/test_producer.py

-13
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,9 @@
77
import pytest
88

99
from kafka import KafkaConsumer, KafkaProducer, TopicPartition
10-
from kafka.producer.buffer import SimpleBufferPool
1110
from test.testutil import env_kafka_version, random_string, maybe_skip_unsupported_compression
1211

1312

14-
def test_buffer_pool():
15-
pool = SimpleBufferPool(1000, 1000)
16-
17-
buf1 = pool.allocate(1000, 1000)
18-
message = ''.join(map(str, range(100)))
19-
buf1.write(message.encode('utf-8'))
20-
pool.deallocate(buf1)
21-
22-
buf2 = pool.allocate(1000, 1000)
23-
assert buf2.read() == b''
24-
25-
2613
@contextmanager
2714
def producer_factory(**kwargs):
2815
producer = KafkaProducer(**kwargs)

test/test_sender.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,9 @@ def sender(client, accumulator, metrics, mocker):
4242
def test_produce_request(sender, mocker, api_version, produce_version):
4343
sender._client._api_versions = BROKER_API_VERSIONS[api_version]
4444
tp = TopicPartition('foo', 0)
45-
buffer = io.BytesIO()
4645
records = MemoryRecordsBuilder(
4746
magic=1, compression_type=0, batch_size=100000)
48-
batch = ProducerBatch(tp, records, buffer)
47+
batch = ProducerBatch(tp, records)
4948
records.close()
5049
produce_request = sender._produce_request(0, 0, 0, [batch])
5150
assert isinstance(produce_request, ProduceRequest[produce_version])

0 commit comments

Comments
 (0)