Skip to content

Commit 5cc386e

Browse files
committed
Messages are now cached in a queue before being processed. This reduces the execution time of the callback thread which will hopefully alleviate the weird sampling frequency drops.
Additionally the memory leak of in Node.cpp was fixed
1 parent 4db48cf commit 5cc386e

File tree

7 files changed

+137
-11
lines changed

7 files changed

+137
-11
lines changed

CMakeLists.txt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,18 @@ endif()
2424
# Define a library for the main project
2525
add_library(${PROJECT_NAME}_lib STATIC ${sources})
2626

27+
include(FetchContent)
28+
29+
FetchContent_Declare(
30+
readerwriterqueue
31+
GIT_REPOSITORY https://github.com/cameron314/readerwriterqueue
32+
GIT_TAG v1.0.7
33+
)
34+
35+
FetchContent_MakeAvailable(readerwriterqueue)
36+
37+
target_link_libraries(${PROJECT_NAME}_lib PUBLIC readerwriterqueue)
38+
2739
# Create an executable that links to this library
2840
add_executable(${PROJECT_NAME} main.cpp)
2941
target_link_libraries(${PROJECT_NAME} PRIVATE ${PROJECT_NAME}_lib)

include/can/CANDriverKvaser.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,11 @@
1010
#include <functional>
1111
#include <string>
1212
#include <canlib.h>
13-
#include "common.h"
13+
#include <readerwriterqueue.h>
1414
#include "utility/Config.h"
15+
#include "CanKvaserReceiveThread.h"
16+
#include "CommonKvaser.h"
17+
1518

1619
class CANDriverKvaser : public CANDriver
1720
{
@@ -21,6 +24,9 @@ class CANDriverKvaser : public CANDriver
2124
std::map<uint32_t, CANParams> arbitrationParamsMap = std::map<uint32_t, CANParams>();
2225
std::map<uint32_t, CANParams> dataParamsMap = std::map<uint32_t, CANParams>();
2326

27+
std::shared_ptr<::moodycamel::ReaderWriterQueue<std::unique_ptr<RawKvaserMessage>>> receivedMessagesQueue;
28+
std::unique_ptr<CanKvaserReceiveThread> receiveThread;
29+
2430
uint64_t blockingTimeout;
2531

2632
static void OnCANCallback(int handle, void *driver, unsigned int event);
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
//
2+
// Created by raffael on 24.08.25.
3+
//
4+
5+
#ifndef CANRECEIVETHREAD_H
6+
#define CANRECEIVETHREAD_H
7+
8+
#ifndef NO_CANLIB
9+
#include "CommonKvaser.h"
10+
#include <readerwriterqueue.h>
11+
#include <functional>
12+
#include <atomic>
13+
#include "CANDriver.h"
14+
15+
16+
class CanKvaserReceiveThread {
17+
public:
18+
explicit CanKvaserReceiveThread(
19+
canRecvCallback_t onRecvCallback);
20+
21+
~CanKvaserReceiveThread();
22+
23+
void stop();
24+
void pushMessage(std::unique_ptr<RawKvaserMessage> msg);
25+
[[nodiscard]] bool isRunning() const;
26+
27+
private:
28+
std::atomic_bool running{false};
29+
std::atomic_int32_t messagesInQueue{0};
30+
std::shared_ptr<moodycamel::ReaderWriterQueue<std::unique_ptr<RawKvaserMessage>> > queue;
31+
canRecvCallback_t onRecvCallback;
32+
void receiveLoop();
33+
};
34+
35+
36+
#endif
37+
38+
#endif //CANRECEIVETHREAD_H

include/can/CommonKvaser.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
//
2+
// Created by raffael on 28.08.25.
3+
//
4+
5+
#ifndef COMMONKVASER_H
6+
#define COMMONKVASER_H
7+
#include <array>
8+
#include <cstdint>
9+
10+
#include "CANDriver.h"
11+
struct RawKvaserMessage {
12+
uint32_t busChannelID;
13+
uint32_t messageID;
14+
uint8_t data[64];
15+
uint8_t dlc;
16+
uint64_t timestamp;
17+
CANDriver *driver;
18+
};
19+
20+
#endif //COMMONKVASER_H

src/can/CANDriverKvaser.cpp

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
CANDriverKvaser::CANDriverKvaser(canRecvCallback_t onRecvCallback,
1010
std::function<void(std::string *)> onErrorCallback, std::vector<uint32_t> &canBusChannelIDs, Config &config) :
11-
CANDriver(onRecvCallback, onErrorCallback)
11+
CANDriver(std::move(onRecvCallback), std::move(onErrorCallback))
1212
{
1313
//arbitration bus parameters
1414
int32_t bitrate = config["/CAN/BUS/ARBITRATION/bitrate"];
@@ -29,6 +29,8 @@ CANDriverKvaser::CANDriverKvaser(canRecvCallback_t onRecvCallback,
2929

3030
nlohmann::json busExtra = config["/CAN/BUS_EXTRA"];
3131

32+
receiveThread = std::make_unique<CanKvaserReceiveThread>(onRecvCallback);
33+
3234
canStatus stat;
3335
for (auto &channelID : canBusChannelIDs)
3436
{
@@ -167,15 +169,15 @@ void CANDriverKvaser::OnCANCallback(int handle, void *driver, unsigned int event
167169
std::string errorMsg = "canNOTIFY_ERROR: " + canDriver->CANError(stat);
168170
canDriver->onErrorCallback(&errorMsg);
169171
}
170-
break;
172+
break;
171173
}
172174
case canNOTIFY_STATUS:
173175
{
174176
uint64_t statFlags = 0;
175177
stat = canReadStatus(handle, &statFlags);
176178
Debug::print("canNOTIFY_STATUS changed");
177179
if(statFlags & canSTAT_OVERRUN) {
178-
Debug::print("canNOTIFY_STATUS: buffer overflow");
180+
Debug::warning("canNOTIFY_STATUS: buffer overflow");
179181
}
180182
break;
181183
}
@@ -208,11 +210,14 @@ void CANDriverKvaser::OnCANCallback(int handle, void *driver, unsigned int event
208210
try
209211
{
210212
// Copy the received data into a new buffer for the thread
211-
auto threadData = std::make_unique<uint8_t[]>(dlc);
212-
std::copy_n(data, dlc, threadData.get());
213-
std::thread([canDriver, canBusChannelID, id, dlc, softwareTime, threadData = std::move(threadData)]() {
214-
canDriver->onRecvCallback(canBusChannelID, id, threadData.get(), dlc, softwareTime, canDriver);
215-
}).detach();
213+
std::unique_ptr<RawKvaserMessage> threadData = std::make_unique<RawKvaserMessage>();
214+
std::ranges::copy(data, threadData->data);
215+
threadData->dlc = dlc;
216+
threadData->busChannelID = canBusChannelID;
217+
threadData->messageID = id;
218+
threadData->timestamp = softwareTime;
219+
threadData->driver = canDriver;
220+
canDriver->receiveThread->pushMessage(std::move(threadData));
216221
}
217222
catch(const std::exception& e)
218223
{
@@ -236,7 +241,7 @@ void CANDriverKvaser::OnCANCallback(int handle, void *driver, unsigned int event
236241
}
237242
Debug::print("\t\tCAN Status Flags: 0x%016x", statFlags);
238243
}
239-
244+
240245
stat = canRead(handle, &id, data, &dlc, &flags, &timestamp);
241246
}
242247
// stat is either canERR_NOMSG or any different error code
@@ -246,7 +251,7 @@ void CANDriverKvaser::OnCANCallback(int handle, void *driver, unsigned int event
246251
}
247252
break;
248253
}
249-
254+
250255
default:
251256
//TODO: MP since this thread is managed by the canlib, should we really throw exceptions?
252257
throw std::runtime_error("Callback got called with neither ERROR nor RX, gigantic UFF");

src/can/CanKvaserReceiveThread.cpp

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
//
2+
// Created by raffael on 24.08.25.
3+
//
4+
5+
#include "can/CanKvaserReceiveThread.h"
6+
7+
#include <utility>
8+
9+
CanKvaserReceiveThread::CanKvaserReceiveThread(canRecvCallback_t onRecvCallback): queue(std::make_shared<moodycamel::ReaderWriterQueue<std::unique_ptr<RawKvaserMessage>>>(100)),
10+
onRecvCallback(std::move(onRecvCallback)) {
11+
12+
std::thread([this]() { this->receiveLoop(); }).detach();
13+
}
14+
15+
CanKvaserReceiveThread::~CanKvaserReceiveThread() {
16+
stop();
17+
}
18+
19+
void CanKvaserReceiveThread::stop() {
20+
running.store(false);
21+
}
22+
23+
void CanKvaserReceiveThread::pushMessage(std::unique_ptr<RawKvaserMessage> msg) {
24+
queue->enqueue(std::move(msg));
25+
++messagesInQueue;
26+
}
27+
28+
bool CanKvaserReceiveThread::isRunning() const {
29+
return running.load();
30+
}
31+
32+
void CanKvaserReceiveThread::receiveLoop(){
33+
while (running.load()) {
34+
std::unique_ptr<RawKvaserMessage> msg;
35+
if (queue->try_dequeue(msg)) {
36+
onRecvCallback(msg->busChannelID, msg->messageID, msg->data, msg->dlc, msg->timestamp, msg->driver);
37+
}
38+
--messagesInQueue;
39+
if (messagesInQueue>40) {
40+
Debug::warning("CanKvaserReceiveThread::receiveLoop: High message load, messages in queue: %d", messagesInQueue.load());
41+
}
42+
}
43+
}
44+

src/can/Node.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ std::map<std::string, std::tuple<double, uint64_t>> Node::GetLatestSensorData()
249249
/* if (nodeID==8)
250250
Debug::print("NodeID %d, %zd sensor data transmissions", nodeID, uint64_t(count));*/
251251
count = 0;
252+
delete[] copy;
252253
return sensorData;
253254
}
254255

0 commit comments

Comments
 (0)