Skip to content

Commit

Permalink
Respect remote concurrency limit
Browse files Browse the repository at this point in the history
If the number of streams has reached the server setting MAX_CONCURRENT_STREAMS,
new requests result in a stream error too_many_streams. This allows the user
to handle the situation, e.g. to retry the request later or to send it on
a different connection.

Depends on ninenines/cowlib#123 which is included in:
https://github.com/Nordix/cowlib/releases/tag/2.13.0-nordix1

Merged PR:
ninenines#280
  • Loading branch information
zuiderkwast authored and bjosv committed Apr 2, 2024
1 parent 5dff085 commit a854678
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ CT_OPTS += -ct_hooks gun_ct_hook [] # -boot start_sasl
LOCAL_DEPS = public_key ssl

DEPS = cowlib
dep_cowlib = git https://github.com/ninenines/cowlib 2.13.0
dep_cowlib = git https://github.com/Nordix/cowlib 2.13.0-nordix1

DOC_DEPS = asciideck

Expand Down
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{deps, [
{cowlib,".*",{git,"https://github.com/ninenines/cowlib","2.13.0"}}
{cowlib,".*",{git,"https://github.com/Nordix/cowlib","2.13.0-nordix1"}}
]}.
{erl_opts, [debug_info,warn_export_vars,warn_shadow_vars,warn_obsolete_guard]}.
62 changes: 39 additions & 23 deletions src/gun_http2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,44 @@ headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
{[], CookieStore0, EvHandlerState0}
end.

request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
request(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, Method, Host, Port,
Path, Headers, Body, InitialFlow, CookieStore, EvHandler, EvHandlerState)
when is_reference(StreamRef) ->

case cow_http2_machine:is_remote_concurrency_limit_reached(HTTP2Machine) of
true ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{stream_error, too_many_streams,
'Maximum concurrency limit has been reached.'}},
{[], CookieStore, EvHandlerState};
false ->
request1(State, StreamRef, ReplyTo, Method, Host, Port,
Path, Headers, Body, InitialFlow, CookieStore,
EvHandler, EvHandlerState)
end;
%% Tunneled request.
request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of
%% @todo We should send an error to the user if the stream isn't ready.
Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{
origin_host := OriginHost, origin_port := OriginPort}}} ->
{Commands, CookieStore, EvHandlerState1} = Proto:request(ProtoState0, RealStreamRef,
ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
{ResCommands, EvHandlerState} = tunnel_commands(Commands,
Stream, State, EvHandler, EvHandlerState1),
{ResCommands, CookieStore, EvHandlerState};
#stream{tunnel=undefined} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
{[], CookieStore0, EvHandlerState0};
error ->
error_stream_not_found(State, StreamRef, ReplyTo),
{[], CookieStore0, EvHandlerState0}
end.

request1(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port,
Path, Headers0, Body, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0)
when is_reference(StreamRef) ->
Expand All @@ -1005,7 +1042,7 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
RequestEvent = #{
stream_ref => RealStreamRef,
reply_to => ReplyTo,
function => ?FUNCTION_NAME,
function => request,
method => Method,
authority => Authority,
path => Path,
Expand Down Expand Up @@ -1039,27 +1076,6 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
end;
Error={error, _} ->
{Error, CookieStore, EvHandlerState1}
end;
%% Tunneled request.
request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of
%% @todo We should send an error to the user if the stream isn't ready.
Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{
origin_host := OriginHost, origin_port := OriginPort}}} ->
{Commands, CookieStore, EvHandlerState1} = Proto:request(ProtoState0, RealStreamRef,
ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
{ResCommands, EvHandlerState} = tunnel_commands(Commands,
Stream, State, EvHandler, EvHandlerState1),
{ResCommands, CookieStore, EvHandlerState};
#stream{tunnel=undefined} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
{[], CookieStore0, EvHandlerState0};
error ->
error_stream_not_found(State, StreamRef, ReplyTo),
{[], CookieStore0, EvHandlerState0}
end.

initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow;
Expand Down
26 changes: 26 additions & 0 deletions test/rfc7540_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,32 @@ lingering_data_counts_toward_connection_window(_) ->
timer:sleep(300),
gun:close(ConnPid).

respect_max_concurrent_streams(_) ->
doc("The SETTINGS_MAX_CONCURRENT_STREAMS setting can be used to "
"restrict the number of concurrent streams. (RFC7540 5.1.2, RFC7540 6.5.2)"),
Ref = make_ref(),
Routes = [{'_', [{"/delayed", delayed_hello_h, 500}]}],
ProtoOpts = #{
env => #{dispatch => cowboy_router:compile(Routes)},
tcp => #{protocols => [http2]},
max_concurrent_streams => 1
},
[{ref, _}, {port, Port}] = gun_test:init_cowboy_tcp(Ref, ProtoOpts, []),
try
{ok, ConnPid} = gun:open("localhost", Port, #{protocols => [http2]}),
{ok, http2} = gun:await_up(ConnPid),
StreamRef1 = gun:get(ConnPid, "/delayed"),
timer:sleep(100),
StreamRef2 = gun:get(ConnPid, "/delayed"),
{error, {stream_error, Reason}} = gun:await(ConnPid, StreamRef2),
{stream_error, too_many_streams, _Human} = Reason,
{response, nofin, 200, _} = gun:await(ConnPid, StreamRef1),
{ok, _} = gun:await_body(ConnPid, StreamRef1),
gun:close(ConnPid)
after
cowboy:stop_listener(Ref)
end.

headers_priority_flag(_) ->
doc("HEADERS frames may include a PRIORITY flag indicating "
"that stream dependency information is attached. (RFC7540 6.2)"),
Expand Down

0 comments on commit a854678

Please sign in to comment.