-
Notifications
You must be signed in to change notification settings - Fork 1.4k
KIP-482: add support for flexible versions / tagged fields #2151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The way the request / response headers are handled needs an update, as apparently it's possible to send requests that have an api version with flexible versions enabled with both flexible versions and without. So if someone ends up defining a new requests object but using the old collections, introducing the empty tag buffer will mess up the entire payload. The java code base uses the json files to generate the classes so maybe we should look into that as well? Also, I'll look into why the coverage is so low... but i assume it has to do with the 2 utility methods i have added to the admin client and the utility function for extracting the flexible versions EDIT: Also, the latest supported kafka version should be updated, but in order to do that , flexible versions are required. I can add the code here, but it would bloat the PR |
4550703
to
5330820
Compare
Is there a KIP associated with this change? I switched teams and only work infrequently with Kafka now, so haven't kept up with the latest news. |
@jeffwidman But i guess the second link is the one with the more meat on it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way the request / response headers are handled needs an update, as apparently it's possible to send requests that have an api version with flexible versions enabled with both flexible versions and without.
Can you explain this in more detail? Do you mean that it is possible to send a "flexible version" api request but not include any tags in the header (as I understand it this takes up 1 byte)? Or do you mean not even include the tag structure in the header (0 bytes)? I would assume that any "flexible version" request would require at least a 1-byte empty tag buffer and would generate a response that also has at least a 1-byte empty tag buffer?
kafka/protocol/__init__.py
Outdated
} | ||
|
||
# generated using the util function | ||
FLEXIBLE_VERSIONS = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way I read the KIP and protocol definitions, what this does is tell us when to add/expect a tag buffer in our request and response headers. Personally I think it would be better to do this explicitly via a new constant like HEADER_VERSION
or HAS_TAGGED_FIELDS
that we could manage on the protocol struct definitions. If we do want to use this lookup table as a shortcut, I would probably put it in api.py along side the request/response header code, since I think that is what it relates to specifically. (because if a response struct includes TaggedFields they will be decoded normally via the parser and dont need a flexible versions lookup at that stage).
kafka/protocol/api.py
Outdated
('_tag_buffer', TaggedFields) | ||
) | ||
|
||
def __init__(self, request, correlation_id=0, client_id='kafka-python'): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the standard interface for setting tagged fields? Should it be added to the constructor args?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm adding constructor support for the tags, but for now they will still be initialized to empty i guess
kafka/protocol/parser.py
Outdated
header = RequestHeader(request, | ||
correlation_id=correlation_id, | ||
client_id=self._client_id) | ||
if not self.is_flexible_version_request(request): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we move this to the Request class? Something like this:
header= request.build_header(correlation_id=correlation_id, client_id=self._client_id)
and let build_header call self.is_flexible_version_request()
(or expect_tagged_fields()
) internally to decide whether to return v1 or v2 header type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved this and the response header parsing to the request class
kafka/protocol/parser.py
Outdated
@@ -162,6 +168,9 @@ def _process_response(self, read_buffer): | |||
'Correlation IDs do not match: sent %d, recv %d' | |||
% (correlation_id, recv_correlation_id)) | |||
|
|||
# Flexible response / request headers end in field buffer | |||
if self.is_flexible_version_request(request): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps something like request.expect_tagged_fields()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just took your hint and added a class property to indicate this
return ret | ||
prev_tag = -1 | ||
for i in range(num_fields): | ||
tag = UnsignedVarInt32.decode(data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it expected that tag numbers are globally unique? Is the apache kafka project planning to publish a list of supported / known tags + numbers, and value serialization formats?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation leads me to believe that, at least initially, there will not be any structure to these tags, other than what the client implementation may wish to impart on them (their example of using a trace id comes to mind)
5330820
to
359116e
Compare
6e211d4
to
6f932ba
Compare
Varint code lifted from the java kafka repo, with the caveat that we need to specify a byte size by bitmasking due to how python integers handle the sign bit Serialization tests lifted from java repo as well - Add support for varints - Add support for compact collections (byte array, string, array) - Add support for new request and response headers, supporting flexible versions - Add List / Alter partition reassignments apis
@dpkp this is ready for another look, whenever you have the time |
Thanks a lot. I really like this design. We will probably have to convert the default for FLEXIBLE_VERSIONS from false -> true eventually, but I think we can wait for this feature to bake upstream a bit before we do that. |
Wow, this is awesome work! Good job! 👍 |
versions
This change is