Skip to content

Commit 4e39f6a

Browse files
authored
[KIP-848] Test changes required for KIP-848 Preview (#1967)
* Fixed test_basic_operations integration test failing intermittently. * Upgraded trivup to v0.12.10 * 1) Skipping few tests related to session.timeout.ms for the 'consumer' protocol 2) Fixed test related to assign and unassign in incremental assignor case for 'consumer' protocol * Updated member id test for KIP-1082 * Skipping consumer error tests for 'consumer' protocol for now as it uses session.timeout.ms * Using librdkafka v2.10.0-RC3. Using Java 17 for consumer protocol integration tests. * Disabled test causing MacOS fatal error issue
1 parent 243a57c commit 4e39f6a

13 files changed

+130
-61
lines changed

.semaphore/semaphore.yml

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ execution_time_limit:
88
global_job_config:
99
env_vars:
1010
- name: LIBRDKAFKA_VERSION
11-
value: v2.8.0
11+
value: v2.10.0-RC3
1212
prologue:
1313
commands:
1414
- checkout
@@ -208,6 +208,7 @@ blocks:
208208
- name: Build and Tests with 'consumer' group protocol
209209
commands:
210210
- sem-version python 3.9
211+
- sem-version java 17
211212
# use a virtualenv
212213
- python3 -m venv _venv && source _venv/bin/activate
213214
- chmod u+r+x tools/source-package-verification.sh

requirements/requirements-tests-install.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@
44
-r requirements-avro.txt
55
-r requirements-protobuf.txt
66
-r requirements-json.txt
7-
tests/trivup/trivup-0.12.7.tar.gz
7+
tests/trivup/trivup-0.12.10.tar.gz

tests/common/__init__.py

+45-1
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,26 @@ def _trivup_cluster_type_kraft():
2828

2929

3030
class TestUtils:
31+
@staticmethod
32+
def broker_version():
33+
return '4.0.0' if TestUtils.use_group_protocol_consumer() else '3.9.0'
34+
35+
@staticmethod
36+
def broker_conf():
37+
broker_conf = ['transaction.state.log.replication.factor=1',
38+
'transaction.state.log.min.isr=1']
39+
if TestUtils.use_group_protocol_consumer():
40+
broker_conf.append('group.coordinator.rebalance.protocols=classic,consumer')
41+
return broker_conf
42+
43+
@staticmethod
44+
def _broker_major_version():
45+
return int(TestUtils.broker_version().split('.')[0])
46+
3147
@staticmethod
3248
def use_kraft():
33-
return TestUtils.use_group_protocol_consumer() or _trivup_cluster_type_kraft()
49+
return (TestUtils.use_group_protocol_consumer() or
50+
_trivup_cluster_type_kraft())
3451

3552
@staticmethod
3653
def use_group_protocol_consumer():
@@ -41,8 +58,35 @@ def update_conf_group_protocol(conf=None):
4158
if conf is not None and 'group.id' in conf and TestUtils.use_group_protocol_consumer():
4259
conf['group.protocol'] = 'consumer'
4360

61+
@staticmethod
62+
def remove_forbidden_conf_group_protocol_consumer(conf):
63+
if conf is None:
64+
return
65+
if TestUtils.use_group_protocol_consumer():
66+
forbidden_conf_properties = ["session.timeout.ms",
67+
"partition.assignment.strategy",
68+
"heartbeat.interval.ms",
69+
"group.protocol.type"]
70+
for prop in forbidden_conf_properties:
71+
if prop in conf:
72+
print("Skipping setting forbidden configuration {prop} for `CONSUMER` protocol")
73+
del conf[prop]
74+
4475

4576
class TestConsumer(Consumer):
4677
def __init__(self, conf=None, **kwargs):
4778
TestUtils.update_conf_group_protocol(conf)
79+
TestUtils.remove_forbidden_conf_group_protocol_consumer(conf)
4880
super(TestConsumer, self).__init__(conf, **kwargs)
81+
82+
def assign(self, partitions):
83+
if TestUtils.use_group_protocol_consumer():
84+
super(TestConsumer, self).incremental_assign(partitions)
85+
else:
86+
super(TestConsumer, self).assign(partitions)
87+
88+
def unassign(self, partitions):
89+
if TestUtils.use_group_protocol_consumer():
90+
super(TestConsumer, self).incremental_unassign(partitions)
91+
else:
92+
super(TestConsumer, self).unassign()

tests/common/schema_registry/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@
2424
class TestDeserializingConsumer(DeserializingConsumer):
2525
def __init__(self, conf=None, **kwargs):
2626
TestUtils.update_conf_group_protocol(conf)
27+
TestUtils.remove_forbidden_conf_group_protocol_consumer(conf)
2728
super(TestDeserializingConsumer, self).__init__(conf, **kwargs)
2829

2930

3031
class TestAvroConsumer(AvroConsumer):
3132
def __init__(self, conf=None, **kwargs):
3233
TestUtils.update_conf_group_protocol(conf)
34+
TestUtils.remove_forbidden_conf_group_protocol_consumer(conf)
3335
super(TestAvroConsumer, self).__init__(conf, **kwargs)

tests/integration/admin/test_basic_operations.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,8 @@ def consume_messages(group_id, num_messages=None):
277277
print('Read all the required messages: exiting')
278278
break
279279
except ConsumeError as e:
280-
if msg is not None and e.code == KafkaError._PARTITION_EOF:
280+
if e.code == KafkaError._PARTITION_EOF:
281+
msg = e.kafka_message
281282
print('Reached end of %s [%d] at offset %d' % (
282283
msg.topic(), msg.partition(), msg.offset()))
283284
eof_reached[(msg.topic(), msg.partition())] = True

tests/integration/conftest.py

+4-16
Original file line numberDiff line numberDiff line change
@@ -26,38 +26,26 @@
2626
work_dir = os.path.dirname(os.path.realpath(__file__))
2727

2828

29-
def _broker_conf():
30-
broker_conf = ['transaction.state.log.replication.factor=1',
31-
'transaction.state.log.min.isr=1']
32-
if TestUtils.use_group_protocol_consumer():
33-
broker_conf.append('group.coordinator.rebalance.protocols=classic,consumer')
34-
return broker_conf
35-
36-
37-
def _broker_version():
38-
return 'trunk@3a0efa2845e6a0d237772adfe6364579af50ce18' if TestUtils.use_group_protocol_consumer() else '3.8.0'
39-
40-
4129
def create_trivup_cluster(conf={}):
4230
trivup_fixture_conf = {'with_sr': True,
4331
'debug': True,
4432
'cp_version': '7.6.0',
4533
'kraft': TestUtils.use_kraft(),
46-
'version': _broker_version(),
47-
'broker_conf': _broker_conf()}
34+
'version': TestUtils.broker_version(),
35+
'broker_conf': TestUtils.broker_conf()}
4836
trivup_fixture_conf.update(conf)
4937
return TrivupFixture(trivup_fixture_conf)
5038

5139

5240
def create_sasl_cluster(conf={}):
5341
trivup_fixture_conf = {'with_sr': False,
54-
'version': _broker_version(),
42+
'version': TestUtils.broker_version(),
5543
'sasl_mechanism': "PLAIN",
5644
'kraft': TestUtils.use_kraft(),
5745
'sasl_users': 'sasl_user=sasl_user',
5846
'debug': True,
5947
'cp_version': 'latest',
60-
'broker_conf': _broker_conf()}
48+
'broker_conf': TestUtils.broker_conf()}
6149
trivup_fixture_conf.update(conf)
6250
return TrivupFixture(trivup_fixture_conf)
6351

tests/integration/consumer/test_consumer_error.py

+19-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
from confluent_kafka.error import ConsumeError
2323
from confluent_kafka.serialization import StringSerializer
24+
from tests.common import TestUtils
2425

2526

2627
def test_consume_error(kafka_cluster):
@@ -46,6 +47,14 @@ def test_consume_error(kafka_cluster):
4647
"Expected _PARTITION_EOF, not {}".format(exc_info)
4748

4849

50+
# Skipping the test for consumer protocol for now. Update the test to use
51+
# IncrementalAlterConfigs Admin operation to update
52+
# group.session.timeout.ms and enable the test again.
53+
@pytest.mark.skipif(TestUtils.use_group_protocol_consumer(),
54+
reason="session.timeout.ms is not supported on client side for "
55+
"consumer protocol. Update this test to use IncrementalAlterConfigs "
56+
"Admin operation to update group.session.timeout.ms and enable "
57+
"the test again.")
4958
def test_consume_error_commit(kafka_cluster):
5059
"""
5160
Tests to ensure that we handle messages with errors when commiting.
@@ -63,13 +72,21 @@ def test_consume_error_commit(kafka_cluster):
6372
try:
6473
# Since the session timeout value is low, JoinGroupRequest will fail
6574
# and we get error in a message while polling.
66-
m = consumer.poll(1)
75+
m = consumer.poll(2)
6776
consumer.commit(m)
6877
except KafkaException as e:
6978
assert e.args[0].code() == KafkaError._INVALID_ARG, \
7079
"Expected INVALID_ARG, not {}".format(e)
7180

7281

82+
# Skipping the test for consumer protocol for now. Update the test to use
83+
# IncrementalAlterConfigs Admin operation to update
84+
# group.session.timeout.ms and enable the test again.
85+
@pytest.mark.skipif(TestUtils.use_group_protocol_consumer(),
86+
reason="session.timeout.ms is not supported on client side for "
87+
"consumer protocol. Update this test to use IncrementalAlterConfigs "
88+
"Admin operation to update group.session.timeout.ms and enable "
89+
"the test again.")
7390
def test_consume_error_store_offsets(kafka_cluster):
7491
"""
7592
Tests to ensure that we handle messages with errors when storing offsets.
@@ -89,7 +106,7 @@ def test_consume_error_store_offsets(kafka_cluster):
89106
try:
90107
# Since the session timeout value is low, JoinGroupRequest will fail
91108
# and we get error in a message while polling.
92-
m = consumer.poll(1)
109+
m = consumer.poll(2)
93110
consumer.store_offsets(m)
94111
except KafkaException as e:
95112
assert e.args[0].code() == KafkaError._INVALID_ARG, \

tests/integration/consumer/test_consumer_memberid.py

+18-5
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# limit
1717

1818
import pytest
19+
from tests.common import TestUtils
1920

2021

2122
def test_consumer_memberid(kafka_cluster):
@@ -32,17 +33,29 @@ def test_consumer_memberid(kafka_cluster):
3233
consumer = kafka_cluster.consumer(consumer_conf)
3334

3435
assert consumer is not None
35-
assert consumer.memberid() is None
36+
before_memberid = consumer.memberid()
37+
38+
# With implementation of KIP-1082, member id is generated on the client
39+
# side for ConsumerGroupHeartbeat used in the `consumer` protocol
40+
# introduced in KIP-848
41+
if TestUtils.use_group_protocol_consumer():
42+
assert before_memberid is not None
43+
assert isinstance(before_memberid, str)
44+
assert len(before_memberid) > 0
45+
else:
46+
assert before_memberid is None
47+
3648
kafka_cluster.seed_topic(topic, value_source=[b'memberid'])
3749

3850
consumer.subscribe([topic])
3951
msg = consumer.poll(10)
4052
assert msg is not None
4153
assert msg.value() == b'memberid'
42-
memberid = consumer.memberid()
43-
print("Member Id is -----> " + memberid)
44-
assert isinstance(memberid, str)
45-
assert len(memberid) > 0
54+
after_memberid = consumer.memberid()
55+
assert isinstance(after_memberid, str)
56+
assert len(after_memberid) > 0
57+
if TestUtils.use_group_protocol_consumer():
58+
assert before_memberid == after_memberid
4659
consumer.close()
4760

4861
with pytest.raises(RuntimeError) as error_info:

tests/integration/integration_test.py

+21-21
Original file line numberDiff line numberDiff line change
@@ -333,9 +333,9 @@ def verify_producer_performance(with_dr_cb=True):
333333
bar.finish()
334334

335335
print('# producing %d messages (%.2fMb) took %.3fs: %d msgs/s, %.2f Mb/s' %
336-
(msgs_produced, bytecnt / (1024*1024), t_produce_spent,
336+
(msgs_produced, bytecnt / (1024 * 1024), t_produce_spent,
337337
msgs_produced / t_produce_spent,
338-
(bytecnt/t_produce_spent) / (1024*1024)))
338+
(bytecnt / t_produce_spent) / (1024 * 1024)))
339339
print('# %d temporary produce() failures due to backpressure (local queue full)' % msgs_backpressure)
340340

341341
print('waiting for %d/%d deliveries' % (len(p), msgs_produced))
@@ -344,9 +344,9 @@ def verify_producer_performance(with_dr_cb=True):
344344
t_delivery_spent = time.time() - t_produce_start
345345

346346
print('# producing %d messages (%.2fMb) took %.3fs: %d msgs/s, %.2f Mb/s' %
347-
(msgs_produced, bytecnt / (1024*1024), t_produce_spent,
347+
(msgs_produced, bytecnt / (1024 * 1024), t_produce_spent,
348348
msgs_produced / t_produce_spent,
349-
(bytecnt/t_produce_spent) / (1024*1024)))
349+
(bytecnt / t_produce_spent) / (1024 * 1024)))
350350

351351
# Fake numbers if not using a dr_cb
352352
if not with_dr_cb:
@@ -355,9 +355,9 @@ def verify_producer_performance(with_dr_cb=True):
355355
dr.bytes_delivered = bytecnt
356356

357357
print('# delivering %d messages (%.2fMb) took %.3fs: %d msgs/s, %.2f Mb/s' %
358-
(dr.msgs_delivered, dr.bytes_delivered / (1024*1024), t_delivery_spent,
358+
(dr.msgs_delivered, dr.bytes_delivered / (1024 * 1024), t_delivery_spent,
359359
dr.msgs_delivered / t_delivery_spent,
360-
(dr.bytes_delivered/t_delivery_spent) / (1024*1024)))
360+
(dr.bytes_delivered / t_delivery_spent) / (1024 * 1024)))
361361
print('# post-produce delivery wait took %.3fs' %
362362
(t_delivery_spent - t_produce_spent))
363363

@@ -447,7 +447,7 @@ def print_wmark(consumer, topic_parts):
447447
elif (msg.offset() % 4) == 0:
448448
offsets = c.commit(msg, asynchronous=False)
449449
assert len(offsets) == 1, 'expected 1 offset, not %s' % (offsets)
450-
assert offsets[0].offset == msg.offset()+1, \
450+
assert offsets[0].offset == msg.offset() + 1, \
451451
'expected offset %d to be committed, not %s' % \
452452
(msg.offset(), offsets)
453453
print('Sync committed offset: %s' % offsets)
@@ -515,7 +515,7 @@ def my_on_revoke(consumer, partitions):
515515
print('on_revoke:', len(partitions), 'partitions:')
516516
for p in partitions:
517517
print(' %s [%d] @ %d' % (p.topic, p.partition, p.offset))
518-
consumer.unassign()
518+
consumer.unassign(partitions)
519519

520520
c.subscribe([topic], on_assign=my_on_assign, on_revoke=my_on_revoke)
521521

@@ -559,8 +559,8 @@ def my_on_revoke(consumer, partitions):
559559
if msgcnt > 0:
560560
t_spent = time.time() - t_first_msg
561561
print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' %
562-
(msgcnt, bytecnt / (1024*1024), t_spent, msgcnt / t_spent,
563-
(bytecnt / t_spent) / (1024*1024)))
562+
(msgcnt, bytecnt / (1024 * 1024), t_spent, msgcnt / t_spent,
563+
(bytecnt / t_spent) / (1024 * 1024)))
564564

565565
print('closing consumer')
566566
c.close()
@@ -590,11 +590,11 @@ def verify_consumer_seek(c, seek_to_msg):
590590
print('seek: message at offset %d (epoch %d)' %
591591
(msg.offset(), msg.leader_epoch()))
592592
assert msg.offset() == seek_to_msg.offset() and \
593-
msg.leader_epoch() == seek_to_msg.leader_epoch(), \
594-
('expected message at offset %d (epoch %d), ' % (seek_to_msg.offset(),
595-
seek_to_msg.leader_epoch())) + \
596-
('not %d (epoch %d)' % (msg.offset(),
597-
msg.leader_epoch()))
593+
msg.leader_epoch() == seek_to_msg.leader_epoch(), \
594+
('expected message at offset %d (epoch %d), ' % (seek_to_msg.offset(),
595+
seek_to_msg.leader_epoch())) + \
596+
('not %d (epoch %d)' % (msg.offset(),
597+
msg.leader_epoch()))
598598
break
599599

600600

@@ -643,7 +643,7 @@ def verify_batch_consumer():
643643
elif (msg.offset() % 4) == 0:
644644
offsets = c.commit(msg, asynchronous=False)
645645
assert len(offsets) == 1, 'expected 1 offset, not %s' % (offsets)
646-
assert offsets[0].offset == msg.offset()+1, \
646+
assert offsets[0].offset == msg.offset() + 1, \
647647
'expected offset %d to be committed, not %s' % \
648648
(msg.offset(), offsets)
649649
print('Sync committed offset: %s' % offsets)
@@ -697,7 +697,7 @@ def my_on_revoke(consumer, partitions):
697697
print('on_revoke:', len(partitions), 'partitions:')
698698
for p in partitions:
699699
print(' %s [%d] @ %d' % (p.topic, p.partition, p.offset))
700-
consumer.unassign()
700+
consumer.unassign(partitions)
701701

702702
c.subscribe([topic], on_assign=my_on_assign, on_revoke=my_on_revoke)
703703

@@ -738,8 +738,8 @@ def my_on_revoke(consumer, partitions):
738738
if msgcnt > 0:
739739
t_spent = time.time() - t_first_msg
740740
print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' %
741-
(msgcnt, bytecnt / (1024*1024), t_spent, msgcnt / t_spent,
742-
(bytecnt / t_spent) / (1024*1024)))
741+
(msgcnt, bytecnt / (1024 * 1024), t_spent, msgcnt / t_spent,
742+
(bytecnt / t_spent) / (1024 * 1024)))
743743

744744
print('closing consumer')
745745
c.close()
@@ -1035,8 +1035,8 @@ def stats_cb(stats_json_str):
10351035
if msgcnt > 0:
10361036
t_spent = time.time() - t_first_msg
10371037
print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' %
1038-
(msgcnt, bytecnt / (1024*1024), t_spent, msgcnt / t_spent,
1039-
(bytecnt / t_spent) / (1024*1024)))
1038+
(msgcnt, bytecnt / (1024 * 1024), t_spent, msgcnt / t_spent,
1039+
(bytecnt / t_spent) / (1024 * 1024)))
10401040

10411041
print('closing consumer')
10421042
c.close()

0 commit comments

Comments
 (0)