Skip to content

Commit 6e211d4

Browse files
author
Tincu Gabriel
authored
Merge branch 'master' into master
2 parents 359116e + 53dc740 commit 6e211d4

18 files changed

+377
-92
lines changed

.travis.yml

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ env:
1717
- KAFKA_VERSION=1.1.1
1818
- KAFKA_VERSION=2.4.0
1919
- KAFKA_VERSION=2.5.0
20+
- KAFKA_VERSION=2.6.0
2021

2122
addons:
2223
apt:

README.rst

+19-8
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Kafka Python client
22
------------------------
33

4-
.. image:: https://img.shields.io/badge/kafka-2.5%2C%202.4%2C%202.3%2C%202.2%2C%202.1%2C%202.0%2C%201.1%2C%201.0%2C%200.11%2C%200.10%2C%200.9%2C%200.8-brightgreen.svg
4+
.. image:: https://img.shields.io/badge/kafka-2.6%2C%202.5%2C%202.4%2C%202.3%2C%202.2%2C%202.1%2C%202.0%2C%201.1%2C%201.0%2C%200.11%2C%200.10%2C%200.9%2C%200.8-brightgreen.svg
55
:target: https://kafka-python.readthedocs.io/en/master/compatibility.html
66
.. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg
77
:target: https://pypi.python.org/pypi/kafka-python
@@ -34,6 +34,7 @@ documentation, please see readthedocs and/or python's inline help.
3434

3535
>>> pip install kafka-python
3636

37+
3738
KafkaConsumer
3839
*************
3940

@@ -78,6 +79,7 @@ that expose basic message attributes: topic, partition, offset, key, and value:
7879
>>> # Get consumer metrics
7980
>>> metrics = consumer.metrics()
8081

82+
8183
KafkaProducer
8284
*************
8385

@@ -124,6 +126,7 @@ for more details.
124126
>>> # Get producer performance metrics
125127
>>> metrics = producer.metrics()
126128

129+
127130
Thread safety
128131
*************
129132

@@ -133,22 +136,30 @@ KafkaConsumer which cannot.
133136
While it is possible to use the KafkaConsumer in a thread-local manner,
134137
multiprocessing is recommended.
135138

139+
136140
Compression
137141
***********
138142

139-
kafka-python supports gzip compression/decompression natively. To produce or consume lz4
140-
compressed messages, you should install python-lz4 (pip install lz4).
141-
To enable snappy compression/decompression install python-snappy (also requires snappy library).
142-
See <https://kafka-python.readthedocs.io/en/master/install.html#optional-snappy-install>
143-
for more information.
143+
kafka-python supports the following compression formats:
144+
145+
- gzip
146+
- LZ4
147+
- Snappy
148+
- Zstandard (zstd)
149+
150+
gzip is supported natively, the others require installing additional libraries.
151+
See <https://kafka-python.readthedocs.io/en/master/install.html> for more information.
152+
144153

145154
Optimized CRC32 Validation
146155
**************************
147156

148157
Kafka uses CRC32 checksums to validate messages. kafka-python includes a pure
149158
python implementation for compatibility. To improve performance for high-throughput
150159
applications, kafka-python will use `crc32c` for optimized native code if installed.
151-
See https://pypi.org/project/crc32c/
160+
See <https://kafka-python.readthedocs.io/en/master/install.html> for installation instructions.
161+
See https://pypi.org/project/crc32c/ for details on the underlying crc32c lib.
162+
152163

153164
Protocol
154165
********
@@ -158,4 +169,4 @@ for interacting with kafka brokers via the python repl. This is useful for
158169
testing, probing, and general experimentation. The protocol support is
159170
leveraged to enable a KafkaClient.check_version() method that
160171
probes a kafka broker and attempts to identify which version it is running
161-
(0.8.0 to 2.4+).
172+
(0.8.0 to 2.6+).

docs/compatibility.rst

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
Compatibility
22
-------------
33

4-
.. image:: https://img.shields.io/badge/kafka-2.5%2C%202.4%2C%202.3%2C%202.2%2C%202.1%2C%202.0%2C%201.1%2C%201.0%2C%200.11%2C%200.10%2C%200.9%2C%200.8-brightgreen.svg
4+
.. image:: https://img.shields.io/badge/kafka-2.6%2C%202.5%2C%202.4%2C%202.3%2C%202.2%2C%202.1%2C%202.0%2C%201.1%2C%201.0%2C%200.11%2C%200.10%2C%200.9%2C%200.8-brightgreen.svg
55
:target: https://kafka-python.readthedocs.io/compatibility.html
66
.. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg
77
:target: https://pypi.python.org/pypi/kafka-python
88

9-
kafka-python is compatible with (and tested against) broker versions 2.5
9+
kafka-python is compatible with (and tested against) broker versions 2.6
1010
through 0.8.0 . kafka-python is not compatible with the 0.8.2-beta release.
1111

1212
Because the kafka server protocol is backwards compatible, kafka-python is

docs/index.rst

+22-7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
kafka-python
22
############
33

4-
.. image:: https://img.shields.io/badge/kafka-2.5%2C%202.4%2C%202.3%2C%202.2%2C%202.1%2C%202.0%2C%201.1%2C%201.0%2C%200.11%2C%200.10%2C%200.9%2C%200.8-brightgreen.svg
4+
.. image:: https://img.shields.io/badge/kafka-2.6%2C%202.5%2C%202.4%2C%202.3%2C%202.2%2C%202.1%2C%202.0%2C%201.1%2C%201.0%2C%200.11%2C%200.10%2C%200.9%2C%200.8-brightgreen.svg
55
:target: https://kafka-python.readthedocs.io/compatibility.html
66
.. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg
77
:target: https://pypi.python.org/pypi/kafka-python
@@ -33,6 +33,7 @@ documentation, please see readthedocs and/or python's inline help.
3333

3434
>>> pip install kafka-python
3535

36+
3637
KafkaConsumer
3738
*************
3839

@@ -122,12 +123,26 @@ multiprocessing is recommended.
122123
Compression
123124
***********
124125

125-
kafka-python supports multiple compression types:
126+
kafka-python supports the following compression formats:
127+
128+
- gzip
129+
- LZ4
130+
- Snappy
131+
- Zstandard (zstd)
132+
133+
gzip is supported natively, the others require installing additional libraries.
134+
See `Install <install.html>`_ for more information.
135+
136+
137+
Optimized CRC32 Validation
138+
**************************
139+
140+
Kafka uses CRC32 checksums to validate messages. kafka-python includes a pure
141+
python implementation for compatibility. To improve performance for high-throughput
142+
applications, kafka-python will use `crc32c` for optimized native code if installed.
143+
See `Install <install.html>`_ for installation instructions and
144+
https://pypi.org/project/crc32c/ for details on the underlying crc32c lib.
126145

127-
- gzip : supported natively
128-
- lz4 : requires `python-lz4 <https://pypi.org/project/lz4/>`_ installed
129-
- snappy : requires the `python-snappy <https://pypi.org/project/python-snappy/>`_ package (which requires the snappy C library)
130-
- zstd : requires the `python-zstandard <https://github.com/indygreg/python-zstandard>`_ package installed
131146

132147
Protocol
133148
********
@@ -137,7 +152,7 @@ for interacting with kafka brokers via the python repl. This is useful for
137152
testing, probing, and general experimentation. The protocol support is
138153
leveraged to enable a :meth:`~kafka.KafkaClient.check_version()`
139154
method that probes a kafka broker and
140-
attempts to identify which version it is running (0.8.0 to 2.4+).
155+
attempts to identify which version it is running (0.8.0 to 2.6+).
141156

142157

143158
.. toctree::

docs/install.rst

+20-20
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,33 @@ Bleeding-Edge
2323
pip install ./kafka-python
2424
2525
26-
Optional LZ4 install
26+
Optional crc32c install
27+
***********************
28+
Highly recommended if you are using Kafka 11+ brokers. For those `kafka-python`
29+
uses a new message protocol version, that requires calculation of `crc32c`,
30+
which differs from the `zlib.crc32` hash implementation. By default `kafka-python`
31+
calculates it in pure python, which is quite slow. To speed it up we optionally
32+
support https://pypi.python.org/pypi/crc32c package if it's installed.
33+
34+
.. code:: bash
35+
36+
pip install 'kafka-python[crc32c]'
37+
38+
39+
Optional ZSTD install
2740
********************
2841

29-
To enable LZ4 compression/decompression, install python-lz4:
42+
To enable ZSTD compression/decompression, install python-zstandard:
3043

31-
>>> pip install lz4
44+
>>> pip install 'kafka-python[zstd]'
3245

3346

34-
Optional crc32c install
47+
Optional LZ4 install
3548
********************
3649

37-
To enable optimized CRC32 checksum validation, install crc32c:
50+
To enable LZ4 compression/decompression, install python-lz4:
3851

39-
>>> pip install crc32c
52+
>>> pip install 'kafka-python[lz4]'
4053

4154

4255
Optional Snappy install
@@ -77,17 +90,4 @@ Install the `python-snappy` module
7790

7891
.. code:: bash
7992
80-
pip install python-snappy
81-
82-
83-
Optional crc32c install
84-
***********************
85-
Highly recommended if you are using Kafka 11+ brokers. For those `kafka-python`
86-
uses a new message protocol version, that requires calculation of `crc32c`,
87-
which differs from `zlib.crc32` hash implementation. By default `kafka-python`
88-
calculates it in pure python, which is quite slow. To speed it up we optionally
89-
support https://pypi.python.org/pypi/crc32c package if it's installed.
90-
91-
.. code:: bash
92-
93-
pip install crc32c
93+
pip install 'kafka-python[snappy]'

kafka/conn.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from kafka.future import Future
2525
from kafka.metrics.stats import Avg, Count, Max, Rate
2626
from kafka.oauth.abstract import AbstractTokenProvider
27-
from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest_v2
27+
from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest_v2, DescribeClientQuotasRequest
2828
from kafka.protocol.commit import OffsetFetchRequest
2929
from kafka.protocol.offset import OffsetRequest
3030
from kafka.protocol.produce import ProduceRequest
@@ -1169,6 +1169,7 @@ def _infer_broker_version_from_api_versions(self, api_versions):
11691169
# in reverse order. As soon as we find one that works, return it
11701170
test_cases = [
11711171
# format (<broker version>, <needed struct>)
1172+
((2, 6, 0), DescribeClientQuotasRequest[0]),
11721173
((2, 5, 0), DescribeAclsRequest_v2),
11731174
((2, 4, 0), ProduceRequest[8]),
11741175
((2, 3, 0), FetchRequest[11]),

kafka/consumer/fetcher.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -817,8 +817,9 @@ def _parse_fetched_data(self, completed_fetch):
817817
position)
818818
unpacked = list(self._unpack_message_set(tp, records))
819819
parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked)
820-
last_offset = unpacked[-1].offset
821-
self._sensors.records_fetch_lag.record(highwater - last_offset)
820+
if unpacked:
821+
last_offset = unpacked[-1].offset
822+
self._sensors.records_fetch_lag.record(highwater - last_offset)
822823
num_bytes = records.valid_bytes()
823824
records_count = len(unpacked)
824825
elif records.size_in_bytes() > 0:

kafka/consumer/group.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -651,7 +651,7 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
651651
# Poll for new data until the timeout expires
652652
start = time.time()
653653
remaining = timeout_ms
654-
while True:
654+
while not self._closed:
655655
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
656656
if records:
657657
return records
@@ -660,7 +660,9 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
660660
remaining = timeout_ms - elapsed_ms
661661

662662
if remaining <= 0:
663-
return {}
663+
break
664+
665+
return {}
664666

665667
def _poll_once(self, timeout_ms, max_records, update_offsets=True):
666668
"""Do one round of polling. In addition to checking for new data, this does

kafka/coordinator/assignors/sticky/sticky_assignor.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -648,15 +648,19 @@ def parse_member_metadata(cls, metadata):
648648

649649
@classmethod
650650
def metadata(cls, topics):
651-
if cls.member_assignment is None:
651+
return cls._metadata(topics, cls.member_assignment, cls.generation)
652+
653+
@classmethod
654+
def _metadata(cls, topics, member_assignment_partitions, generation=-1):
655+
if member_assignment_partitions is None:
652656
log.debug("No member assignment available")
653657
user_data = b''
654658
else:
655659
log.debug("Member assignment is available, generating the metadata: generation {}".format(cls.generation))
656660
partitions_by_topic = defaultdict(list)
657-
for topic_partition in cls.member_assignment: # pylint: disable=not-an-iterable
661+
for topic_partition in member_assignment_partitions:
658662
partitions_by_topic[topic_partition.topic].append(topic_partition.partition)
659-
data = StickyAssignorUserDataV1(six.iteritems(partitions_by_topic), cls.generation)
663+
data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), generation)
660664
user_data = data.encode()
661665
return ConsumerProtocolMemberMetadata(cls.version, list(topics), user_data)
662666

kafka/protocol/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,5 @@
4545
42: 'DeleteGroups',
4646
45: 'AlterPartitionReassignments',
4747
46: 'ListPartitionReassignments',
48+
48: 'DescribeClientQuotas',
4849
}

kafka/protocol/admin.py

+44-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from __future__ import absolute_import
22

33
from kafka.protocol.api import Request, Response
4-
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, CompactString, CompactArray, TaggedFields
4+
5+
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields
56

67

78
class ApiVersionResponse_v0(Response):
@@ -1011,4 +1012,45 @@ class ListPartitionReassignmentsRequest_v0(Request):
10111012

10121013
ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0]
10131014

1014-
ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0]
1015+
ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0]
1016+
1017+
1018+
class DescribeClientQuotasResponse_v0(Request):
1019+
API_KEY = 48
1020+
API_VERSION = 0
1021+
SCHEMA = Schema(
1022+
('throttle_time_ms', Int32),
1023+
('error_code', Int16),
1024+
('error_message', String('utf-8')),
1025+
('entries', Array(
1026+
('entity', Array(
1027+
('entity_type', String('utf-8')),
1028+
('entity_name', String('utf-8')))),
1029+
('values', Array(
1030+
('name', String('utf-8')),
1031+
('value', Float64))))),
1032+
)
1033+
1034+
1035+
class DescribeClientQuotasRequest_v0(Request):
1036+
API_KEY = 48
1037+
API_VERSION = 0
1038+
RESPONSE_TYPE = DescribeClientQuotasResponse_v0
1039+
SCHEMA = Schema(
1040+
('components', Array(
1041+
('entity_type', String('utf-8')),
1042+
('match_type', Int8),
1043+
('match', String('utf-8')),
1044+
)),
1045+
('strict', Boolean)
1046+
)
1047+
1048+
1049+
DescribeClientQuotasRequest = [
1050+
DescribeClientQuotasRequest_v0,
1051+
]
1052+
1053+
DescribeClientQuotasResponse = [
1054+
DescribeClientQuotasResponse_v0,
1055+
]
1056+

kafka/protocol/types.py

+16-2
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,19 @@ def decode(cls, data):
7777
return _unpack(cls._unpack, data.read(8))
7878

7979

80+
class Float64(AbstractType):
81+
_pack = struct.Struct('>d').pack
82+
_unpack = struct.Struct('>d').unpack
83+
84+
@classmethod
85+
def encode(cls, value):
86+
return _pack(cls._pack, value)
87+
88+
@classmethod
89+
def decode(cls, data):
90+
return _unpack(cls._unpack, data.read(8))
91+
92+
8093
class String(AbstractType):
8194
def __init__(self, encoding='utf-8'):
8295
self.encoding = encoding
@@ -181,9 +194,10 @@ def __init__(self, *array_of):
181194
def encode(self, items):
182195
if items is None:
183196
return Int32.encode(-1)
197+
encoded_items = [self.array_of.encode(item) for item in items]
184198
return b''.join(
185-
[Int32.encode(len(items))] +
186-
[self.array_of.encode(item) for item in items]
199+
[Int32.encode(len(encoded_items))] +
200+
encoded_items
187201
)
188202

189203
def decode(self, data):

0 commit comments

Comments
 (0)