Skip to content
Merged
38 changes: 29 additions & 9 deletions src/esockd_accept_inet.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

-export([
init/2,
async_accept/2,
async_accept/1,
async_accept_result/3,
post_accept/2,
sockname/1,
fast_close/1
]).

Expand All @@ -29,23 +30,34 @@
tune_socket/2
]).

-type ctx() :: {module(), tune_socket_fun()}.
-type socket() :: inet:socket().
-type socket() :: esockd_transport:listen_socket().
-type async_ref() :: reference().

-type tune_socket_fun() ::
{fun((socket(), Opts) -> {ok, socket()} | {error, any()}), Opts}.

-record(ctx, {
lsock :: socket(),
sock_mod :: module(),
tune_fun :: tune_socket_fun()
}).

-type ctx() :: #ctx{}.

%%

-spec init(socket(), _Opts) -> ctx().
init(LSock, TuneFun) ->
{ok, SockMod} = inet_db:lookup_socket(LSock),
{SockMod, TuneFun}.
#ctx{
lsock = LSock,
sock_mod = SockMod,
tune_fun = TuneFun
}.

-spec async_accept(socket(), ctx()) ->
-spec async_accept(ctx()) ->
{async, async_ref()} | {error, atom()}.
async_accept(LSock, _Ctx) ->
async_accept(#ctx{lsock = LSock}) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} ->
{async, Ref};
Expand All @@ -62,12 +74,15 @@ async_accept_result({inet_async, _LSock, Ref, {error, Reason}}, Ref, _Ctx) ->
async_accept_result(Info, _Ref, _Ctx) ->
Info.

-spec post_accept(socket(), ctx()) -> {ok, socket()} | {error, atom()}.
post_accept(Sock, {SockMod, TuneFun}) ->
-spec post_accept(socket(), ctx()) -> {ok, esockd_transport, socket()} | {error, atom()}.
post_accept(Sock, #ctx{sock_mod = SockMod, tune_fun = TuneFun}) ->
%% make it look like gen_tcp:accept
inet_db:register_socket(Sock, SockMod),
eval_tune_socket_fun(TuneFun, Sock).

return_socket(Sock) ->
{ok, esockd_transport, Sock}.

eval_tune_socket_fun({Fun, Opts}, Sock) ->
Fun(Sock, Opts).

Expand All @@ -79,7 +94,7 @@ mk_tune_socket_fun(Opts) ->
{fun ?MODULE:tune_socket/2, TuneOpts}.

tune_socket(Sock, []) ->
{ok, Sock};
return_socket(Sock);
tune_socket(Sock, [{tune_buffer, true}|More]) ->
case esockd_transport:getopts(Sock, [sndbuf, recbuf, buffer]) of
{ok, BufSizes} ->
Expand All @@ -102,6 +117,11 @@ tune_socket(Sock, [{tune_fun, {M, F, A}} | More]) ->
Error
end.

-spec sockname(ctx()) ->
{ok, {inet:ip_address(), inet:port_number()}} | {error, inet:posix()}.
sockname(#ctx{lsock = LSock}) ->
esockd_transport:sockname(LSock).

-spec fast_close(socket()) -> ok.
fast_close(Sock) ->
try
Expand Down
44 changes: 27 additions & 17 deletions src/esockd_accept_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

-export([
init/2,
async_accept/2,
async_accept/1,
async_accept_result/3,
post_accept/2,
sockname/1,
fast_close/1
]).

Expand All @@ -29,24 +30,33 @@
tune_socket/2
]).

-type ctx() :: tune_socket_fun().
-type socket() :: socket:socket().
-type async_ref() :: socket:select_info().

-type tune_socket_fun() ::
{fun((socket(), Opts) -> {ok, socket()} | {error, any()}), Opts}.

-record(ctx, {
lsock :: socket(),
tune_fun :: tune_socket_fun()
}).

-type ctx() :: #ctx{}.

-define(DEFAULT_SOCK_OPTIONS, [{nodelay, true}]).

%%

-spec init(socket(), _Opts) -> ctx().
init(_LSock, TuneFun) ->
TuneFun.
init(LSock, TuneFun) ->
#ctx{
lsock = LSock,
tune_fun = TuneFun
}.

-spec async_accept(socket(), _Opts) ->
-spec async_accept(ctx()) ->
{ok, socket()} | {async, async_ref()} | {error, atom()}.
async_accept(LSock, _Opts) ->
async_accept(#ctx{lsock = LSock}) ->
case socket:accept(LSock, nowait) of
{ok, Sock} ->
{ok, Sock};
Expand All @@ -72,10 +82,13 @@ async_accept_result({'$socket', _LSock, abort, {Handle, Reason}}, Handle, _Opts)
async_accept_result(Info, _Handle, _Opts) ->
Info.

-spec post_accept(socket(), ctx()) -> {ok, socket()} | {error, atom()}.
post_accept(Sock, TuneFun) ->
-spec post_accept(socket(), ctx()) -> {ok, esockd_socket, socket()} | {error, atom()}.
post_accept(Sock, #ctx{tune_fun = TuneFun}) ->
eval_tune_socket_fun(Sock, TuneFun).

return_socket(Sock) ->
{ok, esockd_socket, Sock}.

eval_tune_socket_fun(Sock, {Fun, Opts}) ->
Fun(Sock, Opts).

Expand All @@ -89,7 +102,7 @@ mk_tune_socket_fun(Opts) ->
{fun ?MODULE:tune_socket/2, [{setopts, SockOpts} | TuneOpts]}.

tune_socket(Sock, [{setopts, SockOpts} | Rest]) ->
case setopts(Sock, SockOpts) of
case esockd_socket:setopts(Sock, SockOpts) of
ok ->
tune_socket(Sock, Rest);
Error ->
Expand All @@ -114,7 +127,7 @@ tune_socket(Sock, [{tune_fun, {M, F, A}} | Rest]) ->
Error
end;
tune_socket(Sock, _) ->
{ok, Sock}.
return_socket(Sock).

ensure(ok) -> ok;
ensure({ok, Result}) -> Result;
Expand Down Expand Up @@ -146,13 +159,10 @@ sock_opt(_Opt) ->
%% TODO: Ignored, need to notify user.
[].

setopts(Sock, [{Opt, Value} | Rest]) ->
case socket:setopt(Sock, Opt, Value) of
ok -> setopts(Sock, Rest);
Error -> Error
end;
setopts(_Sock, []) ->
ok.
-spec sockname(ctx()) ->
{ok, {inet:ip_address(), inet:port_number()}} | {error, inet:posix() | closed}.
sockname(#ctx{lsock = LSock}) ->
esockd_socket:sockname(LSock).

-spec fast_close(socket()) -> ok.
fast_close(Sock) ->
Expand Down
Loading