From 901d2e0aed83d195a4d1cf2929114b07dcac0dd8 Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Fri, 15 Apr 2016 13:44:33 +0300 Subject: [PATCH] Clean mod_offline.erl from DB specific code --- src/mod_offline.erl | 934 +++++++------------------------------ src/mod_offline_mnesia.erl | 232 +++++++++ src/mod_offline_riak.erl | 153 ++++++ src/mod_offline_sql.erl | 252 ++++++++++ 4 files changed, 808 insertions(+), 763 deletions(-) create mode 100644 src/mod_offline_mnesia.erl create mode 100644 src/mod_offline_riak.erl create mode 100644 src/mod_offline_sql.erl diff --git a/src/mod_offline.erl b/src/mod_offline.erl index 54eda165c..2cdd82ae8 100644 --- a/src/mod_offline.erl +++ b/src/mod_offline.erl @@ -25,8 +25,6 @@ -module(mod_offline). --compile([{parse_transform, ejabberd_sql_pt}]). - -author('alexey@process-one.net'). -protocol({xep, 13, '1.2'}). @@ -61,6 +59,7 @@ get_queue_length/2, count_offline_messages/2, get_offline_els/2, + find_x_expire/2, webadmin_page/3, webadmin_user/4, webadmin_user_parse_query/5]). @@ -82,8 +81,6 @@ -include("mod_offline.hrl"). --include("ejabberd_sql_pt.hrl"). - -define(PROCNAME, ejabberd_offline). -define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000). @@ -91,6 +88,25 @@ %% default value for the maximum number of user messages -define(MAX_USER_MESSAGES, infinity). +-type us() :: {binary(), binary()}. +-callback init(binary(), gen_mod:opts()) -> any(). +-callback import(binary(), #offline_msg{}) -> ok | pass. +-callback store_messages(binary(), us(), [#offline_msg{}], + non_neg_integer(), non_neg_integer()) -> + {atomic, any()}. +-callback pop_messages(binary(), binary()) -> + {atomic, [#offline_msg{}]} | {aborted, any()}. +-callback remove_expired_messages(binary()) -> {atomic, any()}. +-callback remove_old_messages(non_neg_integer(), binary()) -> {atomic, any()}. +-callback remove_user(binary(), binary()) -> {atomic, any()}. +-callback read_message_headers(binary(), binary()) -> any(). +-callback read_message(binary(), binary(), non_neg_integer()) -> + {ok, #offline_msg{}} | error. +-callback remove_message(binary(), binary(), non_neg_integer()) -> ok. +-callback read_all_messages(binary(), binary()) -> [#offline_msg{}]. +-callback remove_all_messages(binary(), binary()) -> {atomic, any()}. +-callback count_messages(binary(), binary()) -> non_neg_integer(). + start_link(Host, Opts) -> Proc = gen_mod:get_module_proc(Host, ?PROCNAME), ?GEN_SERVER:start_link({local, Proc}, ?MODULE, @@ -115,14 +131,8 @@ stop(Host) -> %%==================================================================== init([Host, Opts]) -> - case gen_mod:db_type(Host, Opts) of - mnesia -> - mnesia:create_table(offline_msg, - [{disc_only_copies, [node()]}, {type, bag}, - {attributes, record_info(fields, offline_msg)}]), - update_table(); - _ -> ok - end, + Mod = gen_mod:db_mod(Host, Opts, ?MODULE), + Mod:init(Host, Opts), IQDisc = gen_mod:get_opt(iqdisc, Opts, fun gen_iq_handler:check_type/1, no_queue), ejabberd_hooks:add(offline_message_hook, Host, ?MODULE, @@ -174,7 +184,7 @@ handle_info(#offline_msg{us = UserServer} = Msg, State) -> Len = length(Msgs), MaxOfflineMsgs = get_max_user_messages(AccessMaxOfflineMsgs, UserServer, Host), - store_offline_msg(Host, UserServer, Msgs, Len, MaxOfflineMsgs, DBType), + store_offline_msg(Host, UserServer, Msgs, Len, MaxOfflineMsgs), {noreply, State}; handle_info(_Info, State) -> @@ -210,68 +220,12 @@ terminate(_Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. store_offline_msg(Host, US, Msgs, Len, MaxOfflineMsgs) -> - DBType = gen_mod:db_type(Host, ?MODULE), - store_offline_msg(Host, US, Msgs, Len, MaxOfflineMsgs, DBType). - -store_offline_msg(_Host, US, Msgs, Len, MaxOfflineMsgs, - mnesia) -> - F = fun () -> - Count = if MaxOfflineMsgs =/= infinity -> - Len + count_mnesia_records(US); - true -> 0 - end, - if Count > MaxOfflineMsgs -> discard_warn_sender(Msgs); - true -> - if Len >= (?OFFLINE_TABLE_LOCK_THRESHOLD) -> - mnesia:write_lock_table(offline_msg); - true -> ok - end, - lists:foreach(fun (M) -> mnesia:write(M) end, Msgs) - end - end, - mnesia:transaction(F); -store_offline_msg(Host, {User, _Server}, Msgs, Len, MaxOfflineMsgs, odbc) -> - Count = if MaxOfflineMsgs =/= infinity -> - Len + count_offline_messages(User, Host); - true -> 0 - end, - if Count > MaxOfflineMsgs -> discard_warn_sender(Msgs); - true -> - Query = lists:map(fun (M) -> - Username = - ejabberd_odbc:escape((M#offline_msg.to)#jid.luser), - From = M#offline_msg.from, - To = M#offline_msg.to, - Packet = - jlib:replace_from_to(From, To, - M#offline_msg.packet), - NewPacket = - jlib:add_delay_info(Packet, Host, - M#offline_msg.timestamp, - <<"Offline Storage">>), - XML = - ejabberd_odbc:escape(fxml:element_to_binary(NewPacket)), - odbc_queries:add_spool_sql(Username, XML) - end, - Msgs), - odbc_queries:add_spool(Host, Query) - end; -store_offline_msg(Host, {User, _}, Msgs, Len, MaxOfflineMsgs, - riak) -> - Count = if MaxOfflineMsgs =/= infinity -> - Len + count_offline_messages(User, Host); - true -> 0 - end, - if - Count > MaxOfflineMsgs -> - discard_warn_sender(Msgs); - true -> - lists:foreach( - fun(#offline_msg{us = US, - timestamp = TS} = M) -> - ejabberd_riak:put(M, offline_msg_schema(), - [{i, TS}, {'2i', [{<<"us">>, US}]}]) - end, Msgs) + Mod = gen_mod:db_mod(Host, ?MODULE), + case Mod:store_messages(Host, US, Msgs, Len, MaxOfflineMsgs) of + {atomic, discard} -> + discard_warn_sender(Msgs); + _ -> + ok end. get_max_user_messages(AccessRule, {User, Server}, Host) -> @@ -330,11 +284,11 @@ get_sm_items(_Acc, #jid{luser = U, lserver = S, lresource = R} = JID, BareJID = jid:to_string(jid:remove_resource(JID)), Pid ! dont_ask_offline, {result, lists:map( - fun({Node, From, _OfflineMsg}) -> + fun({Node, From, _To, _El}) -> #xmlel{name = <<"item">>, attrs = [{<<"jid">>, BareJID}, {<<"node">>, Node}, - {<<"name">>, From}]} + {<<"name">>, jid:to_string(From)}]} end, Hdrs)}; none -> {result, []} @@ -452,46 +406,31 @@ handle_offline_fetch(#jid{luser = U, lserver = S, lresource = R}) -> Pid when is_pid(Pid) -> Pid ! dont_ask_offline, lists:foreach( - fun({Node, _, Msg}) -> - case offline_msg_to_route(S, Msg) of - {route, From, To, El} -> - NewEl = set_offline_tag(El, Node), - Pid ! {route, From, To, NewEl}; - error -> - ok - end + fun({Node, From, To, El}) -> + NewEl = set_offline_tag(El, Node), + Pid ! {route, From, To, NewEl} end, read_message_headers(U, S)) end. -fetch_msg_by_node(To, <>) -> - case jid:from_string(From_s) of - From = #jid{} -> - case gen_mod:db_type(To#jid.lserver, ?MODULE) of - odbc -> - read_message(From, To, Seq, odbc); - DBType -> - case binary_to_timestamp(Seq) of - undefined -> ok; - TS -> read_message(From, To, TS, DBType) - end - end; - error -> - ok +fetch_msg_by_node(To, Seq) -> + case catch binary_to_integer(Seq) of + I when is_integer(I), I >= 0 -> + LUser = To#jid.luser, + LServer = To#jid.lserver, + Mod = gen_mod:db_mod(LServer, ?MODULE), + Mod:read_message(LUser, LServer, I); + _ -> + error end. -remove_msg_by_node(To, <>) -> - case jid:from_string(From_s) of - From = #jid{} -> - case gen_mod:db_type(To#jid.lserver, ?MODULE) of - odbc -> - remove_message(From, To, Seq, odbc); - DBType -> - case binary_to_timestamp(Seq) of - undefined -> ok; - TS -> remove_message(From, To, TS, DBType) - end - end; - error -> +remove_msg_by_node(To, Seq) -> + case catch binary_to_integer(Seq) of + I when is_integer(I), I>= 0 -> + LUser = To#jid.luser, + LServer = To#jid.lserver, + Mod = gen_mod:db_mod(LServer, ?MODULE), + Mod:remove_message(LUser, LServer, I); + _ -> ok end. @@ -648,21 +587,11 @@ find_x_expire(TimeStamp, [El | Els]) -> resend_offline_messages(User, Server) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), - US = {LUser, LServer}, - F = fun () -> - Rs = mnesia:wread({offline_msg, US}), - mnesia:delete({offline_msg, US}), - Rs - end, - case mnesia:transaction(F) of - {atomic, Rs} -> + Mod = gen_mod:db_mod(LServer, ?MODULE), + case Mod:pop_messages(LUser, LServer) of + {ok, Rs} -> lists:foreach(fun (R) -> - ejabberd_sm ! - {route, R#offline_msg.from, R#offline_msg.to, - jlib:add_delay_info(R#offline_msg.packet, - LServer, - R#offline_msg.timestamp, - <<"Offline Storage">>)} + ejabberd_sm ! offline_msg_to_route(LServer, R) end, lists:keysort(#offline_msg.timestamp, Rs)); _ -> ok @@ -671,190 +600,47 @@ resend_offline_messages(User, Server) -> pop_offline_messages(Ls, User, Server) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), - pop_offline_messages(Ls, LUser, LServer, - gen_mod:db_type(LServer, ?MODULE)). - -pop_offline_messages(Ls, LUser, LServer, mnesia) -> - US = {LUser, LServer}, - F = fun () -> - Rs = mnesia:wread({offline_msg, US}), - mnesia:delete({offline_msg, US}), - Rs - end, - case mnesia:transaction(F) of - {atomic, Rs} -> - TS = p1_time_compat:timestamp(), - Ls ++ - lists:map(fun (R) -> - offline_msg_to_route(LServer, R) - end, - lists:filter(fun (R) -> - case R#offline_msg.expire of - never -> true; - TimeStamp -> TS < TimeStamp - end - end, - lists:keysort(#offline_msg.timestamp, Rs))); - _ -> Ls - end; -pop_offline_messages(Ls, LUser, LServer, odbc) -> - case odbc_queries:get_and_del_spool_msg_t(LServer, LUser) of - {atomic, {selected, Rs}} -> - Ls ++ - lists:flatmap(fun ({_, XML}) -> - case fxml_stream:parse_element(XML) of - {error, _Reason} -> - []; - El -> - case offline_msg_to_route(LServer, El) of - error -> - []; - RouteMsg -> - [RouteMsg] - end - end + Mod = gen_mod:db_mod(LServer, ?MODULE), + case Mod:pop_messages(LUser, LServer) of + {ok, Rs} -> + TS = p1_time_compat:timestamp(), + Ls ++ + lists:map(fun (R) -> + offline_msg_to_route(LServer, R) end, - Rs); - _ -> Ls - end; -pop_offline_messages(Ls, LUser, LServer, riak) -> - case ejabberd_riak:get_by_index(offline_msg, offline_msg_schema(), - <<"us">>, {LUser, LServer}) of - {ok, Rs} -> - try - lists:foreach( - fun(#offline_msg{timestamp = T}) -> - ok = ejabberd_riak:delete(offline_msg, T) - end, Rs), - TS = p1_time_compat:timestamp(), - Ls ++ lists:map( - fun (R) -> - offline_msg_to_route(LServer, R) - end, - lists:filter( - fun(R) -> - case R#offline_msg.expire of - never -> true; - TimeStamp -> TS < TimeStamp - end - end, - lists:keysort(#offline_msg.timestamp, Rs))) - catch _:{badmatch, _} -> - Ls - end; + lists:filter( + fun(#offline_msg{packet = Pkt} = R) -> + #xmlel{children = Els} = Pkt, + Expire = case R#offline_msg.expire of + undefined -> + find_x_expire(TS, Els); + Exp -> + Exp + end, + case Expire of + never -> true; + TimeStamp -> TS < TimeStamp + end + end, Rs)); _ -> Ls end. remove_expired_messages(Server) -> LServer = jid:nameprep(Server), - remove_expired_messages(LServer, - gen_mod:db_type(LServer, ?MODULE)). - -remove_expired_messages(_LServer, mnesia) -> - TimeStamp = p1_time_compat:timestamp(), - F = fun () -> - mnesia:write_lock_table(offline_msg), - mnesia:foldl(fun (Rec, _Acc) -> - case Rec#offline_msg.expire of - never -> ok; - TS -> - if TS < TimeStamp -> - mnesia:delete_object(Rec); - true -> ok - end - end - end, - ok, offline_msg) - end, - mnesia:transaction(F); -remove_expired_messages(_LServer, odbc) -> {atomic, ok}; -remove_expired_messages(_LServer, riak) -> {atomic, ok}. + Mod = gen_mod:db_mod(LServer, ?MODULE), + Mod:remove_expired_messages(LServer). remove_old_messages(Days, Server) -> LServer = jid:nameprep(Server), - remove_old_messages(Days, LServer, - gen_mod:db_type(LServer, ?MODULE)). - -remove_old_messages(Days, _LServer, mnesia) -> - S = p1_time_compat:system_time(seconds) - 60 * 60 * 24 * Days, - MegaSecs1 = S div 1000000, - Secs1 = S rem 1000000, - TimeStamp = {MegaSecs1, Secs1, 0}, - F = fun () -> - mnesia:write_lock_table(offline_msg), - mnesia:foldl(fun (#offline_msg{timestamp = TS} = Rec, - _Acc) - when TS < TimeStamp -> - mnesia:delete_object(Rec); - (_Rec, _Acc) -> ok - end, - ok, offline_msg) - end, - mnesia:transaction(F); - -remove_old_messages(Days, LServer, odbc) -> - case catch ejabberd_odbc:sql_query( - LServer, - [<<"DELETE FROM spool" - " WHERE created_at < " - "DATE_SUB(CURDATE(), INTERVAL ">>, - integer_to_list(Days), <<" DAY);">>]) of - {updated, N} -> - ?INFO_MSG("~p message(s) deleted from offline spool", [N]); - _Error -> - ?ERROR_MSG("Cannot delete message in offline spool: ~p", [_Error]) - end, - {atomic, ok}; -remove_old_messages(_Days, _LServer, riak) -> - {atomic, ok}. + Mod = gen_mod:db_mod(LServer, ?MODULE), + Mod:remove_old_messages(Days, LServer). remove_user(User, Server) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), - remove_user(LUser, LServer, - gen_mod:db_type(LServer, ?MODULE)). - -remove_user(LUser, LServer, mnesia) -> - US = {LUser, LServer}, - F = fun () -> mnesia:delete({offline_msg, US}) end, - mnesia:transaction(F); -remove_user(LUser, LServer, odbc) -> - odbc_queries:del_spool_msg(LServer, LUser); -remove_user(LUser, LServer, riak) -> - {atomic, ejabberd_riak:delete_by_index(offline_msg, - <<"us">>, {LUser, LServer})}. - -jid_to_binary(#jid{user = U, server = S, resource = R, - luser = LU, lserver = LS, lresource = LR}) -> - #jid{user = iolist_to_binary(U), - server = iolist_to_binary(S), - resource = iolist_to_binary(R), - luser = iolist_to_binary(LU), - lserver = iolist_to_binary(LS), - lresource = iolist_to_binary(LR)}. - -update_table() -> - Fields = record_info(fields, offline_msg), - case mnesia:table_info(offline_msg, attributes) of - Fields -> - ejabberd_config:convert_table_to_binary( - offline_msg, Fields, bag, - fun(#offline_msg{us = {U, _}}) -> U end, - fun(#offline_msg{us = {U, S}, - from = From, - to = To, - packet = El} = R) -> - R#offline_msg{us = {iolist_to_binary(U), - iolist_to_binary(S)}, - from = jid_to_binary(From), - to = jid_to_binary(To), - packet = fxml:to_xmlel(El)} - end); - _ -> - ?INFO_MSG("Recreating offline_msg table", []), - mnesia:transform_table(offline_msg, ignore, Fields) - end. + Mod = gen_mod:db_mod(LServer, ?MODULE), + Mod:remove_user(LUser, LServer). %% Helper functions: @@ -880,255 +666,71 @@ webadmin_page(_, Host, webadmin_page(Acc, _, _) -> Acc. get_offline_els(LUser, LServer) -> - get_offline_els(LUser, LServer, gen_mod:db_type(LServer, ?MODULE)). - -get_offline_els(LUser, LServer, DBType) - when DBType == mnesia; DBType == riak -> - Msgs = read_all_msgs(LUser, LServer, DBType), + Mod = gen_mod:db_mod(LServer, ?MODULE), + Hdrs = Mod:read_message_headers(LUser, LServer), lists:map( - fun(Msg) -> - {route, From, To, Packet} = offline_msg_to_route(LServer, Msg), - jlib:replace_from_to(From, To, Packet) - end, Msgs); -get_offline_els(LUser, LServer, odbc) -> - case catch ejabberd_odbc:sql_query( - LServer, - ?SQL("select @(xml)s from spool where " - "username=%(LUser)s order by seq")) of - {selected, Rs} -> - lists:flatmap( - fun({XML}) -> - case fxml_stream:parse_element(XML) of - #xmlel{} = El -> - case offline_msg_to_route(LServer, El) of - {route, _, _, NewEl} -> - [NewEl]; - error -> - [] - end; - _ -> - [] - end - end, Rs); - _ -> - [] - end. + fun({_Seq, From, To, Packet}) -> + jlib:replace_from_to(From, To, Packet) + end, Hdrs). offline_msg_to_route(LServer, #offline_msg{} = R) -> - {route, R#offline_msg.from, R#offline_msg.to, - jlib:add_delay_info(R#offline_msg.packet, LServer, R#offline_msg.timestamp, - <<"Offline Storage">>)}; -offline_msg_to_route(_LServer, #xmlel{} = El) -> - To = jid:from_string(fxml:get_tag_attr_s(<<"to">>, El)), - From = jid:from_string(fxml:get_tag_attr_s(<<"from">>, El)), - if (To /= error) and (From /= error) -> - {route, From, To, El}; - true -> - error - end. - -binary_to_timestamp(TS) -> - case catch jlib:binary_to_integer(TS) of - Int when is_integer(Int) -> - Secs = Int div 1000000, - USec = Int rem 1000000, - MSec = Secs div 1000000, - Sec = Secs rem 1000000, - {MSec, Sec, USec}; - _ -> - undefined - end. - -timestamp_to_binary({MS, S, US}) -> - format_timestamp(integer_to_list((MS * 1000000 + S) * 1000000 + US)). - -format_timestamp(TS) -> - iolist_to_binary(io_lib:format("~20..0s", [TS])). - -offline_msg_to_header(#offline_msg{from = From, timestamp = Int} = Msg) -> - TS = timestamp_to_binary(Int), - From_s = jid:to_string(From), - {<>, From_s, Msg}. + El = case R#offline_msg.timestamp of + undefined -> + R#offline_msg.packet; + TS -> + jlib:add_delay_info(R#offline_msg.packet, LServer, TS, + <<"Offline Storage">>) + end, + {route, R#offline_msg.from, R#offline_msg.to, El}. read_message_headers(LUser, LServer) -> - DBType = gen_mod:db_type(LServer, ?MODULE), - read_message_headers(LUser, LServer, DBType). + Mod = gen_mod:db_mod(LServer, ?MODULE), + lists:map( + fun({Seq, From, To, El}) -> + Node = integer_to_binary(Seq), + {Node, From, To, El} + end, Mod:read_message_headers(LUser, LServer)). -read_message_headers(LUser, LServer, mnesia) -> - Msgs = mnesia:dirty_read({offline_msg, {LUser, LServer}}), - Hdrs = lists:map(fun offline_msg_to_header/1, Msgs), - lists:keysort(1, Hdrs); -read_message_headers(LUser, LServer, riak) -> - case ejabberd_riak:get_by_index( - offline_msg, offline_msg_schema(), - <<"us">>, {LUser, LServer}) of - {ok, Rs} -> - Hdrs = lists:map(fun offline_msg_to_header/1, Rs), - lists:keysort(1, Hdrs); - _Err -> - [] - end; -read_message_headers(LUser, LServer, odbc) -> - Username = ejabberd_odbc:escape(LUser), - case catch ejabberd_odbc:sql_query( - LServer, [<<"select xml, seq from spool where username ='">>, - Username, <<"' order by seq;">>]) of - {selected, [<<"xml">>, <<"seq">>], Rows} -> - Hdrs = lists:flatmap( - fun([XML, Seq]) -> - try - #xmlel{} = El = fxml_stream:parse_element(XML), - From = fxml:get_tag_attr_s(<<"from">>, El), - #jid{} = jid:from_string(From), - TS = format_timestamp(Seq), - [{<>, From, El}] - catch _:_ -> [] - end - end, Rows), - lists:keysort(1, Hdrs); - _Err -> - [] - end. - -read_message(_From, To, TS, mnesia) -> - {U, S, _} = jid:tolower(To), - case mnesia:dirty_match_object( - offline_msg, #offline_msg{us = {U, S}, timestamp = TS, _ = '_'}) of - [Msg|_] -> - {ok, Msg}; - _ -> - error - end; -read_message(_From, _To, TS, riak) -> - case ejabberd_riak:get(offline_msg, offline_msg_schema(), TS) of - {ok, Msg} -> - {ok, Msg}; - _ -> - error - end; -read_message(_From, To, Seq, odbc) -> - {LUser, LServer, _} = jid:tolower(To), - Username = ejabberd_odbc:escape(LUser), - SSeq = ejabberd_odbc:escape(Seq), - case ejabberd_odbc:sql_query( - LServer, - [<<"select xml from spool where username='">>, Username, - <<"' and seq='">>, SSeq, <<"';">>]) of - {selected, [<<"xml">>], [[RawXML]|_]} -> - case fxml_stream:parse_element(RawXML) of - #xmlel{} = El -> {ok, El}; - {error, _} -> error - end; - _ -> - error - end. - -remove_message(_From, To, TS, mnesia) -> - {U, S, _} = jid:tolower(To), - Msgs = mnesia:dirty_match_object( - offline_msg, #offline_msg{us = {U, S}, timestamp = TS, _ = '_'}), - lists:foreach( - fun(Msg) -> - mnesia:dirty_delete_object(Msg) - end, Msgs); -remove_message(_From, _To, TS, riak) -> - ejabberd_riak:delete(offline_msg, TS), - ok; -remove_message(_From, To, Seq, odbc) -> - {LUser, LServer, _} = jid:tolower(To), - Username = ejabberd_odbc:escape(LUser), - SSeq = ejabberd_odbc:escape(Seq), - ejabberd_odbc:sql_query( - LServer, - [<<"delete from spool where username='">>, Username, - <<"' and seq='">>, SSeq, <<"';">>]), - ok. - -read_all_msgs(LUser, LServer, mnesia) -> - US = {LUser, LServer}, - lists:keysort(#offline_msg.timestamp, - mnesia:dirty_read({offline_msg, US})); -read_all_msgs(LUser, LServer, riak) -> - case ejabberd_riak:get_by_index( - offline_msg, offline_msg_schema(), - <<"us">>, {LUser, LServer}) of - {ok, Rs} -> - lists:keysort(#offline_msg.timestamp, Rs); - _Err -> - [] - end; -read_all_msgs(LUser, LServer, odbc) -> - case catch ejabberd_odbc:sql_query( - LServer, - ?SQL("select @(xml)s from spool where " - "username=%(LUser)s order by seq")) of - {selected, Rs} -> - lists:flatmap( - fun({XML}) -> - case fxml_stream:parse_element(XML) of - {error, _Reason} -> []; - El -> [El] - end - end, - Rs); - _ -> [] - end. - -format_user_queue(Msgs, DBType) when DBType == mnesia; DBType == riak -> - lists:map(fun (#offline_msg{timestamp = TimeStamp, - from = From, to = To, - packet = - #xmlel{name = Name, attrs = Attrs, - children = Els}} = - Msg) -> - ID = jlib:encode_base64((term_to_binary(Msg))), - {{Year, Month, Day}, {Hour, Minute, Second}} = - calendar:now_to_local_time(TimeStamp), - Time = - iolist_to_binary(io_lib:format("~w-~.2.0w-~.2.0w ~.2.0w:~.2.0w:~.2.0w", - [Year, Month, Day, - Hour, Minute, - Second])), - SFrom = jid:to_string(From), - STo = jid:to_string(To), - Attrs2 = jlib:replace_from_to_attrs(SFrom, STo, Attrs), - Packet = #xmlel{name = Name, attrs = Attrs2, - children = Els}, - FPacket = ejabberd_web_admin:pretty_print_xml(Packet), - ?XE(<<"tr">>, - [?XAE(<<"td">>, [{<<"class">>, <<"valign">>}], - [?INPUT(<<"checkbox">>, <<"selected">>, ID)]), - ?XAC(<<"td">>, [{<<"class">>, <<"valign">>}], Time), - ?XAC(<<"td">>, [{<<"class">>, <<"valign">>}], SFrom), - ?XAC(<<"td">>, [{<<"class">>, <<"valign">>}], STo), - ?XAE(<<"td">>, [{<<"class">>, <<"valign">>}], - [?XC(<<"pre">>, FPacket)])]) - end, - Msgs); -format_user_queue(Msgs, odbc) -> - lists:map(fun (#xmlel{} = Msg) -> - ID = jlib:encode_base64((term_to_binary(Msg))), - Packet = Msg, - FPacket = ejabberd_web_admin:pretty_print_xml(Packet), - ?XE(<<"tr">>, - [?XAE(<<"td">>, [{<<"class">>, <<"valign">>}], - [?INPUT(<<"checkbox">>, <<"selected">>, ID)]), - ?XAE(<<"td">>, [{<<"class">>, <<"valign">>}], - [?XC(<<"pre">>, FPacket)])]) - end, - Msgs). +format_user_queue(Hdrs) -> + lists:map( + fun({Seq, From, To, El}) -> + ID = integer_to_binary(Seq), + FPacket = ejabberd_web_admin:pretty_print_xml(El), + SFrom = jid:to_string(From), + STo = jid:to_string(To), + Stamp = fxml:get_path_s(El, [{elem, <<"delay">>}, + {attr, <<"stamp">>}]), + Time = case jlib:datetime_string_to_timestamp(Stamp) of + {_, _, _} = Now -> + {{Year, Month, Day}, {Hour, Minute, Second}} = + calendar:now_to_local_time(Now), + iolist_to_binary( + io_lib:format( + "~w-~.2.0w-~.2.0w ~.2.0w:~.2.0w:~.2.0w", + [Year, Month, Day, Hour, Minute, + Second])); + _ -> + <<"">> + end, + ?XE(<<"tr">>, + [?XAE(<<"td">>, [{<<"class">>, <<"valign">>}], + [?INPUT(<<"checkbox">>, <<"selected">>, ID)]), + ?XAC(<<"td">>, [{<<"class">>, <<"valign">>}], Time), + ?XAC(<<"td">>, [{<<"class">>, <<"valign">>}], SFrom), + ?XAC(<<"td">>, [{<<"class">>, <<"valign">>}], STo), + ?XAE(<<"td">>, [{<<"class">>, <<"valign">>}], + [?XC(<<"pre">>, FPacket)])]) + end, Hdrs). user_queue(User, Server, Query, Lang) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), US = {LUser, LServer}, - DBType = gen_mod:db_type(LServer, ?MODULE), - Res = user_queue_parse_query(LUser, LServer, Query, - DBType), - MsgsAll = read_all_msgs(LUser, LServer, DBType), - Msgs = get_messages_subset(US, Server, MsgsAll, - DBType), - FMsgs = format_user_queue(Msgs, DBType), + Mod = gen_mod:db_mod(LServer, ?MODULE), + Res = user_queue_parse_query(LUser, LServer, Query), + HdrsAll = Mod:read_message_headers(LUser, LServer), + Hdrs = get_messages_subset(US, Server, HdrsAll), + FMsgs = format_user_queue(Hdrs), [?XC(<<"h1">>, list_to_binary(io_lib:format(?T(<<"~s's Offline Messages Queue">>), [us_to_list(US)])))] @@ -1158,96 +760,24 @@ user_queue(User, Server, Query, Lang) -> ?INPUTT(<<"submit">>, <<"delete">>, <<"Delete Selected">>)])]. -user_queue_parse_query(LUser, LServer, Query, mnesia) -> - US = {LUser, LServer}, +user_queue_parse_query(LUser, LServer, Query) -> + Mod = gen_mod:db_mod(LServer, ?MODULE), case lists:keysearch(<<"delete">>, 1, Query) of - {value, _} -> - Msgs = lists:keysort(#offline_msg.timestamp, - mnesia:dirty_read({offline_msg, US})), - F = fun () -> - lists:foreach(fun (Msg) -> - ID = - jlib:encode_base64((term_to_binary(Msg))), - case lists:member({<<"selected">>, - ID}, - Query) - of - true -> mnesia:delete_object(Msg); - false -> ok - end - end, - Msgs) - end, - mnesia:transaction(F), - ok; - false -> nothing - end; -user_queue_parse_query(LUser, LServer, Query, riak) -> - case lists:keysearch(<<"delete">>, 1, Query) of - {value, _} -> - Msgs = read_all_msgs(LUser, LServer, riak), - lists:foreach( - fun (Msg) -> - ID = jlib:encode_base64((term_to_binary(Msg))), - case lists:member({<<"selected">>, ID}, Query) of - true -> - ejabberd_riak:delete(offline_msg, - Msg#offline_msg.timestamp); - false -> - ok - end - end, - Msgs), - ok; - false -> - nothing - end; -user_queue_parse_query(LUser, LServer, Query, odbc) -> - Username = ejabberd_odbc:escape(LUser), - case lists:keysearch(<<"delete">>, 1, Query) of - {value, _} -> - Msgs = case catch ejabberd_odbc:sql_query(LServer, - [<<"select xml, seq from spool where username='">>, - Username, - <<"' order by seq;">>]) - of - {selected, [<<"xml">>, <<"seq">>], Rs} -> - lists:flatmap(fun ([XML, Seq]) -> - case fxml_stream:parse_element(XML) - of - {error, _Reason} -> []; - El -> [{El, Seq}] - end - end, - Rs); - _ -> [] - end, - F = fun () -> - lists:foreach(fun ({Msg, Seq}) -> - ID = - jlib:encode_base64((term_to_binary(Msg))), - case lists:member({<<"selected">>, - ID}, - Query) - of - true -> - SSeq = - ejabberd_odbc:escape(Seq), - catch - ejabberd_odbc:sql_query(LServer, - [<<"delete from spool where username='">>, - Username, - <<"' and seq='">>, - SSeq, - <<"';">>]); - false -> ok - end - end, - Msgs) - end, - mnesia:transaction(F), - ok; - false -> nothing + {value, _} -> + case lists:keyfind(<<"selected">>, 1, Query) of + {_, Seq} -> + case catch binary_to_integer(Seq) of + I when is_integer(I), I>=0 -> + Mod:remove_message(LUser, LServer, I), + ok; + _ -> + nothing + end; + false -> + nothing + end; + _ -> + nothing end. us_to_list({User, Server}) -> @@ -1256,7 +786,7 @@ us_to_list({User, Server}) -> get_queue_length(LUser, LServer) -> count_offline_messages(LUser, LServer). -get_messages_subset(User, Host, MsgsAll, DBType) -> +get_messages_subset(User, Host, MsgsAll) -> Access = gen_mod:get_module_opt(Host, ?MODULE, access_max_user_messages, fun(A) when is_atom(A) -> A end, max_user_offline_messages), @@ -1267,33 +797,20 @@ get_messages_subset(User, Host, MsgsAll, DBType) -> _ -> 100 end, Length = length(MsgsAll), - get_messages_subset2(MaxOfflineMsgs, Length, MsgsAll, - DBType). + get_messages_subset2(MaxOfflineMsgs, Length, MsgsAll). -get_messages_subset2(Max, Length, MsgsAll, _DBType) - when Length =< Max * 2 -> +get_messages_subset2(Max, Length, MsgsAll) when Length =< Max * 2 -> MsgsAll; -get_messages_subset2(Max, Length, MsgsAll, DBType) - when DBType == mnesia; DBType == riak -> +get_messages_subset2(Max, Length, MsgsAll) -> FirstN = Max, {MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll), MsgsLastN = lists:nthtail(Length - FirstN - FirstN, Msgs2), NoJID = jid:make(<<"...">>, <<"...">>, <<"">>), - IntermediateMsg = #offline_msg{timestamp = p1_time_compat:timestamp(), - from = NoJID, to = NoJID, - packet = - #xmlel{name = <<"...">>, attrs = [], - children = []}}, - MsgsFirstN ++ [IntermediateMsg] ++ MsgsLastN; -get_messages_subset2(Max, Length, MsgsAll, odbc) -> - FirstN = Max, - {MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll), - MsgsLastN = lists:nthtail(Length - FirstN - FirstN, - Msgs2), + Seq = <<"0">>, IntermediateMsg = #xmlel{name = <<"...">>, attrs = [], children = []}, - MsgsFirstN ++ [IntermediateMsg] ++ MsgsLastN. + MsgsFirstN ++ [{Seq, NoJID, NoJID, IntermediateMsg}] ++ MsgsLastN. webadmin_user(Acc, User, Server, Lang) -> QueueLen = count_offline_messages(jid:nodeprep(User), @@ -1310,25 +827,8 @@ webadmin_user(Acc, User, Server, Lang) -> delete_all_msgs(User, Server) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), - delete_all_msgs(LUser, LServer, - gen_mod:db_type(LServer, ?MODULE)). - -delete_all_msgs(LUser, LServer, mnesia) -> - US = {LUser, LServer}, - F = fun () -> - mnesia:write_lock_table(offline_msg), - lists:foreach(fun (Msg) -> mnesia:delete_object(Msg) - end, - mnesia:dirty_read({offline_msg, US})) - end, - mnesia:transaction(F); -delete_all_msgs(LUser, LServer, riak) -> - Res = ejabberd_riak:delete_by_index(offline_msg, - <<"us">>, {LUser, LServer}), - {atomic, Res}; -delete_all_msgs(LUser, LServer, odbc) -> - odbc_queries:del_spool_msg(LServer, LUser), - {atomic, ok}. + Mod = gen_mod:db_mod(LServer, ?MODULE), + Mod:remove_all_messages(LUser, LServer). webadmin_user_parse_query(_, <<"removealloffline">>, User, Server, _Query) -> @@ -1350,112 +850,20 @@ webadmin_user_parse_query(Acc, _Action, _User, _Server, count_offline_messages(User, Server) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), - DBType = gen_mod:db_type(LServer, ?MODULE), - count_offline_messages(LUser, LServer, DBType). + Mod = gen_mod:db_mod(LServer, ?MODULE), + Mod:count_messages(LUser, LServer). -count_offline_messages(LUser, LServer, mnesia) -> - US = {LUser, LServer}, - F = fun () -> - count_mnesia_records(US) - end, - case catch mnesia:async_dirty(F) of - I when is_integer(I) -> I; - _ -> 0 - end; -count_offline_messages(LUser, LServer, odbc) -> - case catch ejabberd_odbc:sql_query( - LServer, - ?SQL("select @(count(*))d from spool " - "where username=%(LUser)s")) of - {selected, [{Res}]} -> - Res; - _ -> 0 - end; -count_offline_messages(LUser, LServer, riak) -> - case ejabberd_riak:count_by_index( - offline_msg, <<"us">>, {LUser, LServer}) of - {ok, Res} -> - Res; - _ -> - 0 - end. - -%% Return the number of records matching a given match expression. -%% This function is intended to be used inside a Mnesia transaction. -%% The count has been written to use the fewest possible memory by -%% getting the record by small increment and by using continuation. --define(BATCHSIZE, 100). - -count_mnesia_records(US) -> - MatchExpression = #offline_msg{us = US, _ = '_'}, - case mnesia:select(offline_msg, [{MatchExpression, [], [[]]}], - ?BATCHSIZE, read) of - {Result, Cont} -> - Count = length(Result), - count_records_cont(Cont, Count); - '$end_of_table' -> - 0 - end. - -count_records_cont(Cont, Count) -> - case mnesia:select(Cont) of - {Result, Cont} -> - NewCount = Count + length(Result), - count_records_cont(Cont, NewCount); - '$end_of_table' -> - Count - end. - -offline_msg_schema() -> - {record_info(fields, offline_msg), #offline_msg{}}. - -export(_Server) -> - [{offline_msg, - fun(Host, #offline_msg{us = {LUser, LServer}, - timestamp = TimeStamp, from = From, to = To, - packet = Packet}) - when LServer == Host -> - Username = ejabberd_odbc:escape(LUser), - Packet1 = jlib:replace_from_to(From, To, Packet), - Packet2 = jlib:add_delay_info(Packet1, LServer, TimeStamp, - <<"Offline Storage">>), - XML = ejabberd_odbc:escape(fxml:element_to_binary(Packet2)), - [[<<"delete from spool where username='">>, Username, <<"';">>], - [<<"insert into spool(username, xml) values ('">>, - Username, <<"', '">>, XML, <<"');">>]]; - (_Host, _R) -> - [] - end}]. +export(LServer) -> + Mod = gen_mod:db_mod(LServer, ?MODULE), + Mod:export(LServer). import(LServer) -> - [{<<"select username, xml from spool;">>, - fun([LUser, XML]) -> - El = #xmlel{} = fxml_stream:parse_element(XML), - From = #jid{} = jid:from_string( - fxml:get_attr_s(<<"from">>, El#xmlel.attrs)), - To = #jid{} = jid:from_string( - fxml:get_attr_s(<<"to">>, El#xmlel.attrs)), - Stamp = fxml:get_path_s(El, [{elem, <<"delay">>}, - {attr, <<"stamp">>}]), - TS = case jlib:datetime_string_to_timestamp(Stamp) of - {_, _, _} = Now -> - Now; - undefined -> - p1_time_compat:timestamp() - end, - Expire = find_x_expire(TS, El#xmlel.children), - #offline_msg{us = {LUser, LServer}, - from = From, to = To, - timestamp = TS, expire = Expire} - end}]. + Mod = gen_mod:db_mod(LServer, ?MODULE), + Mod:import(LServer). -import(_LServer, mnesia, #offline_msg{} = Msg) -> - mnesia:dirty_write(Msg); -import(_LServer, riak, #offline_msg{us = US, timestamp = TS} = M) -> - ejabberd_riak:put(M, offline_msg_schema(), - [{i, TS}, {'2i', [{<<"us">>, US}]}]); -import(_, _, _) -> - pass. +import(LServer, DBType, Data) -> + Mod = gen_mod:db_mod(DBType, ?MODULE), + Mod:import(LServer, Data). mod_opt_type(access_max_user_messages) -> fun (A) -> A end; diff --git a/src/mod_offline_mnesia.erl b/src/mod_offline_mnesia.erl new file mode 100644 index 000000000..6a1d9e309 --- /dev/null +++ b/src/mod_offline_mnesia.erl @@ -0,0 +1,232 @@ +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov +%%% @copyright (C) 2016, Evgeny Khramtsov +%%% @doc +%%% +%%% @end +%%% Created : 15 Apr 2016 by Evgeny Khramtsov +%%%------------------------------------------------------------------- +-module(mod_offline_mnesia). + +-behaviour(mod_offline). + +-export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1, + remove_old_messages/2, remove_user/2, read_message_headers/2, + read_message/3, remove_message/3, read_all_messages/2, + remove_all_messages/2, count_messages/2, import/2]). + +-include("jlib.hrl"). +-include("mod_offline.hrl"). +-include("logger.hrl"). + +-define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000). + +%%%=================================================================== +%%% API +%%%=================================================================== +init(_Host, _Opts) -> + mnesia:create_table(offline_msg, + [{disc_only_copies, [node()]}, {type, bag}, + {attributes, record_info(fields, offline_msg)}]), + update_table(). + +store_messages(_Host, US, Msgs, Len, MaxOfflineMsgs) -> + F = fun () -> + Count = if MaxOfflineMsgs =/= infinity -> + Len + count_mnesia_records(US); + true -> 0 + end, + if Count > MaxOfflineMsgs -> discard; + true -> + if Len >= (?OFFLINE_TABLE_LOCK_THRESHOLD) -> + mnesia:write_lock_table(offline_msg); + true -> ok + end, + lists:foreach(fun (M) -> mnesia:write(M) end, Msgs) + end + end, + mnesia:transaction(F). + +pop_messages(LUser, LServer) -> + US = {LUser, LServer}, + F = fun () -> + Rs = mnesia:wread({offline_msg, US}), + mnesia:delete({offline_msg, US}), + Rs + end, + case mnesia:transaction(F) of + {atomic, L} -> + {ok, lists:keysort(#offline_msg.timestamp, L)}; + {aborted, Reason} -> + {error, Reason} + end. + +remove_expired_messages(_LServer) -> + TimeStamp = p1_time_compat:timestamp(), + F = fun () -> + mnesia:write_lock_table(offline_msg), + mnesia:foldl(fun (Rec, _Acc) -> + case Rec#offline_msg.expire of + never -> ok; + TS -> + if TS < TimeStamp -> + mnesia:delete_object(Rec); + true -> ok + end + end + end, + ok, offline_msg) + end, + mnesia:transaction(F). + +remove_old_messages(Days, _LServer) -> + S = p1_time_compat:system_time(seconds) - 60 * 60 * 24 * Days, + MegaSecs1 = S div 1000000, + Secs1 = S rem 1000000, + TimeStamp = {MegaSecs1, Secs1, 0}, + F = fun () -> + mnesia:write_lock_table(offline_msg), + mnesia:foldl(fun (#offline_msg{timestamp = TS} = Rec, + _Acc) + when TS < TimeStamp -> + mnesia:delete_object(Rec); + (_Rec, _Acc) -> ok + end, + ok, offline_msg) + end, + mnesia:transaction(F). + +remove_user(LUser, LServer) -> + US = {LUser, LServer}, + F = fun () -> mnesia:delete({offline_msg, US}) end, + mnesia:transaction(F). + +read_message_headers(LUser, LServer) -> + Msgs = mnesia:dirty_read({offline_msg, {LUser, LServer}}), + Hdrs = lists:map( + fun(#offline_msg{from = From, to = To, packet = Pkt, + timestamp = TS}) -> + Seq = now_to_integer(TS), + NewPkt = jlib:add_delay_info(Pkt, LServer, TS, + <<"Offline Storage">>), + {Seq, From, To, NewPkt} + end, Msgs), + lists:keysort(1, Hdrs). + +read_message(LUser, LServer, I) -> + US = {LUser, LServer}, + TS = integer_to_now(I), + case mnesia:dirty_match_object( + offline_msg, #offline_msg{us = US, timestamp = TS, _ = '_'}) of + [Msg|_] -> + {ok, Msg}; + _ -> + error + end. + +remove_message(LUser, LServer, I) -> + US = {LUser, LServer}, + TS = integer_to_now(I), + Msgs = mnesia:dirty_match_object( + offline_msg, #offline_msg{us = US, timestamp = TS, _ = '_'}), + lists:foreach( + fun(Msg) -> + mnesia:dirty_delete_object(Msg) + end, Msgs). + +read_all_messages(LUser, LServer) -> + US = {LUser, LServer}, + lists:keysort(#offline_msg.timestamp, + mnesia:dirty_read({offline_msg, US})). + +remove_all_messages(LUser, LServer) -> + US = {LUser, LServer}, + F = fun () -> + mnesia:write_lock_table(offline_msg), + lists:foreach(fun (Msg) -> mnesia:delete_object(Msg) end, + mnesia:dirty_read({offline_msg, US})) + end, + mnesia:transaction(F). + +count_messages(LUser, LServer) -> + US = {LUser, LServer}, + F = fun () -> + count_mnesia_records(US) + end, + case catch mnesia:async_dirty(F) of + I when is_integer(I) -> I; + _ -> 0 + end. + +import(_LServer, #offline_msg{} = Msg) -> + mnesia:dirty_write(Msg). + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +%% Return the number of records matching a given match expression. +%% This function is intended to be used inside a Mnesia transaction. +%% The count has been written to use the fewest possible memory by +%% getting the record by small increment and by using continuation. +-define(BATCHSIZE, 100). + +count_mnesia_records(US) -> + MatchExpression = #offline_msg{us = US, _ = '_'}, + case mnesia:select(offline_msg, [{MatchExpression, [], [[]]}], + ?BATCHSIZE, read) of + {Result, Cont} -> + Count = length(Result), + count_records_cont(Cont, Count); + '$end_of_table' -> + 0 + end. + +count_records_cont(Cont, Count) -> + case mnesia:select(Cont) of + {Result, Cont} -> + NewCount = Count + length(Result), + count_records_cont(Cont, NewCount); + '$end_of_table' -> + Count + end. + +jid_to_binary(#jid{user = U, server = S, resource = R, + luser = LU, lserver = LS, lresource = LR}) -> + #jid{user = iolist_to_binary(U), + server = iolist_to_binary(S), + resource = iolist_to_binary(R), + luser = iolist_to_binary(LU), + lserver = iolist_to_binary(LS), + lresource = iolist_to_binary(LR)}. + +now_to_integer({MS, S, US}) -> + (MS * 1000000 + S) * 1000000 + US. + +integer_to_now(Int) -> + Secs = Int div 1000000, + USec = Int rem 1000000, + MSec = Secs div 1000000, + Sec = Secs rem 1000000, + {MSec, Sec, USec}. + +update_table() -> + Fields = record_info(fields, offline_msg), + case mnesia:table_info(offline_msg, attributes) of + Fields -> + ejabberd_config:convert_table_to_binary( + offline_msg, Fields, bag, + fun(#offline_msg{us = {U, _}}) -> U end, + fun(#offline_msg{us = {U, S}, + from = From, + to = To, + packet = El} = R) -> + R#offline_msg{us = {iolist_to_binary(U), + iolist_to_binary(S)}, + from = jid_to_binary(From), + to = jid_to_binary(To), + packet = fxml:to_xmlel(El)} + end); + _ -> + ?INFO_MSG("Recreating offline_msg table", []), + mnesia:transform_table(offline_msg, ignore, Fields) + end. diff --git a/src/mod_offline_riak.erl b/src/mod_offline_riak.erl new file mode 100644 index 000000000..217e8f828 --- /dev/null +++ b/src/mod_offline_riak.erl @@ -0,0 +1,153 @@ +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov +%%% @copyright (C) 2016, Evgeny Khramtsov +%%% @doc +%%% +%%% @end +%%% Created : 15 Apr 2016 by Evgeny Khramtsov +%%%------------------------------------------------------------------- +-module(mod_offline_riak). + +-behaviour(mod_offline). + +-export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1, + remove_old_messages/2, remove_user/2, read_message_headers/2, + read_message/3, remove_message/3, read_all_messages/2, + remove_all_messages/2, count_messages/2, import/2]). + +-include("jlib.hrl"). +-include("mod_offline.hrl"). + +%%%=================================================================== +%%% API +%%%=================================================================== +init(_Host, _Opts) -> + ok. + +store_messages(Host, {User, _}, Msgs, Len, MaxOfflineMsgs) -> + Count = if MaxOfflineMsgs =/= infinity -> + Len + count_messages(User, Host); + true -> 0 + end, + if + Count > MaxOfflineMsgs -> + {atomic, discard}; + true -> + try + lists:foreach( + fun(#offline_msg{us = US, + timestamp = TS} = M) -> + ok = ejabberd_riak:put( + M, offline_msg_schema(), + [{i, TS}, {'2i', [{<<"us">>, US}]}]) + end, Msgs), + {atomic, ok} + catch _:{badmatch, Err} -> + {atomic, Err} + end + end. + +pop_messages(LUser, LServer) -> + case ejabberd_riak:get_by_index(offline_msg, offline_msg_schema(), + <<"us">>, {LUser, LServer}) of + {ok, Rs} -> + try + lists:foreach( + fun(#offline_msg{timestamp = T}) -> + ok = ejabberd_riak:delete(offline_msg, T) + end, Rs), + {ok, lists:keysort(#offline_msg.timestamp, Rs)} + catch _:{badmatch, Err} -> + Err + end; + Err -> + Err + end. + +remove_expired_messages(_LServer) -> + %% TODO + {atomic, ok}. + +remove_old_messages(_Days, _LServer) -> + %% TODO + {atomic, ok}. + +remove_user(LUser, LServer) -> + {atomic, ejabberd_riak:delete_by_index(offline_msg, + <<"us">>, {LUser, LServer})}. + +read_message_headers(LUser, LServer) -> + case ejabberd_riak:get_by_index( + offline_msg, offline_msg_schema(), + <<"us">>, {LUser, LServer}) of + {ok, Rs} -> + Hdrs = lists:map( + fun(#offline_msg{from = From, to = To, packet = Pkt, + timestamp = TS}) -> + Seq = now_to_integer(TS), + NewPkt = jlib:add_delay_info( + Pkt, LServer, TS, <<"Offline Storage">>), + {Seq, From, To, NewPkt} + end, Rs), + lists:keysort(1, Hdrs); + _Err -> + [] + end. + +read_message(_LUser, _LServer, I) -> + TS = integer_to_now(I), + case ejabberd_riak:get(offline_msg, offline_msg_schema(), TS) of + {ok, Msg} -> + {ok, Msg}; + _ -> + error + end. + +remove_message(_LUser, _LServer, I) -> + TS = integer_to_now(I), + ejabberd_riak:delete(offline_msg, TS), + ok. + +read_all_messages(LUser, LServer) -> + case ejabberd_riak:get_by_index( + offline_msg, offline_msg_schema(), + <<"us">>, {LUser, LServer}) of + {ok, Rs} -> + lists:keysort(#offline_msg.timestamp, Rs); + _Err -> + [] + end. + +remove_all_messages(LUser, LServer) -> + Res = ejabberd_riak:delete_by_index(offline_msg, + <<"us">>, {LUser, LServer}), + {atomic, Res}. + +count_messages(LUser, LServer) -> + case ejabberd_riak:count_by_index( + offline_msg, <<"us">>, {LUser, LServer}) of + {ok, Res} -> + Res; + _ -> + 0 + end. + +import(_LServer, #offline_msg{us = US, timestamp = TS} = M) -> + ejabberd_riak:put(M, offline_msg_schema(), + [{i, TS}, {'2i', [{<<"us">>, US}]}]). + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +offline_msg_schema() -> + {record_info(fields, offline_msg), #offline_msg{}}. + +now_to_integer({MS, S, US}) -> + (MS * 1000000 + S) * 1000000 + US. + +integer_to_now(Int) -> + Secs = Int div 1000000, + USec = Int rem 1000000, + MSec = Secs div 1000000, + Sec = Secs rem 1000000, + {MSec, Sec, USec}. diff --git a/src/mod_offline_sql.erl b/src/mod_offline_sql.erl new file mode 100644 index 000000000..37b90163d --- /dev/null +++ b/src/mod_offline_sql.erl @@ -0,0 +1,252 @@ +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov +%%% @copyright (C) 2016, Evgeny Khramtsov +%%% @doc +%%% +%%% @end +%%% Created : 15 Apr 2016 by Evgeny Khramtsov +%%%------------------------------------------------------------------- +-module(mod_offline_sql). + +-compile([{parse_transform, ejabberd_sql_pt}]). + +-behaviour(mod_offline). + +-export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1, + remove_old_messages/2, remove_user/2, read_message_headers/2, + read_message/3, remove_message/3, read_all_messages/2, + remove_all_messages/2, count_messages/2, import/1, import/2, + export/1]). + +-include("jlib.hrl"). +-include("mod_offline.hrl"). +-include("logger.hrl"). +-include("ejabberd_sql_pt.hrl"). + +%%%=================================================================== +%%% API +%%%=================================================================== +init(_Host, _Opts) -> + ok. + +store_messages(Host, {User, _Server}, Msgs, Len, MaxOfflineMsgs) -> + Count = if MaxOfflineMsgs =/= infinity -> + Len + count_messages(User, Host); + true -> 0 + end, + if Count > MaxOfflineMsgs -> {atomic, discard}; + true -> + Query = lists:map( + fun(M) -> + Username = + ejabberd_odbc:escape((M#offline_msg.to)#jid.luser), + From = M#offline_msg.from, + To = M#offline_msg.to, + Packet = + jlib:replace_from_to(From, To, + M#offline_msg.packet), + NewPacket = + jlib:add_delay_info(Packet, Host, + M#offline_msg.timestamp, + <<"Offline Storage">>), + XML = + ejabberd_odbc:escape(fxml:element_to_binary(NewPacket)), + odbc_queries:add_spool_sql(Username, XML) + end, + Msgs), + odbc_queries:add_spool(Host, Query) + end. + +pop_messages(LUser, LServer) -> + case odbc_queries:get_and_del_spool_msg_t(LServer, LUser) of + {atomic, {selected, Rs}} -> + {ok, lists:flatmap( + fun({_, XML}) -> + case xml_to_offline_msg(XML) of + {ok, Msg} -> + [Msg]; + _Err -> + [] + end + end, Rs)}; + Err -> + {error, Err} + end. + +remove_expired_messages(_LServer) -> + %% TODO + {atomic, ok}. + +remove_old_messages(Days, LServer) -> + case catch ejabberd_odbc:sql_query( + LServer, + [<<"DELETE FROM spool" + " WHERE created_at < " + "DATE_SUB(CURDATE(), INTERVAL ">>, + integer_to_list(Days), <<" DAY);">>]) of + {updated, N} -> + ?INFO_MSG("~p message(s) deleted from offline spool", [N]); + _Error -> + ?ERROR_MSG("Cannot delete message in offline spool: ~p", [_Error]) + end, + {atomic, ok}. + +remove_user(LUser, LServer) -> + odbc_queries:del_spool_msg(LServer, LUser). + +read_message_headers(LUser, LServer) -> + Username = ejabberd_odbc:escape(LUser), + case catch ejabberd_odbc:sql_query( + LServer, [<<"select xml, seq from spool where username ='">>, + Username, <<"' order by seq;">>]) of + {selected, [<<"xml">>, <<"seq">>], Rows} -> + lists:flatmap( + fun([XML, Seq]) -> + case xml_to_offline_msg(XML) of + {ok, #offline_msg{from = From, + to = To, + packet = El}} -> + Seq0 = binary_to_integer(Seq), + [{Seq0, From, To, El}]; + _ -> + [] + end + end, Rows); + _Err -> + [] + end. + +read_message(LUser, LServer, Seq) -> + Username = ejabberd_odbc:escape(LUser), + SSeq = ejabberd_odbc:escape(integer_to_binary(Seq)), + case ejabberd_odbc:sql_query( + LServer, + [<<"select xml from spool where username='">>, Username, + <<"' and seq='">>, SSeq, <<"';">>]) of + {selected, [<<"xml">>], [[RawXML]|_]} -> + case xml_to_offline_msg(RawXML) of + {ok, Msg} -> + {ok, Msg}; + _ -> + error + end; + _ -> + error + end. + +remove_message(LUser, LServer, Seq) -> + Username = ejabberd_odbc:escape(LUser), + SSeq = ejabberd_odbc:escape(integer_to_binary(Seq)), + ejabberd_odbc:sql_query( + LServer, + [<<"delete from spool where username='">>, Username, + <<"' and seq='">>, SSeq, <<"';">>]), + ok. + +read_all_messages(LUser, LServer) -> + case catch ejabberd_odbc:sql_query( + LServer, + ?SQL("select @(xml)s from spool where " + "username=%(LUser)s order by seq")) of + {selected, Rs} -> + lists:flatmap( + fun({XML}) -> + case xml_to_offline_msg(XML) of + {ok, Msg} -> [Msg]; + _ -> [] + end + end, Rs); + _ -> + [] + end. + +remove_all_messages(LUser, LServer) -> + odbc_queries:del_spool_msg(LServer, LUser), + {atomic, ok}. + +count_messages(LUser, LServer) -> + case catch ejabberd_odbc:sql_query( + LServer, + ?SQL("select @(count(*))d from spool " + "where username=%(LUser)s")) of + {selected, [{Res}]} -> + Res; + _ -> 0 + end. + +export(_Server) -> + [{offline_msg, + fun(Host, #offline_msg{us = {LUser, LServer}, + timestamp = TimeStamp, from = From, to = To, + packet = Packet}) + when LServer == Host -> + Username = ejabberd_odbc:escape(LUser), + Packet1 = jlib:replace_from_to(From, To, Packet), + Packet2 = jlib:add_delay_info(Packet1, LServer, TimeStamp, + <<"Offline Storage">>), + XML = ejabberd_odbc:escape(fxml:element_to_binary(Packet2)), + [[<<"delete from spool where username='">>, Username, <<"';">>], + [<<"insert into spool(username, xml) values ('">>, + Username, <<"', '">>, XML, <<"');">>]]; + (_Host, _R) -> + [] + end}]. + +import(LServer) -> + [{<<"select username, xml from spool;">>, + fun([LUser, XML]) -> + El = #xmlel{} = fxml_stream:parse_element(XML), + From = #jid{} = jid:from_string( + fxml:get_attr_s(<<"from">>, El#xmlel.attrs)), + To = #jid{} = jid:from_string( + fxml:get_attr_s(<<"to">>, El#xmlel.attrs)), + Stamp = fxml:get_path_s(El, [{elem, <<"delay">>}, + {attr, <<"stamp">>}]), + TS = case jlib:datetime_string_to_timestamp(Stamp) of + {_, _, _} = Now -> + Now; + undefined -> + p1_time_compat:timestamp() + end, + Expire = mod_offline:find_x_expire(TS, El#xmlel.children), + #offline_msg{us = {LUser, LServer}, + from = From, to = To, + packet = El, + timestamp = TS, expire = Expire} + end}]. + +import(_, _) -> + pass. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +xml_to_offline_msg(XML) -> + case fxml_stream:parse_element(XML) of + #xmlel{} = El -> + el_to_offline_msg(El); + Err -> + ?ERROR_MSG("got ~p when parsing XML packet ~s", + [Err, XML]), + Err + end. + +el_to_offline_msg(El) -> + To_s = fxml:get_tag_attr_s(<<"to">>, El), + From_s = fxml:get_tag_attr_s(<<"from">>, El), + To = jid:from_string(To_s), + From = jid:from_string(From_s), + if To == error -> + ?ERROR_MSG("failed to get 'to' JID from offline XML ~p", [El]), + {error, bad_jid_to}; + From == error -> + ?ERROR_MSG("failed to get 'from' JID from offline XML ~p", [El]), + {error, bad_jid_from}; + true -> + {ok, #offline_msg{us = {To#jid.luser, To#jid.lserver}, + from = From, + to = To, + timestamp = undefined, + expire = undefined, + packet = El}} + end.