Skip to content

Commit c093610

Browse files
committed
Fix accessing destroyed objects in the callback of async_wait
Fixes #358 Fixes #359 ### Motivation `async_wait` is not used correctly in some places. A callback that captures the `this` pointer or reference to `this` is passed to `async_wait`, if this object is destroyed when the callback is called, an invalid memory access will happen. ### Modifications Use the following pattern in all `async_wait` calls. ```c++ std::weak_ptr<T> weakSelf{shared_from_this()}; timer_->async_wait([weakSelf](/* ... */) { if (auto self = weakSelf.lock()) { self->foo(); } }); ```
1 parent 0bbc155 commit c093610

9 files changed

+55
-28
lines changed

lib/ConsumerImpl.cc

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
8686
consumerName_(config_.getConsumerName()),
8787
consumerStr_("[" + topic + ", " + subscriptionName + ", " + std::to_string(consumerId_) + "] "),
8888
messageListenerRunning_(true),
89-
negativeAcksTracker_(client, *this, conf),
89+
negativeAcksTracker_(std::make_shared<NegativeAcksTracker>(client, *this, conf)),
9090
readCompacted_(conf.isReadCompacted()),
9191
startMessageId_(startMessageId),
9292
maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
@@ -105,6 +105,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
105105
} else {
106106
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
107107
}
108+
unAckedMessageTrackerPtr_->start();
108109

109110
// Setup stats reporter.
110111
unsigned int statsIntervalInSeconds = client->getClientConfig().getStatsIntervalInSeconds();
@@ -1228,7 +1229,7 @@ std::pair<MessageId, bool> ConsumerImpl::prepareCumulativeAck(const MessageId& m
12281229

12291230
void ConsumerImpl::negativeAcknowledge(const MessageId& messageId) {
12301231
unAckedMessageTrackerPtr_->remove(messageId);
1231-
negativeAcksTracker_.add(messageId);
1232+
negativeAcksTracker_->add(messageId);
12321233
}
12331234

12341235
void ConsumerImpl::disconnectConsumer() {
@@ -1266,7 +1267,7 @@ void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
12661267
if (ackGroupingTrackerPtr_) {
12671268
ackGroupingTrackerPtr_->close();
12681269
}
1269-
negativeAcksTracker_.close();
1270+
negativeAcksTracker_->close();
12701271

12711272
ClientConnectionPtr cnx = getCnx().lock();
12721273
if (!cnx) {
@@ -1304,7 +1305,7 @@ void ConsumerImpl::shutdown() {
13041305
if (client) {
13051306
client->cleanupConsumer(this);
13061307
}
1307-
negativeAcksTracker_.close();
1308+
negativeAcksTracker_->close();
13081309
cancelTimers();
13091310
consumerCreatedPromise_.setFailed(ResultAlreadyClosed);
13101311
failPendingReceiveCallback();
@@ -1609,7 +1610,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time
16091610
}
16101611

16111612
void ConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) {
1612-
negativeAcksTracker_.setEnabledForTesting(enabled);
1613+
negativeAcksTracker_->setEnabledForTesting(enabled);
16131614
}
16141615

16151616
void ConsumerImpl::trackMessage(const MessageId& messageId) {
@@ -1696,6 +1697,7 @@ void ConsumerImpl::cancelTimers() noexcept {
16961697
boost::system::error_code ec;
16971698
batchReceiveTimer_->cancel(ec);
16981699
checkExpiredChunkedTimer_->cancel(ec);
1700+
unAckedMessageTrackerPtr_->stop();
16991701
}
17001702

17011703
void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb) {

lib/ConsumerImpl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ class ConsumerImpl : public ConsumerImplBase {
224224
CompressionCodecProvider compressionCodecProvider_;
225225
UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
226226
BrokerConsumerStatsImpl brokerConsumerStats_;
227-
NegativeAcksTracker negativeAcksTracker_;
227+
std::shared_ptr<NegativeAcksTracker> negativeAcksTracker_;
228228
AckGroupingTrackerPtr ackGroupingTrackerPtr_;
229229

230230
MessageCryptoPtr msgCrypto_;

lib/NegativeAcksTracker.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,13 @@ void NegativeAcksTracker::scheduleTimer() {
4949
if (closed_) {
5050
return;
5151
}
52+
std::weak_ptr<NegativeAcksTracker> weakSelf{shared_from_this()};
5253
timer_->expires_from_now(timerInterval_);
53-
timer_->async_wait(std::bind(&NegativeAcksTracker::handleTimer, this, std::placeholders::_1));
54+
timer_->async_wait([weakSelf](const boost::system::error_code &ec) {
55+
if (auto self = weakSelf.lock()) {
56+
self->handleTimer(ec);
57+
}
58+
});
5459
}
5560

5661
void NegativeAcksTracker::handleTimer(const boost::system::error_code &ec) {

lib/NegativeAcksTracker.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
4040
class ExecutorService;
4141
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
4242

43-
class NegativeAcksTracker {
43+
class NegativeAcksTracker : public std::enable_shared_from_this<NegativeAcksTracker> {
4444
public:
4545
NegativeAcksTracker(ClientImplPtr client, ConsumerImpl &consumer, const ConsumerConfiguration &conf);
4646

lib/PatternMultiTopicsConsumerImpl.cc

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,13 @@ const PULSAR_REGEX_NAMESPACE::regex PatternMultiTopicsConsumerImpl::getPattern()
4747
void PatternMultiTopicsConsumerImpl::resetAutoDiscoveryTimer() {
4848
autoDiscoveryRunning_ = false;
4949
autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
50-
autoDiscoveryTimer_->async_wait(
51-
std::bind(&PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask, this, std::placeholders::_1));
50+
51+
auto weakSelf = weak_from_this();
52+
autoDiscoveryTimer_->async_wait([weakSelf](const boost::system::error_code& err) {
53+
if (auto self = weakSelf.lock()) {
54+
self->autoDiscoveryTimerTask(err);
55+
}
56+
});
5257
}
5358

5459
void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const boost::system::error_code& err) {
@@ -222,8 +227,12 @@ void PatternMultiTopicsConsumerImpl::start() {
222227

223228
if (conf_.getPatternAutoDiscoveryPeriod() > 0) {
224229
autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
225-
autoDiscoveryTimer_->async_wait(
226-
std::bind(&PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask, this, std::placeholders::_1));
230+
auto weakSelf = weak_from_this();
231+
autoDiscoveryTimer_->async_wait([weakSelf](const boost::system::error_code& err) {
232+
if (auto self = weakSelf.lock()) {
233+
self->autoDiscoveryTimerTask(err);
234+
}
235+
});
227236
}
228237
}
229238

lib/PatternMultiTopicsConsumerImpl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl {
8686
void onTopicsRemoved(NamespaceTopicsPtr removedTopics, ResultCallback callback);
8787
void handleOneTopicAdded(const Result result, const std::string& topic,
8888
std::shared_ptr<std::atomic<int>> topicsNeedCreate, ResultCallback callback);
89+
90+
std::weak_ptr<PatternMultiTopicsConsumerImpl> weak_from_this() noexcept {
91+
return std::static_pointer_cast<PatternMultiTopicsConsumerImpl>(shared_from_this());
92+
}
8993
};
9094

9195
} // namespace pulsar

lib/UnAckedMessageTrackerEnabled.cc

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
3535
ExecutorServicePtr executorService = client_->getIOExecutorProvider()->get();
3636
timer_ = executorService->createDeadlineTimer();
3737
timer_->expires_from_now(boost::posix_time::milliseconds(tickDurationInMs_));
38-
timer_->async_wait([&](const boost::system::error_code& ec) {
39-
if (ec) {
40-
LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
41-
} else {
42-
timeoutHandler();
38+
std::weak_ptr<UnAckedMessageTrackerEnabled> weakSelf{shared_from_this()};
39+
timer_->async_wait([weakSelf](const boost::system::error_code& ec) {
40+
auto self = weakSelf.lock();
41+
if (self && !ec) {
42+
self->timeoutHandler();
4343
}
4444
});
4545
}
@@ -91,10 +91,10 @@ UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, long
9191
std::set<MessageId> msgIds;
9292
timePartitions.push_back(msgIds);
9393
}
94-
95-
timeoutHandler();
9694
}
9795

96+
void UnAckedMessageTrackerEnabled::start() { timeoutHandler(); }
97+
9898
bool UnAckedMessageTrackerEnabled::add(const MessageId& msgId) {
9999
std::lock_guard<std::recursive_mutex> acquire(lock_);
100100
auto id = discardBatch(msgId);
@@ -172,9 +172,10 @@ void UnAckedMessageTrackerEnabled::clear() {
172172
}
173173
}
174174

175-
UnAckedMessageTrackerEnabled::~UnAckedMessageTrackerEnabled() {
175+
void UnAckedMessageTrackerEnabled::stop() {
176+
boost::system::error_code ec;
176177
if (timer_) {
177-
timer_->cancel();
178+
timer_->cancel(ec);
178179
}
179180
}
180181
} /* namespace pulsar */

lib/UnAckedMessageTrackerEnabled.h

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <boost/asio/deadline_timer.hpp>
2222
#include <deque>
2323
#include <map>
24+
#include <memory>
2425
#include <mutex>
2526
#include <set>
2627

@@ -34,19 +35,22 @@ class ConsumerImplBase;
3435
using ClientImplPtr = std::shared_ptr<ClientImpl>;
3536
using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
3637

37-
class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
38+
class UnAckedMessageTrackerEnabled : public std::enable_shared_from_this<UnAckedMessageTrackerEnabled>,
39+
public UnAckedMessageTrackerInterface {
3840
public:
3941
~UnAckedMessageTrackerEnabled();
4042
UnAckedMessageTrackerEnabled(long timeoutMs, ClientImplPtr, ConsumerImplBase&);
4143
UnAckedMessageTrackerEnabled(long timeoutMs, long tickDuration, ClientImplPtr, ConsumerImplBase&);
42-
bool add(const MessageId& msgId);
43-
bool remove(const MessageId& msgId);
44-
void remove(const MessageIdList& msgIds);
45-
void removeMessagesTill(const MessageId& msgId);
46-
void removeTopicMessage(const std::string& topic);
44+
void start() override;
45+
void stop() override;
46+
bool add(const MessageId& msgId) override;
47+
bool remove(const MessageId& msgId) override;
48+
void remove(const MessageIdList& msgIds) override;
49+
void removeMessagesTill(const MessageId& msgId) override;
50+
void removeTopicMessage(const std::string& topic) override;
4751
void timeoutHandler();
4852

49-
void clear();
53+
void clear() override;
5054

5155
protected:
5256
void timeoutHandlerHelper();

lib/UnAckedMessageTrackerInterface.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ class UnAckedMessageTrackerInterface {
2828
public:
2929
virtual ~UnAckedMessageTrackerInterface() {}
3030
UnAckedMessageTrackerInterface() {}
31+
virtual void start() {}
32+
virtual void stop() {}
3133
virtual bool add(const MessageId& m) = 0;
3234
virtual bool remove(const MessageId& m) = 0;
3335
virtual void remove(const MessageIdList& msgIds) = 0;

0 commit comments

Comments
 (0)