%%%------------------------------------------------------------------- %%% File : mod_offline_sql.erl %%% Author : Evgeny Khramtsov %%% Created : 15 Apr 2016 by Evgeny Khramtsov %%% %%% %%% ejabberd, Copyright (C) 2002-2024 ProcessOne %%% %%% This program is free software; you can redistribute it and/or %%% modify it under the terms of the GNU General Public License as %%% published by the Free Software Foundation; either version 2 of the %%% License, or (at your option) any later version. %%% %%% This program is distributed in the hope that it will be useful, %%% but WITHOUT ANY WARRANTY; without even the implied warranty of %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU %%% General Public License for more details. %%% %%% You should have received a copy of the GNU General Public License along %%% with this program; if not, write to the Free Software Foundation, Inc., %%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. %%% %%%---------------------------------------------------------------------- -module(mod_offline_sql). -behaviour(mod_offline). -export([init/2, store_message/1, pop_messages/2, remove_expired_messages/1, remove_old_messages/2, remove_user/2, read_message_headers/2, read_message/3, remove_message/3, read_all_messages/2, remove_all_messages/2, count_messages/2, import/1, export/1, remove_old_messages_batch/3]). -export([sql_schemas/0]). -include_lib("xmpp/include/xmpp.hrl"). -include("mod_offline.hrl"). -include("logger.hrl"). -include("ejabberd_sql_pt.hrl"). %%%=================================================================== %%% API %%%=================================================================== init(Host, _Opts) -> ejabberd_sql_schema:update_schema(Host, ?MODULE, sql_schemas()), ok. sql_schemas() -> [#sql_schema{ version = 1, tables = [#sql_table{ name = <<"spool">>, columns = [#sql_column{name = <<"username">>, type = text}, #sql_column{name = <<"server_host">>, type = text}, #sql_column{name = <<"xml">>, type = {text, big}}, #sql_column{name = <<"seq">>, type = bigserial}, #sql_column{name = <<"created_at">>, type = timestamp, default = true}], indices = [#sql_index{ columns = [<<"server_host">>, <<"username">>]}, #sql_index{ columns = [<<"created_at">>]}]}]}]. store_message(#offline_msg{us = {LUser, LServer}} = M) -> From = M#offline_msg.from, To = M#offline_msg.to, Packet = xmpp:set_from_to(M#offline_msg.packet, From, To), NewPacket = misc:add_delay_info( Packet, jid:make(LServer), M#offline_msg.timestamp, <<"Offline Storage">>), XML = fxml:element_to_binary( xmpp:encode(NewPacket)), case ejabberd_sql:sql_query( LServer, ?SQL_INSERT( "spool", ["username=%(LUser)s", "server_host=%(LServer)s", "xml=%(XML)s"])) of {updated, _} -> ok; _ -> {error, db_failure} end. pop_messages(LUser, LServer) -> case get_and_del_spool_msg_t(LServer, LUser) of {atomic, {selected, Rs}} -> {ok, lists:flatmap( fun({_, XML}) -> case xml_to_offline_msg(XML) of {ok, Msg} -> [Msg]; _Err -> [] end end, Rs)}; Err -> {error, Err} end. remove_expired_messages(_LServer) -> %% TODO {atomic, ok}. remove_old_messages(Days, LServer) -> case ejabberd_sql:sql_query( LServer, fun(pgsql, _) -> ejabberd_sql:sql_query_t( ?SQL("DELETE FROM spool" " WHERE created_at <" " NOW() - %(Days)d * INTERVAL '1 DAY'")); (sqlite, _) -> ejabberd_sql:sql_query_t( ?SQL("DELETE FROM spool" " WHERE created_at <" " DATETIME('now', '-%(Days)d days')")); (_, _) -> ejabberd_sql:sql_query_t( ?SQL("DELETE FROM spool" " WHERE created_at < NOW() - INTERVAL %(Days)d DAY")) end) of {updated, N} -> ?INFO_MSG("~p message(s) deleted from offline spool", [N]); Error -> ?ERROR_MSG("Cannot delete message in offline spool: ~p", [Error]) end, {atomic, ok}. remove_old_messages_batch(LServer, Days, Batch) -> case ejabberd_sql:sql_query( LServer, fun(pgsql, _) -> ejabberd_sql:sql_query_t( ?SQL("DELETE FROM spool" " WHERE created_at <" " NOW() - %(Days)d * INTERVAL '1 DAY' LIMIT %(Batch)d")); (sqlite, _) -> ejabberd_sql:sql_query_t( ?SQL("DELETE FROM spool" " WHERE created_at <" " DATETIME('now', '-%(Days)d days') LIMIT %(Batch)d")); (_, _) -> ejabberd_sql:sql_query_t( ?SQL("DELETE FROM spool" " WHERE created_at < NOW() - INTERVAL %(Days)d DAY LIMIT %(Batch)d")) end) of {updated, N} -> {ok, N}; Error -> {error, Error} end. remove_user(LUser, LServer) -> ejabberd_sql:sql_query( LServer, ?SQL("delete from spool where username=%(LUser)s and %(LServer)H")). read_message_headers(LUser, LServer) -> case ejabberd_sql:sql_query( LServer, ?SQL("select @(xml)s, @(seq)d from spool" " where username=%(LUser)s and %(LServer)H order by seq")) of {selected, Rows} -> lists:flatmap( fun({XML, Seq}) -> case xml_to_offline_msg(XML) of {ok, #offline_msg{from = From, to = To, timestamp = TS, packet = El}} -> [{Seq, From, To, TS, El}]; _ -> [] end end, Rows); _Err -> error end. read_message(LUser, LServer, Seq) -> case ejabberd_sql:sql_query( LServer, ?SQL("select @(xml)s from spool where username=%(LUser)s" " and %(LServer)H" " and seq=%(Seq)d")) of {selected, [{RawXML}|_]} -> case xml_to_offline_msg(RawXML) of {ok, Msg} -> {ok, Msg}; _ -> error end; _ -> error end. remove_message(LUser, LServer, Seq) -> ejabberd_sql:sql_query( LServer, ?SQL("delete from spool where username=%(LUser)s and %(LServer)H" " and seq=%(Seq)d")), ok. read_all_messages(LUser, LServer) -> case ejabberd_sql:sql_query( LServer, ?SQL("select @(xml)s from spool where " "username=%(LUser)s and %(LServer)H order by seq")) of {selected, Rs} -> lists:flatmap( fun({XML}) -> case xml_to_offline_msg(XML) of {ok, Msg} -> [Msg]; _ -> [] end end, Rs); _ -> [] end. remove_all_messages(LUser, LServer) -> remove_user(LUser, LServer), {atomic, ok}. count_messages(LUser, LServer) -> case catch ejabberd_sql:sql_query( LServer, ?SQL("select @(count(*))d from spool " "where username=%(LUser)s and %(LServer)H")) of {selected, [{Res}]} -> {cache, Res}; {selected, []} -> {cache, 0}; _ -> {nocache, 0} end. export(_Server) -> [{offline_msg, fun(Host, #offline_msg{us = {LUser, LServer}}) when LServer == Host -> [?SQL("delete from spool where username=%(LUser)s" " and %(LServer)H;")]; (_Host, _R) -> [] end}, {offline_msg, fun(Host, #offline_msg{us = {LUser, LServer}, timestamp = TimeStamp, from = From, to = To, packet = El}) when LServer == Host -> try xmpp:decode(El, ?NS_CLIENT, [ignore_els]) of Packet -> Packet1 = xmpp:set_from_to(Packet, From, To), Packet2 = misc:add_delay_info( Packet1, jid:make(LServer), TimeStamp, <<"Offline Storage">>), XML = fxml:element_to_binary(xmpp:encode(Packet2)), [?SQL_INSERT( "spool", ["username=%(LUser)s", "server_host=%(LServer)s", "xml=%(XML)s"])] catch _:{xmpp_codec, Why} -> ?ERROR_MSG("Failed to decode packet ~p of user ~ts@~ts: ~ts", [El, LUser, LServer, xmpp:format_error(Why)]), [] end; (_Host, _R) -> [] end}]. import(_) -> ok. %%%=================================================================== %%% Internal functions %%%=================================================================== xml_to_offline_msg(XML) -> case fxml_stream:parse_element(XML) of #xmlel{} = El -> el_to_offline_msg(El); Err -> ?ERROR_MSG("Got ~p when parsing XML packet ~ts", [Err, XML]), Err end. el_to_offline_msg(El) -> To_s = fxml:get_tag_attr_s(<<"to">>, El), From_s = fxml:get_tag_attr_s(<<"from">>, El), try To = jid:decode(To_s), From = jid:decode(From_s), {ok, #offline_msg{us = {To#jid.luser, To#jid.lserver}, from = From, to = To, packet = El}} catch _:{bad_jid, To_s} -> ?ERROR_MSG("Failed to get 'to' JID from offline XML ~p", [El]), {error, bad_jid_to}; _:{bad_jid, From_s} -> ?ERROR_MSG("Failed to get 'from' JID from offline XML ~p", [El]), {error, bad_jid_from} end. get_and_del_spool_msg_t(LServer, LUser) -> F = fun () -> Result = ejabberd_sql:sql_query_t( ?SQL("select @(username)s, @(xml)s from spool where " "username=%(LUser)s and %(LServer)H order by seq;")), DResult = ejabberd_sql:sql_query_t( ?SQL("delete from spool where" " username=%(LUser)s and %(LServer)H;")), case {Result, DResult} of {{selected, Rs}, {updated, DC}} when length(Rs) /= DC -> ejabberd_sql:restart(concurent_insert); _ -> Result end end, ejabberd_sql:sql_transaction(LServer, F).