Skip to content

Commit ee909be

Browse files
committed
Use MessageBatch to de-batch payload
1 parent 832feda commit ee909be

File tree

1 file changed

+4
-10
lines changed

1 file changed

+4
-10
lines changed

tests/EncryptionTest.cc

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
#include <gtest/gtest.h>
2020
#include <pulsar/Client.h>
2121
#include <pulsar/ConsumerCryptoFailureAction.h>
22+
#include <pulsar/MessageBatch.h>
2223

2324
#include <optional>
2425
#include <stdexcept>
2526

26-
#include "PulsarApi.pb.h"
2727
#include "lib/CompressionCodec.h"
2828
#include "lib/MessageCrypto.h"
2929
#include "lib/SharedBuffer.h"
@@ -61,15 +61,9 @@ static std::vector<std::string> decryptValue(const char* data, size_t length,
6161

6262
std::vector<std::string> values;
6363
if (auto batchSize = context.value()->batchSize(); batchSize > 0) {
64-
for (decltype(batchSize) i = 0; i < batchSize; i++) {
65-
auto singleMetaSize = uncompressedPayload.readUnsignedInt();
66-
proto::SingleMessageMetadata singleMeta;
67-
singleMeta.ParseFromArray(uncompressedPayload.data(), singleMetaSize);
68-
uncompressedPayload.consume(singleMetaSize);
69-
70-
auto payload = uncompressedPayload.slice(0, singleMeta.payload_size());
71-
uncompressedPayload.consume(payload.readableBytes());
72-
values.emplace_back(payload.data(), payload.readableBytes());
64+
MessageBatch batch;
65+
for (auto&& msg : batch.parseFrom(uncompressedPayload, batchSize).messages()) {
66+
values.emplace_back(msg.getDataAsString());
7367
}
7468
} else {
7569
// non-batched message

0 commit comments

Comments
 (0)