Skip to content

Commit 9fbc27b

Browse files
committed
Set commit_async as true by default for Kafka
Provide addition configuration documentation for integrating with Symfony bundle Fixes #580
1 parent dfe055c commit 9fbc27b

File tree

2 files changed

+49
-9
lines changed

2 files changed

+49
-9
lines changed

docs/transport/kafka.md

+48-8
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ $connectionFactory = new RdKafkaConnectionFactory('kafka:');
4141
$connectionFactory = new RdKafkaConnectionFactory([]);
4242

4343
// connect to Kafka broker at example.com:1000 plus custom options
44-
$connectionFactory = new RdKafkaConnectionFactory([
44+
$connectionFactory = new RdKafkaConnectionFactory([
4545
'global' => [
4646
'group.id' => uniqid('', true),
4747
'metadata.broker.list' => 'example.com:1000',
@@ -54,11 +54,11 @@ $connectionFactory = new RdKafkaConnectionFactory([
5454

5555
$context = $connectionFactory->createContext();
5656

57-
// if you have enqueue/enqueue library installed you can use a factory to build context from DSN
57+
// if you have enqueue/enqueue library installed you can use a factory to build context from DSN
5858
$context = (new \Enqueue\ConnectionFactoryFactory())->create('kafka:')->createContext();
5959
```
6060

61-
## Send message to topic
61+
## Send message to topic
6262

6363
```php
6464
<?php
@@ -71,7 +71,7 @@ $fooTopic = $context->createTopic('foo');
7171
$context->createProducer()->send($fooTopic, $message);
7272
```
7373

74-
## Send message to queue
74+
## Send message to queue
7575

7676
```php
7777
<?php
@@ -94,7 +94,7 @@ $fooQueue = $context->createQueue('foo');
9494

9595
$consumer = $context->createConsumer($fooQueue);
9696

97-
// Enable async commit to gain better performance.
97+
// Enable async commit to gain better performance (true by default since version 0.9.9).
9898
//$consumer->setCommitAsync(true);
9999

100100
$message = $consumer->receive();
@@ -108,7 +108,7 @@ $consumer->acknowledge($message);
108108
## Serialize message
109109

110110
By default the transport serializes messages to json format but you might want to use another format such as [Apache Avro](https://avro.apache.org/docs/1.2.0/).
111-
For that you have to implement Serializer interface and set it to the context, producer or consumer.
111+
For that you have to implement Serializer interface and set it to the context, producer or consumer.
112112
If a serializer set to context it will be injected to all consumers and producers created by the context.
113113

114114
```php
@@ -119,7 +119,7 @@ use Enqueue\RdKafka\RdKafkaMessage;
119119
class FooSerializer implements Serializer
120120
{
121121
public function toMessage($string) {}
122-
122+
123123
public function toString(RdKafkaMessage $message) {}
124124
}
125125

@@ -145,4 +145,44 @@ $consumer->setOffset(123);
145145
$message = $consumer->receive(2000);
146146
```
147147

148-
[back to index](index.md)
148+
## Usage with Symfony bundle
149+
150+
Set your enqueue to use rdkafka as your transport
151+
152+
```yaml
153+
# app/config/config.yml
154+
155+
enqueue:
156+
default:
157+
transport: "rdkafka:"
158+
client: ~
159+
```
160+
161+
You can also you extended configuration to pass additional options, if you don't want to pass them via DSN string or
162+
need to pass specific options. Since rdkafka uses librdkafka (being basically a wrapper around it) most configuration
163+
options are identical to those found at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
164+
165+
```yaml
166+
# app/config/config.yml
167+
168+
enqueue:
169+
default:
170+
transport:
171+
dsn: "rdkafka://"
172+
global:
173+
### Make sure this is unique for each application / consumer group and does not change
174+
### Otherwise, Kafka won't be able to track your last offset and will always start according to
175+
### `auto.offset.reset` setting.
176+
### See Kafka documentation regarding `group.id` property if you want to know more
177+
group.id: 'foo-app'
178+
metadata.broker.list: 'example.com:1000'
179+
topic:
180+
auto.offset.reset: beginning
181+
### Commit async is true by default since version 0.9.9.
182+
### It is suggested to set it to true in earlier versions since otherwise consumers become extremely slow,
183+
### waiting for offset to be stored on Kafka before continuing.
184+
commit_async: true
185+
client: ~
186+
```
187+
188+
[back to index](index.md)

pkg/rdkafka/RdKafkaConsumer.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, Rd
5151
$this->context = $context;
5252
$this->topic = $topic;
5353
$this->subscribed = false;
54-
$this->commitAsync = false;
54+
$this->commitAsync = true;
5555

5656
$this->setSerializer($serializer);
5757
}

0 commit comments

Comments
 (0)