Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ xref:

.PHONY: eunit
eunit: compile
$(REBAR) eunit verbose=truen
$(REBAR) eunit --verbose

.PHONY: ct
ct: compile
Expand Down
2 changes: 1 addition & 1 deletion examples/async_recv/async_recv_echo_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ start_link(Transport, Sock) ->
init([Transport, Sock]) ->
case Transport:wait(Sock) of
{ok, NewSock} ->
Transport:async_recv(Sock, 0, infinity),
Transport:async_recv(NewSock, 0, infinity),
State = #state{transport = Transport, socket = NewSock},
gen_server:enter_loop(?MODULE, [], State);
Error -> Error
Expand Down
2 changes: 1 addition & 1 deletion include/esockd.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
| {pp2_ssl, list(pp2_additional_ssl_field())}).

-record(proxy_socket, {inet :: inet4 | inet6 | 'unix' | 'unspec',
socket :: inet:socket() | #ssl_socket{},
socket :: inet:socket() | #ssl_socket{} | socket:socket(),
src_addr :: inet:ip_address() | undefined,
dst_addr :: inet:ip_address() | undefined,
src_port :: inet:port_number() | undefined,
Expand Down
14 changes: 14 additions & 0 deletions src/esockd.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
-export([ open/3
, open_udp/3
, open_dtls/3
, open_tcpsocket/3
, close/2
, close/1
%% Legacy API
Expand All @@ -37,6 +38,7 @@
]).

-export([ child_spec/3
, tcpsocket_child_spec/3
, udp_child_spec/3
, dtls_child_spec/3
%% Legacy API
Expand Down Expand Up @@ -163,6 +165,12 @@ tcp_options(Opts) ->
open(Proto, Port, Opts, MFA) ->
open(Proto, Port, merge_mfargs(Opts, MFA)).

%% @doc Open a Socket API based TCP listener
-spec open_tcpsocket(atom(), listen_on(), options())
-> {ok, pid()} | {error, term()}.
open_tcpsocket(Proto, ListenOn, Opts) ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, need a -type for Proto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, but this is unrelated to the PR in question I believe.

esockd_sup:start_child(tcpsocket_child_spec(Proto, ListenOn, Opts)).

%% @doc Open a UDP listener
-spec open_udp(atom(), listen_on(), [option()])
-> {ok, pid()} | {error, term()}.
Expand Down Expand Up @@ -221,6 +229,12 @@ child_spec(Proto, ListenOn, Opts) when is_atom(Proto) ->
child_spec(Proto, ListenOn, Opts, MFA) when is_atom(Proto) ->
child_spec(Proto, ListenOn, merge_mfargs(Opts, MFA)).

%% @doc Create a Child spec for a Socket API based TCP listener.
-spec tcpsocket_child_spec(atom(), listen_on(), options())
-> supervisor:child_spec().
tcpsocket_child_spec(Proto, ListenOn, Opts) when is_atom(Proto) ->
esockd_sup:tcpsocket_child_spec(Proto, fixaddr(ListenOn), Opts).

%% @doc Create a Child spec for a UDP Listener.
-spec udp_child_spec(atom(), listen_on(), options())
-> supervisor:child_spec().
Expand Down
113 changes: 113 additions & 0 deletions src/esockd_accept_inet.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2025 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(esockd_accept_inet).

-export([
init/2,
async_accept/2,
async_accept_result/3,
post_accept/2,
fast_close/1
]).

-export([
mk_tune_socket_fun/1,
tune_socket/2
]).

-type ctx() :: {module(), tune_socket_fun()}.
-type socket() :: inet:socket().
-type async_ref() :: reference().

-type tune_socket_fun() ::
{fun((socket(), Opts) -> {ok, socket()} | {error, any()}), Opts}.

%%

-spec init(socket(), _Opts) -> ctx().
init(LSock, TuneFun) ->
{ok, SockMod} = inet_db:lookup_socket(LSock),
{SockMod, TuneFun}.

-spec async_accept(socket(), ctx()) ->
{async, async_ref()} | {error, atom()}.
async_accept(LSock, _Ctx) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} ->
{async, Ref};
{error, Reason} ->
{error, Reason}
end.

-spec async_accept_result(Message, async_ref(), ctx()) ->
{ok, socket()} | {error, atom()} | Message.
async_accept_result({inet_async, _LSock, Ref, {ok, Sock}}, Ref, _Ctx) ->
{ok, Sock};
async_accept_result({inet_async, _LSock, Ref, {error, Reason}}, Ref, _Ctx) ->
{error, Reason};
async_accept_result(Info, _Ref, _Ctx) ->
Info.

-spec post_accept(socket(), ctx()) -> {ok, socket()} | {error, atom()}.
post_accept(Sock, {SockMod, TuneFun}) ->
%% make it look like gen_tcp:accept
inet_db:register_socket(Sock, SockMod),
eval_tune_socket_fun(TuneFun, Sock).

eval_tune_socket_fun({Fun, Opts}, Sock) ->
Fun(Sock, Opts).

-spec mk_tune_socket_fun([esockd:option()]) -> tune_socket_fun().
mk_tune_socket_fun(Opts) ->
TuneOpts = [{Name, Val} || {Name, Val} <- Opts,
Name =:= tune_buffer orelse
Name =:= tune_fun],
{fun ?MODULE:tune_socket/2, TuneOpts}.

tune_socket(Sock, []) ->
{ok, Sock};
tune_socket(Sock, [{tune_buffer, true}|More]) ->
case esockd_transport:getopts(Sock, [sndbuf, recbuf, buffer]) of
{ok, BufSizes} ->
BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
case esockd_transport:setopts(Sock, [{buffer, BufSz}]) of
ok ->
tune_socket(Sock, More);
Error ->
Error
end;
Error ->
Error
end;
tune_socket(Sock, [{tune_fun, {M, F, A}} | More]) ->
case apply(M, F, A) of
ok ->
tune_socket(Sock, More);
Error ->
Error
end.

-spec fast_close(socket()) -> ok.
fast_close(Sock) ->
try
%% NOTE
%% Port-close leads to a TCP reset which cuts out TCP graceful close overheads.
_ = port_close(Sock),
receive {'EXIT', Sock, _} -> ok after 1 -> ok end
catch
error:_ -> ok
end.
149 changes: 149 additions & 0 deletions src/esockd_accept_socket.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2025 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(esockd_accept_socket).

-export([
init/2,
async_accept/2,
async_accept_result/3,
post_accept/2,
fast_close/1
]).

-export([
mk_tune_socket_fun/1,
tune_socket/2
]).

-type ctx() :: tune_socket_fun().
-type socket() :: socket:socket().
-type async_ref() :: socket:select_info().

-type tune_socket_fun() ::
{fun((socket(), Opts) -> {ok, socket()} | {error, any()}), Opts}.

-define(DEFAULT_SOCK_OPTIONS, [{nodelay, true}]).

%%

-spec init(socket(), _Opts) -> ctx().
init(_LSock, TuneFun) ->
TuneFun.

-spec async_accept(socket(), _Opts) ->
{ok, socket()} | {async, async_ref()} | {error, atom()}.
async_accept(LSock, _Opts) ->
case socket:accept(LSock, nowait) of
{ok, Sock} ->
{ok, Sock};
{error, Reason} ->
{error, Reason};
{select, {_Info, _Tag, Handle}} ->
{async, Handle}
end.

-spec async_accept_result(Info, async_ref(), _Opts) ->
{ok, socket()} | {error, atom()} | {async, async_ref()} | Info.
async_accept_result({'$socket', LSock, select, Handle}, Handle, _Opts) ->
case socket:accept(LSock, Handle) of
{ok, Sock} ->
{ok, Sock};
{error, Reason} ->
{error, Reason};
{select, {_Info, _Tag, NHandle}} ->
{async, NHandle}
end;
async_accept_result({'$socket', _LSock, abort, {Handle, Reason}}, Handle, _Opts) ->
{error, Reason};
async_accept_result(Info, _Handle, _Opts) ->
Info.

-spec post_accept(socket(), ctx()) -> {ok, socket()} | {error, atom()}.
post_accept(Sock, TuneFun) ->
eval_tune_socket_fun(Sock, TuneFun).

eval_tune_socket_fun(Sock, {Fun, Opts}) ->
Fun(Sock, Opts).

-spec mk_tune_socket_fun([esockd:option()]) -> tune_socket_fun().
mk_tune_socket_fun(Opts) ->
TcpOpts = proplists:get_value(tcp_options, Opts, []),
SockOpts = lists:flatten([sock_opt(O) || O <- merge_sock_defaults(TcpOpts)]),
TuneOpts = [{Name, Val} || {Name, Val} <- Opts, Name =:= tune_buffer],
{fun ?MODULE:tune_socket/2, [{setopts, SockOpts} | TuneOpts]}.

tune_socket(Sock, [{setopts, SockOpts} | Rest]) ->
case setopts(Sock, SockOpts) of
ok ->
tune_socket(Sock, Rest);
Error ->
Error
end;
tune_socket(Sock, [{tune_buffer, true} | _]) ->
try
BufRecv = ensure(socket:getopt(Sock, {socket, rcvbuf})),
Buffer = ensure(socket:getopt(Sock, {otp, rcvbuf})),
Max = max(Buffer, BufRecv),
ok = ensure(socket:setopt(Sock, {otp, rcvbuf}, Max)),
{ok, Sock}
catch
Error -> Error
end;
tune_socket(Sock, _) ->
{ok, Sock}.

ensure(ok) -> ok;
ensure({ok, Result}) -> Result;
ensure(Error) -> throw(Error).

merge_sock_defaults(Opts) ->
esockd:merge_opts(?DEFAULT_SOCK_OPTIONS, Opts).

sock_opt(binary) ->
%% Meaningless.
[];
sock_opt({nodelay, Flag}) ->
{{tcp, nodelay}, Flag};
sock_opt({linger, {Flag, N}}) ->
{{socket, linger}, #{onoff => Flag, linger => N}};
sock_opt({recbuf, Size}) ->
{{socket, rcvbuf}, Size};
sock_opt({sndbuf, Size}) ->
{{socket, sndbuf}, Size};
sock_opt({buffer, Size}) ->
Copy link
Contributor

@qzhuyan qzhuyan Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to my own notes, double check.

{{otp, rcvbuf}, Size};
sock_opt({reuseaddr, _}) ->
%% Listener option.
[];
sock_opt({backlog, _}) ->
%% Listener option.
[];
sock_opt(_Opt) ->
%% TODO: Ignored, need to notify user.
[].

setopts(Sock, [{Opt, Value} | Rest]) ->
case socket:setopt(Sock, Opt, Value) of
ok -> setopts(Sock, Rest);
Error -> Error
end;
setopts(_Sock, []) ->
ok.

-spec fast_close(socket()) -> ok.
fast_close(Sock) ->
esockd_socket:fast_close(Sock).
Loading