Skip to content

Commit 1b392ae

Browse files
author
Luís Venâncio
committed
Merge remote-tracking branch 'LSTS/fix/udp_filter'
2 parents f869f03 + 3811b3d commit 1b392ae

File tree

3 files changed

+52
-9
lines changed

3 files changed

+52
-9
lines changed

src/Transports/UDP/ContactTable.hpp

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,34 @@ namespace Transports
6161
list.push_back(itr->second);
6262
}
6363

64-
void
65-
update(unsigned id, const Address& addr)
64+
bool
65+
addContact(unsigned id, const Address& addr)
6666
{
6767
Table::iterator itr = m_table.find(id);
6868

69-
if (itr == m_table.end())
69+
if (itr != m_table.end())
70+
return false;
71+
72+
std::pair<Table::iterator, bool> rv = m_table.insert(Entry(id, Contact(id, addr)));
73+
if (rv.second)
7074
{
71-
std::pair<Table::iterator, bool> rv = m_table.insert(Entry(id, Contact(id, addr)));
7275
itr = rv.first;
73-
itr->second.setTimeout(m_tout);
76+
// Set invalid timeout.
77+
itr->second.setTimeout(-1);
78+
return true;
7479
}
7580

81+
return false;
82+
}
83+
84+
bool
85+
update(unsigned id, const Address& addr)
86+
{
87+
Table::iterator itr = m_table.find(id);
88+
89+
if (itr == m_table.end())
90+
return false;
91+
7692
// Address has changed... update it.
7793
if (itr->second.getAddress() != addr)
7894
{
@@ -81,6 +97,8 @@ namespace Transports
8197
}
8298

8399
itr->second.update();
100+
101+
return true;
84102
}
85103

86104
private:

src/Transports/UDP/Listener.hpp

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,11 @@ namespace Transports
5151
{
5252
public:
5353
Listener(Tasks::Task& task, UDPSocket& sock, LimitedComms* lcomms,
54-
float contact_timeout, bool trace = false):
54+
float contact_timeout, bool force = false, bool trace = false):
5555
m_task(task),
5656
m_sock(sock),
5757
m_trace(trace),
58+
force_send(force),
5859
m_contacts(contact_timeout),
5960
m_lcomms(lcomms)
6061
{ }
@@ -83,6 +84,14 @@ namespace Transports
8384
m_contacts_lock.unlock();
8485
}
8586

87+
void
88+
addContact(unsigned id)
89+
{
90+
m_contacts_lock.lockWrite();
91+
m_contacts.addContact(id, Address::Any);
92+
m_contacts_lock.unlock();
93+
}
94+
8695
private:
8796
// Buffer capacity.
8897
static const int c_bfr_size = 65535;
@@ -94,6 +103,8 @@ namespace Transports
94103
UDPSocket& m_sock;
95104
// True to print incoming messages.
96105
bool m_trace;
106+
// Flag to force sending messages.
107+
bool force_send;
97108
// Table of contacts.
98109
ContactTable m_contacts;
99110
// Lock to serialize access to m_contacts.
@@ -133,10 +144,13 @@ namespace Transports
133144
}
134145

135146
m_contacts_lock.lockWrite();
136-
m_contacts.update(msg->getSource(), addr);
147+
bool onTable = m_contacts.update(msg->getSource(), addr);
137148
m_contacts_lock.unlock();
138149

139-
m_task.dispatch(msg, DF_KEEP_TIME | DF_KEEP_SRC_EID);
150+
if (force_send)
151+
m_task.dispatch(msg, DF_KEEP_TIME | DF_KEEP_SRC_EID);
152+
else if (onTable)
153+
m_task.dispatch(msg, DF_KEEP_TIME | DF_KEEP_SRC_EID);
140154

141155
if (m_trace)
142156
DUNE_MSG(m_task.getName(), "incoming: " + std::string(msg->getName()));

src/Transports/UDP/Task.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ namespace Transports
8484
bool only_local;
8585
// Optional custom service type
8686
std::string custom_service;
87+
// Ignore the UDP source message filter
88+
bool ign_filter;
8789
};
8890

8991
// Internal buffer size.
@@ -184,6 +186,10 @@ namespace Transports
184186
.defaultValue("")
185187
.description("Optional custom service type (imc+udp+<Custom Service Type>), empty entry gives default service (imc+udp)");
186188

189+
param("Ignore Filter", m_args.ign_filter)
190+
.defaultValue("false")
191+
.description("Ignore the UDP source message filter.");
192+
187193
// Allocate space for internal buffer.
188194
m_bfr = new uint8_t[c_bfr_size];
189195

@@ -302,7 +308,7 @@ namespace Transports
302308

303309
// Start listener thread.
304310
m_listener = new Listener(*this, m_sock, m_lcomms,
305-
m_args.contact_timeout, m_args.trace_in);
311+
m_args.contact_timeout, m_args.ign_filter, m_args.trace_in);
306312
m_listener->start();
307313

308314
setEntityState(IMC::EntityState::ESTA_NORMAL, Status::CODE_ACTIVE);
@@ -384,7 +390,12 @@ namespace Transports
384390
return;
385391
}
386392

393+
// Check if the message is from this system.
394+
if (msg->getSource() == getSystemId())
395+
return;
396+
387397
m_node_table.addNode(msg->getSource(), msg->sys_name, msg->services);
398+
m_listener->addContact(msg->getSource());
388399
m_lcomms->setAnnounce(msg);
389400
}
390401

0 commit comments

Comments
 (0)