Skip to content

Commit 712803a

Browse files
committed
feat: add SharedMemoryQueue for FIFO message handling and enhance shared memory functionality
1 parent 8ffe5de commit 712803a

File tree

2 files changed

+633
-21
lines changed

2 files changed

+633
-21
lines changed

include/libsharedmemory/libsharedmemory.hpp

Lines changed: 261 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
#pragma once
22

33
#define LIBSHAREDMEMORY_VERSION_MAJOR 1
4-
#define LIBSHAREDMEMORY_VERSION_MINOR 4
4+
#define LIBSHAREDMEMORY_VERSION_MINOR 5
55
#define LIBSHAREDMEMORY_VERSION_PATCH 0
66

77
#include <ostream>
88
#include <cstring>
99
#include <string>
1010
#include <string_view>
1111
#include <cstddef> // nullptr_t, ptrdiff_t, std::size_t
12-
#include <cstdint> // intptr_t, uint8_t, etc.
1312
#include <limits>
1413
#include <span>
15-
#include <concepts>
14+
#include <thread>
1615

1716
#if defined(__APPLE__) || defined(__linux__) || defined(__unix__) || defined(_POSIX_VERSION) || defined(__ANDROID__)
1817
#include <fcntl.h> // O_* constants
@@ -434,6 +433,18 @@ class SharedMemoryReadStream
434433
return memory[0];
435434
}
436435

436+
[[nodiscard]] bool hasNewData() const noexcept
437+
{
438+
const char flags = readFlags();
439+
return !!(flags & kMemoryChanged);
440+
}
441+
442+
void markAsRead() const noexcept
443+
{
444+
auto memory = static_cast<char*>(_memory.data());
445+
memory[0] &= ~kMemoryChanged;
446+
}
447+
437448
void close()
438449
{
439450
_memory.close();
@@ -536,6 +547,21 @@ class SharedMemoryWriteStream
536547
_memory.close();
537548
}
538549

550+
[[nodiscard]] bool isMessageRead() const noexcept
551+
{
552+
const auto memory = static_cast<const char*>(_memory.data());
553+
const char flags = memory[0];
554+
return !(flags & kMemoryChanged);
555+
}
556+
557+
void waitForRead() const noexcept
558+
{
559+
while (!isMessageRead())
560+
{
561+
std::this_thread::yield();
562+
}
563+
}
564+
539565
// https://stackoverflow.com/questions/18591924/how-to-use-bitmask
540566
[[nodiscard]] static constexpr char getWriteFlags(const char type, const char currentFlags) noexcept
541567
{
@@ -628,4 +654,236 @@ class SharedMemoryWriteStream
628654
Memory _memory;
629655
};
630656

657+
/**
658+
* @brief Queue structure for shared memory
659+
* Layout: [writeIndex(4)][readIndex(4)][capacity(4)][count(4)][maxMessageSize(4)][messages...]
660+
*/
661+
class SharedMemoryQueue
662+
{
663+
private:
664+
static constexpr std::size_t kWriteIndexOffset = 0;
665+
static constexpr std::size_t kReadIndexOffset = 4;
666+
static constexpr std::size_t kCapacityOffset = 8;
667+
static constexpr std::size_t kCountOffset = 12;
668+
static constexpr std::size_t kMaxMessageSizeOffset = 16;
669+
static constexpr std::size_t kHeaderSize = 20;
670+
671+
Memory _memory;
672+
std::uint32_t _capacity;
673+
std::uint32_t _maxMessageSize;
674+
bool _isWriter;
675+
676+
[[nodiscard]] std::uint32_t readUInt32(std::size_t offset) const noexcept
677+
{
678+
const auto memory = static_cast<const char*>(_memory.data());
679+
std::uint32_t value = 0;
680+
std::memcpy(&value, &memory[offset], sizeof(std::uint32_t));
681+
return value;
682+
}
683+
684+
void writeUInt32(std::size_t offset, std::uint32_t value) const noexcept
685+
{
686+
auto memory = static_cast<char*>(_memory.data());
687+
std::memcpy(&memory[offset], &value, sizeof(std::uint32_t));
688+
}
689+
690+
[[nodiscard]] std::size_t getMessageOffset(std::uint32_t index) const noexcept
691+
{
692+
// Each slot contains: [length(4)][data(maxMessageSize)]
693+
return kHeaderSize + index * (_maxMessageSize + sizeof(std::uint32_t));
694+
}
695+
696+
public:
697+
/**
698+
* @brief Create or open a shared memory queue
699+
* @param name Queue name
700+
* @param capacity Maximum number of messages in queue
701+
* @param maxMessageSize Maximum size of each message in bytes
702+
* @param isPersistent Whether the queue persists after process exit
703+
* @param isWriter True to create/write, false to open/read
704+
*/
705+
SharedMemoryQueue(const std::string& name, std::uint32_t capacity,
706+
std::uint32_t maxMessageSize, bool isPersistent, bool isWriter)
707+
: _memory(name, kHeaderSize + capacity * (maxMessageSize + sizeof(std::uint32_t)), isPersistent)
708+
, _capacity(capacity)
709+
, _maxMessageSize(maxMessageSize)
710+
, _isWriter(isWriter)
711+
{
712+
if (isWriter)
713+
{
714+
if (_memory.create() != Error::OK)
715+
{
716+
throw "Shared memory queue could not be created.";
717+
}
718+
719+
// Initialize queue metadata
720+
writeUInt32(kWriteIndexOffset, 0);
721+
writeUInt32(kReadIndexOffset, 0);
722+
writeUInt32(kCapacityOffset, capacity);
723+
writeUInt32(kCountOffset, 0);
724+
writeUInt32(kMaxMessageSizeOffset, maxMessageSize);
725+
}
726+
else
727+
{
728+
if (_memory.open() != Error::OK)
729+
{
730+
throw "Shared memory queue could not be opened.";
731+
}
732+
733+
// Read queue metadata
734+
_capacity = readUInt32(kCapacityOffset);
735+
_maxMessageSize = readUInt32(kMaxMessageSizeOffset);
736+
}
737+
}
738+
739+
[[nodiscard]] bool isEmpty() const noexcept
740+
{
741+
return readUInt32(kCountOffset) == 0;
742+
}
743+
744+
[[nodiscard]] bool isFull() const noexcept
745+
{
746+
return readUInt32(kCountOffset) >= _capacity;
747+
}
748+
749+
[[nodiscard]] std::uint32_t size() const noexcept
750+
{
751+
return readUInt32(kCountOffset);
752+
}
753+
754+
[[nodiscard]] std::uint32_t capacity() const noexcept
755+
{
756+
return _capacity;
757+
}
758+
759+
/**
760+
* @brief Enqueue a message (writer only)
761+
* @param message Message to enqueue
762+
* @return true if message was enqueued, false if queue is full
763+
*/
764+
bool enqueue(std::string_view message)
765+
{
766+
if (!_isWriter)
767+
{
768+
throw "Cannot enqueue from a reader queue instance.";
769+
}
770+
771+
if (message.size() > _maxMessageSize)
772+
{
773+
throw "Message exceeds maximum message size.";
774+
}
775+
776+
if (isFull())
777+
{
778+
return false;
779+
}
780+
781+
const std::uint32_t writeIndex = readUInt32(kWriteIndexOffset);
782+
const std::size_t offset = getMessageOffset(writeIndex);
783+
784+
auto memory = static_cast<char*>(_memory.data());
785+
786+
// Write message length
787+
const auto messageLength = static_cast<std::uint32_t>(message.size());
788+
std::memcpy(&memory[offset], &messageLength, sizeof(std::uint32_t));
789+
790+
// Write message data
791+
std::memcpy(&memory[offset + sizeof(std::uint32_t)], message.data(), messageLength);
792+
793+
// Update write index (circular)
794+
const std::uint32_t newWriteIndex = (writeIndex + 1) % _capacity;
795+
writeUInt32(kWriteIndexOffset, newWriteIndex);
796+
797+
// Increment count
798+
const std::uint32_t count = readUInt32(kCountOffset);
799+
writeUInt32(kCountOffset, count + 1);
800+
801+
return true;
802+
}
803+
804+
/**
805+
* @brief Dequeue a message (reader only)
806+
* @param message Output parameter for dequeued message
807+
* @return true if message was dequeued, false if queue is empty
808+
*/
809+
bool dequeue(std::string& message)
810+
{
811+
if (_isWriter)
812+
{
813+
throw "Cannot dequeue from a writer queue instance.";
814+
}
815+
816+
if (isEmpty())
817+
{
818+
return false;
819+
}
820+
821+
const std::uint32_t readIndex = readUInt32(kReadIndexOffset);
822+
const std::size_t offset = getMessageOffset(readIndex);
823+
824+
const auto memory = static_cast<const char*>(_memory.data());
825+
826+
// Read message length
827+
std::uint32_t messageLength = 0;
828+
std::memcpy(&messageLength, &memory[offset], sizeof(std::uint32_t));
829+
830+
// Read message data
831+
message.resize(messageLength);
832+
std::memcpy(&message[0], &memory[offset + sizeof(std::uint32_t)], messageLength);
833+
834+
// Update read index (circular)
835+
const std::uint32_t newReadIndex = (readIndex + 1) % _capacity;
836+
writeUInt32(kReadIndexOffset, newReadIndex);
837+
838+
// Decrement count
839+
const std::uint32_t count = readUInt32(kCountOffset);
840+
writeUInt32(kCountOffset, count - 1);
841+
842+
return true;
843+
}
844+
845+
/**
846+
* @brief Peek at the next message without dequeuing (reader only)
847+
* @param message Output parameter for peeked message
848+
* @return true if message was peeked, false if queue is empty
849+
*/
850+
bool peek(std::string& message) const
851+
{
852+
if (_isWriter)
853+
{
854+
throw "Cannot peek from a writer queue instance.";
855+
}
856+
857+
if (isEmpty())
858+
{
859+
return false;
860+
}
861+
862+
const std::uint32_t readIndex = readUInt32(kReadIndexOffset);
863+
const std::size_t offset = getMessageOffset(readIndex);
864+
865+
const auto memory = static_cast<const char*>(_memory.data());
866+
867+
// Read message length
868+
std::uint32_t messageLength = 0;
869+
std::memcpy(&messageLength, &memory[offset], sizeof(std::uint32_t));
870+
871+
// Read message data
872+
message.resize(messageLength);
873+
std::memcpy(&message[0], &memory[offset + sizeof(std::uint32_t)], messageLength);
874+
875+
return true;
876+
}
877+
878+
void close()
879+
{
880+
_memory.close();
881+
}
882+
883+
void destroy() const
884+
{
885+
_memory.destroy();
886+
}
887+
};
888+
631889
}; // namespace lsm

0 commit comments

Comments
 (0)