Skip to content

Commit de70d42

Browse files
Make on_xxx_matched thread-safe (#6371)
* Refs #21355. Added regression test. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> * Refs #21355. Improve readability in EDP.cpp. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> * Refs #21355. Protect `InnerDataWriterListener::on_writer_matched`. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> * Refs #21355. Protect `InnerDataReaderListener::on_reader_matched`. Signed-off-by: Miguel Company <miguelcompany@eprosima.com> --------- Signed-off-by: Miguel Company <miguelcompany@eprosima.com>
1 parent fed20e6 commit de70d42

6 files changed

Lines changed: 187 additions & 56 deletions

File tree

src/cpp/fastdds/publisher/DataWriterImpl.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1348,6 +1348,8 @@ void DataWriterImpl::InnerDataWriterListener::on_writer_matched(
13481348
RTPSWriter* /*writer*/,
13491349
const MatchingInfo& info)
13501350
{
1351+
std::lock_guard<std::mutex> scoped_lock(matching_info_mutex_);
1352+
13511353
data_writer_->update_publication_matched_status(info);
13521354

13531355
StatusMask notify_status = StatusMask::publication_matched();

src/cpp/fastdds/publisher/DataWriterImpl.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,10 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
531531
const uint32_t& status_id);
532532
#endif //FASTDDS_STATISTICS
533533

534+
private:
535+
534536
DataWriterImpl* data_writer_;
537+
std::mutex matching_info_mutex_;
535538
}
536539
writer_listener_;
537540

src/cpp/fastdds/subscriber/DataReaderImpl.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -941,6 +941,8 @@ void DataReaderImpl::InnerDataReaderListener::on_reader_matched(
941941
RTPSReader* /*reader*/,
942942
const MatchingInfo& info)
943943
{
944+
std::lock_guard<std::mutex> scoped_lock(matching_info_mutex_);
945+
944946
data_reader_->update_subscription_matched_status(info);
945947

946948
StatusMask notify_status = StatusMask::subscription_matched();

src/cpp/fastdds/subscriber/DataReaderImpl.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,10 @@ class DataReaderImpl
511511
const uint32_t& status_id);
512512
#endif //FASTDDS_STATISTICS
513513

514+
private:
515+
514516
DataReaderImpl* data_reader_;
517+
std::mutex matching_info_mutex_;
515518

516519
}
517520
reader_listener_;

src/cpp/rtps/builtin/discovery/endpoint/EDP.cpp

Lines changed: 58 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -801,8 +801,8 @@ bool EDP::unpairWriterProxy(
801801
#endif // if HAVE_SECURITY
802802

803803
//MATCHED AND ADDED CORRECTLY:
804-
ReaderListener* listener = nullptr;
805-
if (nullptr != (listener = r.get_listener()))
804+
ReaderListener* listener = r.get_listener();
805+
if (nullptr != listener)
806806
{
807807
MatchingInfo info;
808808
info.status = REMOVED_MATCHING;
@@ -836,8 +836,8 @@ bool EDP::unpairReaderProxy(
836836
participant_guid, reader_guid);
837837
#endif // if HAVE_SECURITY
838838
//MATCHED AND ADDED CORRECTLY:
839-
WriterListener* listener = nullptr;
840-
if (nullptr != (listener = w.get_listener()))
839+
WriterListener* listener = w.get_listener();
840+
if (nullptr != listener)
841841
{
842842
MatchingInfo info;
843843
info.status = REMOVED_MATCHING;
@@ -1167,40 +1167,39 @@ bool EDP::pairingReader(
11671167
"WP:" << wdatait->guid << " match R:" << reader_guid << ". RLoc:"
11681168
<< wdatait->remote_locators);
11691169
//MATCHED AND ADDED CORRECTLY:
1170-
if (reader->get_listener() != nullptr)
1170+
ReaderListener* listener = reader->get_listener();
1171+
if (nullptr != listener)
11711172
{
11721173
MatchingInfo info;
11731174
info.status = MATCHED_MATCHING;
11741175
info.remoteEndpointGuid = writer_guid;
1175-
reader->get_listener()->on_reader_matched(reader, info);
1176+
listener->on_reader_matched(reader, info);
11761177
}
11771178
}
11781179
#endif // if HAVE_SECURITY
11791180
}
11801181
else
11811182
{
1182-
if (no_match_reason.test(MatchingFailureMask::incompatible_qos) && reader->get_listener() != nullptr)
1183+
ReaderListener* listener = reader->get_listener();
1184+
if (no_match_reason.test(MatchingFailureMask::incompatible_qos) && (nullptr != listener))
11831185
{
1184-
reader->get_listener()->on_requested_incompatible_qos(reader, incompatible_qos);
1186+
listener->on_requested_incompatible_qos(reader, incompatible_qos);
11851187
mp_PDP->notify_incompatible_qos_matching(R->getGuid(), wdatait->guid, incompatible_qos);
11861188
}
11871189

1188-
//EPROSIMA_LOG_INFO(RTPS_EDP,RTPS_CYAN<<"Valid Matching to writerProxy: "<<wdatait->guid<<RTPS_DEF<<endl);
1189-
if (reader->matched_writer_is_matched(wdatait->guid)
1190-
&& reader->matched_writer_remove(wdatait->guid))
1190+
if (reader->matched_writer_is_matched(wdatait->guid) && reader->matched_writer_remove(wdatait->guid))
11911191
{
11921192
#if HAVE_SECURITY
1193-
mp_RTPSParticipant->security_manager().remove_writer(reader_guid, participant_guid,
1194-
wdatait->guid);
1193+
mp_RTPSParticipant->security_manager().remove_writer(reader_guid, participant_guid, wdatait->guid);
11951194
#endif // if HAVE_SECURITY
11961195

11971196
//MATCHED AND ADDED CORRECTLY:
1198-
if (reader->get_listener() != nullptr)
1197+
if (nullptr != listener)
11991198
{
12001199
MatchingInfo info;
12011200
info.status = REMOVED_MATCHING;
12021201
info.remoteEndpointGuid = writer_guid;
1203-
reader->get_listener()->on_reader_matched(reader, info);
1202+
listener->on_reader_matched(reader, info);
12041203
}
12051204
}
12061205
}
@@ -1261,21 +1260,23 @@ bool EDP::pairingWriter(
12611260
"RP:" << rdatait->guid << " match W:" << writer_guid << ". WLoc:"
12621261
<< rdatait->remote_locators);
12631262
//MATCHED AND ADDED CORRECTLY:
1264-
if (writer->get_listener() != nullptr)
1263+
WriterListener* listener = writer->get_listener();
1264+
if (nullptr != listener)
12651265
{
12661266
MatchingInfo info;
12671267
info.status = MATCHED_MATCHING;
12681268
info.remoteEndpointGuid = reader_guid;
1269-
writer->get_listener()->on_writer_matched(writer, info);
1269+
listener->on_writer_matched(writer, info);
12701270
}
12711271
}
12721272
#endif // if HAVE_SECURITY
12731273
}
12741274
else
12751275
{
1276-
if (no_match_reason.test(MatchingFailureMask::incompatible_qos) && writer->get_listener() != nullptr)
1276+
WriterListener* listener = writer->get_listener();
1277+
if (no_match_reason.test(MatchingFailureMask::incompatible_qos) && (nullptr != listener))
12771278
{
1278-
writer->get_listener()->on_offered_incompatible_qos(writer, incompatible_qos);
1279+
listener->on_offered_incompatible_qos(writer, incompatible_qos);
12791280
mp_PDP->notify_incompatible_qos_matching(W->getGuid(), rdatait->guid, incompatible_qos);
12801281
}
12811282

@@ -1286,12 +1287,12 @@ bool EDP::pairingWriter(
12861287
mp_RTPSParticipant->security_manager().remove_reader(writer_guid, participant_guid, reader_guid);
12871288
#endif // if HAVE_SECURITY
12881289
//MATCHED AND ADDED CORRECTLY:
1289-
if (writer->get_listener() != nullptr)
1290+
if (nullptr != listener)
12901291
{
12911292
MatchingInfo info;
12921293
info.status = REMOVED_MATCHING;
12931294
info.remoteEndpointGuid = reader_guid;
1294-
writer->get_listener()->on_writer_matched(writer, info);
1295+
listener->on_writer_matched(writer, info);
12951296
}
12961297
}
12971298
}
@@ -1338,38 +1339,39 @@ bool EDP::pairing_reader_proxy_with_any_local_writer(
13381339
"RP:" << rdata->guid << " match W:" << w.getGuid() << ". RLoc:"
13391340
<< rdata->remote_locators);
13401341
//MATCHED AND ADDED CORRECTLY:
1341-
if (w.get_listener() != nullptr)
1342+
WriterListener* listener = w.get_listener();
1343+
if (nullptr != listener)
13421344
{
13431345
MatchingInfo info;
13441346
info.status = MATCHED_MATCHING;
13451347
info.remoteEndpointGuid = reader_guid;
1346-
w.get_listener()->on_writer_matched(&w, info);
1348+
listener->on_writer_matched(&w, info);
13471349
}
13481350
}
13491351
#endif // if HAVE_SECURITY
13501352
}
13511353
else
13521354
{
1353-
if (no_match_reason.test(MatchingFailureMask::incompatible_qos) && w.get_listener() != nullptr)
1355+
WriterListener* listener = w.get_listener();
1356+
if (no_match_reason.test(MatchingFailureMask::incompatible_qos) && (nullptr != listener))
13541357
{
1355-
w.get_listener()->on_offered_incompatible_qos(&w, incompatible_qos);
1358+
listener->on_offered_incompatible_qos(&w, incompatible_qos);
13561359
mp_PDP->notify_incompatible_qos_matching(w.getGuid(), rdata->guid, incompatible_qos);
13571360
}
13581361

1359-
if (w.matched_reader_is_matched(reader_guid)
1360-
&& w.matched_reader_remove(reader_guid))
1362+
if (w.matched_reader_is_matched(reader_guid) && w.matched_reader_remove(reader_guid))
13611363
{
13621364
#if HAVE_SECURITY
13631365
mp_RTPSParticipant->security_manager().remove_reader(
13641366
w.getGuid(), participant_guid, reader_guid);
13651367
#endif // if HAVE_SECURITY
13661368
//MATCHED AND ADDED CORRECTLY:
1367-
if (w.get_listener() != nullptr)
1369+
if (nullptr != listener)
13681370
{
13691371
MatchingInfo info;
13701372
info.status = REMOVED_MATCHING;
13711373
info.remoteEndpointGuid = reader_guid;
1372-
w.get_listener()->on_writer_matched(&w, info);
1374+
listener->on_writer_matched(&w, info);
13731375
}
13741376
}
13751377
}
@@ -1419,25 +1421,24 @@ bool EDP::pairing_reader_proxy_with_local_writer(
14191421
}
14201422
else
14211423
{
1422-
if (no_match_reason.test(MatchingFailureMask::incompatible_qos) &&
1423-
w.get_listener() != nullptr)
1424+
WriterListener* listener = w.get_listener();
1425+
if (no_match_reason.test(MatchingFailureMask::incompatible_qos) && (nullptr != listener))
14241426
{
1425-
w.get_listener()->on_offered_incompatible_qos(&w, incompatible_qos);
1427+
listener->on_offered_incompatible_qos(&w, incompatible_qos);
14261428
mp_PDP->notify_incompatible_qos_matching(local_writer, rdata.guid, incompatible_qos);
14271429
}
14281430

1429-
if (w.matched_reader_is_matched(reader_guid)
1430-
&& w.matched_reader_remove(reader_guid))
1431+
if (w.matched_reader_is_matched(reader_guid) && w.matched_reader_remove(reader_guid))
14311432
{
14321433
mp_RTPSParticipant->security_manager().remove_reader(w.getGuid(),
14331434
remote_participant_guid, reader_guid);
14341435
//MATCHED AND ADDED CORRECTLY:
1435-
if (w.get_listener() != nullptr)
1436+
if (nullptr != listener)
14361437
{
14371438
MatchingInfo info;
14381439
info.status = REMOVED_MATCHING;
14391440
info.remoteEndpointGuid = reader_guid;
1440-
w.get_listener()->on_writer_matched(&w, info);
1441+
listener->on_writer_matched(&w, info);
14411442
}
14421443
}
14431444
}
@@ -1474,12 +1475,13 @@ bool EDP::pairing_remote_reader_with_local_writer_after_security(
14741475
matched = true;
14751476

14761477
//MATCHED AND ADDED CORRECTLY:
1477-
if (w.get_listener() != nullptr)
1478+
WriterListener* listener = w.get_listener();
1479+
if (nullptr != listener)
14781480
{
14791481
MatchingInfo info;
14801482
info.status = MATCHED_MATCHING;
14811483
info.remoteEndpointGuid = reader_guid;
1482-
w.get_listener()->on_writer_matched(&w, info);
1484+
listener->on_writer_matched(&w, info);
14831485
}
14841486
}
14851487
// don't look anymore
@@ -1533,38 +1535,39 @@ bool EDP::pairing_writer_proxy_with_any_local_reader(
15331535
"WP:" << wdata->guid << " match R:" << r.getGuid() << ". WLoc:"
15341536
<< wdata->remote_locators);
15351537
//MATCHED AND ADDED CORRECTLY:
1536-
if (r.get_listener() != nullptr)
1538+
ReaderListener* listener = r.get_listener();
1539+
if (nullptr != listener)
15371540
{
15381541
MatchingInfo info;
15391542
info.status = MATCHED_MATCHING;
15401543
info.remoteEndpointGuid = writer_guid;
1541-
r.get_listener()->on_reader_matched(&r, info);
1544+
listener->on_reader_matched(&r, info);
15421545
}
15431546
}
15441547
#endif // if HAVE_SECURITY
15451548
}
15461549
else
15471550
{
1548-
if (no_match_reason.test(MatchingFailureMask::incompatible_qos) && r.get_listener() != nullptr)
1551+
ReaderListener* listener = r.get_listener();
1552+
if (no_match_reason.test(MatchingFailureMask::incompatible_qos) && (nullptr != listener))
15491553
{
1550-
r.get_listener()->on_requested_incompatible_qos(&r, incompatible_qos);
1554+
listener->on_requested_incompatible_qos(&r, incompatible_qos);
15511555
mp_PDP->notify_incompatible_qos_matching(r.getGuid(), wdata->guid, incompatible_qos);
15521556
}
15531557

1554-
if (r.matched_writer_is_matched(writer_guid)
1555-
&& r.matched_writer_remove(writer_guid))
1558+
if (r.matched_writer_is_matched(writer_guid) && r.matched_writer_remove(writer_guid))
15561559
{
15571560
#if HAVE_SECURITY
15581561
mp_RTPSParticipant->security_manager().remove_writer(readerGUID, participant_guid,
15591562
writer_guid);
15601563
#endif // if HAVE_SECURITY
15611564
//MATCHED AND ADDED CORRECTLY:
1562-
if (r.get_listener() != nullptr)
1565+
if (nullptr != listener)
15631566
{
15641567
MatchingInfo info;
15651568
info.status = REMOVED_MATCHING;
15661569
info.remoteEndpointGuid = writer_guid;
1567-
r.get_listener()->on_reader_matched(&r, info);
1570+
listener->on_reader_matched(&r, info);
15681571
}
15691572
}
15701573
}
@@ -1614,25 +1617,24 @@ bool EDP::pairing_writer_proxy_with_local_reader(
16141617
}
16151618
else
16161619
{
1617-
if (no_match_reason.test(MatchingFailureMask::incompatible_qos) &&
1618-
r.get_listener() != nullptr)
1620+
ReaderListener* listener = r.get_listener();
1621+
if (no_match_reason.test(MatchingFailureMask::incompatible_qos) && (nullptr != listener))
16191622
{
1620-
r.get_listener()->on_requested_incompatible_qos(&r, incompatible_qos);
1623+
listener->on_requested_incompatible_qos(&r, incompatible_qos);
16211624
mp_PDP->notify_incompatible_qos_matching(local_reader, wdata.guid, incompatible_qos);
16221625
}
16231626

1624-
if (r.matched_writer_is_matched(writer_guid)
1625-
&& r.matched_writer_remove(writer_guid))
1627+
if (r.matched_writer_is_matched(writer_guid) && r.matched_writer_remove(writer_guid))
16261628
{
16271629
mp_RTPSParticipant->security_manager().remove_writer(readerGUID,
16281630
remote_participant_guid, writer_guid);
16291631
//MATCHED AND ADDED CORRECTLY:
1630-
if (r.get_listener() != nullptr)
1632+
if (nullptr != listener)
16311633
{
16321634
MatchingInfo info;
16331635
info.status = REMOVED_MATCHING;
16341636
info.remoteEndpointGuid = writer_guid;
1635-
r.get_listener()->on_reader_matched(&r, info);
1637+
listener->on_reader_matched(&r, info);
16361638
}
16371639
}
16381640
}
@@ -1671,13 +1673,13 @@ bool EDP::pairing_remote_writer_with_local_reader_after_security(
16711673
matched = true;
16721674

16731675
//MATCHED AND ADDED CORRECTLY:
1674-
if (r.get_listener() != nullptr)
1676+
ReaderListener* listener = r.get_listener();
1677+
if (nullptr != listener)
16751678
{
16761679
MatchingInfo info;
16771680
info.status = MATCHED_MATCHING;
16781681
info.remoteEndpointGuid = writer_guid;
1679-
r.get_listener()->on_reader_matched(&r, info);
1680-
1682+
listener->on_reader_matched(&r, info);
16811683
}
16821684
}
16831685
// dont' look anymore

0 commit comments

Comments
 (0)