Skip to content

Commit c75b85b

Browse files
authored
Fix MetadataRequest for no topics (#2573)
1 parent e21fe99 commit c75b85b

File tree

3 files changed

+7
-3
lines changed

3 files changed

+7
-3
lines changed

kafka/client_async.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -978,8 +978,10 @@ def _maybe_refresh_metadata(self, wakeup=False):
978978
topics = list(self.config['bootstrap_topics_filter'])
979979

980980
api_version = self.api_version(MetadataRequest, max_version=7)
981-
if self.cluster.need_all_topic_metadata or not topics:
981+
if self.cluster.need_all_topic_metadata:
982982
topics = MetadataRequest[api_version].ALL_TOPICS
983+
elif not topics:
984+
topics = MetadataRequest[api_version].NO_TOPICS
983985
if api_version >= 4:
984986
request = MetadataRequest[api_version](topics, self.config['allow_auto_create_topics'])
985987
else:

kafka/protocol/metadata.py

+1
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ class MetadataRequest_v0(Request):
172172
('topics', Array(String('utf-8')))
173173
)
174174
ALL_TOPICS = [] # Empty Array (len 0) for topics returns all topics
175+
NO_TOPICS = [] # v0 does not support a 'no topics' request, so we'll just ask for ALL
175176

176177

177178
class MetadataRequest_v1(Request):

test/test_client_async.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def cli(mocker, conn):
3232

3333
def test_bootstrap(mocker, conn):
3434
conn.state = ConnectionStates.CONNECTED
35-
cli = KafkaClient(api_version=(0, 9))
35+
cli = KafkaClient(api_version=(2, 1))
3636
mocker.patch.object(cli, '_selector')
3737
future = cli.cluster.request_update()
3838
cli.poll(future=future)
@@ -43,7 +43,7 @@ def test_bootstrap(mocker, conn):
4343
kwargs.pop('state_change_callback')
4444
kwargs.pop('node_id')
4545
assert kwargs == cli.config
46-
conn.send.assert_called_once_with(MetadataRequest[0]([]), blocking=False, request_timeout_ms=None)
46+
conn.send.assert_called_once_with(MetadataRequest[7]([], True), blocking=False, request_timeout_ms=None)
4747
assert cli._bootstrap_fails == 0
4848
assert cli.cluster.brokers() == set([BrokerMetadata(0, 'foo', 12, None),
4949
BrokerMetadata(1, 'bar', 34, None)])
@@ -330,6 +330,7 @@ def test_maybe_refresh_metadata_update(mocker, client):
330330
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
331331
mocker.patch.object(client, '_can_send_request', return_value=True)
332332
send = mocker.patch.object(client, 'send')
333+
client.cluster.need_all_topic_metadata = True
333334

334335
client.poll(timeout_ms=12345678)
335336
client._poll.assert_called_with(9999.999) # request_timeout_ms

0 commit comments

Comments
 (0)