mirror of
https://github.com/processone/ejabberd.git
synced 2024-12-26 17:38:45 +01:00
Add mod_offline option for fetching data from mam instead of from spool table
This commit introduces `use_mam_for_storage` option that take boolean argument. Enabling it will make mod_offline not use spool table for storing offline message, but instead will use mam archive to retrieve messages stored when offline. Enabling this option have couple drawback currently, only messages that were stored in mam will be available, most of flexible message retrieval queries don't work (those that allow retrieval/deletion of messages by id).
This commit is contained in:
parent
bcfe50f817
commit
b76f90fe39
@ -42,7 +42,7 @@
|
|||||||
get_room_config/4, set_room_option/3, offline_message/1, export/1,
|
get_room_config/4, set_room_option/3, offline_message/1, export/1,
|
||||||
mod_options/1, remove_mam_for_user_with_peer/3, remove_mam_for_user/2,
|
mod_options/1, remove_mam_for_user_with_peer/3, remove_mam_for_user/2,
|
||||||
is_empty_for_user/2, is_empty_for_room/3, check_create_room/4,
|
is_empty_for_user/2, is_empty_for_room/3, check_create_room/4,
|
||||||
process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2]).
|
process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2, select/6]).
|
||||||
|
|
||||||
-include("xmpp.hrl").
|
-include("xmpp.hrl").
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
@ -112,7 +112,7 @@ start(Host, Opts) ->
|
|||||||
ejabberd_hooks:add(user_send_packet, Host, ?MODULE,
|
ejabberd_hooks:add(user_send_packet, Host, ?MODULE,
|
||||||
user_send_packet_strip_tag, 500),
|
user_send_packet_strip_tag, 500),
|
||||||
ejabberd_hooks:add(offline_message_hook, Host, ?MODULE,
|
ejabberd_hooks:add(offline_message_hook, Host, ?MODULE,
|
||||||
offline_message, 50),
|
offline_message, 49),
|
||||||
ejabberd_hooks:add(muc_filter_message, Host, ?MODULE,
|
ejabberd_hooks:add(muc_filter_message, Host, ?MODULE,
|
||||||
muc_filter_message, 50),
|
muc_filter_message, 50),
|
||||||
ejabberd_hooks:add(muc_process_iq, Host, ?MODULE,
|
ejabberd_hooks:add(muc_process_iq, Host, ?MODULE,
|
||||||
@ -188,7 +188,7 @@ stop(Host) ->
|
|||||||
ejabberd_hooks:delete(user_send_packet, Host, ?MODULE,
|
ejabberd_hooks:delete(user_send_packet, Host, ?MODULE,
|
||||||
user_send_packet_strip_tag, 500),
|
user_send_packet_strip_tag, 500),
|
||||||
ejabberd_hooks:delete(offline_message_hook, Host, ?MODULE,
|
ejabberd_hooks:delete(offline_message_hook, Host, ?MODULE,
|
||||||
offline_message, 50),
|
offline_message, 49),
|
||||||
ejabberd_hooks:delete(muc_filter_message, Host, ?MODULE,
|
ejabberd_hooks:delete(muc_filter_message, Host, ?MODULE,
|
||||||
muc_filter_message, 50),
|
muc_filter_message, 50),
|
||||||
ejabberd_hooks:delete(muc_process_iq, Host, ?MODULE,
|
ejabberd_hooks:delete(muc_process_iq, Host, ?MODULE,
|
||||||
|
@ -61,7 +61,8 @@
|
|||||||
c2s_copy_session/2,
|
c2s_copy_session/2,
|
||||||
webadmin_page/3,
|
webadmin_page/3,
|
||||||
webadmin_user/4,
|
webadmin_user/4,
|
||||||
webadmin_user_parse_query/5]).
|
webadmin_user_parse_query/5,
|
||||||
|
user_unset_presence/4]).
|
||||||
|
|
||||||
-export([mod_opt_type/1, mod_options/1, depends/2]).
|
-export([mod_opt_type/1, mod_options/1, depends/2]).
|
||||||
|
|
||||||
@ -131,6 +132,8 @@ start(Host, Opts) ->
|
|||||||
?MODULE, webadmin_user, 50),
|
?MODULE, webadmin_user, 50),
|
||||||
ejabberd_hooks:add(webadmin_user_parse_query, Host,
|
ejabberd_hooks:add(webadmin_user_parse_query, Host,
|
||||||
?MODULE, webadmin_user_parse_query, 50),
|
?MODULE, webadmin_user_parse_query, 50),
|
||||||
|
ejabberd_hooks:add(unset_presence_hook, Host, ?MODULE,
|
||||||
|
user_unset_presence, 50),
|
||||||
gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE,
|
gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE,
|
||||||
?MODULE, handle_offline_query).
|
?MODULE, handle_offline_query).
|
||||||
|
|
||||||
@ -153,6 +156,8 @@ stop(Host) ->
|
|||||||
?MODULE, webadmin_user, 50),
|
?MODULE, webadmin_user, 50),
|
||||||
ejabberd_hooks:delete(webadmin_user_parse_query, Host,
|
ejabberd_hooks:delete(webadmin_user_parse_query, Host,
|
||||||
?MODULE, webadmin_user_parse_query, 50),
|
?MODULE, webadmin_user_parse_query, 50),
|
||||||
|
ejabberd_hooks:delete(unset_presence_hook, Host, ?MODULE,
|
||||||
|
user_unset_presence, 50),
|
||||||
gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE).
|
gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE).
|
||||||
|
|
||||||
reload(Host, NewOpts, OldOpts) ->
|
reload(Host, NewOpts, OldOpts) ->
|
||||||
@ -165,7 +170,17 @@ reload(Host, NewOpts, OldOpts) ->
|
|||||||
end.
|
end.
|
||||||
|
|
||||||
-spec store_offline_msg(#offline_msg{}) -> ok | {error, full | any()}.
|
-spec store_offline_msg(#offline_msg{}) -> ok | {error, full | any()}.
|
||||||
store_offline_msg(#offline_msg{us = {User, Server}} = Msg) ->
|
store_offline_msg(#offline_msg{us = {User, Server}, packet = Pkt} = Msg) ->
|
||||||
|
case (not xmpp:get_meta(Pkt, activity_marker, false)) andalso
|
||||||
|
use_mam_for_user(User, Server) of
|
||||||
|
true ->
|
||||||
|
case xmpp:get_meta(Pkt, first_from_queue, false) of
|
||||||
|
true ->
|
||||||
|
store_last_activity_marker(User, Server, xmpp:get_meta(Pkt, stanza_id));
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
Mod = gen_mod:db_mod(Server, ?MODULE),
|
Mod = gen_mod:db_mod(Server, ?MODULE),
|
||||||
case get_max_user_messages(User, Server) of
|
case get_max_user_messages(User, Server) of
|
||||||
infinity ->
|
infinity ->
|
||||||
@ -177,6 +192,7 @@ store_offline_msg(#offline_msg{us = {User, Server}} = Msg) ->
|
|||||||
true ->
|
true ->
|
||||||
{error, full}
|
{error, full}
|
||||||
end
|
end
|
||||||
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get_max_user_messages(User, Server) ->
|
get_max_user_messages(User, Server) ->
|
||||||
@ -298,6 +314,10 @@ handle_offline_query(#iq{lang = Lang} = IQ) ->
|
|||||||
-spec handle_offline_items_view(jid(), [offline_item()]) -> boolean().
|
-spec handle_offline_items_view(jid(), [offline_item()]) -> boolean().
|
||||||
handle_offline_items_view(JID, Items) ->
|
handle_offline_items_view(JID, Items) ->
|
||||||
{U, S, R} = jid:tolower(JID),
|
{U, S, R} = jid:tolower(JID),
|
||||||
|
case use_mam_for_user(U, S) of
|
||||||
|
true ->
|
||||||
|
false;
|
||||||
|
_ ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(#offline_item{node = Node, action = view}, Acc) ->
|
fun(#offline_item{node = Node, action = view}, Acc) ->
|
||||||
case fetch_msg_by_node(JID, Node) of
|
case fetch_msg_by_node(JID, Node) of
|
||||||
@ -318,14 +338,20 @@ handle_offline_items_view(JID, Items) ->
|
|||||||
error ->
|
error ->
|
||||||
Acc or false
|
Acc or false
|
||||||
end
|
end
|
||||||
end, false, Items).
|
end, false, Items) end.
|
||||||
|
|
||||||
-spec handle_offline_items_remove(jid(), [offline_item()]) -> boolean().
|
-spec handle_offline_items_remove(jid(), [offline_item()]) -> boolean().
|
||||||
handle_offline_items_remove(JID, Items) ->
|
handle_offline_items_remove(JID, Items) ->
|
||||||
|
{U, S, _R} = jid:tolower(JID),
|
||||||
|
case use_mam_for_user(U, S) of
|
||||||
|
true ->
|
||||||
|
false;
|
||||||
|
_ ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(#offline_item{node = Node, action = remove}, Acc) ->
|
fun(#offline_item{node = Node, action = remove}, Acc) ->
|
||||||
Acc or remove_msg_by_node(JID, Node)
|
Acc or remove_msg_by_node(JID, Node)
|
||||||
end, false, Items).
|
end, false, Items)
|
||||||
|
end.
|
||||||
|
|
||||||
-spec set_offline_tag(message(), binary()) -> message().
|
-spec set_offline_tag(message(), binary()) -> message().
|
||||||
set_offline_tag(Msg, Node) ->
|
set_offline_tag(Msg, Node) ->
|
||||||
@ -508,15 +534,26 @@ c2s_self_presence(Acc) ->
|
|||||||
-spec route_offline_messages(c2s_state()) -> ok.
|
-spec route_offline_messages(c2s_state()) -> ok.
|
||||||
route_offline_messages(#{jid := #jid{luser = LUser, lserver = LServer}} = State) ->
|
route_offline_messages(#{jid := #jid{luser = LUser, lserver = LServer}} = State) ->
|
||||||
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
||||||
case Mod:pop_messages(LUser, LServer) of
|
Msgs = case Mod:pop_messages(LUser, LServer) of
|
||||||
{ok, OffMsgs} ->
|
{ok, OffMsgs} ->
|
||||||
|
case use_mam_for_user(LUser, LServer) of
|
||||||
|
true ->
|
||||||
|
lists:map(
|
||||||
|
fun({_, #message{from = From, to = To} = Msg}) ->
|
||||||
|
#offline_msg{from = From, to = To,
|
||||||
|
us = {LUser, LServer},
|
||||||
|
packet = Msg}
|
||||||
|
end, read_mam_messages(LUser, LServer, OffMsgs));
|
||||||
|
_ ->
|
||||||
|
OffMsgs
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
[]
|
||||||
|
end,
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(OffMsg) ->
|
fun(OffMsg) ->
|
||||||
route_offline_message(State, OffMsg)
|
route_offline_message(State, OffMsg)
|
||||||
end, OffMsgs);
|
end, Msgs).
|
||||||
_ ->
|
|
||||||
ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec route_offline_message(c2s_state(), #offline_msg{}) -> ok.
|
-spec route_offline_message(c2s_state(), #offline_msg{}) -> ok.
|
||||||
route_offline_message(#{lserver := LServer} = State,
|
route_offline_message(#{lserver := LServer} = State,
|
||||||
@ -574,6 +611,31 @@ remove_user(User, Server) ->
|
|||||||
Mod:remove_user(LUser, LServer),
|
Mod:remove_user(LUser, LServer),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
-spec user_unset_presence(binary(), binary(), binary(), binary()) -> any().
|
||||||
|
user_unset_presence(User, Server, _Resource, _Status) ->
|
||||||
|
case use_mam_for_user(User, Server) of
|
||||||
|
true ->
|
||||||
|
case ejabberd_sm:get_user_present_resources(User, Server) of
|
||||||
|
[] ->
|
||||||
|
TimeStamp = erlang:system_time(microsecond),
|
||||||
|
store_last_activity_marker(User, Server, TimeStamp);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
store_last_activity_marker(User, Server, Timestamp) ->
|
||||||
|
Jid = jid:make(User, Server, <<>>),
|
||||||
|
Pkt = xmpp:put_meta(#message{id = <<"ActivityMarker">>, type = error},
|
||||||
|
activity_marker, true),
|
||||||
|
|
||||||
|
Msg = #offline_msg{us = {User, Server}, from = Jid, to = Jid,
|
||||||
|
timestamp = misc:usec_to_now(Timestamp),
|
||||||
|
packet = Pkt},
|
||||||
|
store_offline_msg(Msg).
|
||||||
|
|
||||||
%% Helper functions:
|
%% Helper functions:
|
||||||
|
|
||||||
-spec check_if_message_should_be_bounced(message()) -> boolean().
|
-spec check_if_message_should_be_bounced(message()) -> boolean().
|
||||||
@ -641,6 +703,15 @@ offline_msg_to_route(LServer, #offline_msg{from = From, to = To} = R) ->
|
|||||||
|
|
||||||
-spec read_messages(binary(), binary()) -> [{binary(), message()}].
|
-spec read_messages(binary(), binary()) -> [{binary(), message()}].
|
||||||
read_messages(LUser, LServer) ->
|
read_messages(LUser, LServer) ->
|
||||||
|
Res = read_db_messages(LUser, LServer),
|
||||||
|
case use_mam_for_user(LUser, LServer) of
|
||||||
|
true ->
|
||||||
|
read_mam_messages(LUser, LServer, Res);
|
||||||
|
_ ->
|
||||||
|
Res
|
||||||
|
end.
|
||||||
|
|
||||||
|
read_db_messages(LUser, LServer) ->
|
||||||
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
||||||
CodecOpts = ejabberd_config:codec_options(LServer),
|
CodecOpts = ejabberd_config:codec_options(LServer),
|
||||||
lists:flatmap(
|
lists:flatmap(
|
||||||
@ -661,6 +732,95 @@ read_messages(LUser, LServer) ->
|
|||||||
end
|
end
|
||||||
end, Mod:read_message_headers(LUser, LServer)).
|
end, Mod:read_message_headers(LUser, LServer)).
|
||||||
|
|
||||||
|
read_mam_messages(LUser, LServer, ReadMsgs) ->
|
||||||
|
{Timestamp, ExtraMsgs} = lists:foldl(
|
||||||
|
fun({_Node, #message{id = <<"ActivityMarker">>,
|
||||||
|
body = [], type = error} = Msg}, {T, E}) ->
|
||||||
|
case xmpp:get_subtag(Msg, #delay{}) of
|
||||||
|
#delay{stamp = Time} ->
|
||||||
|
if T == none orelse T > Time ->
|
||||||
|
{Time, E};
|
||||||
|
true ->
|
||||||
|
{T, E}
|
||||||
|
end
|
||||||
|
end;
|
||||||
|
(#offline_msg{from = From, to = To, timestamp = TS, packet = Pkt},
|
||||||
|
{T, E}) ->
|
||||||
|
try xmpp:decode(Pkt) of
|
||||||
|
#message{id = <<"ActivityMarker">>,
|
||||||
|
body = [], type = error} = Msg ->
|
||||||
|
TS2 = case TS of
|
||||||
|
undefined ->
|
||||||
|
case xmpp:get_subtag(Msg, #delay{}) of
|
||||||
|
#delay{stamp = TS0} ->
|
||||||
|
TS0;
|
||||||
|
_ ->
|
||||||
|
erlang:timestamp()
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
if T == none orelse T > TS2 ->
|
||||||
|
{TS2, E};
|
||||||
|
true ->
|
||||||
|
{T, E}
|
||||||
|
end;
|
||||||
|
Decoded ->
|
||||||
|
Pkt1 = add_delay_info(Decoded, LServer, TS),
|
||||||
|
{T, [xmpp:set_from_to(Pkt1, From, To) | E]}
|
||||||
|
catch _:{xmpp_codec, _Why} ->
|
||||||
|
{T, E}
|
||||||
|
end;
|
||||||
|
({_Node, Msg}, {T, E}) ->
|
||||||
|
{T, [Msg | E]}
|
||||||
|
end, {none, []}, ReadMsgs),
|
||||||
|
Start = case {Timestamp, ExtraMsgs} of
|
||||||
|
{none, [First|_]} ->
|
||||||
|
case xmpp:get_subtag(First, #delay{}) of
|
||||||
|
#delay{stamp = {Mega, Sec, Micro}} ->
|
||||||
|
{Mega, Sec, Micro+1};
|
||||||
|
_ ->
|
||||||
|
none
|
||||||
|
end;
|
||||||
|
{none, _} ->
|
||||||
|
none;
|
||||||
|
_ ->
|
||||||
|
Timestamp
|
||||||
|
end,
|
||||||
|
AllMsgs = case Start of
|
||||||
|
none ->
|
||||||
|
ExtraMsgs;
|
||||||
|
_ ->
|
||||||
|
MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of
|
||||||
|
Number when is_integer(Number) -> Number;
|
||||||
|
_ -> 100
|
||||||
|
end,
|
||||||
|
JID = jid:make(LUser, LServer, <<>>),
|
||||||
|
{MamMsgs, _, _} = mod_mam:select(LServer, JID, JID,
|
||||||
|
[{start, Start}],
|
||||||
|
#rsm_set{max = MaxOfflineMsgs,
|
||||||
|
before = <<"9999999999999999">>},
|
||||||
|
chat),
|
||||||
|
MamMsgs2 = lists:map(
|
||||||
|
fun({_, _, #forwarded{sub_els = [MM | _], delay = #delay{stamp = MMT}}}) ->
|
||||||
|
add_delay_info(MM, LServer, MMT)
|
||||||
|
end, MamMsgs),
|
||||||
|
|
||||||
|
ExtraMsgs ++ MamMsgs2
|
||||||
|
end,
|
||||||
|
AllMsgs2 = lists:sort(
|
||||||
|
fun(A, B) ->
|
||||||
|
case {xmpp:get_subtag(A, #delay{}), xmpp:get_subtag(B, #delay{})} of
|
||||||
|
{#delay{stamp = TA}, #delay{stamp = TB}} ->
|
||||||
|
TA < TB;
|
||||||
|
_ ->
|
||||||
|
true
|
||||||
|
end
|
||||||
|
end, AllMsgs),
|
||||||
|
{AllMsgs3, _} = lists:mapfoldl(
|
||||||
|
fun(Msg, Counter) ->
|
||||||
|
{{Counter, Msg}, Counter + 1}
|
||||||
|
end, 1, AllMsgs2),
|
||||||
|
AllMsgs3.
|
||||||
|
|
||||||
format_user_queue(Hdrs) ->
|
format_user_queue(Hdrs) ->
|
||||||
lists:map(
|
lists:map(
|
||||||
fun({Seq, From, To, TS, El}) ->
|
fun({Seq, From, To, TS, El}) ->
|
||||||
@ -873,6 +1033,9 @@ import(LServer, {sql, _}, DBType, <<"spool">>,
|
|||||||
Mod = gen_mod:db_mod(DBType, ?MODULE),
|
Mod = gen_mod:db_mod(DBType, ?MODULE),
|
||||||
Mod:import(OffMsg).
|
Mod:import(OffMsg).
|
||||||
|
|
||||||
|
use_mam_for_user(_User, Server) ->
|
||||||
|
gen_mod:get_module_opt(Server, ?MODULE, use_mam_for_storage).
|
||||||
|
|
||||||
mod_opt_type(access_max_user_messages) ->
|
mod_opt_type(access_max_user_messages) ->
|
||||||
fun acl:shaper_rules_validator/1;
|
fun acl:shaper_rules_validator/1;
|
||||||
mod_opt_type(db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end;
|
mod_opt_type(db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end;
|
||||||
@ -880,6 +1043,8 @@ mod_opt_type(store_groupchat) ->
|
|||||||
fun(V) when is_boolean(V) -> V end;
|
fun(V) when is_boolean(V) -> V end;
|
||||||
mod_opt_type(bounce_groupchat) ->
|
mod_opt_type(bounce_groupchat) ->
|
||||||
fun(V) when is_boolean(V) -> V end;
|
fun(V) when is_boolean(V) -> V end;
|
||||||
|
mod_opt_type(use_mam_for_storage) ->
|
||||||
|
fun(V) when is_boolean(V) -> V end;
|
||||||
mod_opt_type(store_empty_body) ->
|
mod_opt_type(store_empty_body) ->
|
||||||
fun (V) when is_boolean(V) -> V;
|
fun (V) when is_boolean(V) -> V;
|
||||||
(unless_chat_state) -> unless_chat_state
|
(unless_chat_state) -> unless_chat_state
|
||||||
@ -889,5 +1054,6 @@ mod_options(Host) ->
|
|||||||
[{db_type, ejabberd_config:default_db(Host, ?MODULE)},
|
[{db_type, ejabberd_config:default_db(Host, ?MODULE)},
|
||||||
{access_max_user_messages, max_user_offline_messages},
|
{access_max_user_messages, max_user_offline_messages},
|
||||||
{store_empty_body, unless_chat_state},
|
{store_empty_body, unless_chat_state},
|
||||||
|
{use_mam_for_storage, false},
|
||||||
{bounce_groupchat, false},
|
{bounce_groupchat, false},
|
||||||
{store_groupchat, false}].
|
{store_groupchat, false}].
|
||||||
|
@ -591,22 +591,25 @@ route_unacked_stanzas(#{mgmt_state := MgmtState,
|
|||||||
end,
|
end,
|
||||||
?DEBUG("Re-routing ~B unacknowledged stanza(s) to ~s",
|
?DEBUG("Re-routing ~B unacknowledged stanza(s) to ~s",
|
||||||
[p1_queue:len(Queue), jid:encode(JID)]),
|
[p1_queue:len(Queue), jid:encode(JID)]),
|
||||||
p1_queue:foreach(
|
p1_queue:foldl(
|
||||||
fun({_, _Time, #presence{from = From}}) ->
|
fun({_, _Time, #presence{from = From}}, Acc) ->
|
||||||
?DEBUG("Dropping presence stanza from ~s", [jid:encode(From)]);
|
?DEBUG("Dropping presence stanza from ~s", [jid:encode(From)]),
|
||||||
({_, _Time, #iq{} = El}) ->
|
Acc;
|
||||||
|
({_, _Time, #iq{} = El}, Acc) ->
|
||||||
Txt = <<"User session terminated">>,
|
Txt = <<"User session terminated">>,
|
||||||
ejabberd_router:route_error(
|
ejabberd_router:route_error(
|
||||||
El, xmpp:err_service_unavailable(Txt, Lang));
|
El, xmpp:err_service_unavailable(Txt, Lang)),
|
||||||
({_, _Time, #message{from = From, meta = #{carbon_copy := true}}}) ->
|
Acc;
|
||||||
|
({_, _Time, #message{from = From, meta = #{carbon_copy := true}}}, Acc) ->
|
||||||
%% XEP-0280 says: "When a receiving server attempts to deliver a
|
%% XEP-0280 says: "When a receiving server attempts to deliver a
|
||||||
%% forked message, and that message bounces with an error for
|
%% forked message, and that message bounces with an error for
|
||||||
%% any reason, the receiving server MUST NOT forward that error
|
%% any reason, the receiving server MUST NOT forward that error
|
||||||
%% back to the original sender." Resending such a stanza could
|
%% back to the original sender." Resending such a stanza could
|
||||||
%% easily lead to unexpected results as well.
|
%% easily lead to unexpected results as well.
|
||||||
?DEBUG("Dropping forwarded message stanza from ~s",
|
?DEBUG("Dropping forwarded message stanza from ~s",
|
||||||
[jid:encode(From)]);
|
[jid:encode(From)]),
|
||||||
({_, Time, #message{} = Msg}) ->
|
Acc;
|
||||||
|
({_, Time, #message{} = Msg}, Acc) ->
|
||||||
case ejabberd_hooks:run_fold(message_is_archived,
|
case ejabberd_hooks:run_fold(message_is_archived,
|
||||||
LServer, false,
|
LServer, false,
|
||||||
[State, Msg]) of
|
[State, Msg]) of
|
||||||
@ -615,17 +618,26 @@ route_unacked_stanzas(#{mgmt_state := MgmtState,
|
|||||||
[jid:encode(xmpp:get_from(Msg))]);
|
[jid:encode(xmpp:get_from(Msg))]);
|
||||||
false when ResendOnTimeout ->
|
false when ResendOnTimeout ->
|
||||||
NewEl = add_resent_delay_info(State, Msg, Time),
|
NewEl = add_resent_delay_info(State, Msg, Time),
|
||||||
ejabberd_router:route(NewEl);
|
NewEl2 = case Acc of
|
||||||
|
first_resend ->
|
||||||
|
xmpp:put_meta(NewEl, first_from_queue, true);
|
||||||
|
_ ->
|
||||||
|
NewEl
|
||||||
|
end,
|
||||||
|
ejabberd_router:route(NewEl2),
|
||||||
|
false;
|
||||||
false ->
|
false ->
|
||||||
Txt = <<"User session terminated">>,
|
Txt = <<"User session terminated">>,
|
||||||
ejabberd_router:route_error(
|
ejabberd_router:route_error(
|
||||||
Msg, xmpp:err_service_unavailable(Txt, Lang))
|
Msg, xmpp:err_service_unavailable(Txt, Lang)),
|
||||||
|
Acc
|
||||||
end;
|
end;
|
||||||
({_, _Time, El}) ->
|
({_, _Time, El}, Acc) ->
|
||||||
%% Raw element of type 'error' resulting from a validation error
|
%% Raw element of type 'error' resulting from a validation error
|
||||||
%% We cannot pass it to the router, it will generate an error
|
%% We cannot pass it to the router, it will generate an error
|
||||||
?DEBUG("Do not route raw element from ack queue: ~p", [El])
|
?DEBUG("Do not route raw element from ack queue: ~p", [El]),
|
||||||
end, Queue);
|
Acc
|
||||||
|
end, first_resend, Queue);
|
||||||
route_unacked_stanzas(_State) ->
|
route_unacked_stanzas(_State) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user