XEP-0013: Flexible Offline Message Retrieval support

This commit is contained in:
Evgeniy Khramtsov 2016-02-09 17:59:54 +03:00
parent 2bca8d5121
commit 4839ba5ae4
4 changed files with 353 additions and 23 deletions

View File

@ -131,6 +131,7 @@
-define(NS_FEATURE_COMPRESS,
<<"http://jabber.org/features/compress">>).
-define(NS_FEATURE_MSGOFFLINE, <<"msgoffline">>).
-define(NS_FLEX_OFFLINE, <<"http://jabber.org/protocol/offline">>).
-define(NS_COMPRESS,
<<"http://jabber.org/protocol/compress">>).
-define(NS_CAPS, <<"http://jabber.org/protocol/caps">>).

View File

@ -115,6 +115,7 @@
mgmt_resend,
mgmt_stanzas_in = 0,
mgmt_stanzas_out = 0,
ask_offline = true,
lang = <<"">>}).
%-define(DBGFSM, true).
@ -1737,6 +1738,8 @@ handle_info({broadcast, Type, From, Packet}, StateName, StateData) ->
From, jid:make(USR), Packet)
end, lists:usort(Recipients)),
fsm_next_state(StateName, StateData);
handle_info(dont_ask_offline, StateName, StateData) ->
fsm_next_state(StateName, StateData#state{ask_offline = false});
handle_info(Info, StateName, StateData) ->
?ERROR_MSG("Unexpected info: ~p", [Info]),
fsm_next_state(StateName, StateData).
@ -2310,7 +2313,7 @@ process_privacy_iq(From, To,
ejabberd_router:route(To, From, jlib:iq_to_xml(IQRes)),
NewStateData.
resend_offline_messages(StateData) ->
resend_offline_messages(#state{ask_offline = true} = StateData) ->
case ejabberd_hooks:run_fold(resend_offline_messages_hook,
StateData#state.server, [],
[StateData#state.user, StateData#state.server])
@ -2331,7 +2334,9 @@ resend_offline_messages(StateData) ->
end
end,
Rs)
end.
end;
resend_offline_messages(_StateData) ->
ok.
resend_subscription_requests(#state{user = User,
server = Server} = StateData) ->

View File

@ -382,6 +382,8 @@ process_sm_iq_info(From, To,
Identity = ejabberd_hooks:run_fold(disco_sm_identity,
Host, [],
[From, To, Node, Lang]),
Info = ejabberd_hooks:run_fold(disco_info, Host, [],
[From, To, Node, Lang]),
case ejabberd_hooks:run_fold(disco_sm_features, Host,
empty, [From, To, Node, Lang])
of
@ -397,7 +399,7 @@ process_sm_iq_info(From, To,
[{<<"xmlns">>, ?NS_DISCO_INFO}
| ANode],
children =
Identity ++
Identity ++ Info ++
features_to_xml(Features)}]};
{error, Error} ->
IQ#iq{type = error, sub_el = [SubEl, Error]}

View File

@ -47,6 +47,10 @@
resend_offline_messages/2,
pop_offline_messages/3,
get_sm_features/5,
get_sm_identity/5,
get_sm_items/5,
get_info/5,
handle_offline_query/3,
remove_expired_messages/1,
remove_old_messages/2,
remove_user/2,
@ -113,6 +117,8 @@ init([Host, Opts]) ->
update_table();
_ -> ok
end,
IQDisc = gen_mod:get_opt(iqdisc, Opts, fun gen_iq_handler:check_type/1,
no_queue),
ejabberd_hooks:add(offline_message_hook, Host, ?MODULE,
store_packet, 50),
ejabberd_hooks:add(resend_offline_messages_hook, Host,
@ -125,12 +131,19 @@ init([Host, Opts]) ->
?MODULE, get_sm_features, 50),
ejabberd_hooks:add(disco_local_features, Host,
?MODULE, get_sm_features, 50),
ejabberd_hooks:add(disco_sm_identity, Host,
?MODULE, get_sm_identity, 50),
ejabberd_hooks:add(disco_sm_items, Host,
?MODULE, get_sm_items, 50),
ejabberd_hooks:add(disco_info, Host, ?MODULE, get_info, 50),
ejabberd_hooks:add(webadmin_page_host, Host,
?MODULE, webadmin_page, 50),
ejabberd_hooks:add(webadmin_user, Host,
?MODULE, webadmin_user, 50),
ejabberd_hooks:add(webadmin_user_parse_query, Host,
?MODULE, webadmin_user_parse_query, 50),
gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE,
?MODULE, handle_offline_query, IQDisc),
AccessMaxOfflineMsgs =
gen_mod:get_opt(access_max_user_messages, Opts,
fun(A) when is_atom(A) -> A end,
@ -175,12 +188,16 @@ terminate(_Reason, State) ->
?MODULE, remove_user, 50),
ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, get_sm_features, 50),
ejabberd_hooks:delete(disco_local_features, Host, ?MODULE, get_sm_features, 50),
ejabberd_hooks:delete(disco_sm_identity, Host, ?MODULE, get_sm_identity, 50),
ejabberd_hooks:delete(disco_sm_items, Host, ?MODULE, get_sm_items, 50),
ejabberd_hooks:delete(disco_info, Host, ?MODULE, get_info, 50),
ejabberd_hooks:delete(webadmin_page_host, Host,
?MODULE, webadmin_page, 50),
ejabberd_hooks:delete(webadmin_user, Host,
?MODULE, webadmin_user, 50),
ejabberd_hooks:delete(webadmin_user_parse_query, Host,
?MODULE, webadmin_user_parse_query, 50),
gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE),
ok.
@ -276,38 +293,223 @@ get_sm_features(Acc, _From, _To, <<"">>, _Lang) ->
{result, I} -> I;
_ -> []
end,
{result, Feats ++ [?NS_FEATURE_MSGOFFLINE]};
{result, Feats ++ [?NS_FEATURE_MSGOFFLINE, ?NS_FLEX_OFFLINE]};
get_sm_features(_Acc, _From, _To, ?NS_FEATURE_MSGOFFLINE, _Lang) ->
%% override all lesser features...
{result, []};
get_sm_features(_Acc, #jid{luser = U, lserver = S}, #jid{luser = U, lserver = S},
?NS_FLEX_OFFLINE, _Lang) ->
{result, [?NS_FLEX_OFFLINE]};
get_sm_features(Acc, _From, _To, _Node, _Lang) ->
Acc.
get_sm_identity(_Acc, #jid{luser = U, lserver = S}, #jid{luser = U, lserver = S},
?NS_FLEX_OFFLINE, _Lang) ->
Identity = #xmlel{name = <<"identity">>,
attrs = [{<<"category">>, <<"automation">>},
{<<"type">>, <<"message-list">>}]},
[Identity];
get_sm_identity(Acc, _From, _To, _Node, _Lang) ->
Acc.
get_sm_items(_Acc, #jid{luser = U, lserver = S, lresource = R} = JID,
#jid{luser = U, lserver = S},
?NS_FLEX_OFFLINE, _Lang) ->
case ejabberd_sm:get_session_pid(U, S, R) of
Pid when is_pid(Pid) ->
Hdrs = read_message_headers(U, S),
BareJID = jid:to_string(jid:remove_resource(JID)),
Pid ! dont_ask_offline,
{result, lists:map(
fun({Node, From, _OfflineMsg}) ->
#xmlel{name = <<"item">>,
attrs = [{<<"jid">>, BareJID},
{<<"node">>, Node},
{<<"name">>, From}]}
end, Hdrs)};
none ->
{result, []}
end;
get_sm_items(Acc, _From, _To, _Node, _Lang) ->
Acc.
get_info(_Acc, #jid{luser = U, lserver = S}, #jid{luser = U, lserver = S},
?NS_FLEX_OFFLINE, _Lang) ->
N = jlib:integer_to_binary(count_offline_messages(U, S)),
[#xmlel{name = <<"x">>,
attrs = [{<<"xmlns">>, ?NS_XDATA},
{<<"type">>, <<"result">>}],
children = [#xmlel{name = <<"field">>,
attrs = [{<<"var">>, <<"FORM_TYPE">>},
{<<"type">>, <<"hidden">>}],
children = [#xmlel{name = <<"value">>,
children = [{xmlcdata,
?NS_FLEX_OFFLINE}]}]},
#xmlel{name = <<"field">>,
attrs = [{<<"var">>, <<"number_of_messages">>}],
children = [#xmlel{name = <<"value">>,
children = [{xmlcdata, N}]}]}]}];
get_info(Acc, _From, _To, _Node, _Lang) ->
Acc.
handle_offline_query(#jid{luser = U, lserver = S} = From,
#jid{luser = U, lserver = S} = _To,
#iq{type = Type, sub_el = SubEl} = IQ) ->
case Type of
get ->
case fxml:get_subtag(SubEl, <<"fetch">>) of
#xmlel{} ->
handle_offline_fetch(From);
false ->
handle_offline_items_view(From, SubEl)
end;
set ->
case fxml:get_subtag(SubEl, <<"purge">>) of
#xmlel{} ->
delete_all_msgs(U, S);
false ->
handle_offline_items_remove(From, SubEl)
end
end,
IQ#iq{type = result, sub_el = []};
handle_offline_query(_From, _To, #iq{sub_el = SubEl} = IQ) ->
IQ#iq{type = error, sub_el = [SubEl, ?ERR_FORBIDDEN]}.
handle_offline_items_view(JID, #xmlel{children = Items}) ->
{U, S, R} = jid:tolower(JID),
lists:foreach(
fun(Node) ->
case fetch_msg_by_node(JID, Node) of
{ok, OfflineMsg} ->
case offline_msg_to_route(S, OfflineMsg) of
{route, From, To, El} ->
NewEl = set_offline_tag(El, Node),
case ejabberd_sm:get_session_pid(U, S, R) of
Pid when is_pid(Pid) ->
Pid ! {route, From, To, NewEl};
none ->
ok
end;
error ->
ok
end;
error ->
ok
end
end, get_nodes_from_items(Items, <<"view">>)).
handle_offline_items_remove(JID, #xmlel{children = Items}) ->
lists:foreach(
fun(Node) ->
remove_msg_by_node(JID, Node)
end, get_nodes_from_items(Items, <<"remove">>)).
get_nodes_from_items(Items, Action) ->
lists:flatmap(
fun(#xmlel{name = <<"item">>, attrs = Attrs}) ->
case fxml:get_attr_s(<<"action">>, Attrs) of
Action ->
case fxml:get_attr_s(<<"node">>, Attrs) of
<<"">> ->
[];
TS ->
[TS]
end;
_ ->
[]
end;
(_) ->
[]
end, Items).
set_offline_tag(#xmlel{children = Els} = El, Node) ->
OfflineEl = #xmlel{name = <<"offline">>,
attrs = [{<<"xmlns">>, ?NS_FLEX_OFFLINE}],
children = [#xmlel{name = <<"item">>,
attrs = [{<<"node">>, Node}]}]},
El#xmlel{children = [OfflineEl|Els]}.
handle_offline_fetch(#jid{luser = U, lserver = S, lresource = R}) ->
case ejabberd_sm:get_session_pid(U, S, R) of
none ->
ok;
Pid when is_pid(Pid) ->
Pid ! dont_ask_offline,
lists:foreach(
fun({Node, _, Msg}) ->
case offline_msg_to_route(S, Msg) of
{route, From, To, El} ->
NewEl = set_offline_tag(El, Node),
Pid ! {route, From, To, NewEl};
error ->
ok
end
end, read_message_headers(U, S))
end.
fetch_msg_by_node(To, <<Seq:20/binary, "+", From_s/binary>>) ->
case jid:from_string(From_s) of
From = #jid{} ->
case gen_mod:db_type(To#jid.lserver, ?MODULE) of
odbc ->
read_message(From, To, Seq, odbc);
DBType ->
case binary_to_timestamp(Seq) of
undefined -> ok;
TS -> read_message(From, To, TS, DBType)
end
end;
error ->
ok
end.
remove_msg_by_node(To, <<Seq:20/binary, "+", From_s/binary>>) ->
case jid:from_string(From_s) of
From = #jid{} ->
case gen_mod:db_type(To#jid.lserver, ?MODULE) of
odbc ->
remove_message(From, To, Seq, odbc);
DBType ->
case binary_to_timestamp(Seq) of
undefined -> ok;
TS -> remove_message(From, To, TS, DBType)
end
end;
error ->
ok
end.
need_to_store(LServer, Packet) ->
Type = fxml:get_tag_attr_s(<<"type">>, Packet),
if (Type /= <<"error">>) and (Type /= <<"groupchat">>)
and (Type /= <<"headline">>) ->
case check_store_hint(Packet) of
store ->
true;
no_store ->
false;
none ->
case gen_mod:get_module_opt(
LServer, ?MODULE, store_empty_body,
fun(V) when is_boolean(V) -> V;
(unless_chat_state) -> unless_chat_state
end,
unless_chat_state) of
false ->
fxml:get_subtag(Packet, <<"body">>) /= false;
unless_chat_state ->
not jlib:is_standalone_chat_state(Packet);
true ->
true
end
case has_offline_tag(Packet) of
false ->
case check_store_hint(Packet) of
store ->
true;
no_store ->
false;
none ->
case gen_mod:get_module_opt(
LServer, ?MODULE, store_empty_body,
fun(V) when is_boolean(V) -> V;
(unless_chat_state) -> unless_chat_state
end,
unless_chat_state) of
false ->
fxml:get_subtag(Packet, <<"body">>) /= false;
unless_chat_state ->
not jlib:is_standalone_chat_state(Packet);
true ->
true
end
end;
true ->
false
end;
true ->
false
@ -353,6 +555,9 @@ has_no_store_hint(Packet) ->
orelse
fxml:get_subtag_with_xmlns(Packet, <<"no-storage">>, ?NS_HINTS) =/= false.
has_offline_tag(Packet) ->
fxml:get_subtag_with_xmlns(Packet, <<"offline">>, ?NS_FLEX_OFFLINE) =/= false.
%% Check if the packet has any content about XEP-0022
check_event(From, To, Packet) ->
#xmlel{name = Name, attrs = Attrs, children = Els} =
@ -713,6 +918,123 @@ offline_msg_to_route(_LServer, #xmlel{} = El) ->
error
end.
binary_to_timestamp(TS) ->
case catch jlib:binary_to_integer(TS) of
Int when is_integer(Int) ->
Secs = Int div 1000000,
USec = Int rem 1000000,
MSec = Secs div 1000000,
Sec = Secs rem 1000000,
{MSec, Sec, USec};
_ ->
undefined
end.
timestamp_to_binary({MS, S, US}) ->
format_timestamp(integer_to_list((MS * 1000000 + S) * 1000000 + US)).
format_timestamp(TS) ->
iolist_to_binary(io_lib:format("~20..0s", [TS])).
offline_msg_to_header(#offline_msg{from = From, timestamp = Int} = Msg) ->
TS = timestamp_to_binary(Int),
From_s = jid:to_string(From),
{<<TS/binary, "+", From_s/binary>>, From_s, Msg}.
read_message_headers(LUser, LServer) ->
DBType = gen_mod:db_type(LServer, ?MODULE),
read_message_headers(LUser, LServer, DBType).
read_message_headers(LUser, LServer, mnesia) ->
Msgs = mnesia:dirty_read({offline_msg, {LUser, LServer}}),
Hdrs = lists:map(fun offline_msg_to_header/1, Msgs),
lists:keysort(1, Hdrs);
read_message_headers(LUser, LServer, riak) ->
case ejabberd_riak:get_by_index(
offline_msg, offline_msg_schema(),
<<"us">>, {LUser, LServer}) of
{ok, Rs} ->
Hdrs = lists:map(fun offline_msg_to_header/1, Rs),
lists:keysort(1, Hdrs);
_Err ->
[]
end;
read_message_headers(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
case catch ejabberd_odbc:sql_query(
LServer, [<<"select xml, seq from spool where username ='">>,
Username, <<"' order by seq;">>]) of
{selected, [<<"xml">>, <<"seq">>], Rows} ->
Hdrs = lists:flatmap(
fun([XML, Seq]) ->
try
#xmlel{} = El = fxml_stream:parse_element(XML),
From = fxml:get_tag_attr_s(<<"from">>, El),
#jid{} = jid:from_string(From),
TS = format_timestamp(Seq),
[{<<TS/binary, "+", From/binary>>, From, El}]
catch _:_ -> []
end
end, Rows),
lists:keysort(1, Hdrs);
_Err ->
[]
end.
read_message(_From, To, TS, mnesia) ->
{U, S, _} = jid:tolower(To),
case mnesia:dirty_match_object(
offline_msg, #offline_msg{us = {U, S}, timestamp = TS, _ = '_'}) of
[Msg|_] ->
{ok, Msg};
_ ->
error
end;
read_message(_From, _To, TS, riak) ->
case ejabberd_riak:get(offline_msg, offline_msg_schema(), TS) of
{ok, Msg} ->
{ok, Msg};
_ ->
error
end;
read_message(_From, To, Seq, odbc) ->
{LUser, LServer, _} = jid:tolower(To),
Username = ejabberd_odbc:escape(LUser),
SSeq = ejabberd_odbc:escape(Seq),
case ejabberd_odbc:sql_query(
LServer,
[<<"select xml from spool where username='">>, Username,
<<"' and seq='">>, SSeq, <<"';">>]) of
{selected, [<<"xml">>], [[RawXML]|_]} ->
case fxml_stream:parse_element(RawXML) of
#xmlel{} = El -> {ok, El};
{error, _} -> error
end;
_ ->
error
end.
remove_message(_From, To, TS, mnesia) ->
{U, S, _} = jid:tolower(To),
Msgs = mnesia:dirty_match_object(
offline_msg, #offline_msg{us = {U, S}, timestamp = TS, _ = '_'}),
lists:foreach(
fun(Msg) ->
mnesia:dirty_delete_object(Msg)
end, Msgs);
remove_message(_From, _To, TS, riak) ->
ejabberd_riak:delete(offline_msg, TS),
ok;
remove_message(_From, To, Seq, odbc) ->
{LUser, LServer, _} = jid:tolower(To),
Username = ejabberd_odbc:escape(LUser),
SSeq = ejabberd_odbc:escape(Seq),
ejabberd_odbc:sql_query(
LServer,
[<<"delete from spool where username='">>, Username,
<<"' and seq='">>, SSeq, <<"';">>]),
ok.
read_all_msgs(LUser, LServer, mnesia) ->
US = {LUser, LServer},
lists:keysort(#offline_msg.timestamp,