Skip to content

Commit dd1a09d

Browse files
committed
Respect remote concurrency limit for headers/connect/ws_upgrade
In order to simplify the implementation the CookieStore is given to the connect function now, even though it's not currently used.
1 parent 04c3436 commit dd1a09d

File tree

6 files changed

+144
-124
lines changed

6 files changed

+144
-124
lines changed

src/gun.erl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1401,13 +1401,14 @@ connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, Initi
14011401
event_handler_state=EvHandlerState});
14021402
connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow},
14031403
State=#state{origin_host=Host, origin_port=Port,
1404-
protocol=Protocol, protocol_state=ProtoState,
1404+
protocol=Protocol, protocol_state=ProtoState, cookie_store=CookieStore0,
14051405
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
1406-
{Commands, EvHandlerState} = Protocol:connect(ProtoState,
1406+
{Commands, CookieStore, EvHandlerState} = Protocol:connect(ProtoState,
14071407
dereference_stream_ref(StreamRef, State), ReplyTo,
14081408
Destination, #{host => Host, port => Port},
1409-
Headers, InitialFlow, EvHandler, EvHandlerState0),
1410-
commands(Commands, State#state{event_handler_state=EvHandlerState});
1409+
Headers, InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
1410+
commands(Commands, State#state{cookie_store=CookieStore,
1411+
event_handler_state=EvHandlerState});
14111412
%% Public Websocket interface.
14121413
connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers}, State=#state{opts=Opts}) ->
14131414
WsOpts = maps:get(ws_opts, Opts, #{}),

src/gun_http.erl

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
-export([headers/12]).
3030
-export([request/13]).
3131
-export([data/7]).
32-
-export([connect/9]).
32+
-export([connect/10]).
3333
-export([cancel/5]).
3434
-export([stream_info/2]).
3535
-export([down/1]).
@@ -759,19 +759,20 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
759759
{[], EvHandlerState0}
760760
end.
761761

762-
connect(State, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
762+
connect(State, StreamRef, ReplyTo, _, _, _, _, CookieStore, _, EvHandlerState)
763763
when is_list(StreamRef) ->
764764
gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef),
765765
{badstate, "The stream is not a tunnel."}}),
766-
{[], EvHandlerState};
767-
connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
766+
{[], CookieStore, EvHandlerState};
767+
connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _,
768+
CookieStore, _, EvHandlerState)
768769
when Streams =/= [] ->
769770
gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
770771
"CONNECT can only be used with HTTP/1.1 when no other streams are active."}}),
771-
{[], EvHandlerState};
772+
{[], CookieStore, EvHandlerState};
772773
connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version},
773774
StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0,
774-
EvHandler, EvHandlerState0) ->
775+
CookieStore, EvHandler, EvHandlerState0) ->
775776
Host = case Host0 of
776777
Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
777778
_ -> Host0
@@ -817,9 +818,9 @@ connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version
817818
InitialFlow = initial_flow(InitialFlow0, Opts),
818819
{{state, new_stream(State, {connect, StreamRef, Destination},
819820
ReplyTo, <<"CONNECT">>, Authority, <<>>, InitialFlow)},
820-
EvHandlerState};
821+
CookieStore, EvHandlerState};
821822
Error={error, _} ->
822-
{Error, EvHandlerState1}
823+
{Error, CookieStore, EvHandlerState1}
823824
end.
824825

825826
%% We can't cancel anything, we can just stop forwarding messages to the owner.

src/gun_http2.erl

Lines changed: 103 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
-export([headers/12]).
3131
-export([request/13]).
3232
-export([data/7]).
33-
-export([connect/9]).
33+
-export([connect/10]).
3434
-export([cancel/5]).
3535
-export([timeout/3]).
3636
-export([stream_info/2]).
@@ -948,10 +948,24 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport, pings_unack=Pin
948948
{Error, EvHandlerState}
949949
end.
950950

951-
headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
951+
headers(State, StreamRef, ReplyTo, Method, Host, Port, Path,
952+
Headers, InitialFlow, CookieStore, EvHandler, EvHandlerState) ->
953+
request_common(State, StreamRef, ReplyTo, CookieStore, EvHandler, EvHandlerState,
954+
fun() ->
955+
headers1(State, StreamRef, ReplyTo,
956+
Method, Host, Port, Path, Headers,
957+
InitialFlow, CookieStore, EvHandler, EvHandlerState)
958+
end,
959+
fun(#tunnel{protocol=Proto, protocol_state=ProtoState0,
960+
info=#{origin_host := OriginHost, origin_port := OriginPort}}) ->
961+
Proto:headers(ProtoState0, StreamRef, ReplyTo,
962+
Method, OriginHost, OriginPort, Path, Headers,
963+
InitialFlow, CookieStore, EvHandler, EvHandlerState)
964+
end).
965+
966+
headers1(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
952967
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port,
953-
Path, Headers0, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0)
954-
when is_reference(StreamRef) ->
968+
Path, Headers0, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) ->
955969
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(
956970
iolist_to_binary(Method), HTTP2Machine0),
957971
{ok, PseudoHeaders, Headers, CookieStore} = prepare_headers(
@@ -960,7 +974,7 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
960974
RequestEvent = #{
961975
stream_ref => stream_ref(State, StreamRef),
962976
reply_to => ReplyTo,
963-
function => ?FUNCTION_NAME,
977+
function => headers,
964978
method => Method,
965979
authority => Authority,
966980
path => Path,
@@ -981,69 +995,26 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
981995
EvHandlerState};
982996
Error={error, _} ->
983997
{Error, CookieStore, EvHandlerState1}
984-
end;
985-
%% Tunneled request.
986-
headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
987-
Path, Headers, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) ->
988-
case get_stream_by_ref(State, StreamRef) of
989-
%% @todo We should send an error to the user if the stream isn't ready.
990-
Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{
991-
origin_host := OriginHost, origin_port := OriginPort}}} ->
992-
{Commands, CookieStore, EvHandlerState1} = Proto:headers(ProtoState0, RealStreamRef,
993-
ReplyTo, Method, OriginHost, OriginPort, Path, Headers,
994-
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
995-
{ResCommands, EvHandlerState} = tunnel_commands(Commands, Stream,
996-
State, EvHandler, EvHandlerState1),
997-
{ResCommands, CookieStore, EvHandlerState};
998-
#stream{tunnel=undefined} ->
999-
gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
1000-
"The stream is not a tunnel."}}),
1001-
{[], CookieStore0, EvHandlerState0};
1002-
error ->
1003-
error_stream_not_found(State, StreamRef, ReplyTo),
1004-
{[], CookieStore0, EvHandlerState0}
1005998
end.
1006999

1007-
request(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, Method, Host, Port,
1008-
Path, Headers, Body, InitialFlow, CookieStore, EvHandler, EvHandlerState)
1009-
when is_reference(StreamRef) ->
1010-
case cow_http2_machine:is_remote_concurrency_limit_reached(HTTP2Machine) of
1011-
true ->
1012-
gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef),
1013-
{stream_error, too_many_streams,
1014-
'Maximum concurrency limit has been reached.'}}),
1015-
{[], CookieStore, EvHandlerState};
1016-
false ->
1017-
request1(State, StreamRef, ReplyTo, Method, Host, Port,
1018-
Path, Headers, Body, InitialFlow, CookieStore,
1019-
EvHandler, EvHandlerState)
1020-
end;
1021-
%% Tunneled request.
1022-
request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
1023-
Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) ->
1024-
case get_stream_by_ref(State, StreamRef) of
1025-
%% @todo We should send an error to the user if the stream isn't ready.
1026-
Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{
1027-
origin_host := OriginHost, origin_port := OriginPort}}} ->
1028-
{Commands, CookieStore, EvHandlerState1} = Proto:request(ProtoState0, RealStreamRef,
1029-
ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body,
1030-
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
1031-
{ResCommands, EvHandlerState} = tunnel_commands(Commands,
1032-
Stream, State, EvHandler, EvHandlerState1),
1033-
{ResCommands, CookieStore, EvHandlerState};
1034-
#stream{tunnel=undefined} ->
1035-
gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
1036-
"The stream is not a tunnel."}}),
1037-
{[], CookieStore0, EvHandlerState0};
1038-
error ->
1039-
error_stream_not_found(State, StreamRef, ReplyTo),
1040-
{[], CookieStore0, EvHandlerState0}
1041-
end.
1000+
request(State, StreamRef, ReplyTo, Method, Host, Port, Path,
1001+
Headers, Body, InitialFlow, CookieStore, EvHandler, EvHandlerState) ->
1002+
request_common(State, StreamRef, ReplyTo, CookieStore, EvHandler, EvHandlerState,
1003+
fun() ->
1004+
request1(State, StreamRef, ReplyTo,
1005+
Method, Host, Port, Path, Headers, Body,
1006+
InitialFlow, CookieStore, EvHandler, EvHandlerState)
1007+
end,
1008+
fun(#tunnel{protocol=Proto, protocol_state=ProtoState0,
1009+
info=#{origin_host := OriginHost, origin_port := OriginPort}}) ->
1010+
Proto:request(ProtoState0, StreamRef, ReplyTo,
1011+
Method, OriginHost, OriginPort, Path, Headers, Body,
1012+
InitialFlow, CookieStore, EvHandler, EvHandlerState)
1013+
end).
10421014

10431015
request1(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
10441016
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port,
1045-
Path, Headers0, Body, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0)
1046-
when is_reference(StreamRef) ->
1017+
Path, Headers0, Body, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) ->
10471018
Headers1 = lists:keystore(<<"content-length">>, 1, Headers0,
10481019
{<<"content-length">>, integer_to_binary(iolist_size(Body))}),
10491020
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(
@@ -1091,6 +1062,39 @@ request1(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
10911062
{Error, CookieStore, EvHandlerState1}
10921063
end.
10931064

1065+
%% Normal request.
1066+
request_common(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef,
1067+
ReplyTo, CookieStore, _, EvHandlerState, OnRequest, _)
1068+
when is_reference(StreamRef) ->
1069+
case cow_http2_machine:is_remote_concurrency_limit_reached(HTTP2Machine) of
1070+
true ->
1071+
gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef),
1072+
{stream_error, too_many_streams,
1073+
'Maximum concurrency limit has been reached.'}}),
1074+
{[], CookieStore, EvHandlerState};
1075+
false ->
1076+
OnRequest()
1077+
end;
1078+
%% Tunneled request.
1079+
request_common(State, [StreamRef|_], ReplyTo,
1080+
CookieStore0, EvHandler, EvHandlerState0, _, OnTunnel)
1081+
when is_reference(StreamRef) ->
1082+
case get_stream_by_ref(State, StreamRef) of
1083+
%% @todo We should send an error to the user if the stream isn't ready.
1084+
Stream=#stream{tunnel=Tunnel=#tunnel{}} ->
1085+
{Commands, CookieStore, EvHandlerState1} = OnTunnel(Tunnel),
1086+
{ResCommands, EvHandlerState} = tunnel_commands(Commands,
1087+
Stream, State, EvHandler, EvHandlerState1),
1088+
{ResCommands, CookieStore, EvHandlerState};
1089+
#stream{tunnel=undefined} ->
1090+
gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
1091+
"The stream is not a tunnel."}}),
1092+
{[], CookieStore0, EvHandlerState0};
1093+
error ->
1094+
error_stream_not_found(State, StreamRef, ReplyTo),
1095+
{[], CookieStore0, EvHandlerState0}
1096+
end.
1097+
10941098
initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow;
10951099
initial_flow(InitialFlow, _) -> InitialFlow.
10961100

@@ -1264,10 +1268,24 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport},
12641268
Error
12651269
end.
12661270

1267-
connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
1271+
connect(State, StreamRef, ReplyTo, Destination, TunnelInfo, Headers,
1272+
InitialFlow, CookieStore, EvHandler, EvHandlerState) ->
1273+
request_common(State, StreamRef, ReplyTo, CookieStore, EvHandler, EvHandlerState,
1274+
fun() ->
1275+
connect1(State, StreamRef, ReplyTo,
1276+
Destination, TunnelInfo, Headers,
1277+
InitialFlow, CookieStore, EvHandler, EvHandlerState)
1278+
end,
1279+
fun(#tunnel{protocol=Proto, protocol_state=ProtoState0}) ->
1280+
Proto:connect(ProtoState0, StreamRef, ReplyTo,
1281+
Destination, TunnelInfo, Headers,
1282+
InitialFlow, CookieStore, EvHandler, EvHandlerState)
1283+
end).
1284+
1285+
connect1(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
12681286
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo,
12691287
Destination=#{host := Host0}, TunnelInfo, Headers0, InitialFlow0,
1270-
EvHandler, EvHandlerState0)
1288+
CookieStore, EvHandler, EvHandlerState0)
12711289
when is_reference(StreamRef) ->
12721290
Host = case Host0 of
12731291
Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
@@ -1318,27 +1336,9 @@ connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
13181336
flow=InitialFlow, authority=Authority, path= <<>>,
13191337
tunnel=#tunnel{destination=Destination, info=TunnelInfo}},
13201338
{{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)},
1321-
EvHandlerState};
1339+
CookieStore, EvHandlerState};
13221340
Error={error, _} ->
1323-
{Error, EvHandlerState1}
1324-
end;
1325-
%% Tunneled request.
1326-
connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow,
1327-
EvHandler, EvHandlerState0) ->
1328-
case get_stream_by_ref(State, StreamRef) of
1329-
%% @todo Should we send an error to the user if the stream isn't ready.
1330-
Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
1331-
{Commands, EvHandlerState1} = Proto:connect(ProtoState0, RealStreamRef,
1332-
ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow,
1333-
EvHandler, EvHandlerState0),
1334-
tunnel_commands(Commands, Stream, State, EvHandler, EvHandlerState1);
1335-
#stream{tunnel=undefined} ->
1336-
gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
1337-
"The stream is not a tunnel."}}),
1338-
{[], EvHandlerState0};
1339-
error ->
1340-
error_stream_not_found(State, StreamRef, ReplyTo),
1341-
{[], EvHandlerState0}
1341+
{Error, CookieStore, EvHandlerState1}
13421342
end.
13431343

13441344
cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0},
@@ -1459,11 +1459,25 @@ stream_info(State, RealStreamRef=[StreamRef|_]) ->
14591459
down(#http2_state{stream_refs=Refs}) ->
14601460
maps:keys(Refs).
14611461

1462-
ws_upgrade(State=#http2_state{socket=Socket, transport=Transport,
1462+
ws_upgrade(State, StreamRef, ReplyTo, Host, Port, Path,
1463+
Headers, WsOpts, CookieStore, EvHandler, EvHandlerState) ->
1464+
request_common(State, StreamRef, ReplyTo, CookieStore, EvHandler, EvHandlerState,
1465+
fun() ->
1466+
ws_upgrade1(State, StreamRef, ReplyTo,
1467+
Host, Port, Path, Headers, WsOpts,
1468+
CookieStore, EvHandler, EvHandlerState)
1469+
end,
1470+
fun(#tunnel{protocol=Proto, protocol_state=ProtoState0,
1471+
info=#{origin_host := OriginHost, origin_port := OriginPort}}) ->
1472+
Proto:ws_upgrade(ProtoState0, StreamRef, ReplyTo,
1473+
OriginHost, OriginPort, Path, Headers, WsOpts,
1474+
CookieStore, EvHandler, EvHandlerState)
1475+
end).
1476+
1477+
ws_upgrade1(State=#http2_state{socket=Socket, transport=Transport,
14631478
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo,
14641479
Host, Port, Path, Headers0, WsOpts,
1465-
CookieStore0, EvHandler, EvHandlerState0)
1466-
when is_reference(StreamRef) ->
1480+
CookieStore0, EvHandler, EvHandlerState0) ->
14671481
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(
14681482
<<"CONNECT">>, HTTP2Machine0),
14691483
{ok, PseudoHeaders, Headers1, CookieStore} = prepare_headers(State,
@@ -1489,7 +1503,7 @@ ws_upgrade(State=#http2_state{socket=Socket, transport=Transport,
14891503
RequestEvent = #{
14901504
stream_ref => RealStreamRef,
14911505
reply_to => ReplyTo,
1492-
function => ?FUNCTION_NAME,
1506+
function => ws_upgrade,
14931507
method => <<"CONNECT">>,
14941508
authority => Authority,
14951509
path => Path,
@@ -1517,19 +1531,6 @@ ws_upgrade(State=#http2_state{socket=Socket, transport=Transport,
15171531
Stream)}, CookieStore, EvHandlerState};
15181532
Error={error, _} ->
15191533
{Error, EvHandlerState1}
1520-
end;
1521-
ws_upgrade(State, RealStreamRef=[StreamRef|_], ReplyTo,
1522-
Host, Port, Path, Headers, WsOpts, CookieStore0, EvHandler, EvHandlerState0) ->
1523-
case get_stream_by_ref(State, StreamRef) of
1524-
Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
1525-
{Commands, CookieStore, EvHandlerState1} = Proto:ws_upgrade(
1526-
ProtoState0, RealStreamRef, ReplyTo,
1527-
Host, Port, Path, Headers, WsOpts,
1528-
CookieStore0, EvHandler, EvHandlerState0),
1529-
{ResCommands, EvHandlerState} = tunnel_commands(Commands,
1530-
Stream, State, EvHandler, EvHandlerState1),
1531-
{ResCommands, CookieStore, EvHandlerState}
1532-
%% @todo Error conditions?
15331534
end.
15341535

15351536
ws_send(Frames, State, RealStreamRef, ReplyTo, EvHandler, EvHandlerState0) ->

src/gun_http3.erl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
-export([headers/12]).
3232
-export([request/13]).
3333
-export([data/7]).
34-
-export([connect/9]).
34+
-export([connect/10]).
3535
-export([cancel/5]).
3636
-export([timeout/3]).
3737
-export([stream_info/2]).
@@ -637,10 +637,11 @@ data(State=#http3_state{conn=Conn, transport=Transport}, StreamRef, _ReplyTo, Is
637637
% {[], EvHandlerState}
638638
end.
639639

640-
-spec connect(_, _, _, _, _, _, _, _, _) -> no_return().
640+
-spec connect(_, _, _, _, _, _, _, _, _, _) -> no_return().
641641

642642
connect(_State, StreamRef, _ReplyTo, _Destination, _TunnelInfo, _Headers0,
643-
_InitialFlow0, _EvHandler, _EvHandlerState0) when is_reference(StreamRef) ->
643+
_InitialFlow0, _CookieStore, _EvHandler, _EvHandlerState0)
644+
when is_reference(StreamRef) ->
644645
error(unimplemented).
645646

646647
-spec cancel(_, _, _, _, _) -> no_return().

0 commit comments

Comments
 (0)