Skip to content
Merged
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
8d073fd
Update to use g++-14 to compile instead of the defaulted g++-13 etc.
gxuu Jul 2, 2025
12e938c
Use CMAKE_BUILD_TYPE as the source of truth for what build we are on
gxuu Jul 2, 2025
6ade637
Update TODOs and DONEs in readme
gxuu Jul 2, 2025
20ad885
Implement C++ core of YMQ network library
gxuu Jul 2, 2025
f22c87e
Update pymod io_context to make sure the code compiles (TMP FIX!)
gxuu Jul 2, 2025
af10dcb
Provide basic examples on how to use YMQ in C++
gxuu Jul 2, 2025
d543515
Bump version number
gxuu Jul 2, 2025
9d2c2cd
Add qualifiers and attributes to functions (noexcept, nodiscard, etc.)
gxuu Jul 3, 2025
2e17a4e
Change calls to move_only_functions so functions are not copied
gxuu Jul 3, 2025
0623285
Extract TCP Operations to their own header
gxuu Jul 7, 2025
f3eeb9f
Implement IOSocketType behavior in YMQ
gxuu Jul 8, 2025
b5395d8
Even Unicast and Multicast needs to handle reconnect
gxuu Jul 9, 2025
c114aad
Add pub-sub examples to demonstrate socket types
gxuu Jul 9, 2025
5ed6284
upd readme (need rebase later)
gxuu Jul 9, 2025
d96e506
Improve the examples
gxuu Jul 9, 2025
fa51f60
Change to new socket types Binder and Connector
gxuu Jul 9, 2025
d9dcb9b
Refactor: MessageConnectionTCP is easier to construct correctly
gxuu Jul 9, 2025
734cc7b
upd readme (need rebase later)
gxuu Jul 9, 2025
5c48a19
Implement ISO 8601 conforming timestamp output
gxuu Jul 10, 2025
aab4c0f
Reference implementation of the error type
gxuu Jul 10, 2025
d12def6
Example usage to the error type
gxuu Jul 10, 2025
4ca3b66
upd readme (need rebase later)
gxuu Jul 10, 2025
a1b8926
Make comments better looking
gxuu Jul 11, 2025
83cb911
fixup! Reference implementation of the error type
gxuu Jul 11, 2025
849b952
fixup! Example usage to the error type
gxuu Jul 11, 2025
413673b
Merge branch 'main' into ymq
gxuu Jul 11, 2025
37206f8
Use switch on enum to handle error->explanation translation
gxuu Jul 12, 2025
c8c51cb
Merge branch 'ymq' of github.com:gxuu/scaler into ymq
gxuu Jul 12, 2025
5488320
use std::string_view instead of const char*
gxuu Jul 12, 2025
5226fd6
Fix off by one error when getting format string
gxuu Jul 14, 2025
b13ce76
Refinements on Error implementation
gxuu Jul 14, 2025
168e1b2
Refactor: Put network utils in their own file
gxuu Jul 14, 2025
c956839
Refactor: createIOSocket takes a callback and returns nothing
gxuu Jul 15, 2025
155aef1
Remove unneeded comments
gxuu Jul 15, 2025
fb2d15d
Fix pymod ymq build due to namespace change
gxuu Jul 15, 2025
97c6291
Add namespace scaler::ymq for every file
gxuu Jul 15, 2025
9145405
Refactor: Extract common part of examples to header file
gxuu Jul 15, 2025
2f5e94a
check epoll_wait calls return value
gxuu Jul 16, 2025
e658389
Update automated examples
gxuu Jul 16, 2025
e32ac2a
Fixup typo found in logging.h
gxuu Jul 16, 2025
b009c17
Fix: send/recv were implemented incorrectly but not shows up in low tp.
gxuu Jul 16, 2025
ceecfc2
Fix bugs for not adding the correct offset (Thanks magniloquency!)
gxuu Jul 18, 2025
adb685c
Update the exmaple to mimic high throughput scenario
gxuu Jul 18, 2025
fd33915
Make sure onRead returns early when socket is closed and set to 0
gxuu Jul 18, 2025
18f6cf3
Update automated example to not use hard coded address in syncBindTo
gxuu Jul 18, 2025
bc4f33f
Update error's implementation
gxuu Jul 18, 2025
dc7ecad
Implement error abort for syscalls
gxuu Jul 18, 2025
cc483a6
Fix: Did not return when receiving EAGAIN
gxuu Jul 18, 2025
75e7938
Add notice with respect to thread safety of methods
gxuu Jul 21, 2025
8c4f494
Remove unneeded IOSocketType(s)
gxuu Jul 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .github/workflows/linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ jobs:
sudo apt update -y
sudo apt upgrade -y
sudo DEBIAN_FRONTEND=noninteractive sudo apt install -y tzdata
sudo apt install cmake clang curl pkg-config -y
sudo apt install cmake clang curl pkg-config g++-14 -y

- name: Make scripts executable
run: |
sudo chmod 755 ./scripts/download_install_dependencies.sh
sudo chmod 755 ./scripts/build.sh

- name: Cache Boost
id: cache-boost
Expand Down Expand Up @@ -72,9 +73,9 @@ jobs:
run: |
sudo ./scripts/download_install_dependencies.sh capnp install

- name: Build Object Storage Component
- name: Build and test C++ Components
run: |
CXX=$(which g++) ./scripts/build.sh
CXX=$(which g++-14) ./scripts/build.sh

- name: Install Python Dependent Packages
run: |
Expand Down
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ message(STATUS "Found Capnp in ${CAPNP_INCLUDE_DIRS}")
include_directories(${CAPNP_INCLUDE_DIRS})
include_directories(${PROJECT_SOURCE_DIR})


add_subdirectory(scaler)

if(NOT SKBUILD_STATE)
if(CMAKE_BUILD_TYPE STREQUAL "Debug")
add_subdirectory(tests)
endif()
12 changes: 10 additions & 2 deletions scaler/io/ymq/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ target_sources(cc_ymq PRIVATE
bytes.h
common.h
configuration.h
main.h

epoll_context.h
epoll_context.cpp

event_loop_backend.h
event_loop.h

event_loop_thread.h
Expand All @@ -20,6 +18,7 @@ target_sources(cc_ymq PRIVATE

message_connection.h
message_connection_tcp.h
message_connection_tcp.cpp

third_party/concurrentqueue.h
interruptive_concurrent_queue.h
Expand All @@ -38,8 +37,17 @@ target_sources(cc_ymq PRIVATE
tcp_client.h
tcp_client.cpp

tcp_operations.h

timestamp.h

timed_queue.h

network_utils.h

error.h

logging.h
)

set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/scaler/io/ymq)
Expand Down
122 changes: 51 additions & 71 deletions scaler/io/ymq/bytes.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#pragma once

// C
#include <string.h> // memcmp

#include <algorithm>
#include <compare>
#include <cstddef>
#include <cstdint>
#include <cstring>
Expand All @@ -10,64 +14,68 @@

// First-party
#include "scaler/io/ymq/common.h"
#include "scaler/io/ymq/typedefs.h"

class Bytes {
uint8_t* _data;
size_t _len;
Ownership _tag;

void free() {
if (_tag != Owned)
return;

if (is_empty())
return;

delete[] _data;
this->_data = NULL;
}

Bytes(uint8_t* m_data, size_t m_len, Ownership tag): _data(m_data), _len(m_len), _tag(tag) {}
explicit Bytes(uint8_t* m_data, size_t m_len): _data(m_data), _len(m_len) {}

public:
// move-only
// TODO: make copyable
Bytes(const Bytes&) = delete;
Bytes& operator=(const Bytes&) = delete;
Bytes(Bytes&& other) noexcept: _data(other._data), _len(other._len), _tag(other._tag) {
other._data = NULL;
Bytes(char* data, size_t len): _data(datadup((uint8_t*)data, len)), _len(len) {}

Bytes(): _data {}, _len {} {}

Bytes(const Bytes& other) noexcept {
this->_data = datadup(other._data, other._len);
this->_len = other._len;
}

Bytes& operator=(const Bytes& other) noexcept {
Bytes tmp(other);
swap(*this, tmp);
return *this;
}

friend void swap(Bytes& x, Bytes& y) noexcept {
using std::swap;
swap(x._len, y._len);
swap(x._data, y._data);
}

Bytes(Bytes&& other) noexcept: _data(other._data), _len(other._len) {
other._data = nullptr;
other._len = 0;
}

friend std::strong_ordering operator<=>(const Bytes& x, const Bytes& y) noexcept {
return std::lexicographical_compare_three_way(x._data, x._data + x._len, y._data, y._data + y._len);
}

Bytes& operator=(Bytes&& other) noexcept {
if (this != &other) {
this->free(); // free current data

_data = other._data;
_len = other._len;
_tag = other._tag;

other._data = NULL;
other._data = nullptr;
other._len = 0;
}
return *this;
}

~Bytes() { this->free(); }

bool operator==(const Bytes& other) const {
if (_len != other._len)
return false;
[[nodiscard]] constexpr bool operator!() const noexcept { return is_empty(); }

if (_data == other._data)
return true;

return std::memcmp(_data, other._data, _len) == 0;
}

bool operator!() const { return is_empty(); }

bool is_empty() const { return this->_data == NULL; }
[[nodiscard]] constexpr bool is_empty() const noexcept { return !this->_data; }

// debugging utility
std::string as_string() const {
Expand All @@ -77,51 +85,23 @@ class Bytes {
return std::string((char*)_data, _len);
}

Bytes ref() { return Bytes {this->_data, this->_len, Borrowed}; }

static Bytes alloc(size_t m_len) {
if (m_len == 0)
return empty();

return Bytes {new uint8_t[m_len], m_len, Owned};
[[nodiscard("Allocated Bytes is not used, likely causing memory leak")]]
static Bytes alloc(size_t m_len) noexcept {
auto ptr = new uint8_t[m_len]; // we just assume the allocation will succeed
return Bytes {ptr, m_len};
}

static Bytes empty() { return Bytes {NULL, 0, Owned}; }

static Bytes copy(const uint8_t* m_data, size_t m_len) {
if (m_len == 0)
return empty();

return Bytes {datadup(m_data, m_len), m_len, Owned};
// NOTE: Below two functions are not used by the core but appears
// to be used by pymod YMQ. - gxu
[[nodiscard]] static Bytes empty() { return Bytes {(uint8_t*)nullptr, 0}; }
[[nodiscard]] static Bytes copy(const uint8_t* m_data, size_t m_len) {
Bytes result;
result._data = datadup(m_data, m_len);
result._len = m_len;
return result;
}

static Bytes clone(const Bytes& bytes) {
if (bytes.is_empty())
panic("tried to clone empty bytes");

return Bytes {datadup(bytes._data, bytes._len), bytes._len, Owned};
}

// static Bytes from_buffer(Buffer& buffer) { return buffer.into_bytes(); }

// // consume this Bytes and return a Buffer object
// Buffer into_buffer() {
// if (tag != Owned) {
// // if the m_data is borrowed, we need to copy it
// auto new_m_data = new uint8_t[m_len];
// std::memcpy(new_m_data, m_data, m_len);
// m_data = new_m_data;
// tag = Owned; // now we own the m_data
// }

// Buffer buffer {m_data, m_len, m_len};
// m_data = NULL; // prevent double free
// m_len = 0; // prevent double free
// return buffer;
// }

size_t len() const { return _len; }
const uint8_t* data() const { return _data; }

friend class Buffer;
[[nodiscard]] constexpr size_t len() const { return _len; }
[[nodiscard]] constexpr const uint8_t* data() const { return _data; }
[[nodiscard]] constexpr uint8_t* data() { return _data; }
};
17 changes: 3 additions & 14 deletions scaler/io/ymq/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
#include <source_location>
#include <string>

const size_t HEADER_SIZE = 4; // size of the message header in bytes

using Errno = int;

inline void print_trace(void) {
Expand Down Expand Up @@ -45,18 +43,9 @@ inline void print_trace(void) {
std::abort();
}

[[noreturn]] inline void todo(
std::optional<std::string> message = std::nullopt,
const std::source_location& location = std::source_location::current()) {
if (message) {
panic("TODO: " + *message, location);
} else {
panic("TODO", location);
}
}

inline uint8_t* datadup(const uint8_t* data, size_t len) {
uint8_t* dup = new uint8_t[len];
[[nodiscard("Memory is allocated but not bind, likely causing memory leak")]]
constexpr inline uint8_t* datadup(const uint8_t* data, size_t len) noexcept {
uint8_t* dup = new uint8_t[len]; // we just assume allocation will succeed
std::memcpy(dup, data, len);
return dup;
}
Expand Down
22 changes: 20 additions & 2 deletions scaler/io/ymq/configuration.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,29 @@
#pragma once

// C++
#include <functional>
#include <memory>
#include <string>

namespace scaler {
namespace ymq {

class EpollContext;
class Message;
class IOSocket;

struct Configuration {
using PollingContext = EpollContext;
using Identity = std::string;
using PollingContext = EpollContext;
using IOSocketIdentity = std::string;
using SendMessageCallback = std::move_only_function<void(int)>;
using RecvMessageCallback = std::move_only_function<void(Message)>;
using ConnectReturnCallback = std::move_only_function<void(int)>;
using BindReturnCallback = std::move_only_function<void(int)>;
using CreateIOSocketCallback = std::move_only_function<void(std::shared_ptr<IOSocket>)>;
using TimedQueueCallback = std::move_only_function<void()>;
using ExecutionFunction = std::move_only_function<void()>;
using ExecutionCancellationIdentifier = size_t;
};

} // namespace ymq
} // namespace scaler
Loading
Loading