From 297f3587f2522264cdd9a16e3bee9370eb4ee2b7 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 15 Jul 2025 13:04:43 +0200 Subject: [PATCH 01/11] chore: refine typespecs --- src/esockd_connection_sup.erl | 36 ++++++++++++++++++++--------------- src/esockd_socket.erl | 7 +++++-- src/esockd_transport.erl | 1 + 3 files changed, 27 insertions(+), 17 deletions(-) diff --git a/src/esockd_connection_sup.erl b/src/esockd_connection_sup.erl index fa3a668..57968cb 100644 --- a/src/esockd_connection_sup.erl +++ b/src/esockd_connection_sup.erl @@ -49,7 +49,7 @@ -include("esockd.hrl"). --type(shutdown() :: brutal_kill | infinity | pos_integer()). +-type shutdown() :: brutal_kill | infinity | pos_integer(). -type option() :: {shutdown, shutdown()} | {max_connections, pos_integer()} @@ -100,12 +100,17 @@ set_options(Pid, Opts) -> %%-------------------------------------------------------------------- %% @doc Start connection. -start_connection(Sup, Transport, Sock, UpgradeFuns) -> - case call(Sup, {start_connection, Transport, Sock}) of +-spec start_connection + (pid(), esockd_transport, esockd_transport:socket(), [esockd:sock_fun()]) -> + {ok, Connection :: pid()} | ignore | {error, term()}; + (pid(), esockd_socket, esockd_socket:socket(), [esockd:sock_fun()]) -> + {ok, Connection :: pid()} | ignore | {error, term()}. +start_connection(Sup, TransportMod, Sock, UpgradeFuns) -> + case call(Sup, {start_connection, TransportMod, Sock}) of {ok, ConnPid} -> %% Transfer controlling from acceptor to connection - _ = Transport:controlling_process(Sock, ConnPid), - _ = Transport:ready(ConnPid, Sock, UpgradeFuns), + _ = TransportMod:controlling_process(Sock, ConnPid), + _ = TransportMod:ready(ConnPid, Sock, UpgradeFuns), {ok, ConnPid}; ignore -> ignore; @@ -114,12 +119,13 @@ start_connection(Sup, Transport, Sock, UpgradeFuns) -> end. %% @doc Start the connection process. --spec start_connection_proc(esockd:mfargs(), - esockd_transport | esockd_socket, - esockd_transport:socket() | socket:socket()) - -> {ok, pid()} | ignore | {error, term()}. -start_connection_proc(MFA, Transport, Sock) -> - esockd:start_mfargs(MFA, Transport, Sock). +-spec start_connection_proc + (esockd:mfargs(), esockd_transport, esockd_transport:socket()) -> + {ok, pid()} | ignore | {error, term()}; + (esockd:mfargs(), esockd_socket, esockd_socket:socket()) -> + {ok, pid()} | ignore | {error, term()}. +start_connection_proc(MFA, TransportMod, Sock) -> + esockd:start_mfargs(MFA, TransportMod, Sock). -spec(count_connections(pid()) -> integer()). count_connections(Sup) -> @@ -163,18 +169,18 @@ init(Opts0) -> shutdown = Shutdown, mfargs = MFA}}. -handle_call({start_connection, _Transport, _Sock}, _From, +handle_call({start_connection, _TransportMod, _Sock}, _From, State = #state{curr_connections = Conns, max_connections = MaxConns}) when map_size(Conns) >= MaxConns -> {reply, {error, ?ERROR_MAXLIMIT}, State}; -handle_call({start_connection, Transport, Sock}, _From, +handle_call({start_connection, TransportMod, Sock}, _From, State = #state{curr_connections = Conns, access_rules = Rules, mfargs = MFA}) -> - case Transport:peername(Sock) of + case TransportMod:peername(Sock) of {ok, {Addr, _Port}} -> case allowed(Addr, Rules) of true -> - try start_connection_proc(MFA, Transport, Sock) of + try start_connection_proc(MFA, TransportMod, Sock) of {ok, Pid} when is_pid(Pid) -> NState = State#state{curr_connections = maps:put(Pid, true, Conns)}, {reply, {ok, Pid}, NState}; diff --git a/src/esockd_socket.erl b/src/esockd_socket.erl index 82ee869..b66d471 100644 --- a/src/esockd_socket.erl +++ b/src/esockd_socket.erl @@ -28,6 +28,8 @@ %% Internal callbacks -export([proxy_upgrade_fun/1, proxy_upgrade/2]). +-export_type([socket/0]). + -type socket() :: socket:socket(). -spec type(socket()) -> tcp | proxy | {error, closed}. @@ -56,6 +58,7 @@ wait(Sock) -> upgrade(Sock, UpgradeFuns) end. +-spec upgrade(socket(), [esockd:sock_fun()]) -> {ok, socket()} | {error, term()}. upgrade(Sock, []) -> {ok, Sock}; upgrade(Sock, [{Fun, Args} | More]) -> @@ -72,7 +75,7 @@ fast_close(Sock) -> %% @doc Sockname -spec sockname(socket()) -> {ok, {inet:ip_address(), inet:port_number()}} - | {error, inet:posix()}. + | {error, inet:posix() | closed}. sockname(Sock) -> case socket:getopt(Sock, {otp, meta}) of {ok, #{proxy_dst_addr := DstAddr, proxy_dst_port := DstPort}} -> @@ -90,7 +93,7 @@ sockname(Sock) -> %% @doc Peername -spec peername(socket()) -> {ok, {inet:ip_address(), inet:port_number()}} - | {error, inet:posix()}. + | {error, inet:posix() | closed}. peername(Sock) -> case socket:getopt(Sock, {otp, meta}) of {ok, #{proxy_src_addr := SrcAddr, proxy_src_port := SrcPort}} -> diff --git a/src/esockd_transport.erl b/src/esockd_transport.erl index cc17eb1..3452fc7 100644 --- a/src/esockd_transport.erl +++ b/src/esockd_transport.erl @@ -71,6 +71,7 @@ wait(Sock) -> upgrade(Sock, UpgradeFuns) end. +-spec(upgrade(socket(), [esockd:sock_fun()]) -> {ok, socket()} | {error, term()}). upgrade(Sock, []) -> {ok, Sock}; upgrade(Sock, [{Fun, Args}|More]) -> From eb0ffd1282c7e1c9a725d337f78f29d708b36c0f Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 15 Jul 2025 13:05:40 +0200 Subject: [PATCH 02/11] chore: refine naming and add more documentation --- src/esockd_acceptor_fsm.erl | 6 +++--- src/esockd_socket.erl | 17 +++++++++++++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/esockd_acceptor_fsm.erl b/src/esockd_acceptor_fsm.erl index 558d099..f34fbec 100644 --- a/src/esockd_acceptor_fsm.erl +++ b/src/esockd_acceptor_fsm.erl @@ -124,7 +124,7 @@ handle_event(internal, accept, State, D) when State =:= waiting orelse {stop, Reason, D} end; handle_event(info, Message, State = {accepting, Ref, InState}, D) -> - case unwrap_accept_result(Message, Ref, D) of + case async_accept_result(Message, Ref, D) of {ok, Sock} -> handle_accepted(Sock, InState, D); {error, Reason} -> @@ -307,7 +307,7 @@ async_accept(CurrentState, #d{lsock = LSock, modopts = {Mod, Opts}}) -> Other end. -unwrap_accept_result({?MODULE, Ref, {error, Reason}}, Ref, _) -> +async_accept_result({?MODULE, Ref, {error, Reason}}, Ref, _) -> {error, Reason}; -unwrap_accept_result(Message, Ref, #d{modopts = {Mod, Opts}}) -> +async_accept_result(Message, Ref, #d{modopts = {Mod, Opts}}) -> Mod:async_accept_result(Message, Ref, Opts). diff --git a/src/esockd_socket.erl b/src/esockd_socket.erl index b66d471..06a32a4 100644 --- a/src/esockd_socket.erl +++ b/src/esockd_socket.erl @@ -74,6 +74,8 @@ fast_close(Sock) -> ok. %% @doc Sockname +%% Returns original destination address and port if proxy protocol is enabled. +%% Otherwise, returns the local address and port. -spec sockname(socket()) -> {ok, {inet:ip_address(), inet:port_number()}} | {error, inet:posix() | closed}. sockname(Sock) -> @@ -92,6 +94,8 @@ sockname(Sock) -> end. %% @doc Peername +%% Returns original source address and port if proxy protocol is enabled. +%% Otherwise, returns the local address and port. -spec peername(socket()) -> {ok, {inet:ip_address(), inet:port_number()}} | {error, inet:posix() | closed}. peername(Sock) -> @@ -110,6 +114,9 @@ peername(Sock) -> end. %% @doc Socket peercert +%% Returns the peer certificate if proxy protocol is enabled, and the proxy +%% middleware passed it as part of PPv2 exchange. +%% See also `esockd_transport:peercert/1'. -spec peercert(socket()) -> nossl | list(pp2_additional_ssl_field()) | {error, term()}. @@ -124,12 +131,18 @@ peercert(Sock) -> end. %% @doc Peercert subject +%% Returns the common name of the peer certificate if proxy protocol is enabled, +%% and the proxy middleware passed it as part of PPv2 exchange. +%% See also `esockd_transport:peer_cert_subject/1'. -spec peer_cert_subject(socket()) -> undefined | binary(). peer_cert_subject(Sock) -> %% Common Name? Haproxy PP2 will not pass subject. peer_cert_common_name(Sock). %% @doc Peercert common name +%% Returns the common name of the peer certificate if proxy protocol is enabled, +%% and the proxy middleware passed it as part of PPv2 exchange. +%% See also `esockd_transport:peer_cert_common_name/1'. -spec peer_cert_common_name(socket()) -> undefined | binary(). peer_cert_common_name(Sock) -> case socket:getopt(Sock, {otp, meta}) of @@ -142,6 +155,10 @@ peer_cert_common_name(Sock) -> Error end. +%% @doc Peersni +%% Returns the SNI of the peer TLS connection if proxy protocol is enabled, +%% and the proxy middleware passed it as part of PPv2 exchange. +%% See also `esockd_transport:peersni/1'. -spec peersni(socket()) -> undefined | binary(). peersni(Sock) -> case socket:getopt(Sock, {otp, meta}) of From 679fc47e14baa27ccca76e82485956474e3ffab6 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 15 Jul 2025 13:36:38 +0200 Subject: [PATCH 03/11] chore(acceptor-fsm): abstract listener socket into callback impl --- src/esockd_accept_inet.erl | 38 ++++++++++++++++++------ src/esockd_accept_socket.erl | 36 +++++++++++++++++------ src/esockd_acceptor_fsm.erl | 46 ++++++++++++------------------ src/esockd_acceptor_sup.erl | 4 +-- src/esockd_dtls_acceptor.erl | 5 ++-- src/esockd_transport.erl | 5 ++-- test/esockd_acceptor_fsm_SUITE.erl | 5 +--- 7 files changed, 83 insertions(+), 56 deletions(-) diff --git a/src/esockd_accept_inet.erl b/src/esockd_accept_inet.erl index e42a194..7613472 100644 --- a/src/esockd_accept_inet.erl +++ b/src/esockd_accept_inet.erl @@ -18,9 +18,10 @@ -export([ init/2, - async_accept/2, + async_accept/1, async_accept_result/3, post_accept/2, + sockname/1, fast_close/1 ]). @@ -29,23 +30,34 @@ tune_socket/2 ]). --type ctx() :: {module(), tune_socket_fun()}. --type socket() :: inet:socket(). +-type socket() :: esockd_transport:listen_socket(). -type async_ref() :: reference(). -type tune_socket_fun() :: {fun((socket(), Opts) -> {ok, socket()} | {error, any()}), Opts}. +-record(ctx, { + lsock :: socket(), + sock_mod :: module(), + tune_fun :: tune_socket_fun() +}). + +-type ctx() :: #ctx{}. + %% -spec init(socket(), _Opts) -> ctx(). init(LSock, TuneFun) -> {ok, SockMod} = inet_db:lookup_socket(LSock), - {SockMod, TuneFun}. + #ctx{ + lsock = LSock, + sock_mod = SockMod, + tune_fun = TuneFun + }. --spec async_accept(socket(), ctx()) -> +-spec async_accept(ctx()) -> {async, async_ref()} | {error, atom()}. -async_accept(LSock, _Ctx) -> +async_accept(#ctx{lsock = LSock}) -> case prim_inet:async_accept(LSock, -1) of {ok, Ref} -> {async, Ref}; @@ -62,12 +74,15 @@ async_accept_result({inet_async, _LSock, Ref, {error, Reason}}, Ref, _Ctx) -> async_accept_result(Info, _Ref, _Ctx) -> Info. --spec post_accept(socket(), ctx()) -> {ok, socket()} | {error, atom()}. -post_accept(Sock, {SockMod, TuneFun}) -> +-spec post_accept(socket(), ctx()) -> {ok, esockd_transport, socket()} | {error, atom()}. +post_accept(Sock, #ctx{sock_mod = SockMod, tune_fun = TuneFun}) -> %% make it look like gen_tcp:accept inet_db:register_socket(Sock, SockMod), eval_tune_socket_fun(TuneFun, Sock). +return_socket(Sock) -> + {ok, esockd_transport, Sock}. + eval_tune_socket_fun({Fun, Opts}, Sock) -> Fun(Sock, Opts). @@ -79,7 +94,7 @@ mk_tune_socket_fun(Opts) -> {fun ?MODULE:tune_socket/2, TuneOpts}. tune_socket(Sock, []) -> - {ok, Sock}; + return_socket(Sock); tune_socket(Sock, [{tune_buffer, true}|More]) -> case esockd_transport:getopts(Sock, [sndbuf, recbuf, buffer]) of {ok, BufSizes} -> @@ -102,6 +117,11 @@ tune_socket(Sock, [{tune_fun, {M, F, A}} | More]) -> Error end. +-spec sockname(ctx()) -> + {ok, {inet:ip_address(), inet:port_number()}} | {error, inet:posix()}. +sockname(#ctx{lsock = LSock}) -> + esockd_transport:sockname(LSock). + -spec fast_close(socket()) -> ok. fast_close(Sock) -> try diff --git a/src/esockd_accept_socket.erl b/src/esockd_accept_socket.erl index 8101be9..173cf00 100644 --- a/src/esockd_accept_socket.erl +++ b/src/esockd_accept_socket.erl @@ -18,9 +18,10 @@ -export([ init/2, - async_accept/2, + async_accept/1, async_accept_result/3, post_accept/2, + sockname/1, fast_close/1 ]). @@ -29,24 +30,33 @@ tune_socket/2 ]). --type ctx() :: tune_socket_fun(). -type socket() :: socket:socket(). -type async_ref() :: socket:select_info(). -type tune_socket_fun() :: {fun((socket(), Opts) -> {ok, socket()} | {error, any()}), Opts}. +-record(ctx, { + lsock :: socket(), + tune_fun :: tune_socket_fun() +}). + +-type ctx() :: #ctx{}. + -define(DEFAULT_SOCK_OPTIONS, [{nodelay, true}]). %% -spec init(socket(), _Opts) -> ctx(). -init(_LSock, TuneFun) -> - TuneFun. +init(LSock, TuneFun) -> + #ctx{ + lsock = LSock, + tune_fun = TuneFun + }. --spec async_accept(socket(), _Opts) -> +-spec async_accept(ctx()) -> {ok, socket()} | {async, async_ref()} | {error, atom()}. -async_accept(LSock, _Opts) -> +async_accept(#ctx{lsock = LSock}) -> case socket:accept(LSock, nowait) of {ok, Sock} -> {ok, Sock}; @@ -72,10 +82,13 @@ async_accept_result({'$socket', _LSock, abort, {Handle, Reason}}, Handle, _Opts) async_accept_result(Info, _Handle, _Opts) -> Info. --spec post_accept(socket(), ctx()) -> {ok, socket()} | {error, atom()}. -post_accept(Sock, TuneFun) -> +-spec post_accept(socket(), ctx()) -> {ok, esockd_socket, socket()} | {error, atom()}. +post_accept(Sock, #ctx{tune_fun = TuneFun}) -> eval_tune_socket_fun(Sock, TuneFun). +return_socket(Sock) -> + {ok, esockd_socket, Sock}. + eval_tune_socket_fun(Sock, {Fun, Opts}) -> Fun(Sock, Opts). @@ -114,7 +127,7 @@ tune_socket(Sock, [{tune_fun, {M, F, A}} | Rest]) -> Error end; tune_socket(Sock, _) -> - {ok, Sock}. + return_socket(Sock). ensure(ok) -> ok; ensure({ok, Result}) -> Result; @@ -154,6 +167,11 @@ setopts(Sock, [{Opt, Value} | Rest]) -> setopts(_Sock, []) -> ok. +-spec sockname(ctx()) -> + {ok, {inet:ip_address(), inet:port_number()}} | {error, inet:posix() | closed}. +sockname(#ctx{lsock = LSock}) -> + esockd_socket:sockname(LSock). + -spec fast_close(socket()) -> ok. fast_close(Sock) -> esockd_socket:fast_close(Sock). diff --git a/src/esockd_acceptor_fsm.erl b/src/esockd_acceptor_fsm.erl index f34fbec..8eeadef 100644 --- a/src/esockd_acceptor_fsm.erl +++ b/src/esockd_acceptor_fsm.erl @@ -20,9 +20,7 @@ -include("esockd.hrl"). --export([ - start_link/7 -]). +-export([start_link/6]). %% state callbacks -export([handle_event/4]). @@ -59,9 +57,7 @@ -record(d, { listener_ref :: esockd:listener_ref(), - lsock :: _ListenerSocket, - transport :: module(), - modopts :: {module(), _Ctx}, + modctx :: {module(), _Ctx}, upgrade_funs :: [esockd:sock_fun()], conn_limiter :: undefined | esockd_generic_limiter:limiter(), conn_sup :: pid() @@ -72,18 +68,17 @@ %% @doc Start an acceptor -spec start_link( esockd:listener_ref(), - module(), pid(), {module(), _Options}, [esockd:sock_fun()], esockd_generic_limiter:limiter(), - _ListenerSocket + _ListenSocket ) -> {ok, pid()} | {error, term()}. -start_link(ListenerRef, Transport, ConnSup, ModOpts, UpgradeFuns, Limiter, LSock) -> +start_link(ListenerRef, ConnSup, ModOpts, UpgradeFuns, Limiter, LSock) -> gen_statem:start_link( ?MODULE, - [ListenerRef, Transport, ConnSup, ModOpts, UpgradeFuns, Limiter, LSock], + [ListenerRef, ConnSup, ModOpts, UpgradeFuns, Limiter, LSock], [] ). @@ -94,14 +89,12 @@ start_link(ListenerRef, Transport, ConnSup, ModOpts, UpgradeFuns, Limiter, LSock callback_mode() -> handle_event_function. -init([ListenerRef, Transport, ConnSup, {Mod, Opts}, UpgradeFuns, Limiter, LSock]) -> +init([ListenerRef, ConnSup, {Mod, Opts}, UpgradeFuns, Limiter, LSock]) -> _ = erlang:process_flag(trap_exit, true), Ctx = Mod:init(LSock, Opts), D = #d{ listener_ref = ListenerRef, - lsock = LSock, - transport = Transport, - modopts = {Mod, Ctx}, + modctx = {Mod, Ctx}, upgrade_funs = UpgradeFuns, conn_limiter = Limiter, conn_sup = ConnSup @@ -178,15 +171,14 @@ enter_suspending(D, Timeout) -> handle_socket( Sock, D = #d{ - transport = Transport, - modopts = {Mod, Opts}, + modctx = {Mod, Ctx}, upgrade_funs = UpgradeFuns, conn_sup = ConnSup } ) -> - case Mod:post_accept(Sock, Opts) of - {ok, NSock} -> - case start_connection(ConnSup, Transport, NSock, UpgradeFuns) of + case Mod:post_accept(Sock, Ctx) of + {ok, TransportMod, NSock} -> + case start_connection(ConnSup, TransportMod, NSock, UpgradeFuns) of {ok, _Pid} -> %% Inc accepted stats. inc_stats(D, accepted); @@ -219,7 +211,7 @@ code_change(_OldVsn, State, D, _Extra) -> %% Internal funcs %%-------------------------------------------------------------------- -close(#d{modopts = {Mod, _}}, Sock) -> +close(#d{modctx = {Mod, _}}, Sock) -> Mod:fast_close(Sock). handle_start_error(econnreset, _) -> @@ -260,8 +252,8 @@ log_system_limit(Reason, D) -> listener => format_sockname(D), cause => explain_posix(Reason)}). -format_sockname(#d{lsock = LSock, transport = Transport}) -> - case Transport:sockname(LSock) of +format_sockname(#d{modctx = {Mod, Ctx}}) -> + case Mod:sockname(Ctx) of {ok, SockName} -> esockd:format(SockName); {error, Reason} -> @@ -292,14 +284,14 @@ start_connection({F, A}, _Transport, Sock, UpgradeFuns) when is_function(F) -> %% The first delayed message should cause acceptor to enter suspending state. %% Then it should continue to accept 10 more sockets (which are all likely %% to result in emfile error anyway) during suspending state. -async_accept(CurrentState, #d{lsock = LSock, modopts = {Mod, Opts}}) -> - case Mod:async_accept(LSock, Opts) of +async_accept(CurrentState, #d{modctx = {Mod, Ctx}}) -> + case Mod:async_accept(Ctx) of {error, Reason} when Reason =:= emfile orelse Reason =:= enfile -> Delay = case CurrentState of suspending -> ?SYS_LIMIT_SUSPEND_MS div 10; _Waiting -> 0 end, - Ref = Reason, + Ref = make_ref(), Msg = {?MODULE, Ref, {error, Reason}}, _ = erlang:send_after(Delay, self(), Msg), {async, Ref}; @@ -309,5 +301,5 @@ async_accept(CurrentState, #d{lsock = LSock, modopts = {Mod, Opts}}) -> async_accept_result({?MODULE, Ref, {error, Reason}}, Ref, _) -> {error, Reason}; -async_accept_result(Message, Ref, #d{modopts = {Mod, Opts}}) -> - Mod:async_accept_result(Message, Ref, Opts). +async_accept_result(Message, Ref, #d{modctx = {Mod, Ctx}}) -> + Mod:async_accept_result(Message, Ref, Ctx). diff --git a/src/esockd_acceptor_sup.erl b/src/esockd_acceptor_sup.erl index 77032b9..69d2572 100644 --- a/src/esockd_acceptor_sup.erl +++ b/src/esockd_acceptor_sup.erl @@ -52,12 +52,12 @@ start_supervised(ListenerRef = {Proto, ListenOn}) -> TuneFun = esockd_accept_inet:mk_tune_socket_fun(Opts), AcceptCb = {esockd_accept_inet, TuneFun}, Mod = esockd_acceptor_fsm, - Args = [ListenerRef, esockd_transport, ConnSup, AcceptCb, UpgradeFuns, Limiter]; + Args = [ListenerRef, ConnSup, AcceptCb, UpgradeFuns, Limiter]; tcpsocket -> TuneFun = esockd_accept_socket:mk_tune_socket_fun(Opts), AcceptCb = {esockd_accept_socket, TuneFun}, Mod = esockd_acceptor_fsm, - Args = [ListenerRef, esockd_socket, ConnSup, AcceptCb, UpgradeFuns, Limiter]; + Args = [ListenerRef, ConnSup, AcceptCb, UpgradeFuns, Limiter]; dtls -> TuneFun = esockd_accept_inet:mk_tune_socket_fun(Opts), Mod = esockd_dtls_acceptor, diff --git a/src/esockd_dtls_acceptor.erl b/src/esockd_dtls_acceptor.erl index e3eb05a..fbdf2ea 100644 --- a/src/esockd_dtls_acceptor.erl +++ b/src/esockd_dtls_acceptor.erl @@ -84,9 +84,8 @@ accepting(internal, accept, %% Inc accepted stats. _ = esockd_server:inc_stats(ListenerRef, accepted, 1), _ = case eval_tune_socket_fun(TuneFun, Sock) of - {ok, Sock} -> - Transport = esockd_transport, - case esockd_connection_sup:start_connection(ConnSup, Transport, Sock, UpgradeFuns) of + {ok, TransportMod, Sock} -> + case esockd_connection_sup:start_connection(ConnSup, TransportMod, Sock, UpgradeFuns) of {ok, _Pid} -> ok; {error, enotconn} -> close(Sock); %% quiet...issue #10 diff --git a/src/esockd_transport.erl b/src/esockd_transport.erl index 3452fc7..3eb0d5a 100644 --- a/src/esockd_transport.erl +++ b/src/esockd_transport.erl @@ -38,10 +38,11 @@ , proxy_upgrade/2 ]). --export_type([socket/0]). +-export_type([socket/0, listen_socket/0]). -type(ssl_socket() :: #ssl_socket{}). -type(proxy_socket() :: #proxy_socket{}). +-type(listen_socket() :: inet:socket()). -type(socket() :: inet:socket() | ssl_socket() | proxy_socket() | #sslsocket{}). -spec(type(socket()) -> tcp | ssl | proxy). @@ -81,7 +82,7 @@ upgrade(Sock, [{Fun, Args}|More]) -> end. -spec(listen(inet:port_number(), [gen_tcp:listen_option()]) - -> {ok, inet:socket()} | {error, system_limit | inet:posix()}). + -> {ok, listen_socket()} | {error, system_limit | inet:posix()}). listen(Port, Opts) -> gen_tcp:listen(Port, Opts). diff --git a/test/esockd_acceptor_fsm_SUITE.erl b/test/esockd_acceptor_fsm_SUITE.erl index 8b2ee5c..7077b2f 100644 --- a/test/esockd_acceptor_fsm_SUITE.erl +++ b/test/esockd_acceptor_fsm_SUITE.erl @@ -86,7 +86,6 @@ start(PortNumber, Limiter, Opts, Config) -> AcceptMod = proplists:get_value(accept_mod, Config), case AcceptMod of esockd_accept_inet -> - Transport = esockd_transport, SockOpts = [binary, {active, false}, {reuseaddr, true}, @@ -94,7 +93,6 @@ start(PortNumber, Limiter, Opts, Config) -> {backlog, maps:get(backlog, Opts, 1024)}], {ok, LSock} = gen_tcp:listen(PortNumber, SockOpts); esockd_accept_socket -> - Transport = esockd_socket, Backlog = maps:get(backlog, Opts, 1024), {ok, LSock} = socket:open(inet, stream, tcp), ok = socket:setopt(LSock, {socket, reuseaddr}, true), @@ -109,7 +107,6 @@ start(PortNumber, Limiter, Opts, Config) -> AcceptCb = {AcceptMod, TuneFun}, {ok, AccPid} = esockd_acceptor_fsm:start_link( ListenerRef, - Transport, StartConn, AcceptCb, _UpgradeFuns = [], @@ -206,7 +203,7 @@ t_einval(Config) -> t_sys_limit(Config) -> AcceptMod = proplists:get_value(accept_mod, Config), meck:new(AcceptMod, [passthrough, no_history]), - meck:expect(AcceptMod, async_accept, fun(_, _) -> {error, emfile} end), + meck:expect(AcceptMod, async_accept, fun(_) -> {error, emfile} end), Port = ?PORT, Server = start(Port, no_rate_limit(), #{}, Config), try From 99a4b68e1204d265ad99d6bbff2dc301c2106424 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 15 Jul 2025 15:23:25 +0200 Subject: [PATCH 04/11] chore: update copyright year --- src/esockd_acceptor_fsm.erl | 2 +- src/esockd_socket_listener.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/esockd_acceptor_fsm.erl b/src/esockd_acceptor_fsm.erl index 8eeadef..0e45f42 100644 --- a/src/esockd_acceptor_fsm.erl +++ b/src/esockd_acceptor_fsm.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2020-2025 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/esockd_socket_listener.erl b/src/esockd_socket_listener.erl index 75fb48b..cd70ac6 100644 --- a/src/esockd_socket_listener.erl +++ b/src/esockd_socket_listener.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2025 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. From aa40618a45fccb0a1aff7940b9e8d0e78f3d41d1 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 15 Jul 2025 15:23:47 +0200 Subject: [PATCH 05/11] fix(socket): handle error condition gracefully --- src/esockd_socket_listener.erl | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/esockd_socket_listener.erl b/src/esockd_socket_listener.erl index cd70ac6..9c3898a 100644 --- a/src/esockd_socket_listener.erl +++ b/src/esockd_socket_listener.erl @@ -98,9 +98,15 @@ init({Proto, ListenOn, Opts}) -> case listen(ListenOn, TcpOpts) of {ok, LSock, SockOpts} -> _MRef = socket:monitor(LSock), - {ok, #{addr := LAddr, port := LPort}} = socket:sockname(LSock), - {ok, #state{listener_ref = ListenerRef, lsock = LSock, - laddr = LAddr, lport = LPort, sockopts = SockOpts}}; + case esockd_socket:sockname(LSock) of + {ok, {LAddr, LPort}} -> + {ok, #state{listener_ref = ListenerRef, lsock = LSock, + laddr = LAddr, lport = LPort, sockopts = SockOpts}}; + {error, Reason} -> + error_logger:error_msg("~s failed to get sockname: ~p (~s)", + [Proto, Reason, inet:format_error(Reason)]), + {stop, Reason} + end; {error, Reason = {invalid, What}} -> error_logger:error_msg("~s failed to listen on ~p - invalid option: ~0p", [Proto, Port, What]), From 746845366e9c2e9b7b6dbf36e3719c8b71327046 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 15 Jul 2025 16:41:59 +0200 Subject: [PATCH 06/11] chore: refine naming --- src/esockd_acceptor_fsm.erl | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/esockd_acceptor_fsm.erl b/src/esockd_acceptor_fsm.erl index 0e45f42..bb52f71 100644 --- a/src/esockd_acceptor_fsm.erl +++ b/src/esockd_acceptor_fsm.erl @@ -26,10 +26,10 @@ -export([handle_event/4]). %% The state diagram: -%% -%% +---------------------+ -%% | | -%% +-----v-----+ +--------+----------+ +%% init +%% | +---------------------+ +%% | | | +%% +-V---v-----+ +--------+----------+ %% | waiting +----->+ accepting-waiting | %% +-+----^----+ +--------+----------+ %% | | | @@ -136,7 +136,7 @@ handle_event(Type, Content, State, D) -> handle_info(Type, Content, State, D). handle_info(Type, Content, State, _D) -> - logger:log(warning, #{msg => "esockd_acceptor_unhandled_event", + logger:log(warning, #{msg => "esockd_acceptor_fsm_unhandled_event", state => State, event_type => Type, event_content => Content}), From a0ffc5c7258b99992c9c6a38f4d283924389a592 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 15 Jul 2025 16:42:22 +0200 Subject: [PATCH 07/11] fix(acceptor-fsm): do not attempt to close already closed socket --- src/esockd_acceptor_fsm.erl | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/esockd_acceptor_fsm.erl b/src/esockd_acceptor_fsm.erl index bb52f71..8b629d2 100644 --- a/src/esockd_acceptor_fsm.erl +++ b/src/esockd_acceptor_fsm.erl @@ -184,13 +184,13 @@ handle_socket( inc_stats(D, accepted); {error, Reason} -> handle_start_error(Reason, D), - close(D, NSock), + maybe_close(D, NSock, Reason), inc_stats(D, Reason) end; {error, Reason} -> %% the socket became invalid before %% starting the owner process - close(D, Sock), + maybe_close(D, Sock, Reason), inc_stats(D, Reason) end. @@ -211,6 +211,11 @@ code_change(_OldVsn, State, D, _Extra) -> %% Internal funcs %%-------------------------------------------------------------------- +maybe_close(_D, _Sock, closed) -> + ok; +maybe_close(D, Sock, _Reason) -> + close(D, Sock). + close(#d{modctx = {Mod, _}}, Sock) -> Mod:fast_close(Sock). From 8d6e138fae0c2441d2bd0d32a16e6d5fc6ec086e Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 15 Jul 2025 16:48:14 +0200 Subject: [PATCH 08/11] fix(acceptor-fsm): handle ignore case on connection start --- src/esockd_acceptor_fsm.erl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/esockd_acceptor_fsm.erl b/src/esockd_acceptor_fsm.erl index 8b629d2..e9e0c4d 100644 --- a/src/esockd_acceptor_fsm.erl +++ b/src/esockd_acceptor_fsm.erl @@ -182,6 +182,9 @@ handle_socket( {ok, _Pid} -> %% Inc accepted stats. inc_stats(D, accepted); + ignore -> + close(D, NSock), + inc_stats(D, ignore); {error, Reason} -> handle_start_error(Reason, D), maybe_close(D, NSock, Reason), From c62ecd14efe987f792f60ed786002413903034e1 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 15 Jul 2025 16:51:46 +0200 Subject: [PATCH 09/11] chore: refine naming --- src/esockd_acceptor_fsm.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/esockd_acceptor_fsm.erl b/src/esockd_acceptor_fsm.erl index e9e0c4d..76078ca 100644 --- a/src/esockd_acceptor_fsm.erl +++ b/src/esockd_acceptor_fsm.erl @@ -186,7 +186,7 @@ handle_socket( close(D, NSock), inc_stats(D, ignore); {error, Reason} -> - handle_start_error(Reason, D), + maybe_log_start_error(Reason, D), maybe_close(D, NSock, Reason), inc_stats(D, Reason) end; @@ -222,15 +222,15 @@ maybe_close(D, Sock, _Reason) -> close(#d{modctx = {Mod, _}}, Sock) -> Mod:fast_close(Sock). -handle_start_error(econnreset, _) -> +maybe_log_start_error(econnreset, _) -> ok; -handle_start_error(enotconn, _) -> +maybe_log_start_error(enotconn, _) -> ok; -handle_start_error(einval, _) -> +maybe_log_start_error(einval, _) -> ok; -handle_start_error(overloaded, _) -> +maybe_log_start_error(overloaded, _) -> ok; -handle_start_error(Reason, D) -> +maybe_log_start_error(Reason, D) -> logger:log(error, #{msg => "failed_to_start_connection_process", listener => format_sockname(D), cause => Reason}). From 382beff460fc8ce12d1d83f933037b042d399ad9 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 15 Jul 2025 17:16:35 +0200 Subject: [PATCH 10/11] fix(proxy-protocol): properly mark function as test-only --- src/esockd_proxy_protocol.erl | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/esockd_proxy_protocol.erl b/src/esockd_proxy_protocol.erl index fee37c4..375ad08 100644 --- a/src/esockd_proxy_protocol.erl +++ b/src/esockd_proxy_protocol.erl @@ -20,9 +20,9 @@ -include("esockd.hrl"). -export([recv/3]). --export([get_proxy_attrs/1]). -ifdef(TEST). +-export([get_proxy_attrs/1]). -export([parse_v1/2, parse_v2/4, parse_pp2_tlv/2, parse_pp2_ssl/1]). -endif. @@ -174,7 +174,7 @@ map_tcpsocket_error(Reason) -> {error, {recv_proxy_info_error, Reason}}. set_socket_meta(ProxySocket = #proxy_socket{socket = Sock}) -> - socket:setopt(Sock, {otp, meta}, get_proxy_attrs(ProxySocket)). + socket:setopt(Sock, {otp, meta}, mk_proxy_attrs(ProxySocket)). recv_v2(Transport, Sock, Deadline) -> with_remaining_timeout(Deadline, fun(HeaderTimeout) -> @@ -208,6 +208,8 @@ mk_proxy_attrs(#proxy_socket{inet = Protocol, proxy_src_port => SrcPort, proxy_dst_port => DstPort, proxy_pp2_info => PP2Info}. +-ifdef(TEST). + -spec get_proxy_attrs(maybe_proxy_socket() | socket:socket()) -> map(). get_proxy_attrs(ProxySocket = #proxy_socket{}) -> mk_proxy_attrs(ProxySocket); @@ -221,6 +223,8 @@ get_proxy_attrs(Socket) when element(1, Socket) =:= '$socket' -> get_proxy_attrs(_Socket) -> #{}. +-endif. + parse_v1(ProxyInfo, ProxySock) -> [SrcAddrBin, DstAddrBin, SrcPortBin, DstPortBin] = binary:split(ProxyInfo, [<<" ">>, <<"\r\n">>], [global, trim]), From 7bdd258c33ccf49dcc5be936680492750f02b992 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 15 Jul 2025 17:24:22 +0200 Subject: [PATCH 11/11] feat(socket): document and provide `getopts/2` / `setopts/2` adapters --- src/esockd_accept_socket.erl | 10 +------- src/esockd_socket.erl | 44 ++++++++++++++++++++++++++++++++++ src/esockd_socket_listener.erl | 10 +------- 3 files changed, 46 insertions(+), 18 deletions(-) diff --git a/src/esockd_accept_socket.erl b/src/esockd_accept_socket.erl index 173cf00..3581d94 100644 --- a/src/esockd_accept_socket.erl +++ b/src/esockd_accept_socket.erl @@ -102,7 +102,7 @@ mk_tune_socket_fun(Opts) -> {fun ?MODULE:tune_socket/2, [{setopts, SockOpts} | TuneOpts]}. tune_socket(Sock, [{setopts, SockOpts} | Rest]) -> - case setopts(Sock, SockOpts) of + case esockd_socket:setopts(Sock, SockOpts) of ok -> tune_socket(Sock, Rest); Error -> @@ -159,14 +159,6 @@ sock_opt(_Opt) -> %% TODO: Ignored, need to notify user. []. -setopts(Sock, [{Opt, Value} | Rest]) -> - case socket:setopt(Sock, Opt, Value) of - ok -> setopts(Sock, Rest); - Error -> Error - end; -setopts(_Sock, []) -> - ok. - -spec sockname(ctx()) -> {ok, {inet:ip_address(), inet:port_number()}} | {error, inet:posix() | closed}. sockname(#ctx{lsock = LSock}) -> diff --git a/src/esockd_socket.erl b/src/esockd_socket.erl index 06a32a4..0faf13e 100644 --- a/src/esockd_socket.erl +++ b/src/esockd_socket.erl @@ -14,12 +14,22 @@ %% limitations under the License. %%-------------------------------------------------------------------- +%% @doc This module deals with sockets coming from `esockd:open_tcpsocket/3` +%% listeners, where each socket is essentially `socket:socket()`. +%% +%% Compared to `esockd_transport`, this module *doesn't* provide adapters for +%% sending and receiving TCP stream data, closing, shutting down sockets. +%% This is done on purpose, users are expected to use `socket` APIs directly, +%% largely because `socket`-based connection loop is significantly different +%% from `esockd_transport`-based one, and not worth the effort to adapt due +%% to inevitable performance penalty. -module(esockd_socket). -include("esockd.hrl"). -export([type/1]). -export([controlling_process/2]). +-export([getopts/2, setopts/2]). -export([ready/3, wait/1]). -export([fast_close/1]). -export([sockname/1, peername/1]). @@ -45,6 +55,40 @@ type(Sock) -> controlling_process(Sock, NewOwner) -> socket:setopt(Sock, {otp, controlling_process}, NewOwner). +%% @doc Get socket options. +-spec getopts(socket(), [socket:socket_option()]) -> + {ok, [{socket:socket_option(), any()}]} | {error, inet:posix() | {invalid, _} | closed}. +getopts(Sock, Opts) -> + getopts(Sock, Opts, []). + +getopts(_Sock, [Opt = {otp, meta} | _], _Acc) -> + {error, {invalid, Opt}}; +getopts(Sock, [Opt | Rest], Acc) -> + case socket:getopt(Sock, Opt) of + {ok, Value} -> + getopts(Sock, Rest, [{Opt, Value} | Acc]); + Error -> + Error + end; +getopts(_Sock, [], Acc) -> + {ok, lists:reverse(Acc)}. + +%% @doc Set socket options. +%% Note this operation is not atomic, and may fail mid-way. +-spec setopts(socket(), [{socket:socket_option(), any()}]) -> + ok | {error, inet:posix() | {invalid, _} | closed}. +setopts(_Sock, [{Opt = {otp, OptName}, _Value} | _]) when OptName =:= meta; + OptName =:= controlling_process -> + %% Disallow changing those options explicitly, to avoid conflicts. + {error, {invalid, Opt}}; +setopts(Sock, [{Opt, Value} | Rest]) -> + case socket:setopt(Sock, Opt, Value) of + ok -> setopts(Sock, Rest); + Error -> Error + end; +setopts(_Sock, []) -> + ok. + -spec ready(pid(), socket(), [esockd:sock_fun()]) -> any(). ready(Pid, Sock, UpgradeFuns) -> %% NOTE: See `esockd_transport:ready/3'. diff --git a/src/esockd_socket_listener.erl b/src/esockd_socket_listener.erl index 9c3898a..bcfa46a 100644 --- a/src/esockd_socket_listener.erl +++ b/src/esockd_socket_listener.erl @@ -127,7 +127,7 @@ listen(ListenOn, TcpOpts) -> Backlog = proplists:get_value(backlog, TcpOpts, 128), try LSock = ensure(socket:open(SockDomain, stream, tcp)), - ok = ensure(sock_setopts(LSock, SockOpts)), + ok = ensure(esockd_socket:setopts(LSock, SockOpts)), ok = ensure(socket:bind(LSock, SockAddr#{family => SockDomain})), ok = ensure(socket:listen(LSock, Backlog)), {ok, LSock, SockOpts} @@ -144,14 +144,6 @@ sock_addr({Host, Port}) when tuple_size(Host) =:= 4 -> sock_addr({Host, Port}) when tuple_size(Host) =:= 8 -> #{family => inet6, addr => Host, port => Port}. -sock_setopts(LSock, [{Opt, Value} | Rest]) -> - case socket:setopt(LSock, Opt, Value) of - ok -> sock_setopts(LSock, Rest); - Error -> Error - end; -sock_setopts(_LSock, []) -> - ok. - sock_listen_opt({reuseaddr, Flag}) -> {{socket, reuseaddr}, Flag}; sock_listen_opt(_) ->