Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ endif()
# Define a library for the main project
add_library(${PROJECT_NAME}_lib STATIC ${sources})

include(FetchContent)

FetchContent_Declare(
readerwriterqueue
GIT_REPOSITORY https://github.com/cameron314/readerwriterqueue
GIT_TAG v1.0.7
)

FetchContent_MakeAvailable(readerwriterqueue)

target_link_libraries(${PROJECT_NAME}_lib PUBLIC readerwriterqueue)

# Create an executable that links to this library
add_executable(${PROJECT_NAME} main.cpp)
target_link_libraries(${PROJECT_NAME} PRIVATE ${PROJECT_NAME}_lib)
Expand Down
62 changes: 31 additions & 31 deletions include/can/CANDriver.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,50 +6,50 @@
#include <string>
#include "common.h"


typedef struct
{
int64_t bitrate;
int32_t timeSegment1;
int32_t timeSeqment2;
int32_t syncJumpWidth;
int32_t noSamplingPoints; //unused for can fd data params
int32_t noSamplingPoints; // unused for can fd data params
} CANParams;


const std::map<uint32_t, uint32_t> dlcToCANFDMsgLength = {
{0, 0},
{1, 1},
{2, 2},
{3, 3},
{4, 4},
{5, 5},
{6, 6},
{7, 7},
{8, 8},
{9, 12},
{10, 16},
{11, 20},
{12, 24},
{13, 32},
{14, 48},
{15, 64}
};

{0, 0},
{1, 1},
{2, 2},
{3, 3},
{4, 4},
{5, 5},
{6, 6},
{7, 7},
{8, 8},
{9, 12},
{10, 16},
{11, 20},
{12, 24},
{13, 32},
{14, 48},
{15, 64}};

class CANDriver;

typedef std::function<void(uint8_t canBusChannelId, uint32_t msgId, uint8_t *msgData, uint32_t msgDataLength, uint64_t time, CANDriver *driver)> canRecvCallback_t;

class CANDriver
{
protected:
std::function<void(uint8_t &, uint32_t &, uint8_t *, uint32_t &, uint64_t &, CANDriver *driver)> onRecvCallback;
std::function<void(std::string *)> onErrorCallback;
std::vector<uint32_t> canBusChannelIDs;
protected:
canRecvCallback_t onRecvCallback;
std::function<void(std::string *)> onErrorCallback;
std::vector<uint32_t> canBusChannelIDs;

public:
CANDriver(std::function<void(uint8_t &, uint32_t &, uint8_t *, uint32_t &, uint64_t &, CANDriver *driver)> onRecvCallback,
std::function<void(std::string *)> onErrorCallback);
virtual ~CANDriver();
public:
CANDriver(canRecvCallback_t onRecvCallback,
std::function<void(std::string *)> onErrorCallback);
virtual ~CANDriver();

virtual void SendCANMessage(uint32_t canBusChannelID, uint32_t canID, uint8_t *payload, uint32_t payloadLength, bool blocking);
virtual void SendCANMessage(uint32_t canBusChannelID, uint32_t canID, uint8_t *payload, uint32_t payloadLength, bool blocking);

virtual std::map<std::string, bool> GetCANStatusReadable(uint32_t canChannelID);
virtual std::map<std::string, bool> GetCANStatusReadable(uint32_t canChannelID);
};
10 changes: 8 additions & 2 deletions include/can/CANDriverKvaser.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
#include <functional>
#include <string>
#include <canlib.h>
#include "common.h"
#include <readerwriterqueue.h>
#include "utility/Config.h"
#include "CanKvaserReceiveThread.h"
#include "CommonKvaser.h"


class CANDriverKvaser : public CANDriver
{
Expand All @@ -21,14 +24,17 @@ class CANDriverKvaser : public CANDriver
std::map<uint32_t, CANParams> arbitrationParamsMap = std::map<uint32_t, CANParams>();
std::map<uint32_t, CANParams> dataParamsMap = std::map<uint32_t, CANParams>();

std::shared_ptr<::moodycamel::ReaderWriterQueue<std::unique_ptr<RawKvaserMessage>>> receivedMessagesQueue;
std::unique_ptr<CanKvaserReceiveThread> receiveThread;

uint64_t blockingTimeout;

static void OnCANCallback(int handle, void *driver, unsigned int event);
std::string CANError(canStatus status);
canStatus InitializeCANChannel(uint32_t canChannelID);

public:
CANDriverKvaser(std::function<void(uint8_t &, uint32_t &, uint8_t *, uint32_t &, uint64_t &, CANDriver *driver)> onRecvCallback,
CANDriverKvaser(canRecvCallback_t onRecvCallback,
std::function<void(std::string *)> onErrorCallback, std::vector<uint32_t> &canBusChannelIDs, Config &config);
~CANDriverKvaser();

Expand Down
2 changes: 1 addition & 1 deletion include/can/CANDriverSocketCAN.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class CANDriverSocketCAN : public CANDriver
int canSocket;

public:
CANDriverSocketCAN(std::function<void(uint8_t &, uint32_t &, uint8_t *, uint32_t &, uint64_t &, CANDriver *driver)> onRecvCallback,
CANDriverSocketCAN(canRecvCallback_t onRecvCallback,
std::function<void(std::string *)> onErrorCallback, Config &config);
~CANDriverSocketCAN();

Expand Down
2 changes: 1 addition & 1 deletion include/can/CANDriverUDP.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class CANDriverUDP : public CANDriver
void AsyncListen();

public:
CANDriverUDP(std::function<void(uint8_t &, uint32_t &, uint8_t *, uint32_t &, uint64_t &, CANDriver *driver)> onRecvCallback,
CANDriverUDP(canRecvCallback_t onRecvCallback,
std::function<void(std::string *)> onErrorCallback, Config &config);
~CANDriverUDP();

Expand Down
38 changes: 38 additions & 0 deletions include/can/CanKvaserReceiveThread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//
// Created by raffael on 24.08.25.
//

#ifndef CANRECEIVETHREAD_H
#define CANRECEIVETHREAD_H

#ifndef NO_CANLIB
#include "CommonKvaser.h"
#include <readerwriterqueue.h>
#include <functional>
#include <atomic>
#include "CANDriver.h"


class CanKvaserReceiveThread {
public:
explicit CanKvaserReceiveThread(
canRecvCallback_t onRecvCallback);

~CanKvaserReceiveThread();

void stop();
void pushMessage(std::unique_ptr<RawKvaserMessage> msg);
[[nodiscard]] bool isRunning() const;

private:
std::atomic_bool running{false};
std::atomic_int32_t messagesInQueue{0};
std::shared_ptr<moodycamel::ReaderWriterQueue<std::unique_ptr<RawKvaserMessage>> > queue;
canRecvCallback_t onRecvCallback;
void receiveLoop();
};


#endif

#endif //CANRECEIVETHREAD_H
20 changes: 20 additions & 0 deletions include/can/CommonKvaser.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
//
// Created by raffael on 28.08.25.
//

#ifndef COMMONKVASER_H
#define COMMONKVASER_H
#include <array>
#include <cstdint>

#include "CANDriver.h"
struct RawKvaserMessage {
uint32_t busChannelID;
uint32_t messageID;
uint8_t data[64];
uint8_t dlc;
uint64_t timestamp;
CANDriver *driver;
};

#endif //COMMONKVASER_H
84 changes: 48 additions & 36 deletions include/logging/InfluxDbWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,44 +6,56 @@
#include <vector>

extern "C" {
#include "logging/influxDb.h"
#include "logging/influxDb.h"
}

class InfluxDbWriter {
public:
// Might want to make the buffer size and concurrency configurable (DB)
InfluxDbWriter(std::string hostname, unsigned port, std::string dbName, std::size_t buffer_size);
InfluxDbWriter(const InfluxDbWriter&) = delete;
~InfluxDbWriter();
void Init();
void setCredentials(std::string user, std::string password);
void setTimestampPrecision(timestamp_precision_t precision);

void setMeasurement(std::string measurement);
void startDataPoint();
void addTag(std::string key, std::string value);
void tagsDone();
void addField(std::string key, std::string value);
void addField(std::string key, std::size_t value);
void addField(std::string key, double value);
void addField(std::string key, bool value);
void endDataPoint(std::size_t timestamp);

void flush();
private:
const std::size_t buffer_size = 1024;
const std::size_t buffer_amount = 2;
char **buffer = nullptr;
influxDbContext cntxt;
uint8_t buffer_sel = 0;
std::size_t buffer_pos = 0;
std::size_t last_measurement = 0;
std::string host, portStr, db, measurement;
void pushBuffer();
std::vector<std::thread> threads;
void joinThreads();
void push();
void transferPartialWrite();
public:
// Might want to make the buffer size and concurrency configurable (DB)
InfluxDbWriter(std::string hostname, unsigned port, std::string dbName, std::size_t buffer_size);

InfluxDbWriter(const InfluxDbWriter &) = delete;

~InfluxDbWriter();

void Init();

void setCredentials(std::string user, std::string password);

void setTimestampPrecision(timestamp_precision_t precision);

void setMeasurement(std::string measurement);

void startDataPoint();

void addTag(std::string_view key, std::string_view value);

void tagsDone();

void addField(std::string_view key, std::string_view value);

void addField(std::string_view key, std::size_t value);

void addField(std::string_view key, double value);

void addField(std::string_view key, bool value);

void endDataPoint(std::size_t timestamp);

void flush();

private:
const std::size_t buffer_size_max = 1024;
// allocations that can be reused as buffers
std::vector<std::string> available_buffers;
std::mutex buffer_mutex;
// the current buffer being filled
std::string current_buffer;
influxDbContext cntxt;
std::string host, portStr, db, measurement;
std::vector<std::thread> threads;

void joinThreads();
};

#endif
#endif
2 changes: 2 additions & 0 deletions include/logging/influxDb.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ extern "C" {
int initDbContext(influxDbContext *cntxt, const char *hostname, const char *port, const char *database);
int deInitDbContext(influxDbContext *cntxt);

int createSocket(influxDbContext *cntxt);

int sendData(influxDbContext *cntxt, char *data, size_t length);
#ifdef __cplusplus
}
Expand Down
2 changes: 1 addition & 1 deletion src/can/CANDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include <iostream>


CANDriver::CANDriver(std::function<void(uint8_t &, uint32_t &, uint8_t *, uint32_t &, uint64_t &, CANDriver *driver)> onRecvCallback,
CANDriver::CANDriver(canRecvCallback_t onRecvCallback,
std::function<void(std::string *)> onErrorCallback) :
onRecvCallback(std::move(onRecvCallback)),
onErrorCallback(std::move(onErrorCallback))
Expand Down
24 changes: 17 additions & 7 deletions src/can/CANDriverKvaser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
#include "can_houbolt/can_cmds.h"
#include "utility/utils.h"

CANDriverKvaser::CANDriverKvaser(std::function<void(uint8_t &, uint32_t &, uint8_t *, uint32_t &, uint64_t &, CANDriver *driver)> onRecvCallback,
CANDriverKvaser::CANDriverKvaser(canRecvCallback_t onRecvCallback,
std::function<void(std::string *)> onErrorCallback, std::vector<uint32_t> &canBusChannelIDs, Config &config) :
CANDriver(onRecvCallback, onErrorCallback)
CANDriver(std::move(onRecvCallback), std::move(onErrorCallback))
{
//arbitration bus parameters
int32_t bitrate = config["/CAN/BUS/ARBITRATION/bitrate"];
Expand All @@ -29,6 +29,8 @@ CANDriverKvaser::CANDriverKvaser(std::function<void(uint8_t &, uint32_t &, uint8

nlohmann::json busExtra = config["/CAN/BUS_EXTRA"];

receiveThread = std::make_unique<CanKvaserReceiveThread>(onRecvCallback);

canStatus stat;
for (auto &channelID : canBusChannelIDs)
{
Expand Down Expand Up @@ -167,15 +169,15 @@ void CANDriverKvaser::OnCANCallback(int handle, void *driver, unsigned int event
std::string errorMsg = "canNOTIFY_ERROR: " + canDriver->CANError(stat);
canDriver->onErrorCallback(&errorMsg);
}
break;
break;
}
case canNOTIFY_STATUS:
{
uint64_t statFlags = 0;
stat = canReadStatus(handle, &statFlags);
Debug::print("canNOTIFY_STATUS changed");
if(statFlags & canSTAT_OVERRUN) {
Debug::print("canNOTIFY_STATUS: buffer overflow");
Debug::warning("canNOTIFY_STATUS: buffer overflow");
}
break;
}
Expand Down Expand Up @@ -207,7 +209,15 @@ void CANDriverKvaser::OnCANCallback(int handle, void *driver, unsigned int event
{
try
{
canDriver->onRecvCallback(canBusChannelID, (uint32_t &) id, data, dlc, softwareTime, canDriver);
// Copy the received data into a new buffer for the thread
std::unique_ptr<RawKvaserMessage> threadData = std::make_unique<RawKvaserMessage>();
std::ranges::copy(data, threadData->data);
threadData->dlc = dlc;
threadData->busChannelID = canBusChannelID;
threadData->messageID = id;
threadData->timestamp = softwareTime;
threadData->driver = canDriver;
canDriver->receiveThread->pushMessage(std::move(threadData));
}
catch(const std::exception& e)
{
Expand All @@ -231,7 +241,7 @@ void CANDriverKvaser::OnCANCallback(int handle, void *driver, unsigned int event
}
Debug::print("\t\tCAN Status Flags: 0x%016x", statFlags);
}

stat = canRead(handle, &id, data, &dlc, &flags, &timestamp);
}
// stat is either canERR_NOMSG or any different error code
Expand All @@ -241,7 +251,7 @@ void CANDriverKvaser::OnCANCallback(int handle, void *driver, unsigned int event
}
break;
}

default:
//TODO: MP since this thread is managed by the canlib, should we really throw exceptions?
throw std::runtime_error("Callback got called with neither ERROR nor RX, gigantic UFF");
Expand Down
Loading
Loading