diff --git a/sql/lite.sql b/sql/lite.sql index 1cc0c4dc5..0789ca45a 100644 --- a/sql/lite.sql +++ b/sql/lite.sql @@ -375,3 +375,14 @@ CREATE TABLE proxy65 ( CREATE UNIQUE INDEX i_proxy65_sid ON proxy65 (sid); CREATE INDEX i_proxy65_jid ON proxy65 (jid_i); + +CREATE TABLE push_session ( + username text NOT NULL, + timestamp bigint NOT NULL, + service text NOT NULL, + node text NOT NULL, + xml text NOT NULL +); + +CREATE UNIQUE INDEX i_push_usn ON push_session (username, service, node); +CREATE UNIQUE INDEX i_push_ut ON push_session (username, timestamp); diff --git a/sql/mssql.sql b/sql/mssql.sql index 607acae8f..6f309487a 100644 --- a/sql/mssql.sql +++ b/sql/mssql.sql @@ -541,3 +541,17 @@ WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW CREATE INDEX [carboncopy_user] ON [carboncopy] (username) WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON); + +CREATE TABLE [dbo].[push_session] ( + [username] [varchar] (255) NOT NULL, + [timestamp] [bigint] NOT NULL, + [service] [varchar] (255) NOT NULL, + [node] [varchar] (255) NOT NULL, + [xml] [varchar] (255) NOT NULL +); + +CREATE UNIQUE CLUSTERED INDEX [i_push_usn] ON [push_session] (username, service, node) +WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON); + +CREATE UNIQUE CLUSTERED INDEX [i_push_ut] ON [push_session] (username, timestamp) +WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON); diff --git a/sql/mysql.sql b/sql/mysql.sql index e2586853b..4fd70f382 100644 --- a/sql/mysql.sql +++ b/sql/mysql.sql @@ -391,3 +391,14 @@ CREATE TABLE proxy65 ( CREATE UNIQUE INDEX i_proxy65_sid ON proxy65 (sid(191)); CREATE INDEX i_proxy65_jid ON proxy65 (jid_i(191)); + +CREATE TABLE push_session ( + username text NOT NULL, + timestamp bigint NOT NULL, + service text NOT NULL, + node text NOT NULL, + xml text NOT NULL +); + +CREATE UNIQUE INDEX i_push_usn ON push_session (username(191), service(191), node(191)); +CREATE UNIQUE INDEX i_push_ut ON push_session (username(191), timestamp); diff --git a/sql/pg.sql b/sql/pg.sql index f761e68da..e26b1111f 100644 --- a/sql/pg.sql +++ b/sql/pg.sql @@ -395,3 +395,14 @@ CREATE TABLE proxy65 ( CREATE UNIQUE INDEX i_proxy65_sid ON proxy65 USING btree (sid); CREATE INDEX i_proxy65_jid ON proxy65 USING btree (jid_i); + +CREATE TABLE push_session ( + username text NOT NULL, + timestamp bigint NOT NULL, + service text NOT NULL, + node text NOT NULL, + xml text NOT NULL +); + +CREATE UNIQUE INDEX i_push_usn ON push_session USING btree (username, service, node); +CREATE UNIQUE INDEX i_push_ut ON push_session USING btree (username, timestamp); diff --git a/src/mod_push.erl b/src/mod_push.erl index 2ca0bf525..a57062eb9 100644 --- a/src/mod_push.erl +++ b/src/mod_push.erl @@ -61,21 +61,21 @@ -> any(). -callback store_session(binary(), binary(), timestamp(), jid(), binary(), xdata()) - -> {ok, push_session()} | error. + -> {ok, push_session()} | {error, any()}. -callback lookup_session(binary(), binary(), jid(), binary()) - -> {ok, push_session()} | error. + -> {ok, push_session()} | error | {error, any()}. -callback lookup_session(binary(), binary(), timestamp()) - -> {ok, push_session()} | error. + -> {ok, push_session()} | error | {error, any()}. -callback lookup_sessions(binary(), binary(), jid()) - -> {ok, [push_session()]} | error. + -> {ok, [push_session()]} | {error, any()}. -callback lookup_sessions(binary(), binary()) - -> {ok, [push_session()]} | error. + -> {ok, [push_session()]} | {error, any()}. -callback lookup_sessions(binary()) - -> {ok, [push_session()]} | error. + -> {ok, [push_session()]} | {error, any()}. -callback delete_session(binary(), binary(), timestamp()) - -> ok | error. + -> ok | {error, any()}. -callback delete_old_sessions(binary() | global, erlang:timestamp()) - -> any(). + -> ok | {error, any()}. -callback use_cache(binary()) -> boolean(). -callback cache_nodes(binary()) @@ -253,29 +253,39 @@ process_iq(#iq{lang = Lang, sub_els = [#push_enable{node = <<>>}]} = IQ) -> xmpp:make_error(IQ, xmpp:err_feature_not_implemented(Txt, Lang)); process_iq(#iq{from = #jid{lserver = LServer} = JID, to = #jid{lserver = LServer}, + lang = Lang, sub_els = [#push_enable{jid = PushJID, node = Node, xdata = XData}]} = IQ) -> case enable(JID, PushJID, Node, XData) of ok -> xmpp:make_iq_result(IQ); - error -> - xmpp:make_error(IQ, xmpp:err_internal_server_error()) + {error, db_failure} -> + Txt = <<"Database, failure">>, + xmpp:make_error(IQ, xmpp:err_internal_server_error(Txt, Lang)); + {error, notfound} -> + Txt = <<"User session not found">>, + xmpp:make_error(IQ, xmpp:err_item_not_found(Txt, Lang)) end; process_iq(#iq{from = #jid{lserver = LServer} = JID, to = #jid{lserver = LServer}, + lang = Lang, sub_els = [#push_disable{jid = PushJID, node = Node}]} = IQ) -> case disable(JID, PushJID, Node) of ok -> xmpp:make_iq_result(IQ); - error -> - xmpp:make_error(IQ, xmpp:err_item_not_found()) + {error, db_failure} -> + Txt = <<"Database, failure">>, + xmpp:make_error(IQ, xmpp:err_internal_server_error(Txt, Lang)); + {error, notfound} -> + Txt = <<"Push record not found">>, + xmpp:make_error(IQ, xmpp:err_item_not_found(Txt, Lang)) end; process_iq(IQ) -> xmpp:make_error(IQ, xmpp:err_not_allowed()). --spec enable(jid(), jid(), binary(), xdata()) -> ok | error. +-spec enable(jid(), jid(), binary(), xdata()) -> ok | {error, notfound | db_failure}. enable(#jid{luser = LUser, lserver = LServer, lresource = LResource} = JID, PushJID, Node, XData) -> case ejabberd_sm:get_session_sid(LUser, LServer, LResource) of @@ -285,18 +295,18 @@ enable(#jid{luser = LUser, lserver = LServer, lresource = LResource} = JID, ?INFO_MSG("Enabling push notifications for ~s", [jid:encode(JID)]), ejabberd_c2s:cast(PID, push_enable); - error -> + {error, _} -> ?ERROR_MSG("Cannot enable push for ~s: database error", [jid:encode(JID)]), - error + {error, db_failure} end; none -> ?WARNING_MSG("Cannot enable push for ~s: session not found", [jid:encode(JID)]), - error + {error, notfound} end. --spec disable(jid(), jid(), binary() | undefined) -> ok | error. +-spec disable(jid(), jid(), binary() | undefined) -> ok | {error, notfound | db_failure}. disable(#jid{luser = LUser, lserver = LServer, lresource = LResource} = JID, PushJID, Node) -> case ejabberd_sm:get_session_sid(LUser, LServer, LResource) of @@ -308,7 +318,7 @@ disable(#jid{luser = LUser, lserver = LServer, lresource = LResource} = JID, ?WARNING_MSG("Session not found while disabling push for ~s", [jid:encode(JID)]) end, - if Node /= undefined -> + if Node /= <<>> -> delete_session(LUser, LServer, PushJID, Node); true -> delete_sessions(LUser, LServer, PushJID) @@ -388,7 +398,7 @@ c2s_handle_cast(State, push_disable) -> c2s_handle_cast(State, _Msg) -> State. --spec remove_user(binary(), binary()) -> ok | error. +-spec remove_user(binary(), binary()) -> ok | {error, any()}. remove_user(LUser, LServer) -> ?INFO_MSG("Removing any push sessions of ~s@~s", [LUser, LServer]), Mod = gen_mod:db_mod(LServer, ?MODULE), @@ -403,7 +413,7 @@ notify(#{jid := #jid{luser = LUser, lserver = LServer}, sid := {TS, _}}) -> case lookup_session(LUser, LServer, TS) of {ok, Client} -> notify(LUser, LServer, [Client]); - error -> + _Err -> ok end. @@ -440,7 +450,7 @@ notify(LServer, PushLJID, Node, XData, HandleResponse) -> %% Internal functions. %%-------------------------------------------------------------------- -spec store_session(binary(), binary(), timestamp(), jid(), binary(), xdata()) - -> {ok, push_session()} | error. + -> {ok, push_session()} | {error, any()}. store_session(LUser, LServer, TS, PushJID, Node, XData) -> Mod = gen_mod:db_mod(LServer, ?MODULE), delete_session(LUser, LServer, PushJID, Node), @@ -460,7 +470,7 @@ store_session(LUser, LServer, TS, PushJID, Node, XData) -> end. -spec lookup_session(binary(), binary(), timestamp()) - -> {ok, push_session()} | error. + -> {ok, push_session()} | error | {error, any()}. lookup_session(LUser, LServer, TS) -> Mod = gen_mod:db_mod(LServer, ?MODULE), case use_cache(Mod, LServer) of @@ -472,7 +482,7 @@ lookup_session(LUser, LServer, TS) -> Mod:lookup_session(LUser, LServer, TS) end. --spec lookup_sessions(binary(), binary()) -> {ok, [push_session()]} | error. +-spec lookup_sessions(binary(), binary()) -> {ok, [push_session()]} | {error, any()}. lookup_sessions(LUser, LServer) -> Mod = gen_mod:db_mod(LServer, ?MODULE), case use_cache(Mod, LServer) of @@ -484,40 +494,48 @@ lookup_sessions(LUser, LServer) -> Mod:lookup_sessions(LUser, LServer) end. --spec delete_session(binary(), binary(), timestamp()) -> ok | error. +-spec delete_session(binary(), binary(), timestamp()) -> ok | {error, db_failure}. delete_session(LUser, LServer, TS) -> Mod = gen_mod:db_mod(LServer, ?MODULE), - ok = Mod:delete_session(LUser, LServer, TS), - case use_cache(Mod, LServer) of - true -> - ets_cache:delete(?PUSH_CACHE, {LUser, LServer}, - cache_nodes(Mod, LServer)), - ets_cache:delete(?PUSH_CACHE, {LUser, LServer, TS}, - cache_nodes(Mod, LServer)); - false -> - ok + case Mod:delete_session(LUser, LServer, TS) of + ok -> + case use_cache(Mod, LServer) of + true -> + ets_cache:delete(?PUSH_CACHE, {LUser, LServer}, + cache_nodes(Mod, LServer)), + ets_cache:delete(?PUSH_CACHE, {LUser, LServer, TS}, + cache_nodes(Mod, LServer)); + false -> + ok + end; + {error, _} -> + {error, db_failure} end. --spec delete_session(binary(), binary(), jid(), binary()) -> ok | error. +-spec delete_session(binary(), binary(), jid(), binary()) -> ok | {error, notfound | db_failure}. delete_session(LUser, LServer, PushJID, Node) -> Mod = gen_mod:db_mod(LServer, ?MODULE), case Mod:lookup_session(LUser, LServer, PushJID, Node) of {ok, {TS, _, _, _}} -> delete_session(LUser, LServer, TS); error -> - error + {error, notfound}; + {error, _} -> + {error, db_failure} end. --spec delete_sessions(binary(), binary(), jid()) -> ok | error. +-spec delete_sessions(binary(), binary(), jid()) -> ok | {error, notfound | db_failure}. delete_sessions(LUser, LServer, PushJID) -> Mod = gen_mod:db_mod(LServer, ?MODULE), LookupFun = fun() -> Mod:lookup_sessions(LUser, LServer, PushJID) end, delete_sessions(LUser, LServer, LookupFun, Mod). --spec delete_sessions(binary(), binary(), fun(() -> ok | error), module()) - -> ok | error. +-spec delete_sessions(binary(), binary(), fun(() -> any()), module()) + -> ok | {error, _}. delete_sessions(LUser, LServer, LookupFun, Mod) -> case LookupFun() of + {ok, []} -> + {error, notfound}; {ok, Clients} -> case use_cache(Mod, LServer) of true -> @@ -538,8 +556,8 @@ delete_sessions(LUser, LServer, LookupFun, Mod) -> ok end end, Clients); - error -> - error + {error, _} -> + {error, db_failure} end. -spec drop_online_sessions(binary(), binary(), [push_session()]) diff --git a/src/mod_push_mnesia.erl b/src/mod_push_mnesia.erl index 309ff80e3..ea707dbf6 100644 --- a/src/mod_push_mnesia.erl +++ b/src/mod_push_mnesia.erl @@ -75,7 +75,7 @@ store_session(LUser, LServer, TS, PushJID, Node, XData) -> {aborted, E} -> ?ERROR_MSG("Cannot store push session for ~s@~s: ~p", [LUser, LServer, E]), - error + {error, db_failure} end. lookup_session(LUser, LServer, PushJID, Node) -> @@ -91,7 +91,7 @@ lookup_session(LUser, LServer, PushJID, Node) -> case mnesia:dirty_select(push_session, MatchSpec) of [#push_session{timestamp = TS, xdata = XData}] -> {ok, {TS, PushLJID, Node, XData}}; - _ -> + [] -> ?DEBUG("No push session found for ~s@~s (~p, ~s)", [LUser, LServer, PushJID, Node]), error @@ -108,7 +108,7 @@ lookup_session(LUser, LServer, TS) -> case mnesia:dirty_select(push_session, MatchSpec) of [#push_session{service = PushLJID, node = Node, xdata = XData}] -> {ok, {TS, PushLJID, Node, XData}}; - _ -> + [] -> ?DEBUG("No push session found for ~s@~s (~p)", [LUser, LServer, TS]), error @@ -164,7 +164,7 @@ delete_session(LUser, LServer, TS) -> {aborted, E} -> ?ERROR_MSG("Cannot delete push session of ~s@~s: ~p", [LUser, LServer, E]), - error + {error, db_failure} end. delete_old_sessions(_LServer, Time) -> @@ -181,7 +181,7 @@ delete_old_sessions(_LServer, Time) -> ok; {aborted, E} -> ?ERROR_MSG("Cannot delete old push sessions: ~p", [E]), - error + {error, db_failure} end. %%-------------------------------------------------------------------- diff --git a/src/mod_push_sql.erl b/src/mod_push_sql.erl new file mode 100644 index 000000000..866d51ed4 --- /dev/null +++ b/src/mod_push_sql.erl @@ -0,0 +1,196 @@ +%%%---------------------------------------------------------------------- +%%% ejabberd, Copyright (C) 2017 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- +-module(mod_push_sql). +-behaviour(mod_push). +-compile([{parse_transform, ejabberd_sql_pt}]). + +%% API +-export([init/2, store_session/6, lookup_session/4, lookup_session/3, + lookup_sessions/3, lookup_sessions/2, lookup_sessions/1, + delete_session/3, delete_old_sessions/2]). + +-include("xmpp.hrl"). +-include("logger.hrl"). +-include("ejabberd_sql_pt.hrl"). + +%%%=================================================================== +%%% API +%%%=================================================================== +init(_Host, _Opts) -> + ok. + +store_session(LUser, LServer, NowTS, PushJID, Node, XData) -> + XML = case XData of + undefined -> <<>>; + _ -> fxml:element_to_binary(xmpp:encode(XData)) + end, + TS = misc:now_to_usec(NowTS), + PushLJID = jid:tolower(PushJID), + Service = jid:encode(PushLJID), + case ?SQL_UPSERT(LServer, "push_session", + ["!username=%(LUser)s", + "!timestamp=%(TS)d", + "!service=%(Service)s", + "!node=%(Node)s", + "xml=%(XML)s"]) of + ok -> + {ok, {NowTS, PushLJID, Node, XData}}; + Err -> + ?ERROR_MSG("Failed to update 'push_session' table: ~p", [Err]), + {error, db_failure} + end. + +lookup_session(LUser, LServer, PushJID, Node) -> + PushLJID = jid:tolower(PushJID), + Service = jid:encode(PushLJID), + case ejabberd_sql:sql_query( + LServer, + ?SQL("select @(timestamp)d, @(xml)s from push_session " + "where username=%(LUser)s and service=%(Service)s " + "and node=%(Node)s")) of + {selected, [{TS, XML}]} -> + NowTS = misc:usec_to_now(TS), + XData = decode_xdata(XML, LUser, LServer), + {ok, {NowTS, PushLJID, Node, XData}}; + {selected, []} -> + error; + Err -> + ?ERROR_MSG("Failed to select from 'push_session' table: ~p", [Err]), + {error, db_failure} + end. + +lookup_session(LUser, LServer, NowTS) -> + TS = misc:now_to_usec(NowTS), + case ejabberd_sql:sql_query( + LServer, + ?SQL("select @(service)s, @(node)s, @(xml)s " + "from push_session where username=%(LUser)s " + "and timestamp=%(TS)d")) of + {selected, [{Service, Node, XML}]} -> + PushLJID = jid:tolower(jid:decode(Service)), + XData = decode_xdata(XML, LUser, LServer), + {ok, {NowTS, PushLJID, Node, XData}}; + {selected, []} -> + error; + Err -> + ?ERROR_MSG("Failed to select from 'push_session' table: ~p", [Err]), + {error, db_failure} + end. + +lookup_sessions(LUser, LServer, PushJID) -> + PushLJID = jid:tolower(PushJID), + Service = jid:encode(PushLJID), + case ejabberd_sql:sql_query( + LServer, + ?SQL("select @(timestamp)d, @(xml)s, @(node)s from push_session " + "where username=%(LUser)s and service=%(Service)s")) of + {selected, Rows} -> + {ok, lists:map( + fun({TS, XML, Node}) -> + NowTS = misc:usec_to_now(TS), + XData = decode_xdata(XML, LUser, LServer), + {NowTS, PushLJID, Node, XData} + end, Rows)}; + Err -> + ?ERROR_MSG("Failed to select from 'push_session' table: ~p", [Err]), + {error, db_failure} + end. + +lookup_sessions(LUser, LServer) -> + case ejabberd_sql:sql_query( + LServer, + ?SQL("select @(timestamp)d, @(xml)s, @(node)s, @(service)s " + "from push_session where username=%(LUser)s")) of + {selected, Rows} -> + {ok, lists:map( + fun({TS, XML, Node, Service}) -> + NowTS = misc:usec_to_now(TS), + XData = decode_xdata(XML, LUser, LServer), + PushLJID = jid:tolower(jid:decode(Service)), + {NowTS, PushLJID,Node, XData} + end, Rows)}; + Err -> + ?ERROR_MSG("Failed to select from 'push_session' table: ~p", [Err]), + {error, db_failure} + end. + +lookup_sessions(LServer) -> + case ejabberd_sql:sql_query( + LServer, + ?SQL("select @(username)s, @(timestamp)d, @(xml)s, " + "@(node)s, @(service)s from push_session")) of + {selected, Rows} -> + {ok, lists:map( + fun({LUser, TS, XML, Node, Service}) -> + NowTS = misc:usec_to_now(TS), + XData = decode_xdata(XML, LUser, LServer), + PushLJID = jid:tolower(jid:decode(Service)), + {NowTS, PushLJID, Node, XData} + end, Rows)}; + Err -> + ?ERROR_MSG("Failed to select from 'push_session' table: ~p", [Err]), + {error, db_failure} + end. + +delete_session(LUser, LServer, NowTS) -> + TS = misc:now_to_usec(NowTS), + case ejabberd_sql:sql_query( + LServer, + ?SQL("delete from push_session where " + "username=%(LUser)s and timestamp=%(TS)d")) of + {updated, _} -> + ok; + Err -> + ?ERROR_MSG("failed to delete from 'push_session' table: ~p", [Err]), + {error, db_failure} + end. + +delete_old_sessions(LServer, Time) -> + TS = misc:now_to_usec(Time), + case ejabberd_sql:sql_query( + LServer, + ?SQL("delete from push_session where timestamp<%(TS)d")) of + {updated, _} -> + ok; + Err -> + ?ERROR_MSG("failed to delete from 'push_session' table: ~p", [Err]), + {error, db_failure} + end. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +decode_xdata(<<>>, _LUser, _LServer) -> + undefined; +decode_xdata(XML, LUser, LServer) -> + case fxml_stream:parse_element(XML) of + #xmlel{} = El -> + try xmpp:decode(El) + catch _:{xmpp_codec, Why} -> + ?ERROR_MSG("Failed to decode ~s for user ~s@~s " + "from table 'push_session': ~s", + [XML, LUser, LServer, xmpp:format_error(Why)]), + undefined + end; + Err -> + ?ERROR_MSG("Failed to decode ~s for user ~s@~s from " + "table 'push_session': ~p", + [XML, LUser, LServer, Err]), + undefined + end. diff --git a/test/ejabberd_SUITE.erl b/test/ejabberd_SUITE.erl index 97c56159a..0479c593f 100644 --- a/test/ejabberd_SUITE.erl +++ b/test/ejabberd_SUITE.erl @@ -460,6 +460,7 @@ db_tests(_) -> muc_tests:single_cases(), offline_tests:single_cases(), mam_tests:single_cases(), + push_tests:single_cases(), test_unregister]}, muc_tests:master_slave_cases(), privacy_tests:master_slave_cases(), @@ -469,7 +470,8 @@ db_tests(_) -> mam_tests:master_slave_cases(), vcard_tests:master_slave_cases(), announce_tests:master_slave_cases(), - carbons_tests:master_slave_cases()]. + carbons_tests:master_slave_cases(), + push_tests:master_slave_cases()]. ldap_tests() -> [{ldap_tests, [sequence], diff --git a/test/ejabberd_SUITE_data/ejabberd.yml b/test/ejabberd_SUITE_data/ejabberd.yml index a648cb422..b73d6a67f 100644 --- a/test/ejabberd_SUITE_data/ejabberd.yml +++ b/test/ejabberd_SUITE_data/ejabberd.yml @@ -52,7 +52,12 @@ host_config: mod_disco: [] mod_ping: [] mod_proxy65: [] + mod_push: + db_type: sql + mod_push_keepalive: [] mod_s2s_dialback: [] + mod_stream_mgmt: + resume_timeout: 3 mod_legacy_auth: [] mod_register: welcome_message: @@ -110,7 +115,12 @@ Welcome to this XMPP server." mod_disco: [] mod_ping: [] mod_proxy65: [] + mod_push: + db_type: sql + mod_push_keepalive: [] mod_s2s_dialback: [] + mod_stream_mgmt: + resume_timeout: 3 mod_legacy_auth: [] mod_register: welcome_message: @@ -173,7 +183,12 @@ Welcome to this XMPP server." mod_disco: [] mod_ping: [] mod_proxy65: [] + mod_push: + db_type: sql + mod_push_keepalive: [] mod_s2s_dialback: [] + mod_stream_mgmt: + resume_timeout: 3 mod_legacy_auth: [] mod_register: welcome_message: