Skip to content

Commit f82d8c0

Browse files
authored
Merge pull request #188 from php-enqueue/kafka-serializer
[rdkafka] Add abilito change the way a message is serialized.
2 parents 44a0d77 + 28b8d30 commit f82d8c0

12 files changed

+368
-39
lines changed

Diff for: docs/transport/kafka.md

+24
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ The transport uses [Kafka](https://kafka.apache.org/) streaming platform as a MQ
77
* [Send message to topic](#send-message-to-topic)
88
* [Send message to queue](#send-message-to-queue)
99
* [Consume message](#consume-message)
10+
* [Serialize message](#serialize-message)
1011

1112
## Installation
1213

@@ -88,4 +89,27 @@ $consumer->acknowledge($message);
8889
// $consumer->reject($message);
8990
```
9091

92+
## Serialize message
93+
94+
By default the transport serializes messages to json format but you might want to use another format such as [Apache Avro](https://avro.apache.org/docs/1.2.0/).
95+
For that you have to implement Serializer interface and set it to the context, producer or consumer.
96+
If a serializer set to context it will be injected to all consumers and producers created by the context.
97+
98+
```php
99+
<?php
100+
use Enqueue\RdKafka\Serializer;
101+
use Enqueue\RdKafka\RdKafkaMessage;
102+
103+
class FooSerializer implements Serializer
104+
{
105+
public function toMessage($string) {}
106+
107+
public function toString(RdKafkaMessage $message) {}
108+
}
109+
110+
/** @var \Enqueue\RdKafka\RdKafkaContext $psrContext */
111+
112+
$psrContext->setSerializer(new FooSerializer());
113+
```
114+
91115
[back to index](index.md)

Diff for: pkg/rdkafka/JsonSerializer.php

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
<?php
2+
3+
namespace Enqueue\RdKafka;
4+
5+
class JsonSerializer implements Serializer
6+
{
7+
/**
8+
* {@inheritdoc}
9+
*/
10+
public function toString(RdKafkaMessage $message)
11+
{
12+
$json = json_encode([
13+
'body' => $message->getBody(),
14+
'properties' => $message->getProperties(),
15+
'headers' => $message->getHeaders(),
16+
]);
17+
18+
if (JSON_ERROR_NONE !== json_last_error()) {
19+
throw new \InvalidArgumentException(sprintf(
20+
'The malformed json given. Error %s and message %s',
21+
json_last_error(),
22+
json_last_error_msg()
23+
));
24+
}
25+
26+
return $json;
27+
}
28+
29+
/**
30+
* {@inheritdoc}
31+
*/
32+
public function toMessage($string)
33+
{
34+
$data = json_decode($string, true);
35+
if (JSON_ERROR_NONE !== json_last_error()) {
36+
throw new \InvalidArgumentException(sprintf(
37+
'The malformed json given. Error %s and message %s',
38+
json_last_error(),
39+
json_last_error_msg()
40+
));
41+
}
42+
43+
return new RdKafkaMessage($data['body'], $data['properties'], $data['headers']);
44+
}
45+
}

Diff for: pkg/rdkafka/RdKafkaConsumer.php

+7-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
class RdKafkaConsumer implements PsrConsumer
1111
{
12+
use SerializerAwareTrait;
13+
1214
/**
1315
* @var KafkaConsumer
1416
*/
@@ -38,14 +40,17 @@ class RdKafkaConsumer implements PsrConsumer
3840
* @param KafkaConsumer $consumer
3941
* @param RdKafkaContext $context
4042
* @param RdKafkaTopic $topic
43+
* @param Serializer $serializer
4144
*/
42-
public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, RdKafkaTopic $topic)
45+
public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, RdKafkaTopic $topic, Serializer $serializer)
4346
{
4447
$this->consumer = $consumer;
4548
$this->context = $context;
4649
$this->topic = $topic;
4750
$this->subscribed = false;
4851
$this->commitAsync = false;
52+
53+
$this->setSerializer($serializer);
4954
}
5055

5156
/**
@@ -151,7 +156,7 @@ private function doReceive($timeout)
151156
case RD_KAFKA_RESP_ERR__TIMED_OUT:
152157
break;
153158
case RD_KAFKA_RESP_ERR_NO_ERROR:
154-
$message = RdKafkaMessage::jsonUnserialize($kafkaMessage->payload);
159+
$message = $this->serializer->toMessage($kafkaMessage->payload);
155160
$message->setKey($kafkaMessage->key);
156161
$message->setPartition($kafkaMessage->partition);
157162
$message->setKafkaMessage($kafkaMessage);

Diff for: pkg/rdkafka/RdKafkaContext.php

+11-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212

1313
class RdKafkaContext implements PsrContext
1414
{
15+
use SerializerAwareTrait;
16+
1517
/**
1618
* @var array
1719
*/
@@ -33,6 +35,8 @@ class RdKafkaContext implements PsrContext
3335
public function __construct(array $config)
3436
{
3537
$this->config = $config;
38+
39+
$this->setSerializer(new JsonSerializer());
3640
}
3741

3842
/**
@@ -78,7 +82,7 @@ public function createTemporaryQueue()
7882
*/
7983
public function createProducer()
8084
{
81-
return new RdKafkaProducer($this->getProducer());
85+
return new RdKafkaProducer($this->getProducer(), $this->getSerializer());
8286
}
8387

8488
/**
@@ -90,7 +94,12 @@ public function createConsumer(PsrDestination $destination)
9094
{
9195
InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class);
9296

93-
$consumer = new RdKafkaConsumer(new KafkaConsumer($this->getConf()), $this, $destination);
97+
$consumer = new RdKafkaConsumer(
98+
new KafkaConsumer($this->getConf()),
99+
$this,
100+
$destination,
101+
$this->getSerializer()
102+
);
94103

95104
if (isset($this->config['commit_async'])) {
96105
$consumer->setCommitAsync($this->config['commit_async']);

Diff for: pkg/rdkafka/RdKafkaMessage.php

+6-15
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
use Interop\Queue\PsrMessage;
66
use RdKafka\Message;
77

8+
/**
9+
* TODO: \JsonSerializable will be removed in next version (probably 0.8.x)
10+
* The serialization logic was moved to JsonSerializer.
11+
*/
812
class RdKafkaMessage implements PsrMessage, \JsonSerializable
913
{
1014
/**
@@ -270,11 +274,7 @@ public function setKafkaMessage(Message $message)
270274
*/
271275
public function jsonSerialize()
272276
{
273-
return [
274-
'body' => $this->getBody(),
275-
'properties' => $this->getProperties(),
276-
'headers' => $this->getHeaders(),
277-
];
277+
return (new JsonSerializer())->toString($this);
278278
}
279279

280280
/**
@@ -284,15 +284,6 @@ public function jsonSerialize()
284284
*/
285285
public static function jsonUnserialize($json)
286286
{
287-
$data = json_decode($json, true);
288-
if (JSON_ERROR_NONE !== json_last_error()) {
289-
throw new \InvalidArgumentException(sprintf(
290-
'The malformed json given. Error %s and message %s',
291-
json_last_error(),
292-
json_last_error_msg()
293-
));
294-
}
295-
296-
return new self($data['body'], $data['properties'], $data['headers']);
287+
return (new JsonSerializer())->toMessage($json);
297288
}
298289
}

Diff for: pkg/rdkafka/RdKafkaProducer.php

+8-3
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,22 @@
1111

1212
class RdKafkaProducer implements PsrProducer
1313
{
14+
use SerializerAwareTrait;
15+
1416
/**
1517
* @var Producer
1618
*/
1719
private $producer;
1820

1921
/**
20-
* @param Producer $producer
22+
* @param Producer $producer
23+
* @param Serializer $serializer
2124
*/
22-
public function __construct(Producer $producer)
25+
public function __construct(Producer $producer, Serializer $serializer)
2326
{
2427
$this->producer = $producer;
28+
29+
$this->setSerializer($serializer);
2530
}
2631

2732
/**
@@ -37,7 +42,7 @@ public function send(PsrDestination $destination, PsrMessage $message)
3742

3843
$partition = $message->getPartition() ?: $destination->getPartition() ?: RD_KAFKA_PARTITION_UA;
3944
$key = $message->getKey() ?: $destination->getKey() ?: null;
40-
$payload = json_encode($message);
45+
$payload = $this->serializer->toString($message);
4146

4247
$topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf());
4348
$topic->produce($partition, 0 /* must be 0 */, $payload, $key);

Diff for: pkg/rdkafka/Serializer.php

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<?php
2+
3+
namespace Enqueue\RdKafka;
4+
5+
interface Serializer
6+
{
7+
/**
8+
* @param RdKafkaMessage $message
9+
*
10+
* @return string
11+
*/
12+
public function toString(RdKafkaMessage $message);
13+
14+
/**
15+
* @param string $string
16+
*
17+
* @return RdKafkaMessage
18+
*/
19+
public function toMessage($string);
20+
}

Diff for: pkg/rdkafka/SerializerAwareTrait.php

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
namespace Enqueue\RdKafka;
4+
5+
trait SerializerAwareTrait
6+
{
7+
/**
8+
* @var Serializer
9+
*/
10+
private $serializer;
11+
12+
/**
13+
* @param Serializer $serializer
14+
*/
15+
public function setSerializer(Serializer $serializer)
16+
{
17+
$this->serializer = $serializer;
18+
}
19+
20+
/**
21+
* @return Serializer
22+
*/
23+
public function getSerializer()
24+
{
25+
return $this->serializer;
26+
}
27+
}

Diff for: pkg/rdkafka/Tests/JsonSerializerTest.php

+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
<?php
2+
3+
namespace Enqueue\RdKafka\Tests;
4+
5+
use Enqueue\RdKafka\JsonSerializer;
6+
use Enqueue\RdKafka\RdKafkaMessage;
7+
use Enqueue\RdKafka\Serializer;
8+
use Enqueue\Test\ClassExtensionTrait;
9+
use PHPUnit\Framework\TestCase;
10+
11+
class JsonSerializerTest extends TestCase
12+
{
13+
use ClassExtensionTrait;
14+
15+
public function testShouldImplementSerializerInterface()
16+
{
17+
$this->assertClassImplements(Serializer::class, JsonSerializer::class);
18+
}
19+
20+
public function testCouldBeConstructedWithoutAnyArguments()
21+
{
22+
new JsonSerializer();
23+
}
24+
25+
public function testShouldConvertMessageToJsonString()
26+
{
27+
$serializer = new JsonSerializer();
28+
29+
$message = new RdKafkaMessage('theBody', ['aProp' => 'aPropVal'], ['aHeader' => 'aHeaderVal']);
30+
31+
$json = $serializer->toString($message);
32+
33+
$this->assertSame('{"body":"theBody","properties":{"aProp":"aPropVal"},"headers":{"aHeader":"aHeaderVal"}}', $json);
34+
}
35+
36+
public function testThrowIfFailedToEncodeMessageToJson()
37+
{
38+
$serializer = new JsonSerializer();
39+
40+
$resource = fopen(__FILE__, 'r');
41+
42+
//guard
43+
$this->assertInternalType('resource', $resource);
44+
45+
$message = new RdKafkaMessage('theBody', ['aProp' => $resource]);
46+
47+
$this->expectException(\LogicException::class);
48+
$this->expectExceptionMessage('The malformed json given.');
49+
$serializer->toString($message);
50+
}
51+
52+
public function testShouldConvertJsonStringToMessage()
53+
{
54+
$serializer = new JsonSerializer();
55+
56+
$message = $serializer->toMessage('{"body":"theBody","properties":{"aProp":"aPropVal"},"headers":{"aHeader":"aHeaderVal"}}');
57+
58+
$this->assertInstanceOf(RdKafkaMessage::class, $message);
59+
60+
$this->assertSame('theBody', $message->getBody());
61+
$this->assertSame(['aProp' => 'aPropVal'], $message->getProperties());
62+
$this->assertSame(['aHeader' => 'aHeaderVal'], $message->getHeaders());
63+
}
64+
65+
public function testThrowIfFailedToDecodeJsonToMessage()
66+
{
67+
$serializer = new JsonSerializer();
68+
69+
$this->expectException(\LogicException::class);
70+
$this->expectExceptionMessage('The malformed json given.');
71+
$serializer->toMessage('{]');
72+
}
73+
}

0 commit comments

Comments
 (0)