1111#include < memory>
1212#include < optional>
1313
14+ #include " scaler/io/ymq/error.h"
1415#include " scaler/io/ymq/event_loop_thread.h"
1516#include " scaler/io/ymq/event_manager.h"
1617#include " scaler/io/ymq/io_socket.h"
@@ -123,7 +124,45 @@ std::expected<void, int> MessageConnectionTCP::tryReadMessages(bool readOneMessa
123124 if (errno == EAGAIN || errno == EWOULDBLOCK) {
124125 return {}; // Expected, we read until exhuastion
125126 } else {
126- return std::unexpected {errno};
127+ const int myErrno = errno;
128+ switch (myErrno) {
129+ case EBADF:
130+ case EISDIR:
131+ case EINVAL:
132+ unrecoverableError ({
133+ Error::ErrorCode::CoreBug,
134+ " Originated from" ,
135+ " read(2)" ,
136+ " Errno is" ,
137+ strerror (myErrno),
138+ " _connfd" ,
139+ _connFd,
140+ " readTo" ,
141+ (void *)readTo,
142+ " remainingSize" ,
143+ remainingSize,
144+ });
145+
146+ case EINTR:
147+ unrecoverableError ({
148+ Error::ErrorCode::SignalNotSupported,
149+ " Originated from" ,
150+ " read(2)" ,
151+ " Errno is" ,
152+ strerror (myErrno),
153+ });
154+
155+ case EFAULT:
156+ case EIO:
157+ default :
158+ unrecoverableError ({
159+ Error::ErrorCode::ConfigurationError,
160+ " Originated from" ,
161+ " read(2)" ,
162+ " Errno is" ,
163+ strerror (myErrno),
164+ });
165+ }
127166 }
128167 } else {
129168 message._cursor += n;
@@ -163,9 +202,8 @@ void MessageConnectionTCP::onRead() {
163202 onClose ();
164203 return ;
165204 }
166- printf (" SOMETHING REALLY BAD HAPPENED\n " );
167- exit (1 );
168205 }
206+
169207 if (_receivedReadOperations.size () && isCompleteMessage (_receivedReadOperations.front ())) {
170208 auto id = std::move (_receivedReadOperations.front ());
171209 _remoteIOSocketIdentity.emplace ((char *)id._payload .data (), id._payload .len ());
@@ -181,8 +219,6 @@ void MessageConnectionTCP::onRead() {
181219 onClose ();
182220 return ;
183221 }
184- printf (" SOMETHING REALLY BAD HAPPENED\n " );
185- exit (1 );
186222 }
187223
188224 updateReadOperation ();
@@ -206,9 +242,6 @@ void MessageConnectionTCP::onWrite() {
206242 if (res.error () == ECONNRESET || res.error () == EPIPE) {
207243 onClose ();
208244 return ;
209- } else {
210- printf (" SOMETHING REALLY BAD\n " );
211- exit (1 );
212245 }
213246}
214247
@@ -268,8 +301,69 @@ std::expected<size_t, int> MessageConnectionTCP::trySendQueuedMessages() {
268301 if (bytesSent == -1 ) {
269302 if (errno == EAGAIN || errno == EWOULDBLOCK) {
270303 return 0 ;
271- } else
272- return std::unexpected {errno};
304+ } else {
305+ const int myErrno = errno;
306+ switch (myErrno) {
307+ case EAFNOSUPPORT:
308+ case EBADF:
309+ case EINVAL:
310+ case EMSGSIZE:
311+ case ENOTCONN:
312+ case ENOTSOCK:
313+ case EOPNOTSUPP:
314+ case ENAMETOOLONG:
315+ case ENOENT:
316+ case ENOTDIR:
317+ case ELOOP:
318+ case EDESTADDRREQ:
319+ case EHOSTUNREACH:
320+ case EISCONN:
321+ unrecoverableError ({
322+ Error::ErrorCode::CoreBug,
323+ " Originated from" ,
324+ " sendmsg(2)" ,
325+ " Errno is" ,
326+ strerror (myErrno),
327+ " _connfd" ,
328+ _connFd,
329+ " msg.msg_iovlen" ,
330+ msg.msg_iovlen ,
331+ });
332+ break ;
333+
334+ case ECONNRESET: break ; // NOTE: HANDLE BY CALLER
335+
336+ case EINTR:
337+ unrecoverableError ({
338+ Error::ErrorCode::SignalNotSupported,
339+ " Originated from" ,
340+ " sendmsg(2)" ,
341+ " Errno is" ,
342+ strerror (myErrno),
343+ });
344+ break ;
345+
346+ case EPIPE: break ; // NOTE: HANDLE BY CALLER
347+
348+ case EIO:
349+ case EACCES:
350+ case ENETDOWN:
351+ case ENETUNREACH:
352+ case ENOBUFS:
353+ case ENOMEM:
354+ default :
355+ unrecoverableError ({
356+ Error::ErrorCode::ConfigurationError,
357+ " Originated from" ,
358+ " sendmsg(2)" ,
359+ " Errno is" ,
360+ strerror (myErrno),
361+ });
362+ break ;
363+ }
364+
365+ return std::unexpected {myErrno};
366+ }
273367 }
274368
275369 return bytesSent;
0 commit comments