Skip to content

Commit a21d213

Browse files
authored
Merge pull request #1101 from nick-zh/remove-set-default-topic-conf
[rdkafka] remove topic conf, deprecated
2 parents a3016c0 + a71fd09 commit a21d213

File tree

5 files changed

+13
-11
lines changed

5 files changed

+13
-11
lines changed

Diff for: .travis.yml

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ install:
6868
- echo "memory_limit=2048M" >> ~/.phpenv/versions/$(phpenv version-name)/etc/conf.d/travis.ini
6969
- php ./bin/fix-symfony-version.php "$SYMFONY_VERSION"
7070
- composer install
71+
- sed -i 's/525568/16777471/' vendor/kwn/php-rdkafka-stubs/stubs/constants.php
7172
- if [ "$PREPARE_CONTAINER" = true ]; then docker --version; fi
7273
- if [ "$PREPARE_CONTAINER" = true ]; then docker-compose --version; fi
7374
- if [ "$PREPARE_CONTAINER" = true ]; then bin/dev -b; fi

Diff for: pkg/rdkafka/RdKafkaConnectionFactory.php

+4
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ class RdKafkaConnectionFactory implements ConnectionFactory
3939
*/
4040
public function __construct($config = 'kafka:')
4141
{
42+
if (version_compare(RdKafkaContext::getLibrdKafkaVersion(), '1.0.0', '<')) {
43+
throw new \RuntimeException('You must install librdkafka:1.0.0 or higher');
44+
}
45+
4246
if (empty($config) || 'kafka:' === $config) {
4347
$config = [];
4448
} elseif (is_string($config)) {

Diff for: pkg/rdkafka/RdKafkaContext.php

+3-8
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
use RdKafka\Conf;
2020
use RdKafka\KafkaConsumer;
2121
use RdKafka\Producer as VendorProducer;
22-
use RdKafka\TopicConf;
2322

2423
class RdKafkaContext implements Context
2524
{
@@ -184,20 +183,18 @@ public static function getLibrdKafkaVersion(): string
184183
private function getConf(): Conf
185184
{
186185
if (null === $this->conf) {
187-
$topicConf = new TopicConf();
186+
$this->conf = new Conf();
188187

189188
if (isset($this->config['topic']) && is_array($this->config['topic'])) {
190189
foreach ($this->config['topic'] as $key => $value) {
191-
$topicConf->set($key, $value);
190+
$this->conf->set($key, $value);
192191
}
193192
}
194193

195194
if (isset($this->config['partitioner'])) {
196-
$topicConf->setPartitioner($this->config['partitioner']);
195+
$this->conf->set('partitioner', $this->config['partitioner']);
197196
}
198197

199-
$this->conf = new Conf();
200-
201198
if (isset($this->config['global']) && is_array($this->config['global'])) {
202199
foreach ($this->config['global'] as $key => $value) {
203200
$this->conf->set($key, $value);
@@ -219,8 +216,6 @@ private function getConf(): Conf
219216
if (isset($this->config['stats_cb'])) {
220217
$this->conf->setStatsCb($this->config['stats_cb']);
221218
}
222-
223-
$this->conf->setDefaultTopicConf($topicConf);
224219
}
225220

226221
return $this->conf;

Diff for: pkg/rdkafka/RdKafkaProducer.php

+2-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ public function send(Destination $destination, Message $message): void
4747
// Headers in payload are maintained for backwards compatibility with apps that might run on lower phprdkafka version
4848
if (method_exists($topic, 'producev')) {
4949
// Phprdkafka <= 3.1.0 will fail calling `producev` on librdkafka >= 1.0.0 causing segfault
50-
if (version_compare(RdKafkaContext::getLibrdKafkaVersion(), '1.0.0', '>=')
50+
// Since we are forcing to use at least librdkafka:1.0.0, no need to check the lib version anymore
51+
if (false !== phpversion('rdkafka')
5152
&& version_compare(phpversion('rdkafka'), '3.1.0', '<=')) {
5253
trigger_error(
5354
'Phprdkafka <= 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`. '.

Diff for: pkg/rdkafka/Tests/Spec/RdKafkaSendToAndReceiveFromTopicTest.php

+3-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ public function test()
3030

3131
$context->createProducer()->send($topic, $context->createMessage($expectedBody));
3232

33-
$message = $consumer->receive(10000); // 10 sec
33+
// Initial balancing can take some time, so we want to make sure the timeout is high enough
34+
$message = $consumer->receive(15000); // 15 sec
3435

3536
$this->assertInstanceOf(Message::class, $message);
3637
$consumer->acknowledge($message);
@@ -47,7 +48,7 @@ protected function createContext()
4748
'enable.auto.commit' => 'false',
4849
],
4950
'topic' => [
50-
'auto.offset.reset' => 'beginning',
51+
'auto.offset.reset' => 'earliest',
5152
],
5253
];
5354

0 commit comments

Comments
 (0)