Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ xref:

.PHONY: eunit
eunit: compile
$(REBAR) eunit verbose=truen
$(REBAR) eunit --verbose

.PHONY: ct
ct: compile
Expand Down
2 changes: 1 addition & 1 deletion examples/async_recv/async_recv_echo_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ start_link(Transport, Sock) ->
init([Transport, Sock]) ->
case Transport:wait(Sock) of
{ok, NewSock} ->
Transport:async_recv(Sock, 0, infinity),
Transport:async_recv(NewSock, 0, infinity),
State = #state{transport = Transport, socket = NewSock},
gen_server:enter_loop(?MODULE, [], State);
Error -> Error
Expand Down
2 changes: 1 addition & 1 deletion include/esockd.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
| {pp2_ssl, list(pp2_additional_ssl_field())}).

-record(proxy_socket, {inet :: inet4 | inet6 | 'unix' | 'unspec',
socket :: inet:socket() | #ssl_socket{},
socket :: inet:socket() | #ssl_socket{} | socket:socket(),
src_addr :: inet:ip_address() | undefined,
dst_addr :: inet:ip_address() | undefined,
src_port :: inet:port_number() | undefined,
Expand Down
14 changes: 14 additions & 0 deletions src/esockd.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
-export([ open/3
, open_udp/3
, open_dtls/3
, open_tcpsocket/3
, close/2
, close/1
%% Legacy API
Expand All @@ -37,6 +38,7 @@
]).

-export([ child_spec/3
, tcpsocket_child_spec/3
, udp_child_spec/3
, dtls_child_spec/3
%% Legacy API
Expand Down Expand Up @@ -163,6 +165,12 @@ tcp_options(Opts) ->
open(Proto, Port, Opts, MFA) ->
open(Proto, Port, merge_mfargs(Opts, MFA)).

%% @doc Open a Socket API based TCP listener
-spec open_tcpsocket(atom(), listen_on(), options())
-> {ok, pid()} | {error, term()}.
open_tcpsocket(Proto, ListenOn, Opts) ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, need a -type for Proto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, but this is unrelated to the PR in question I believe.

esockd_sup:start_child(tcpsocket_child_spec(Proto, ListenOn, Opts)).

%% @doc Open a UDP listener
-spec open_udp(atom(), listen_on(), [option()])
-> {ok, pid()} | {error, term()}.
Expand Down Expand Up @@ -221,6 +229,12 @@ child_spec(Proto, ListenOn, Opts) when is_atom(Proto) ->
child_spec(Proto, ListenOn, Opts, MFA) when is_atom(Proto) ->
child_spec(Proto, ListenOn, merge_mfargs(Opts, MFA)).

%% @doc Create a Child spec for a Socket API based TCP listener.
-spec tcpsocket_child_spec(atom(), listen_on(), options())
-> supervisor:child_spec().
tcpsocket_child_spec(Proto, ListenOn, Opts) when is_atom(Proto) ->
esockd_sup:tcpsocket_child_spec(Proto, fixaddr(ListenOn), Opts).

%% @doc Create a Child spec for a UDP Listener.
-spec udp_child_spec(atom(), listen_on(), options())
-> supervisor:child_spec().
Expand Down
114 changes: 114 additions & 0 deletions src/esockd_accept_inet.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2025 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(esockd_accept_inet).

-export([
init/2,
async_accept/2,
async_accept_result/3,
post_accept/2,
fast_close/1
]).

-export([
mk_tune_socket_fun/1,
tune_socket/2
]).

-type ctx() :: {module(), tune_socket_fun()}.
-type socket() :: inet:socket().
-type async_ref() :: reference().

-type tune_socket_fun() ::
{fun((socket(), Opts) -> {ok, socket()} | {error, any()}), Opts}.

%%

-spec init(socket(), _Opts) -> ctx().
init(LSock, TuneFun) ->
{ok, SockMod} = inet_db:lookup_socket(LSock),
{SockMod, TuneFun}.

-spec async_accept(socket(), ctx()) ->
{async, async_ref()} | {error, atom()}.
async_accept(LSock, _Ctx) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} ->
{async, Ref};
{error, Reason} ->
{error, Reason}
end.

-spec async_accept_result(Message, async_ref(), ctx()) ->
{ok, socket()} | {error, atom()} | Message.
async_accept_result({inet_async, _LSock, Ref, {ok, Sock}}, Ref, _Ctx) ->
{ok, Sock};
async_accept_result({inet_async, _LSock, Ref, {error, Reason}}, Ref, _Ctx) ->
{error, Reason};
async_accept_result(Info, _Ref, _Ctx) ->
Info.

-spec post_accept(socket(), ctx()) -> {ok, socket()} | {error, atom()}.
post_accept(Sock, {SockMod, TuneFun}) ->
%% make it look like gen_tcp:accept
inet_db:register_socket(Sock, SockMod),
eval_tune_socket_fun(TuneFun, Sock).

eval_tune_socket_fun({Fun, Opts}, Sock) ->
Fun(Sock, Opts).

-spec mk_tune_socket_fun([esockd:option()]) -> tune_socket_fun().
mk_tune_socket_fun(Opts) ->
TuneOpts = [{Name, Val} || {Name, Val} <- Opts,
Name =:= tune_buffer orelse
Name =:= tune_fun],
{fun ?MODULE:tune_socket/2, TuneOpts}.

tune_socket(Sock, []) ->
{ok, Sock};
tune_socket(Sock, [{tune_buffer, true}|More]) ->
case esockd_transport:getopts(Sock, [sndbuf, recbuf, buffer]) of
{ok, BufSizes} ->
BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
case esockd_transport:setopts(Sock, [{buffer, BufSz}]) of
ok ->
tune_socket(Sock, More);
Error ->
Error
end;
Error ->
Error
end;
tune_socket(Sock, [{tune_fun, {M, F, A}} | More]) ->
%% NOTE: Socket is not part of the argument list, backward compatibility.
case apply(M, F, A) of
ok ->
tune_socket(Sock, More);
Error ->
Error
end.

-spec fast_close(socket()) -> ok.
fast_close(Sock) ->
try
%% NOTE
%% Port-close leads to a TCP reset which cuts out TCP graceful close overheads.
_ = port_close(Sock),
receive {'EXIT', Sock, _} -> ok after 1 -> ok end
catch
error:_ -> ok
end.
159 changes: 159 additions & 0 deletions src/esockd_accept_socket.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2025 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(esockd_accept_socket).

-export([
init/2,
async_accept/2,
async_accept_result/3,
post_accept/2,
fast_close/1
]).

-export([
mk_tune_socket_fun/1,
tune_socket/2
]).

-type ctx() :: tune_socket_fun().
-type socket() :: socket:socket().
-type async_ref() :: socket:select_info().

-type tune_socket_fun() ::
{fun((socket(), Opts) -> {ok, socket()} | {error, any()}), Opts}.

-define(DEFAULT_SOCK_OPTIONS, [{nodelay, true}]).

%%

-spec init(socket(), _Opts) -> ctx().
init(_LSock, TuneFun) ->
TuneFun.

-spec async_accept(socket(), _Opts) ->
{ok, socket()} | {async, async_ref()} | {error, atom()}.
async_accept(LSock, _Opts) ->
case socket:accept(LSock, nowait) of
{ok, Sock} ->
{ok, Sock};
{error, Reason} ->
{error, Reason};
{select, {_Info, _Tag, Handle}} ->
{async, Handle}
end.

-spec async_accept_result(Info, async_ref(), _Opts) ->
{ok, socket()} | {error, atom()} | {async, async_ref()} | Info.
async_accept_result({'$socket', LSock, select, Handle}, Handle, _Opts) ->
case socket:accept(LSock, Handle) of
{ok, Sock} ->
{ok, Sock};
{error, Reason} ->
{error, Reason};
{select, {_Info, _Tag, NHandle}} ->
{async, NHandle}
end;
async_accept_result({'$socket', _LSock, abort, {Handle, Reason}}, Handle, _Opts) ->
{error, Reason};
async_accept_result(Info, _Handle, _Opts) ->
Info.

-spec post_accept(socket(), ctx()) -> {ok, socket()} | {error, atom()}.
post_accept(Sock, TuneFun) ->
eval_tune_socket_fun(Sock, TuneFun).

eval_tune_socket_fun(Sock, {Fun, Opts}) ->
Fun(Sock, Opts).

-spec mk_tune_socket_fun([esockd:option()]) -> tune_socket_fun().
mk_tune_socket_fun(Opts) ->
TcpOpts = proplists:get_value(tcp_options, Opts, []),
SockOpts = lists:flatten([sock_opt(O) || O <- merge_sock_defaults(TcpOpts)]),
TuneOpts = [{Name, Val} || {Name, Val} <- Opts,
Name =:= tune_buffer orelse
Name =:= tune_fun],
{fun ?MODULE:tune_socket/2, [{setopts, SockOpts} | TuneOpts]}.

tune_socket(Sock, [{setopts, SockOpts} | Rest]) ->
case setopts(Sock, SockOpts) of
ok ->
tune_socket(Sock, Rest);
Error ->
Error
end;
tune_socket(Sock, [{tune_buffer, true} | Rest]) ->
try
BufRecv = ensure(socket:getopt(Sock, {socket, rcvbuf})),
Buffer = ensure(socket:getopt(Sock, {otp, rcvbuf})),
Max = max(Buffer, BufRecv),
ok = ensure(socket:setopt(Sock, {otp, rcvbuf}, Max)),
tune_socket(Sock, Rest)
catch
Error -> Error
end;
tune_socket(Sock, [{tune_fun, {M, F, A}} | Rest]) ->
%% NOTE: Socket is not part of the argument list, backward compatibility.
case apply(M, F, A) of
ok ->
tune_socket(Sock, Rest);
Error ->
Error
end;
tune_socket(Sock, _) ->
{ok, Sock}.

ensure(ok) -> ok;
ensure({ok, Result}) -> Result;
ensure(Error) -> throw(Error).

merge_sock_defaults(Opts) ->
esockd:merge_opts(?DEFAULT_SOCK_OPTIONS, Opts).

sock_opt(binary) ->
%% Meaningless.
[];
sock_opt({nodelay, Flag}) ->
{{tcp, nodelay}, Flag};
sock_opt({linger, {Flag, N}}) ->
{{socket, linger}, #{onoff => Flag, linger => N}};
sock_opt({recbuf, Size}) ->
{{socket, rcvbuf}, Size};
sock_opt({sndbuf, Size}) ->
{{socket, sndbuf}, Size};
sock_opt({buffer, Size}) ->
Copy link
Contributor

@qzhuyan qzhuyan Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to my own notes, double check.

{{otp, rcvbuf}, Size};
sock_opt({reuseaddr, _}) ->
%% Listener option.
[];
sock_opt({backlog, _}) ->
%% Listener option.
[];
sock_opt(_Opt) ->
%% TODO: Ignored, need to notify user.
[].

setopts(Sock, [{Opt, Value} | Rest]) ->
case socket:setopt(Sock, Opt, Value) of
ok -> setopts(Sock, Rest);
Error -> Error
end;
setopts(_Sock, []) ->
ok.

-spec fast_close(socket()) -> ok.
fast_close(Sock) ->
esockd_socket:fast_close(Sock).
Loading