Skip to content

Commit

Permalink
Merge pull request #267 from esl/feature/ws_stream_management
Browse files Browse the repository at this point in the history
Feature/ws stream management
  • Loading branch information
chrzaszcz authored Jul 24, 2024
2 parents 6f6e688 + 089ef3f commit 7a6b272
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 98 deletions.
36 changes: 35 additions & 1 deletion src/escalus_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
upgrade_to_tls/1,
start_stream/1]).

-export([stanza_msg/2]).
-export([stanza_msg/2, separate_ack_requests/2]).

%% Behaviour helpers
-export([maybe_forward_to_owner/5]).
Expand Down Expand Up @@ -373,12 +373,16 @@ get_stream_end(#client{rcv_pid = Pid, jid = Jid}, Timeout) ->
-spec get_sm_h(client()) -> non_neg_integer().
get_sm_h(#client{module = escalus_tcp, rcv_pid = Pid}) ->
escalus_tcp:get_sm_h(Pid);
get_sm_h(#client{module = escalus_ws, rcv_pid = Pid}) ->
escalus_ws:get_sm_h(Pid);
get_sm_h(#client{module = Mod}) ->
error({get_sm_h, {undefined_for_escalus_module, Mod}}).

-spec set_sm_h(client(), non_neg_integer()) -> {ok, non_neg_integer()}.
set_sm_h(#client{module = escalus_tcp, rcv_pid = Pid}, H) ->
escalus_tcp:set_sm_h(Pid, H);
set_sm_h(#client{module = escalus_ws, rcv_pid = Pid}, H) ->
escalus_ws:set_sm_h(Pid, H);
set_sm_h(#client{module = Mod}, _) ->
error({set_sm_h, {undefined_for_escalus_module, Mod}}).

Expand Down Expand Up @@ -470,10 +474,40 @@ maybe_forward_to_owner(_, State, Stanzas, Fun, Timestamp) ->
stanza_msg(Stanza, Metadata) ->
{stanza, self(), Stanza, Metadata}.

separate_ack_requests({false, H0, A}, Stanzas) ->
%% Don't keep track of H
{{false, H0, A}, [], Stanzas};
separate_ack_requests({true, H0, inactive}, Stanzas) ->
Enabled = [ S || S <- Stanzas, escalus_pred:is_sm_enabled(S)],
Resumed = [ S || S <- Stanzas, escalus_pred:is_sm_resumed(S)],

case {length(Enabled), length(Resumed)} of
%% Enabled SM: set the H param to 0 and activate counter.
{1,0} -> {{true, 0, active}, [], Stanzas};

%% Resumed SM: keep the H param and activate counter.
{0,1} -> {{true, H0, active}, [], Stanzas};

%% No new SM state: continue as usual
{0,0} -> {{true, H0, inactive}, [], Stanzas}
end;
separate_ack_requests({true, H0, active}, Stanzas) ->
%% Count H and construct appropriate acks
F = fun(Stanza, {H, Acks, NonAckRequests}) ->
case escalus_pred:is_sm_ack_request(Stanza) of
true -> {H, [make_ack(H)|Acks], NonAckRequests};
false -> {H+1, Acks, [Stanza|NonAckRequests]}
end
end,
{H, Acks, Others} = lists:foldl(F, {H0, [], []}, Stanzas),
{{true, H, active}, lists:reverse(Acks), lists:reverse(Others)}.

%%%===================================================================
%%% Helpers
%%%===================================================================

make_ack(H) -> {escalus_stanza:sm_ack(H), H}.

get_connection_steps(UserSpec) ->
case lists:keyfind(connection_steps, 1, UserSpec) of
false -> default_connection_steps();
Expand Down
40 changes: 4 additions & 36 deletions src/escalus_tcp.erl
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -400,11 +400,10 @@ handle_data(Socket, Data, #state{parser = Parser,
_ -> NewState
end.


forward_to_owner(Stanzas0, #state{owner = Owner,
sm_state = SM0,
event_client = EventClient} = State, Timestamp) ->
{SM1, AckRequests, StanzasNoRs} = separate_ack_requests(SM0, Stanzas0),
{SM1, AckRequests, StanzasNoRs} = escalus_connection:separate_ack_requests(SM0, Stanzas0),
reply_to_ack_requests(SM1, AckRequests, State),

lists:foreach(fun(Stanza) ->
Expand All @@ -419,43 +418,12 @@ forward_to_owner(Stanzas0, #state{owner = Owner,

State#state{sm_state = SM1, sent_stanzas = StanzasNoRs}.


separate_ack_requests({false, H0, A}, Stanzas) ->
%% Don't keep track of H
{{false, H0, A}, [], Stanzas};
separate_ack_requests({true, H0, inactive}, Stanzas) ->
Enabled = [ S || S <- Stanzas, escalus_pred:is_sm_enabled(S)],
Resumed = [ S || S <- Stanzas, escalus_pred:is_sm_resumed(S)],

case {length(Enabled),length(Resumed)} of
%% Enabled SM: set the H param to 0 and activate counter.
{1,0} -> {{true, 0, active}, [], Stanzas};

%% Resumed SM: keep the H param and activate counter.
{0,1} -> {{true, H0, active}, [], Stanzas};

%% No new SM state: continue as usual
{0,0} -> {{true, H0, inactive}, [], Stanzas}
end;
separate_ack_requests({true, H0, active}, Stanzas) ->
%% Count H and construct appropriate acks
F = fun(Stanza, {H, Acks, NonAckRequests}) ->
case escalus_pred:is_sm_ack_request(Stanza) of
true -> {H, [make_ack(H)|Acks], NonAckRequests};
false -> {H+1, Acks, [Stanza|NonAckRequests]}
end
end,
{H, Acks, Others} = lists:foldl(F, {H0, [], []}, Stanzas),
{{true, H, active}, lists:reverse(Acks), lists:reverse(Others)}.

make_ack(H) -> {escalus_stanza:sm_ack(H), H}.

reply_to_ack_requests({false,H,A}, _, _) -> {false, H, A};
reply_to_ack_requests({true,H,inactive}, _, _) -> {true, H, inactive};
reply_to_ack_requests({false, H, A}, _, _) -> {false, H, A};
reply_to_ack_requests({true, H, inactive}, _, _) -> {true, H, inactive};
reply_to_ack_requests({true, H0, active}, Acks, State) ->
{true,
% TODO: Maybe compress here?
lists:foldl(fun({Ack,H}, _) -> raw_send(exml:to_iolist(Ack), State), H end,
lists:foldl(fun({Ack, H}, _) -> raw_send(exml:to_iolist(Ack), State), H end,
H0, Acks),
active}.

Expand Down
Loading

0 comments on commit 7a6b272

Please sign in to comment.