Skip to content

Commit 413673b

Browse files
authored
Merge branch 'main' into ymq
2 parents 849b952 + 8254f95 commit 413673b

File tree

11 files changed

+199
-85
lines changed

11 files changed

+199
-85
lines changed

pyproject.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ classifiers = [
1818
]
1919
dynamic = ["version"]
2020
dependencies = [
21-
"bidict",
21+
"bidict",
2222
"cloudpickle",
2323
"psutil==7.0.0",
2424
"pycapnp==2.0.0",
@@ -45,8 +45,8 @@ scaler_object_storage_server = "scaler.entry_points.object_storage_server:main"
4545
[project.optional-dependencies]
4646
uvloop = ["uvloop"]
4747
graphblas = ["python-graphblas", "numpy==2.3.1"]
48-
gui = ["nicegui[plotly]==2.20.0"]
49-
all = ["python-graphblas", "numpy==2.3.1", "uvloop", "nicegui[plotly]==2.20.0"]
48+
gui = ["nicegui[plotly]==2.21.1"]
49+
all = ["python-graphblas", "numpy==2.3.1", "uvloop", "nicegui[plotly]==2.21.1"]
5050

5151
[tool.scikit-build]
5252
wheel.packages = ["scaler"]
@@ -62,7 +62,7 @@ exclude = [
6262
"^venv.*$",
6363
"migrations",
6464
"__pycache__",
65-
"^build.*$",
65+
"^build_*_*.*$",
6666
"^capnproto.*$",
6767
"^boost.*$",
6868
"^scripts.*$",
@@ -77,7 +77,7 @@ extend-ignore = ["E203"]
7777
exclude = [
7878
"migrations",
7979
"__pycache__",
80-
"./build",
80+
"./build_*_*",
8181
"./capnproto*",
8282
"./boost*",
8383
"./scripts",
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import multiprocessing
2-
import os
32

4-
from scaler.object_storage.object_storage_server import run_object_storage_server
3+
from scaler.object_storage.object_storage_server import ObjectStorageServer
54
from scaler.utility.object_storage_config import ObjectStorageConfig
65

76

@@ -10,11 +9,12 @@ def __init__(self, storage_address: ObjectStorageConfig):
109
multiprocessing.Process.__init__(self, name="ObjectStorageServer")
1110

1211
self._storage_address = storage_address
13-
self._on_server_ready_fd = os.eventfd(0, os.EFD_SEMAPHORE)
12+
13+
self._server = ObjectStorageServer()
1414

1515
def wait_until_ready(self) -> None:
1616
"""Blocks until the object storage server is available to server requests."""
17-
os.eventfd_read(self._on_server_ready_fd)
17+
self._server.wait_until_ready()
1818

1919
def run(self) -> None:
20-
run_object_storage_server(self._storage_address.host, self._storage_address.port, self._on_server_ready_fd)
20+
self._server.run(self._storage_address.host, self._storage_address.port)

scaler/entry_points/object_storage_server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import argparse
22

3-
from scaler.object_storage.object_storage_server import run_object_storage_server
3+
from scaler.object_storage.object_storage_server import ObjectStorageServer
44
from scaler.utility.object_storage_config import ObjectStorageConfig
55

66

@@ -19,4 +19,4 @@ def get_args():
1919

2020
def main():
2121
args = get_args()
22-
run_object_storage_server(args.address.host, args.address.port)
22+
ObjectStorageServer().run(args.address.host, args.address.port)

scaler/io/async_object_storage_connector.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ async def routine(self):
8585
if header.response_type != ObjectResponseHeader.ObjectResponseType.GetOK:
8686
return
8787

88-
pending_get_future = self._pending_get_requests.get(header.object_id)
88+
pending_get_future = self._pending_get_requests.pop(header.object_id, None)
8989

9090
if pending_get_future is None:
9191
logging.warning(f"unknown get-ok response for unrequested object_id={repr(header.object_id)}.")

scaler/object_storage/CMakeLists.txt

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,28 @@ target_link_libraries(cc_object_storage_server PUBLIC CapnProto::capnp)
1919
target_include_directories(cc_object_storage_server PUBLIC ${CMAKE_BINARY_DIR})
2020

2121
install(TARGETS cc_object_storage_server
22-
LIBRARY DESTINATION scaler/object_storage/)
22+
LIBRARY DESTINATION scaler/object_storage/
23+
COMPONENT cc_object_storage_server)
2324

2425
find_package(Python3 COMPONENTS Development.Module REQUIRED)
2526

2627
add_library(object_storage_server SHARED)
2728

2829
set_target_properties(object_storage_server PROPERTIES PREFIX "")
2930

31+
# Forces the .so suffix, as Python only recognizes these (macOS uses .dylib by default)
32+
set_target_properties(object_storage_server PROPERTIES SUFFIX ".so")
33+
3034
target_sources(object_storage_server PRIVATE pymod_object_storage_server.cpp)
31-
target_include_directories(object_storage_server PRIVATE ${Python3_INCLUDE_DIRS})
3235
target_link_libraries(object_storage_server PRIVATE cc_object_storage_server
33-
PRIVATE ${Python3_LIBRARIES}
34-
)
36+
PRIVATE Python3::Module)
3537

36-
target_link_options(object_storage_server PRIVATE "-Wl,-rpath,$ORIGIN")
38+
if(APPLE)
39+
target_link_options(object_storage_server PRIVATE "-Wl,-rpath,@loader_path")
40+
else()
41+
target_link_options(object_storage_server PRIVATE "-Wl,-rpath,$ORIGIN")
42+
endif()
3743

3844
install(TARGETS object_storage_server
39-
LIBRARY DESTINATION scaler/object_storage/)
45+
LIBRARY DESTINATION scaler/object_storage/
46+
COMPONENT object_storage_server)

scaler/object_storage/io_helper.cpp

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
#include "io_helper.h"
22

3+
#include <cstdint>
4+
#include <exception>
5+
#include <iostream>
6+
37
#include <capnp/message.h>
48
#include <capnp/serialize.h>
59

@@ -15,8 +19,6 @@
1519
#include <boost/asio/use_awaitable.hpp>
1620
#include <boost/asio/write.hpp>
1721
#include <boost/system/system_error.hpp>
18-
#include <exception>
19-
#include <iostream>
2022

2123
#include "protocol/object_storage.capnp.h"
2224
#include "scaler/object_storage/constants.h"
@@ -65,12 +67,12 @@ awaitable<void> read_request_header(tcp::socket& socket, ObjectRequestHeader& he
6567
if (e.code() == boost::asio::error::eof) {
6668
std::cerr << "Remote end closed, nothing to read.\n";
6769
} else {
68-
std::cerr << "exception throwned, read error e.what() = " << e.what() << '\n';
70+
std::cerr << "exception thrown, read error e.what() = " << e.what() << '\n';
6971
}
7072
throw e;
7173
} catch (std::exception& e) {
7274
// TODO: make this a log, capnp header corruption is an err.
73-
std::cerr << "exception throwned, header not a capnp e.what() = " << e.what() << '\n';
75+
std::cerr << "exception thrown, header not a capnp e.what() = " << e.what() << '\n';
7476

7577
throw e;
7678
}
@@ -93,6 +95,11 @@ awaitable<void> read_request_payload(tcp::socket& socket, ObjectRequestHeader& h
9395
co_return;
9496
}
9597

98+
if (header.payloadLength > SIZE_MAX) {
99+
std::cerr << "payload length is larger than SIZE_MAX = " << SIZE_MAX << '\n';
100+
std::terminate();
101+
}
102+
96103
payload.resize(header.payloadLength);
97104
try {
98105
std::size_t n = co_await boost::asio::async_read(socket, boost::asio::buffer(payload), use_awaitable);

scaler/object_storage/io_helper.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,5 @@ boost::asio::awaitable<void> read_request_payload(
2222
boost::asio::awaitable<void> write_response(
2323
boost::asio::ip::tcp::socket& socket, ObjectResponseHeader& header, std::span<const unsigned char> payload_view);
2424

25-
26-
2725
}; // namespace object_storage
2826
}; // namespace scaler

scaler/object_storage/object_storage_server.h

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
#pragma once
22

3+
#include <algorithm>
4+
#include <iostream>
5+
#include <map>
6+
#include <unistd.h>
7+
#include <utility>
8+
39
#include <boost/asio/awaitable.hpp>
410
#include <boost/asio/co_spawn.hpp>
511
#include <boost/asio/detached.hpp>
612
#include <boost/asio/io_context.hpp>
713
#include <boost/asio/ip/tcp.hpp>
814
#include <boost/asio/signal_set.hpp>
915
#include <boost/system/system_error.hpp>
10-
#include <iostream>
11-
#include <map>
1216

1317
#include "protocol/object_storage.capnp.h"
1418
#include "scaler/object_storage/defs.h"
@@ -81,10 +85,10 @@ class ObjectStorageServer {
8185

8286
case reqType::GET_OBJECT: {
8387
responseHeader.respType = respType::GET_O_K;
84-
if (objectIDToMeta[requestHeader.objectID].object)
85-
responseHeader.payloadLength =
86-
std::min(objectIDToMeta[requestHeader.objectID].object->size(), requestHeader.payloadLength);
87-
else
88+
if (objectIDToMeta[requestHeader.objectID].object) {
89+
uint64_t objectSize = static_cast<uint64_t>(objectIDToMeta[requestHeader.objectID].object->size());
90+
responseHeader.payloadLength = std::min(objectSize, requestHeader.payloadLength);
91+
} else
8892
return false;
8993
break;
9094
}
@@ -106,8 +110,8 @@ class ObjectStorageServer {
106110
private:
107111
awaitable<void> write_once(Meta meta) {
108112
if (meta.requestHeader.reqType == reqType::GET_OBJECT) {
109-
meta.responseHeader.payloadLength =
110-
std::min(objectIDToMeta[meta.responseHeader.objectID].object->size(), meta.requestHeader.payloadLength);
113+
uint64_t objectSize = static_cast<uint64_t>(objectIDToMeta[meta.responseHeader.objectID].object->size());
114+
meta.responseHeader.payloadLength = std::min(objectSize, meta.requestHeader.payloadLength);
111115
}
112116

113117
auto payload_view = getMemoryViewForResponsePayload(meta.responseHeader);
@@ -131,7 +135,8 @@ class ObjectStorageServer {
131135
#ifndef NDEBUG
132136
public:
133137
#endif
134-
int on_server_ready_fd;
138+
int _onServerReadyReader;
139+
int _onServerReadyWriter;
135140

136141
std::map<scaler::object_storage::object_id_t, ObjectWithMeta> objectIDToMeta;
137142
std::map<std::size_t, shared_object_t> objectHashToObject;
@@ -165,31 +170,46 @@ class ObjectStorageServer {
165170
}
166171
}
167172

168-
static int createServerReadyEventfd() {
169-
int on_server_ready_fd = eventfd(0, EFD_SEMAPHORE);
170-
if (on_server_ready_fd == -1) {
171-
std::cerr << "create on_server_ready_fd failed: errno=" << errno << std::endl;
173+
void initServerReadyFds() {
174+
int pipeFds[2];
175+
int ret = pipe(pipeFds);
176+
177+
if (ret != 0) {
178+
std::cerr << "create on server ready FDs failed: errno=" << errno << std::endl;
172179
std::terminate();
173180
}
174181

175-
return on_server_ready_fd;
182+
this->_onServerReadyReader = pipeFds[0];
183+
this->_onServerReadyWriter = pipeFds[1];
176184
}
177185

178-
void setServerReadyEventfd() {
186+
void setServerReadyFd() {
179187
uint64_t value = 1;
180-
ssize_t ret = write(this->on_server_ready_fd, &value, sizeof (uint64_t));
188+
ssize_t ret = write(this->_onServerReadyWriter, &value, sizeof (uint64_t));
181189

182190
if (ret != sizeof (uint64_t)) {
183-
std::cerr << "write to on_server_ready_fd failed: errno=" << errno << std::endl;
191+
std::cerr << "write to _onServerReadyWriter failed: errno=" << errno << std::endl;
184192
std::terminate();
185193
}
186194
}
187195

196+
void closeServerReadyFds() {
197+
std::array<int, 2> fds { this->_onServerReadyReader, this->_onServerReadyWriter };
198+
199+
for (auto fd : fds) {
200+
if (close(fd) != 0) {
201+
std::cerr << "close failed: errno=" << errno << std::endl;
202+
std::terminate();
203+
}
204+
}
205+
}
206+
188207
awaitable<void> listener(boost::asio::ip::tcp::endpoint endpoint) {
208+
189209
auto executor = co_await boost::asio::this_coro::executor;
190210
tcp::acceptor acceptor(executor, endpoint);
191211

192-
setServerReadyEventfd();
212+
setServerReadyFd();
193213

194214
for (;;) {
195215
auto shared_socket = std::make_shared<tcp::socket>(executor);
@@ -201,13 +221,12 @@ class ObjectStorageServer {
201221
}
202222

203223
public:
204-
ObjectStorageServer(int on_server_ready_fd = -1)
205-
{
206-
if (on_server_ready_fd < 0) {
207-
this->on_server_ready_fd = createServerReadyEventfd();
208-
} else {
209-
this->on_server_ready_fd = on_server_ready_fd;
210-
}
224+
ObjectStorageServer() {
225+
this->initServerReadyFds();
226+
}
227+
228+
~ObjectStorageServer() {
229+
this->closeServerReadyFds();
211230
}
212231

213232
void run(std::string name, std::string port) {
@@ -220,7 +239,6 @@ class ObjectStorageServer {
220239
signals.async_wait([&](auto, auto) { io_context.stop(); });
221240

222241
co_spawn(io_context, listener(res.begin()->endpoint()), detached);
223-
224242
io_context.run();
225243
} catch (std::exception& e) {
226244
std::cerr << "Exception: " << e.what() << std::endl;
@@ -230,10 +248,10 @@ class ObjectStorageServer {
230248

231249
void waitUntilReady() {
232250
uint64_t value;
233-
ssize_t ret = read(this->on_server_ready_fd, &value, sizeof (uint64_t));
251+
ssize_t ret = read(this->_onServerReadyReader, &value, sizeof (uint64_t));
234252

235253
if (ret != sizeof (uint64_t)) {
236-
std::cerr << "read from on_server_ready_fd failed: errno=" << errno << std::endl;
254+
std::cerr << "read from _onServerReadyReader failed: errno=" << errno << std::endl;
237255
std::terminate();
238256
}
239257
}

0 commit comments

Comments
 (0)