Skip to content

Commit 9feeb79

Browse files
author
Tincu Gabriel
authored
Core Protocol: Add support for flexible versions (dpkp#2151)
- Add support for new request and response headers, supporting flexible versions / tagged fields - Add List / Alter partition reassignments APIs - Add support for varints - Add support for compact collections (byte array, string, array)
1 parent c48817e commit 9feeb79

File tree

6 files changed

+347
-17
lines changed

6 files changed

+347
-17
lines changed

kafka/protocol/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,7 @@
4343
40: 'ExpireDelegationToken',
4444
41: 'DescribeDelegationToken',
4545
42: 'DeleteGroups',
46+
45: 'AlterPartitionReassignments',
47+
46: 'ListPartitionReassignments',
4648
48: 'DescribeClientQuotas',
4749
}

kafka/protocol/admin.py

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import absolute_import
22

33
from kafka.protocol.api import Request, Response
4-
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64
4+
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields
55

66

77
class ApiVersionResponse_v0(Response):
@@ -963,3 +963,92 @@ class DescribeClientQuotasRequest_v0(Request):
963963
DescribeClientQuotasResponse = [
964964
DescribeClientQuotasResponse_v0,
965965
]
966+
967+
968+
class AlterPartitionReassignmentsResponse_v0(Response):
969+
API_KEY = 45
970+
API_VERSION = 0
971+
SCHEMA = Schema(
972+
("throttle_time_ms", Int32),
973+
("error_code", Int16),
974+
("error_message", CompactString("utf-8")),
975+
("responses", CompactArray(
976+
("name", CompactString("utf-8")),
977+
("partitions", CompactArray(
978+
("partition_index", Int32),
979+
("error_code", Int16),
980+
("error_message", CompactString("utf-8")),
981+
("tags", TaggedFields)
982+
)),
983+
("tags", TaggedFields)
984+
)),
985+
("tags", TaggedFields)
986+
)
987+
988+
989+
class AlterPartitionReassignmentsRequest_v0(Request):
990+
FLEXIBLE_VERSION = True
991+
API_KEY = 45
992+
API_VERSION = 0
993+
RESPONSE_TYPE = AlterPartitionReassignmentsResponse_v0
994+
SCHEMA = Schema(
995+
("timeout_ms", Int32),
996+
("topics", CompactArray(
997+
("name", CompactString("utf-8")),
998+
("partitions", CompactArray(
999+
("partition_index", Int32),
1000+
("replicas", CompactArray(Int32)),
1001+
("tags", TaggedFields)
1002+
)),
1003+
("tags", TaggedFields)
1004+
)),
1005+
("tags", TaggedFields)
1006+
)
1007+
1008+
1009+
AlterPartitionReassignmentsRequest = [AlterPartitionReassignmentsRequest_v0]
1010+
1011+
AlterPartitionReassignmentsResponse = [AlterPartitionReassignmentsResponse_v0]
1012+
1013+
1014+
class ListPartitionReassignmentsResponse_v0(Response):
1015+
API_KEY = 46
1016+
API_VERSION = 0
1017+
SCHEMA = Schema(
1018+
("throttle_time_ms", Int32),
1019+
("error_code", Int16),
1020+
("error_message", CompactString("utf-8")),
1021+
("topics", CompactArray(
1022+
("name", CompactString("utf-8")),
1023+
("partitions", CompactArray(
1024+
("partition_index", Int32),
1025+
("replicas", CompactArray(Int32)),
1026+
("adding_replicas", CompactArray(Int32)),
1027+
("removing_replicas", CompactArray(Int32)),
1028+
("tags", TaggedFields)
1029+
)),
1030+
("tags", TaggedFields)
1031+
)),
1032+
("tags", TaggedFields)
1033+
)
1034+
1035+
1036+
class ListPartitionReassignmentsRequest_v0(Request):
1037+
FLEXIBLE_VERSION = True
1038+
API_KEY = 46
1039+
API_VERSION = 0
1040+
RESPONSE_TYPE = ListPartitionReassignmentsResponse_v0
1041+
SCHEMA = Schema(
1042+
("timeout_ms", Int32),
1043+
("topics", CompactArray(
1044+
("name", CompactString("utf-8")),
1045+
("partition_index", CompactArray(Int32)),
1046+
("tags", TaggedFields)
1047+
)),
1048+
("tags", TaggedFields)
1049+
)
1050+
1051+
1052+
ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0]
1053+
1054+
ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0]

kafka/protocol/api.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import abc
44

55
from kafka.protocol.struct import Struct
6-
from kafka.protocol.types import Int16, Int32, String, Schema, Array
6+
from kafka.protocol.types import Int16, Int32, String, Schema, Array, TaggedFields
77

88

99
class RequestHeader(Struct):
@@ -20,9 +20,40 @@ def __init__(self, request, correlation_id=0, client_id='kafka-python'):
2020
)
2121

2222

23+
class RequestHeaderV2(Struct):
24+
# Flexible response / request headers end in field buffer
25+
SCHEMA = Schema(
26+
('api_key', Int16),
27+
('api_version', Int16),
28+
('correlation_id', Int32),
29+
('client_id', String('utf-8')),
30+
('tags', TaggedFields),
31+
)
32+
33+
def __init__(self, request, correlation_id=0, client_id='kafka-python', tags=None):
34+
super(RequestHeaderV2, self).__init__(
35+
request.API_KEY, request.API_VERSION, correlation_id, client_id, tags or {}
36+
)
37+
38+
39+
class ResponseHeader(Struct):
40+
SCHEMA = Schema(
41+
('correlation_id', Int32),
42+
)
43+
44+
45+
class ResponseHeaderV2(Struct):
46+
SCHEMA = Schema(
47+
('correlation_id', Int32),
48+
('tags', TaggedFields),
49+
)
50+
51+
2352
class Request(Struct):
2453
__metaclass__ = abc.ABCMeta
2554

55+
FLEXIBLE_VERSION = False
56+
2657
@abc.abstractproperty
2758
def API_KEY(self):
2859
"""Integer identifier for api request"""
@@ -50,6 +81,16 @@ def expect_response(self):
5081
def to_object(self):
5182
return _to_object(self.SCHEMA, self)
5283

84+
def build_request_header(self, correlation_id, client_id):
85+
if self.FLEXIBLE_VERSION:
86+
return RequestHeaderV2(self, correlation_id=correlation_id, client_id=client_id)
87+
return RequestHeader(self, correlation_id=correlation_id, client_id=client_id)
88+
89+
def parse_response_header(self, read_buffer):
90+
if self.FLEXIBLE_VERSION:
91+
return ResponseHeaderV2.decode(read_buffer)
92+
return ResponseHeader.decode(read_buffer)
93+
5394

5495
class Response(Struct):
5596
__metaclass__ = abc.ABCMeta

kafka/protocol/parser.py

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44
import logging
55

66
import kafka.errors as Errors
7-
from kafka.protocol.api import RequestHeader
87
from kafka.protocol.commit import GroupCoordinatorResponse
98
from kafka.protocol.frame import KafkaBytes
10-
from kafka.protocol.types import Int32
9+
from kafka.protocol.types import Int32, TaggedFields
1110
from kafka.version import __version__
1211

1312
log = logging.getLogger(__name__)
@@ -59,9 +58,8 @@ def send_request(self, request, correlation_id=None):
5958
log.debug('Sending request %s', request)
6059
if correlation_id is None:
6160
correlation_id = self._next_correlation_id()
62-
header = RequestHeader(request,
63-
correlation_id=correlation_id,
64-
client_id=self._client_id)
61+
62+
header = request.build_request_header(correlation_id=correlation_id, client_id=self._client_id)
6563
message = b''.join([header.encode(), request.encode()])
6664
size = Int32.encode(len(message))
6765
data = size + message
@@ -135,17 +133,12 @@ def receive_bytes(self, data):
135133
return responses
136134

137135
def _process_response(self, read_buffer):
138-
recv_correlation_id = Int32.decode(read_buffer)
139-
log.debug('Received correlation id: %d', recv_correlation_id)
140-
141136
if not self.in_flight_requests:
142-
raise Errors.CorrelationIdError(
143-
'No in-flight-request found for server response'
144-
' with correlation ID %d'
145-
% (recv_correlation_id,))
146-
137+
raise Errors.CorrelationIdError('No in-flight-request found for server response')
147138
(correlation_id, request) = self.in_flight_requests.popleft()
148-
139+
response_header = request.parse_response_header(read_buffer)
140+
recv_correlation_id = response_header.correlation_id
141+
log.debug('Received correlation id: %d', recv_correlation_id)
149142
# 0.8.2 quirk
150143
if (recv_correlation_id == 0 and
151144
correlation_id != 0 and

kafka/protocol/types.py

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,3 +210,156 @@ def repr(self, list_of_items):
210210
if list_of_items is None:
211211
return 'NULL'
212212
return '[' + ', '.join([self.array_of.repr(item) for item in list_of_items]) + ']'
213+
214+
215+
class UnsignedVarInt32(AbstractType):
216+
@classmethod
217+
def decode(cls, data):
218+
value, i = 0, 0
219+
while True:
220+
b, = struct.unpack('B', data.read(1))
221+
if not (b & 0x80):
222+
break
223+
value |= (b & 0x7f) << i
224+
i += 7
225+
if i > 28:
226+
raise ValueError('Invalid value {}'.format(value))
227+
value |= b << i
228+
return value
229+
230+
@classmethod
231+
def encode(cls, value):
232+
value &= 0xffffffff
233+
ret = b''
234+
while (value & 0xffffff80) != 0:
235+
b = (value & 0x7f) | 0x80
236+
ret += struct.pack('B', b)
237+
value >>= 7
238+
ret += struct.pack('B', value)
239+
return ret
240+
241+
242+
class VarInt32(AbstractType):
243+
@classmethod
244+
def decode(cls, data):
245+
value = UnsignedVarInt32.decode(data)
246+
return (value >> 1) ^ -(value & 1)
247+
248+
@classmethod
249+
def encode(cls, value):
250+
# bring it in line with the java binary repr
251+
value &= 0xffffffff
252+
return UnsignedVarInt32.encode((value << 1) ^ (value >> 31))
253+
254+
255+
class VarInt64(AbstractType):
256+
@classmethod
257+
def decode(cls, data):
258+
value, i = 0, 0
259+
while True:
260+
b = data.read(1)
261+
if not (b & 0x80):
262+
break
263+
value |= (b & 0x7f) << i
264+
i += 7
265+
if i > 63:
266+
raise ValueError('Invalid value {}'.format(value))
267+
value |= b << i
268+
return (value >> 1) ^ -(value & 1)
269+
270+
@classmethod
271+
def encode(cls, value):
272+
# bring it in line with the java binary repr
273+
value &= 0xffffffffffffffff
274+
v = (value << 1) ^ (value >> 63)
275+
ret = b''
276+
while (v & 0xffffffffffffff80) != 0:
277+
b = (value & 0x7f) | 0x80
278+
ret += struct.pack('B', b)
279+
v >>= 7
280+
ret += struct.pack('B', v)
281+
return ret
282+
283+
284+
class CompactString(String):
285+
def decode(self, data):
286+
length = UnsignedVarInt32.decode(data) - 1
287+
if length < 0:
288+
return None
289+
value = data.read(length)
290+
if len(value) != length:
291+
raise ValueError('Buffer underrun decoding string')
292+
return value.decode(self.encoding)
293+
294+
def encode(self, value):
295+
if value is None:
296+
return UnsignedVarInt32.encode(0)
297+
value = str(value).encode(self.encoding)
298+
return UnsignedVarInt32.encode(len(value) + 1) + value
299+
300+
301+
class TaggedFields(AbstractType):
302+
@classmethod
303+
def decode(cls, data):
304+
num_fields = UnsignedVarInt32.decode(data)
305+
ret = {}
306+
if not num_fields:
307+
return ret
308+
prev_tag = -1
309+
for i in range(num_fields):
310+
tag = UnsignedVarInt32.decode(data)
311+
if tag <= prev_tag:
312+
raise ValueError('Invalid or out-of-order tag {}'.format(tag))
313+
prev_tag = tag
314+
size = UnsignedVarInt32.decode(data)
315+
val = data.read(size)
316+
ret[tag] = val
317+
return ret
318+
319+
@classmethod
320+
def encode(cls, value):
321+
ret = UnsignedVarInt32.encode(len(value))
322+
for k, v in value.items():
323+
# do we allow for other data types ?? It could get complicated really fast
324+
assert isinstance(v, bytes), 'Value {} is not a byte array'.format(v)
325+
assert isinstance(k, int) and k > 0, 'Key {} is not a positive integer'.format(k)
326+
ret += UnsignedVarInt32.encode(k)
327+
ret += v
328+
return ret
329+
330+
331+
class CompactBytes(AbstractType):
332+
@classmethod
333+
def decode(cls, data):
334+
length = UnsignedVarInt32.decode(data) - 1
335+
if length < 0:
336+
return None
337+
value = data.read(length)
338+
if len(value) != length:
339+
raise ValueError('Buffer underrun decoding Bytes')
340+
return value
341+
342+
@classmethod
343+
def encode(cls, value):
344+
if value is None:
345+
return UnsignedVarInt32.encode(0)
346+
else:
347+
return UnsignedVarInt32.encode(len(value) + 1) + value
348+
349+
350+
class CompactArray(Array):
351+
352+
def encode(self, items):
353+
if items is None:
354+
return UnsignedVarInt32.encode(0)
355+
return b''.join(
356+
[UnsignedVarInt32.encode(len(items) + 1)] +
357+
[self.array_of.encode(item) for item in items]
358+
)
359+
360+
def decode(self, data):
361+
length = UnsignedVarInt32.decode(data) - 1
362+
if length == -1:
363+
return None
364+
return [self.array_of.decode(data) for _ in range(length)]
365+

0 commit comments

Comments
 (0)