Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,12 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul
void ClientImpl::handleProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
CreateProducerCallback callback, ProducerImplBasePtr producer) {
if (result == ResultOk) {
auto pair = producers_.emplace(producer.get(), producer);
if (!pair.second) {
auto existingProducer = pair.first->second.lock();
auto address = producer.get();
auto existingProducer = producers_.create(address, producer);
if (existingProducer) {
auto producer = existingProducer.value().lock();
LOG_ERROR("Unexpected existing producer at the same address: "
<< pair.first->first << ", producer: "
<< (existingProducer ? existingProducer->getProducerName() : "(null)"));
<< address << ", producer: " << (producer ? producer->getProducerName() : "(null)"));
callback(ResultUnknownError, {});
return;
}
Expand Down Expand Up @@ -311,12 +311,12 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat
reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
auto consumer = weakConsumerPtr.lock();
if (consumer) {
auto pair = consumers_.emplace(consumer.get(), consumer);
if (!pair.second) {
auto existingConsumer = pair.first->second.lock();
auto address = consumer.get();
auto existingConsumer = consumers_.create(address, consumer);
if (existingConsumer) {
consumer = existingConsumer.value().lock();
LOG_ERROR("Unexpected existing consumer at the same address: "
<< pair.first->first
<< ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
<< address << ", consumer: " << (consumer ? consumer->getName() : "(null)"));
}
} else {
LOG_ERROR("Unexpected case: the consumer is somehow expired");
Expand Down Expand Up @@ -512,12 +512,12 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
SubscribeCallback callback, ConsumerImplBasePtr consumer) {
if (result == ResultOk) {
auto pair = consumers_.emplace(consumer.get(), consumer);
if (!pair.second) {
auto existingConsumer = pair.first->second.lock();
auto address = consumer.get();
auto existingConsumer = consumers_.create(address, consumer);
if (existingConsumer) {
auto consumer = existingConsumer.value().lock();
LOG_ERROR("Unexpected existing consumer at the same address: "
<< pair.first->first
<< ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
<< address << ", consumer: " << (consumer ? consumer->getName() : "(null)"));
callback(ResultUnknownError, {});
return;
}
Expand Down
4 changes: 2 additions & 2 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
return;
}
if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) {
possibleSendToDeadLetterTopicMessages_.emplace(m.getMessageId(), std::vector<Message>{m});
possibleSendToDeadLetterTopicMessages_.update(m.getMessageId(), std::vector<Message>{m});
if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) {
redeliverUnacknowledgedMessages({m.getMessageId()});
increaseAvailablePermits(cnx);
Expand Down Expand Up @@ -786,7 +786,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
}

if (!possibleToDeadLetter.empty()) {
possibleSendToDeadLetterTopicMessages_.emplace(batchedMessage.getMessageId(), possibleToDeadLetter);
possibleSendToDeadLetterTopicMessages_.update(batchedMessage.getMessageId(), possibleToDeadLetter);
if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) {
redeliverUnacknowledgedMessages({batchedMessage.getMessageId()});
}
Expand Down
6 changes: 3 additions & 3 deletions lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
consumer->getConsumerCreatedFuture().addListener(std::bind(
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
consumers_.emplace(topicName->toString(), consumer);
consumers_.update(topicName->toString(), consumer);
LOG_DEBUG("Creating Consumer for - " << topicName << " - " << consumerStr_);
consumer->start();

Expand All @@ -287,7 +287,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
consumer->setPartitionIndex(i);
consumers_.emplace(topicPartitionName, consumer);
consumers_.update(topicPartitionName, consumer);
LOG_DEBUG("Creating Consumer for - " << topicPartitionName << " - " << consumerStr_);
consumer->start();
}
Expand Down Expand Up @@ -1063,7 +1063,7 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
});
consumer->setPartitionIndex(partitionIndex);
consumer->start();
consumers_.emplace(topicPartitionName, consumer);
consumers_.update(topicPartitionName, consumer);
LOG_INFO("Add Creating Consumer for - " << topicPartitionName << " - " << consumerStr_
<< " consumerSize: " << consumers_.size());
}
Expand Down
18 changes: 15 additions & 3 deletions lib/SynchronizedHashMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,22 @@ class SynchronizedHashMap {
}
}

template <typename... Args>
std::pair<Iterator, bool> emplace(Args&&... args) {
// Create a new key-value pair if the key does not exist.
// Return boost::none if the key already exists or the existing value.
OptValue create(const K& key, const V& value) {
Lock lock(mutex_);
return data_.emplace(std::forward<Args>(args)...);
auto pair = data_.emplace(key, value);
if (pair.second) {
return boost::none;
} else {
return pair.first->second;
}
}

// Update the key with a new value no matter if the key exists.
void update(const K& key, const V& value) {
Lock lock(mutex_);
data_[key] = value;
}

void forEach(std::function<void(const K&, const V&)> f) const {
Expand Down
4 changes: 2 additions & 2 deletions lib/TableViewImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void TableViewImpl::handleMessage(const Message& msg) {
if (msg.getLength() == 0) {
data_.remove(msg.getPartitionKey());
} else {
data_.emplace(msg.getPartitionKey(), value);
data_.update(msg.getPartitionKey(), value);
}

Lock lock(listenersMutex_);
Expand Down Expand Up @@ -167,4 +167,4 @@ void TableViewImpl::readTailMessage() {
});
}

} // namespace pulsar
} // namespace pulsar
16 changes: 13 additions & 3 deletions tests/SynchronizedHashMapTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <algorithm>
#include <atomic>
#include <boost/optional/optional_io.hpp>
#include <chrono>
#include <thread>
#include <vector>
Expand Down Expand Up @@ -100,18 +101,27 @@ TEST(SynchronizedHashMapTest, testForEach) {
ASSERT_TRUE(values.empty());
ASSERT_EQ(result, 1);

m.emplace(1, 100);
ASSERT_EQ(m.create(1, 100), boost::none);
ASSERT_EQ(m.create(1, 101), boost::optional<int>(100));
m.forEachValue([&values](int value, SharedFuture) { values.emplace_back(value); },
[&result] { result = 2; });
ASSERT_EQ(values, (std::vector<int>({100})));
ASSERT_EQ(result, 1);

m.update(1, 102);
values.clear();
m.emplace(2, 200);
m.forEachValue([&values](int value, SharedFuture) { values.emplace_back(value); },
[&result] { result = 2; });
ASSERT_EQ(values, (std::vector<int>({102})));
ASSERT_EQ(result, 1);

values.clear();
ASSERT_EQ(m.create(2, 200), boost::none);
ASSERT_EQ(m.create(2, 201), boost::optional<int>(200));
m.forEachValue([&values](int value, SharedFuture) { values.emplace_back(value); },
[&result] { result = 2; });
std::sort(values.begin(), values.end());
ASSERT_EQ(values, (std::vector<int>({100, 200})));
ASSERT_EQ(values, (std::vector<int>({102, 200})));
ASSERT_EQ(result, 1);
}

Expand Down
14 changes: 13 additions & 1 deletion tests/TableViewTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,22 @@ TEST(TableViewTest, testSimpleTableView) {

// assert interfaces.
std::string value;
ASSERT_TRUE(tableView.containsKey("key1"));
ASSERT_TRUE(tableView.getValue("key1", value));
ASSERT_EQ(value, "value1");

// Test value update
ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1-update").build()));
ASSERT_TRUE(waitUntil(std::chrono::seconds(2), [&tableView]() {
std::string value;
tableView.getValue("key1", value);
return value == "value1-update";
}));

// retrieveValue will remove the key/value from the table view.
ASSERT_TRUE(tableView.retrieveValue("key1", value));
ASSERT_EQ(value, "value1");
ASSERT_EQ(value, "value1-update");
ASSERT_FALSE(tableView.containsKey("key1"));
ASSERT_EQ(tableView.snapshot().size(), count * 2 - 1);
ASSERT_EQ(tableView.size(), 0);
Expand Down
4 changes: 2 additions & 2 deletions tests/extensibleLM/ExtensibleLoadManagerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
ASSERT_EQ(sendResult, ResultOk);
ASSERT_TRUE(elapsed < maxWaitTimeMs);

producedMsgs.emplace(i, i);
producedMsgs.update(i, i);
i++;
}
LOG_INFO("producer finished");
Expand All @@ -143,7 +143,7 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
LOG_INFO("acked i:" << i << " " << elapsed << " ms");
ASSERT_TRUE(elapsed < maxWaitTimeMs);
ASSERT_EQ(ackResult, ResultOk);
consumedMsgs.emplace(i, i);
consumedMsgs.update(i, i);
}
LOG_INFO("consumer finished");
};
Expand Down