From d5e4da54cfde1cf5b7876a192d9fe3b952eb345f Mon Sep 17 00:00:00 2001 From: Evgeny Khramtsov Date: Wed, 5 Dec 2018 13:14:29 +0300 Subject: [PATCH] Update MIX code to reflect newest specification Note that support for older specification is completely dropped, i.e. no backward compatibility is provided since the XEPs are still very experimental and being changed drastically --- rebar.config | 2 +- sql/lite.new.sql | 54 +++ sql/lite.sql | 53 +++ sql/mysql.new.sql | 54 +++ sql/mysql.sql | 53 +++ sql/pg.new.sql | 54 +++ sql/pg.sql | 53 +++ src/ejabberd_sm.erl | 52 ++- src/mod_mam.erl | 60 +-- src/mod_mam_sql.erl | 2 +- src/mod_mix.erl | 801 ++++++++++++++++++++++++++----------- src/mod_mix_mnesia.erl | 189 +++++++++ src/mod_mix_pam.erl | 365 +++++++++++++++++ src/mod_mix_pam_mnesia.erl | 91 +++++ src/mod_mix_pam_sql.erl | 114 ++++++ src/mod_mix_sql.erl | 236 +++++++++++ src/mod_push.erl | 8 +- 17 files changed, 1949 insertions(+), 292 deletions(-) create mode 100644 src/mod_mix_mnesia.erl create mode 100644 src/mod_mix_pam.erl create mode 100644 src/mod_mix_pam_mnesia.erl create mode 100644 src/mod_mix_pam_sql.erl create mode 100644 src/mod_mix_sql.erl diff --git a/rebar.config b/rebar.config index b03a6bba7..06ad1e6db 100644 --- a/rebar.config +++ b/rebar.config @@ -24,7 +24,7 @@ {fast_tls, ".*", {git, "https://github.com/processone/fast_tls", {tag, "1.0.26"}}}, {stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.14"}}}, {fast_xml, ".*", {git, "https://github.com/processone/fast_xml", {tag, "1.1.34"}}}, - {xmpp, ".*", {git, "https://github.com/processone/xmpp", {tag, "1.2.6"}}}, + {xmpp, ".*", {git, "https://github.com/processone/xmpp", "8bc04ba"}}, {fast_yaml, ".*", {git, "https://github.com/processone/fast_yaml", {tag, "1.0.17"}}}, {jiffy, ".*", {git, "https://github.com/davisp/jiffy", {tag, "0.14.8"}}}, {p1_oauth2, ".*", {git, "https://github.com/processone/p1_oauth2", {tag, "0.6.3"}}}, diff --git a/sql/lite.new.sql b/sql/lite.new.sql index de62cd169..6ec7ea876 100644 --- a/sql/lite.new.sql +++ b/sql/lite.new.sql @@ -410,3 +410,57 @@ CREATE TABLE push_session ( ); CREATE UNIQUE INDEX i_push_session_susn ON push_session (server_host, username, service, node); + +CREATE TABLE mix_channel ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + hidden boolean NOT NULL, + hmac_key text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_channel ON mix_channel (channel, service); +CREATE INDEX i_mix_channel_serv ON mix_channel (service); + +CREATE TABLE mix_participant ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + id text NOT NULL, + nick text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_participant ON mix_participant (channel, service, username, domain); +CREATE INDEX i_mix_participant_chan_serv ON mix_participant (channel, service); + +CREATE TABLE mix_subscription ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + node text NOT NULL, + jid text NOT NULL +); + +CREATE UNIQUE INDEX i_mix_subscription ON mix_subscription (channel, service, username, domain, node); +CREATE INDEX i_mix_subscription_chan_serv_ud ON mix_subscription (channel, service, username, domain); +CREATE INDEX i_mix_subscription_chan_serv_node ON mix_subscription (channel, service, node); +CREATE INDEX i_mix_subscription_chan_serv ON mix_subscription (channel, service); + +CREATE TABLE mix_pam ( + username text NOT NULL, + server_host text NOT NULL, + channel text NOT NULL, + service text NOT NULL, + id text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_pam ON mix_pam (username, server_host, channel, service); +CREATE INDEX i_mix_pam_us ON mix_pam (username, server_host); diff --git a/sql/lite.sql b/sql/lite.sql index 50bfec3bd..177454c7b 100644 --- a/sql/lite.sql +++ b/sql/lite.sql @@ -379,3 +379,56 @@ CREATE TABLE push_session ( CREATE UNIQUE INDEX i_push_usn ON push_session (username, service, node); CREATE UNIQUE INDEX i_push_ut ON push_session (username, timestamp); + +CREATE TABLE mix_channel ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + hidden boolean NOT NULL, + hmac_key text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_channel ON mix_channel (channel, service); +CREATE INDEX i_mix_channel_serv ON mix_channel (service); + +CREATE TABLE mix_participant ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + id text NOT NULL, + nick text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_participant ON mix_participant (channel, service, username, domain); +CREATE INDEX i_mix_participant_chan_serv ON mix_participant (channel, service); + +CREATE TABLE mix_subscription ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + node text NOT NULL, + jid text NOT NULL +); + +CREATE UNIQUE INDEX i_mix_subscription ON mix_subscription (channel, service, username, domain, node); +CREATE INDEX i_mix_subscription_chan_serv_ud ON mix_subscription (channel, service, username, domain); +CREATE INDEX i_mix_subscription_chan_serv_node ON mix_subscription (channel, service, node); +CREATE INDEX i_mix_subscription_chan_serv ON mix_subscription (channel, service); + +CREATE TABLE mix_pam ( + username text NOT NULL, + channel text NOT NULL, + service text NOT NULL, + id text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_pam ON mix_pam (username, channel, service); +CREATE INDEX i_mix_pam_us ON mix_pam (username); diff --git a/sql/mysql.new.sql b/sql/mysql.new.sql index 88769ddae..832910d2c 100644 --- a/sql/mysql.new.sql +++ b/sql/mysql.new.sql @@ -426,3 +426,57 @@ CREATE TABLE push_session ( ); CREATE UNIQUE INDEX i_push_session_susn ON push_session (server_host(191), username(191), service(191), node(191)); + +CREATE TABLE mix_channel ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + hidden boolean NOT NULL, + hmac_key text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +CREATE UNIQUE INDEX i_mix_channel ON mix_channel (channel(191), service(191)); +CREATE INDEX i_mix_channel_serv ON mix_channel (service(191)); + +CREATE TABLE mix_participant ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + id text NOT NULL, + nick text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +CREATE UNIQUE INDEX i_mix_participant ON mix_participant (channel(191), service(191), username(191), domain(191)); +CREATE INDEX i_mix_participant_chan_serv ON mix_participant (channel(191), service(191)); + +CREATE TABLE mix_subscription ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + node text NOT NULL, + jid text NOT NULL +) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +CREATE UNIQUE INDEX i_mix_subscription ON mix_subscription (channel(153), service(153), username(153), domain(153), node(153)); +CREATE INDEX i_mix_subscription_chan_serv_ud ON mix_subscription (channel(191), service(191), username(191), domain(191)); +CREATE INDEX i_mix_subscription_chan_serv_node ON mix_subscription (channel(191), service(191), node(191)); +CREATE INDEX i_mix_subscription_chan_serv ON mix_subscription (channel(191), service(191)); + +CREATE TABLE mix_pam ( + username text NOT NULL, + server_host text NOT NULL, + channel text NOT NULL, + service text NOT NULL, + id text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +CREATE UNIQUE INDEX i_mix_pam ON mix_pam (username(191), server_host(191), channel(191), service(191)); +CREATE INDEX i_mix_pam_us ON mix_pam (username(191), server_host(191)); diff --git a/sql/mysql.sql b/sql/mysql.sql index 2fcea38f5..568d2b41c 100644 --- a/sql/mysql.sql +++ b/sql/mysql.sql @@ -395,3 +395,56 @@ CREATE TABLE push_session ( CREATE UNIQUE INDEX i_push_usn ON push_session (username(191), service(191), node(191)); CREATE UNIQUE INDEX i_push_ut ON push_session (username(191), timestamp); + +CREATE TABLE mix_channel ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + hidden boolean NOT NULL, + hmac_key text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +CREATE UNIQUE INDEX i_mix_channel ON mix_channel (channel(191), service(191)); +CREATE INDEX i_mix_channel_serv ON mix_channel (service(191)); + +CREATE TABLE mix_participant ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + id text NOT NULL, + nick text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +CREATE UNIQUE INDEX i_mix_participant ON mix_participant (channel(191), service(191), username(191), domain(191)); +CREATE INDEX i_mix_participant_chan_serv ON mix_participant (channel(191), service(191)); + +CREATE TABLE mix_subscription ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + node text NOT NULL, + jid text NOT NULL +) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +CREATE UNIQUE INDEX i_mix_subscription ON mix_subscription (channel(153), service(153), username(153), domain(153), node(153)); +CREATE INDEX i_mix_subscription_chan_serv_ud ON mix_subscription (channel(191), service(191), username(191), domain(191)); +CREATE INDEX i_mix_subscription_chan_serv_node ON mix_subscription (channel(191), service(191), node(191)); +CREATE INDEX i_mix_subscription_chan_serv ON mix_subscription (channel(191), service(191)); + +CREATE TABLE mix_pam ( + username text NOT NULL, + channel text NOT NULL, + service text NOT NULL, + id text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +CREATE UNIQUE INDEX i_mix_pam ON mix_pam (username(191), channel(191), service(191)); +CREATE INDEX i_mix_pam_u ON mix_pam (username(191)); diff --git a/sql/pg.new.sql b/sql/pg.new.sql index 6244eeea9..721ce1ae9 100644 --- a/sql/pg.new.sql +++ b/sql/pg.new.sql @@ -571,3 +571,57 @@ CREATE TABLE push_session ( ); CREATE UNIQUE INDEX i_push_session_susn ON push_session USING btree (server_host, username, service, node); + +CREATE TABLE mix_channel ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + hidden boolean NOT NULL, + hmac_key text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_channel ON mix_channel (channel, service); +CREATE INDEX i_mix_channel_serv ON mix_channel (service); + +CREATE TABLE mix_participant ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + id text NOT NULL, + nick text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_participant ON mix_participant (channel, service, username, domain); +CREATE INDEX i_mix_participant_chan_serv ON mix_participant (channel, service); + +CREATE TABLE mix_subscription ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + node text NOT NULL, + jid text NOT NULL +); + +CREATE UNIQUE INDEX i_mix_subscription ON mix_subscription (channel, service, username, domain, node); +CREATE INDEX i_mix_subscription_chan_serv_ud ON mix_subscription (channel, service, username, domain); +CREATE INDEX i_mix_subscription_chan_serv_node ON mix_subscription (channel, service, node); +CREATE INDEX i_mix_subscription_chan_serv ON mix_subscription (channel, service); + +CREATE TABLE mix_pam ( + username text NOT NULL, + server_host text NOT NULL, + channel text NOT NULL, + service text NOT NULL, + id text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_pam ON mix_pam (username, server_host, channel, service); +CREATE INDEX i_mix_pam_us ON mix_pam (username, server_host); diff --git a/sql/pg.sql b/sql/pg.sql index ad1e4b9c2..b9797b820 100644 --- a/sql/pg.sql +++ b/sql/pg.sql @@ -399,3 +399,56 @@ CREATE TABLE push_session ( CREATE UNIQUE INDEX i_push_usn ON push_session USING btree (username, service, node); CREATE UNIQUE INDEX i_push_ut ON push_session USING btree (username, timestamp); + +CREATE TABLE mix_channel ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + hidden boolean NOT NULL, + hmac_key text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_channel ON mix_channel (channel, service); +CREATE INDEX i_mix_channel_serv ON mix_channel (service); + +CREATE TABLE mix_participant ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + jid text NOT NULL, + id text NOT NULL, + nick text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_participant ON mix_participant (channel, service, username, domain); +CREATE INDEX i_mix_participant_chan_serv ON mix_participant (channel, service); + +CREATE TABLE mix_subscription ( + channel text NOT NULL, + service text NOT NULL, + username text NOT NULL, + domain text NOT NULL, + node text NOT NULL, + jid text NOT NULL +); + +CREATE UNIQUE INDEX i_mix_subscription ON mix_subscription (channel, service, username, domain, node); +CREATE INDEX i_mix_subscription_chan_serv_ud ON mix_subscription (channel, service, username, domain); +CREATE INDEX i_mix_subscription_chan_serv_node ON mix_subscription (channel, service, node); +CREATE INDEX i_mix_subscription_chan_serv ON mix_subscription (channel, service); + +CREATE TABLE mix_pam ( + username text NOT NULL, + channel text NOT NULL, + service text NOT NULL, + id text NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE UNIQUE INDEX i_mix_pam ON mix_pam (username, channel, service); +CREATE INDEX i_mix_pam_us ON mix_pam (username); diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index ab1251715..119a70939 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -44,6 +44,7 @@ close_session/4, check_in_subscription/2, bounce_offline_message/1, + bounce_sm_packet/1, disconnect_removed_user/2, get_user_resources/2, get_user_present_resources/2, @@ -191,14 +192,22 @@ check_in_subscription(Acc, #presence{to = To}) -> -spec bounce_offline_message({bounce, message()} | any()) -> any(). -bounce_offline_message({bounce, #message{type = T} = Packet} = Acc) - when T == chat; T == groupchat; T == normal -> +bounce_offline_message({bounce, #message{type = T}} = Acc) + when T == chat; T == groupchat; T == normal -> + bounce_sm_packet(Acc); +bounce_offline_message(Acc) -> + Acc. + +-spec bounce_sm_packet({bounce | term(), stanza()}) -> any(). +bounce_sm_packet({bounce, Packet} = Acc) -> Lang = xmpp:get_lang(Packet), Txt = <<"User session not found">>, Err = xmpp:err_service_unavailable(Txt, Lang), ejabberd_router:route_error(Packet, Err), {stop, Acc}; -bounce_offline_message(Acc) -> +bounce_sm_packet({_, Packet} = Acc) -> + ?DEBUG("dropping packet to unavailable resource:~n~s", + [xmpp:pp(Packet)]), Acc. -spec disconnect_removed_user(binary(), binary()) -> ok. @@ -486,6 +495,8 @@ host_up(Host) -> ejabberd_sm, check_in_subscription, 20), ejabberd_hooks:add(offline_message_hook, Host, ejabberd_sm, bounce_offline_message, 100), + ejabberd_hooks:add(bounce_sm_packet, Host, + ejabberd_sm, bounce_sm_packet, 100), ejabberd_hooks:add(remove_user, Host, ejabberd_sm, disconnect_removed_user, 100), ejabberd_c2s:host_up(Host). @@ -510,6 +521,8 @@ host_down(Host) -> ejabberd_sm, check_in_subscription, 20), ejabberd_hooks:delete(offline_message_hook, Host, ejabberd_sm, bounce_offline_message, 100), + ejabberd_hooks:delete(bounce_sm_packet, Host, + ejabberd_sm, bounce_sm_packet, 100), ejabberd_hooks:delete(remove_user, Host, ejabberd_sm, disconnect_removed_user, 100), ejabberd_c2s:host_down(Host). @@ -645,19 +658,22 @@ do_route(#presence{to = #jid{lresource = <<"">>} = To} = Packet) -> fun({_, R}) -> do_route(Packet#presence{to = jid:replace_resource(To, R)}) end, get_user_present_resources(LUser, LServer)); -do_route(#message{to = #jid{lresource = <<"">>}, type = T} = Packet) -> +do_route(#message{to = #jid{lresource = <<"">>} = To, type = T} = Packet) -> ?DEBUG("processing message to bare JID:~n~s", [xmpp:pp(Packet)]), if T == chat; T == headline; T == normal -> route_message(Packet); true -> - Lang = xmpp:get_lang(Packet), - ErrTxt = <<"User session not found">>, - Err = xmpp:err_service_unavailable(ErrTxt, Lang), - ejabberd_router:route_error(Packet, Err) + ejabberd_hooks:run_fold(bounce_sm_packet, + To#jid.lserver, {bounce, Packet}, []) + end; +do_route(#iq{to = #jid{lresource = <<"">>} = To, type = T} = Packet) -> + if T == set; T == get -> + ?DEBUG("processing IQ to bare JID:~n~s", [xmpp:pp(Packet)]), + gen_iq_handler:handle(?MODULE, Packet); + true -> + ejabberd_hooks:run_fold(bounce_sm_packet, + To#jid.lserver, {pass, Packet}, []) end; -do_route(#iq{to = #jid{lresource = <<"">>}} = Packet) -> - ?DEBUG("processing IQ to bare JID:~n~s", [xmpp:pp(Packet)]), - gen_iq_handler:handle(?MODULE, Packet); do_route(Packet) -> ?DEBUG("processing packet to full JID:~n~s", [xmpp:pp(Packet)]), To = xmpp:get_to(Packet), @@ -669,16 +685,14 @@ do_route(Packet) -> #message{type = T} when T == chat; T == normal -> route_message(Packet); #message{type = T} when T == headline -> - ?DEBUG("dropping headline to unavailable resource:~n~s", - [xmpp:pp(Packet)]); + ejabberd_hooks:run_fold(bounce_sm_packet, + LServer, {pass, Packet}, []); #presence{} -> - ?DEBUG("dropping presence to unavailable resource:~n~s", - [xmpp:pp(Packet)]); + ejabberd_hooks:run_fold(bounce_sm_packet, + LServer, {pass, Packet}, []); _ -> - Lang = xmpp:get_lang(Packet), - ErrTxt = <<"User session not found">>, - Err = xmpp:err_service_unavailable(ErrTxt, Lang), - ejabberd_router:route_error(Packet, Err) + ejabberd_hooks:run_fold(bounce_sm_packet, + LServer, {bounce, Packet}, []) end; Ss -> Session = lists:max(Ss), diff --git a/src/mod_mam.erl b/src/mod_mam.erl index 58108fb1e..32c13c875 100644 --- a/src/mod_mam.erl +++ b/src/mod_mam.erl @@ -40,7 +40,8 @@ muc_process_iq/2, muc_filter_message/3, message_is_archived/3, delete_old_messages/2, get_commands_spec/0, msg_to_el/4, get_room_config/4, set_room_option/3, offline_message/1, export/1, - mod_options/1, remove_mam_for_user_with_peer/3, remove_mam_for_user/2]). + mod_options/1, remove_mam_for_user_with_peer/3, remove_mam_for_user/2, + process_iq/3, store_mam_message/7, make_id/0]). -include("xmpp.hrl"). -include("logger.hrl"). @@ -116,6 +117,8 @@ start(Host, Opts) -> get_room_config, 50), ejabberd_hooks:add(set_room_option, Host, ?MODULE, set_room_option, 50), + ejabberd_hooks:add(store_mam_message, Host, ?MODULE, + store_mam_message, 100), case gen_mod:get_opt(assume_mam_usage, Opts) of true -> ejabberd_hooks:add(message_is_archived, Host, ?MODULE, @@ -181,6 +184,8 @@ stop(Host) -> get_room_config, 50), ejabberd_hooks:delete(set_room_option, Host, ?MODULE, set_room_option, 50), + ejabberd_hooks:delete(store_mam_message, Host, ?MODULE, + store_mam_message, 100), case gen_mod:get_module_opt(Host, ?MODULE, assume_mam_usage) of true -> ejabberd_hooks:delete(message_is_archived, Host, ?MODULE, @@ -412,6 +417,10 @@ muc_filter_message(#message{from = From} = Pkt, muc_filter_message(Acc, _MUCState, _FromNick) -> Acc. +-spec make_id() -> binary(). +make_id() -> + p1_time_compat:system_time(micro_seconds). + -spec get_stanza_id(stanza()) -> integer(). get_stanza_id(#message{meta = #{stanza_id := ID}}) -> ID. @@ -422,7 +431,7 @@ init_stanza_id(#message{meta = #{stanza_id := _ID}} = Pkt, _LServer) -> init_stanza_id(#message{meta = #{from_offline := true}} = Pkt, _LServer) -> Pkt; init_stanza_id(Pkt, LServer) -> - ID = p1_time_compat:system_time(micro_seconds), + ID = make_id(), Pkt1 = strip_my_stanza_id(Pkt, LServer), xmpp:put_meta(Pkt1, stanza_id, ID). @@ -526,7 +535,7 @@ message_is_archived(false, #{lserver := LServer}, Pkt) -> delete_old_messages(TypeBin, Days) when TypeBin == <<"chat">>; TypeBin == <<"groupchat">>; TypeBin == <<"all">> -> - CurrentTime = p1_time_compat:system_time(micro_seconds), + CurrentTime = make_id(), Diff = Days * 24 * 60 * 60 * 1000000, TimeStamp = misc:usec_to_now(CurrentTime - Diff), Type = misc:binary_to_atom(TypeBin), @@ -610,7 +619,7 @@ process_iq(LServer, #iq{from = #jid{luser = LUser}, lang = Lang, case MsgType of chat -> maybe_activate_mam(LUser, LServer); - {groupchat, _Role, _MUCState} -> + _ -> ok end, case SubEl of @@ -824,15 +833,9 @@ store_msg(Pkt, LUser, LServer, Peer, Dir) -> ok; % Already stored. {true, _} -> case ejabberd_hooks:run_fold(store_mam_message, LServer, Pkt, - [LUser, LServer, Peer, chat, Dir]) of - drop -> - pass; - Pkt1 -> - US = {LUser, LServer}, - ID = get_stanza_id(Pkt1), - El = xmpp:encode(Pkt1), - Mod = gen_mod:db_mod(LServer, ?MODULE), - Mod:store(El, LServer, US, chat, Peer, <<"">>, Dir, ID) + [LUser, LServer, Peer, <<"">>, chat, Dir]) of + #message{} -> ok; + _ -> pass end; {false, _} -> pass @@ -846,20 +849,23 @@ store_muc(MUCState, Pkt, RoomJID, Peer, Nick) -> {U, S, _} = jid:tolower(RoomJID), LServer = MUCState#state.server_host, case ejabberd_hooks:run_fold(store_mam_message, LServer, Pkt, - [U, S, Peer, groupchat, recv]) of - drop -> - pass; - Pkt1 -> - US = {U, S}, - ID = get_stanza_id(Pkt1), - El = xmpp:encode(Pkt1), - Mod = gen_mod:db_mod(LServer, ?MODULE), - Mod:store(El, LServer, US, groupchat, Peer, Nick, recv, ID) + [U, S, Peer, Nick, groupchat, recv]) of + #message{} -> ok; + _ -> pass end; false -> pass end. +store_mam_message(Pkt, U, S, Peer, Nick, Type, Dir) -> + LServer = ejabberd_router:host_of_route(S), + US = {U, S}, + ID = get_stanza_id(Pkt), + El = xmpp:encode(Pkt), + Mod = gen_mod:db_mod(LServer, ?MODULE), + Mod:store(El, LServer, US, Type, Peer, Nick, Dir, ID), + Pkt. + write_prefs(LUser, LServer, Host, Default, Always, Never) -> Prefs = #archive_prefs{us = {LUser, LServer}, default = Default, @@ -944,7 +950,7 @@ select_and_send(LServer, Query, RSM, #iq{from = From, to = To} = IQ, MsgType) -> case MsgType of chat -> select(LServer, From, From, Query, RSM, MsgType); - {groupchat, _Role, _MUCState} -> + _ -> select(LServer, From, To, Query, RSM, MsgType) end, SortedMsgs = lists:keysort(2, Msgs), @@ -1006,7 +1012,11 @@ msg_to_el(#archive_msg{timestamp = TS, packet = El, nick = Nick, CodecOpts = ejabberd_config:codec_options(LServer), try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of Pkt1 -> - Pkt2 = set_stanza_id(Pkt1, JidArchive, ID), + Pkt2 = case MsgType of + chat -> set_stanza_id(Pkt1, JidArchive, ID); + {groupchat, _, _} -> set_stanza_id(Pkt1, JidArchive, ID); + _ -> Pkt1 + end, Pkt3 = maybe_update_from_to( Pkt2, JidRequestor, JidArchive, Peer, MsgType, Nick), Delay = #delay{stamp = TS, from = jid:make(LServer)}, @@ -1041,7 +1051,7 @@ maybe_update_from_to(#message{sub_els = Els} = Pkt, JidRequestor, JidArchive, Pkt#message{from = jid:replace_resource(JidArchive, Nick), to = undefined, sub_els = Items ++ Els}; -maybe_update_from_to(Pkt, _JidRequestor, _JidArchive, _Peer, chat, _Nick) -> +maybe_update_from_to(Pkt, _JidRequestor, _JidArchive, _Peer, _MsgType, _Nick) -> Pkt. -spec send([{binary(), integer(), xmlel()}], diff --git a/src/mod_mam_sql.erl b/src/mod_mam_sql.erl index 1c9b7cea2..4fac259bc 100644 --- a/src/mod_mam_sql.erl +++ b/src/mod_mam_sql.erl @@ -175,7 +175,7 @@ select(LServer, JidRequestor, #jid{luser = LUser} = JidArchive, MAMQuery, RSM, MsgType) -> User = case MsgType of chat -> LUser; - {groupchat, _Role, _MUCState} -> jid:encode(JidArchive) + _ -> jid:encode(JidArchive) end, {Query, CountQuery} = make_sql_query(User, LServer, MAMQuery, RSM), % TODO from XEP-0313 v0.2: "To conserve resources, a server MAY place a diff --git a/src/mod_mix.erl b/src/mod_mix.erl index 78e5d0251..e9b91202b 100644 --- a/src/mod_mix.erl +++ b/src/mod_mix.erl @@ -21,32 +21,45 @@ %%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. %%% %%%---------------------------------------------------------------------- - -module(mod_mix). - --behaviour(gen_server). -behaviour(gen_mod). +-behaviour(gen_server). +-protocol({xep, 369, '0.13.0'}). %% API --export([start/2, stop/1, process_iq/1, - disco_items/5, disco_identity/5, disco_info/5, - disco_features/5, mod_opt_type/1, mod_options/1, depends/2]). - +-export([route/1]). +%% gen_mod callbacks +-export([start/2, stop/1, reload/3, depends/2, mod_opt_type/1, mod_options/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, format_status/2]). +%% Hooks +-export([process_disco_info/1, + process_disco_items/1, + process_mix_core/1, + process_mam_query/1, + process_pubsub_query/1]). --include("logger.hrl"). -include("xmpp.hrl"). +-include("logger.hrl"). +-include("translate.hrl"). --define(NODES, [?NS_MIX_NODES_MESSAGES, - ?NS_MIX_NODES_PRESENCE, - ?NS_MIX_NODES_PARTICIPANTS, - ?NS_MIX_NODES_SUBJECT, - ?NS_MIX_NODES_CONFIG]). +-callback init(binary(), gen_mod:opts()) -> ok | {error, db_failure}. +-callback set_channel(binary(), binary(), binary(), + binary(), boolean(), binary()) -> + ok | {error, db_failure}. +-callback get_channels(binary(), binary()) -> + {ok, [binary()]} | {error, db_failure}. +-callback get_channel(binary(), binary(), binary()) -> + {ok, {jid(), boolean(), binary()}} | + {error, notfound | db_failure}. +-callback set_participant(binary(), binary(), binary(), jid(), binary(), binary()) -> + ok | {error, db_failure}. +-callback get_participant(binary(), binary(), binary(), jid()) -> + {ok, {binary(), binary()}} | {error, notfound | db_failure}. --record(state, {server_host :: binary(), - hosts :: [binary()]}). +-record(state, {hosts :: [binary()], + server_host :: binary()}). %%%=================================================================== %%% API @@ -57,267 +70,571 @@ start(Host, Opts) -> stop(Host) -> gen_mod:stop_child(?MODULE, Host). --spec disco_features({error, stanza_error()} | {result, [binary()]} | empty, - jid(), jid(), binary(), binary()) -> {result, [binary()]}. -disco_features(_Acc, _From, _To, _Node, _Lang) -> - {result, [?NS_MIX_0]}. +reload(Host, NewOpts, OldOpts) -> + Proc = gen_mod:get_module_proc(Host, ?MODULE), + gen_server:cast(Proc, {reload, Host, NewOpts, OldOpts}). -disco_items(_Acc, _From, To, _Node, _Lang) when To#jid.luser /= <<"">> -> - BareTo = jid:remove_resource(To), - {result, [#disco_item{jid = BareTo, node = Node} || Node <- ?NODES]}; -disco_items(_Acc, _From, _To, _Node, _Lang) -> - {result, []}. +depends(_Host, _Opts) -> + [{mod_mam, hard}]. -disco_identity(Acc, _From, To, _Node, _Lang) when To#jid.luser == <<"">> -> - Acc ++ [#identity{category = <<"conference">>, - name = <<"MIX service">>, - type = <<"text">>}]; -disco_identity(Acc, _From, _To, _Node, _Lang) -> - Acc ++ [#identity{category = <<"conference">>, - type = <<"mix">>}]. +mod_opt_type(access_create) -> fun acl:access_rules_validator/1; +mod_opt_type(name) -> fun iolist_to_binary/1; +mod_opt_type(host) -> fun ejabberd_config:v_host/1; +mod_opt_type(hosts) -> fun ejabberd_config:v_hosts/1; +mod_opt_type(db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end. --spec disco_info([xdata()], binary(), module(), binary(), binary()) -> [xdata()]; - ([xdata()], jid(), jid(), binary(), binary()) -> [xdata()]. -disco_info(_Acc, _From, To, _Node, _Lang) when is_atom(To) -> - [#xdata{type = result, - fields = [#xdata_field{var = <<"FORM_TYPE">>, - type = hidden, - values = [?NS_MIX_SERVICEINFO_0]}]}]; -disco_info(Acc, _From, _To, _Node, _Lang) -> - Acc. +mod_options(Host) -> + [{access_create, all}, + {host, <<"mix.@HOST@">>}, + {hosts, []}, + {name, ?T("Channels")}, + {db_type, ejabberd_config:default_db(Host, ?MODULE)}]. -process_iq(#iq{type = set, from = From, to = To, - sub_els = [#mix_join{subscribe = SubNodes}]} = IQ) -> - Nodes = [Node || Node <- SubNodes, lists:member(Node, ?NODES)], - case subscribe_nodes(From, To, Nodes) of - {result, _} -> - case publish_participant(From, To) of - {result, _} -> - BareFrom = jid:remove_resource(From), - xmpp:make_iq_result( - IQ, #mix_join{jid = BareFrom, subscribe = Nodes}); - {error, Err} -> - xmpp:make_error(IQ, Err) - end; - {error, Err} -> - xmpp:make_error(IQ, Err) +-spec route(stanza()) -> ok. +route(#iq{} = IQ) -> + ejabberd_router:process_iq(IQ); +route(#message{type = groupchat, id = ID, lang = Lang, + to = #jid{luser = <<_, _/binary>>}} = Msg) -> + case ID of + <<>> -> + Txt = <<"Attribute 'id' is mandatory for MIX messages">>, + Err = xmpp:err_bad_request(Txt, Lang), + ejabberd_router:route_error(Msg, Err); + _ -> + process_mix_message(Msg) end; -process_iq(#iq{type = set, from = From, to = To, - sub_els = [#mix_leave{}]} = IQ) -> - case delete_participant(From, To) of - {result, _} -> - case unsubscribe_nodes(From, To, ?NODES) of - {result, _} -> - xmpp:make_iq_result(IQ); - {error, Err} -> - xmpp:make_error(IQ, Err) - end; - {error, Err} -> - xmpp:make_error(IQ, Err) +route(Pkt) -> + ?DEBUG("Dropping packet:~n~s", [xmpp:pp(Pkt)]). + +-spec process_disco_info(iq()) -> iq(). +process_disco_info(#iq{type = set, lang = Lang} = IQ) -> + Txt = <<"Value 'set' of 'type' attribute is not allowed">>, + xmpp:make_error(IQ, xmpp:err_not_allowed(Txt, Lang)); +process_disco_info(#iq{type = get, to = #jid{luser = <<>>} = To, + from = _From, lang = Lang, + sub_els = [#disco_info{node = <<>>}]} = IQ) -> + ServerHost = ejabberd_router:host_of_route(To#jid.lserver), + X = ejabberd_hooks:run_fold(disco_info, ServerHost, [], + [ServerHost, ?MODULE, <<"">>, Lang]), + Name = gen_mod:get_module_opt(ServerHost, ?MODULE, name), + Identity = #identity{category = <<"conference">>, + type = <<"text">>, + name = translate:translate(Lang, Name)}, + Features = [?NS_DISCO_INFO, ?NS_DISCO_ITEMS, + ?NS_MIX_CORE_0, ?NS_MIX_CORE_SEARCHABLE_0, + ?NS_MIX_CORE_CREATE_CHANNEL_0], + xmpp:make_iq_result( + IQ, #disco_info{features = Features, + identities = [Identity], + xdata = X}); +process_disco_info(#iq{type = get, to = #jid{luser = <<_, _/binary>>} = To, + sub_els = [#disco_info{node = <<"mix">>}]} = IQ) -> + {Chan, Host, _} = jid:tolower(To), + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + case Mod:get_channel(ServerHost, Chan, Host) of + {ok, _} -> + Identity = #identity{category = <<"conference">>, + type = <<"mix">>}, + Features = [?NS_DISCO_INFO, ?NS_DISCO_ITEMS, + ?NS_MIX_CORE_0, ?NS_MAM_2], + xmpp:make_iq_result( + IQ, #disco_info{node = <<"mix">>, + features = Features, + identities = [Identity]}); + {error, notfound} -> + xmpp:make_error(IQ, no_channel_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) end; -process_iq(#iq{lang = Lang} = IQ) -> - Txt = <<"Unsupported MIX query">>, - xmpp:make_error(IQ, xmpp:err_bad_request(Txt, Lang)). +process_disco_info(#iq{type = get, sub_els = [#disco_info{node = Node}]} = IQ) -> + xmpp:make_iq_result(IQ, #disco_info{node = Node, features = [?NS_DISCO_INFO]}); +process_disco_info(IQ) -> + xmpp:make_error(IQ, unsupported_error(IQ)). + +-spec process_disco_items(iq()) -> iq(). +process_disco_items(#iq{type = set, lang = Lang} = IQ) -> + Txt = <<"Value 'set' of 'type' attribute is not allowed">>, + xmpp:make_error(IQ, xmpp:err_not_allowed(Txt, Lang)); +process_disco_items(#iq{type = get, to = #jid{luser = <<>>} = To, + sub_els = [#disco_items{node = <<>>}]} = IQ) -> + Host = To#jid.lserver, + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + case Mod:get_channels(ServerHost, Host) of + {ok, Channels} -> + Items = [#disco_item{jid = jid:make(Channel, Host)} + || Channel <- Channels], + xmpp:make_iq_result(IQ, #disco_items{items = Items}); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end; +process_disco_items(#iq{type = get, to = #jid{luser = <<_, _/binary>>} = To, + sub_els = [#disco_items{node = <<"mix">>}]} = IQ) -> + {Chan, Host, _} = jid:tolower(To), + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + case Mod:get_channel(ServerHost, Chan, Host) of + {ok, _} -> + BTo = jid:remove_resource(To), + Items = [#disco_item{jid = BTo, node = Node} || Node <- known_nodes()], + xmpp:make_iq_result(IQ, #disco_items{node = <<"mix">>, items = Items}); + {error, notfound} -> + xmpp:make_error(IQ, no_channel_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end; +process_disco_items(#iq{type = get, sub_els = [#disco_items{node = Node}]} = IQ) -> + xmpp:make_iq_result(IQ, #disco_items{node = Node}); +process_disco_items(IQ) -> + xmpp:make_error(IQ, unsupported_error(IQ)). + +-spec process_mix_core(iq()) -> iq(). +process_mix_core(#iq{type = set, to = #jid{luser = <<>>}, + sub_els = [#mix_create{}]} = IQ) -> + process_mix_create(IQ); +process_mix_core(#iq{type = set, to = #jid{luser = <<>>}, + sub_els = [#mix_destroy{}]} = IQ) -> + process_mix_destroy(IQ); +process_mix_core(#iq{type = set, to = #jid{luser = <<_, _/binary>>}, + sub_els = [#mix_join{}]} = IQ) -> + process_mix_join(IQ); +process_mix_core(#iq{type = set, to = #jid{luser = <<_, _/binary>>}, + sub_els = [#mix_leave{}]} = IQ) -> + process_mix_leave(IQ); +process_mix_core(#iq{type = set, to = #jid{luser = <<_, _/binary>>}, + sub_els = [#mix_setnick{}]} = IQ) -> + process_mix_setnick(IQ); +process_mix_core(IQ) -> + xmpp:make_error(IQ, unsupported_error(IQ)). + +process_pubsub_query(#iq{type = get, + sub_els = [#pubsub{items = #ps_items{node = Node}}]} = IQ) + when Node == ?NS_MIX_NODES_PARTICIPANTS -> + process_participants_list(IQ); +process_pubsub_query(IQ) -> + xmpp:make_error(IQ, unsupported_error(IQ)). + +process_mam_query(#iq{from = From, to = To, type = T, + sub_els = [#mam_query{}]} = IQ) + when T == get; T == set -> + {Chan, Host, _} = jid:tolower(To), + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + case Mod:get_channel(ServerHost, Chan, Host) of + {ok, _} -> + BFrom = jid:remove_resource(From), + case Mod:get_participant(ServerHost, Chan, Host, BFrom) of + {ok, _} -> + mod_mam:process_iq(ServerHost, IQ, mix); + {error, notfound} -> + xmpp:make_error(IQ, not_joined_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end; + {error, notfound} -> + xmpp:make_error(IQ, no_channel_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end; +process_mam_query(IQ) -> + xmpp:make_error(IQ, unsupported_error(IQ)). %%%=================================================================== %%% gen_server callbacks %%%=================================================================== -init([ServerHost, Opts]) -> +init([Host, Opts]) -> process_flag(trap_exit, true), - Hosts = gen_mod:get_opt_hosts(ServerHost, Opts), - lists:foreach( - fun(Host) -> - ConfigTab = gen_mod:get_module_proc(Host, config), - ets:new(ConfigTab, [named_table]), - ets:insert(ConfigTab, {plugins, [<<"mix">>]}), - ejabberd_hooks:add(disco_local_items, Host, ?MODULE, disco_items, 100), - ejabberd_hooks:add(disco_local_features, Host, ?MODULE, disco_features, 100), - ejabberd_hooks:add(disco_local_identity, Host, ?MODULE, disco_identity, 100), - ejabberd_hooks:add(disco_sm_items, Host, ?MODULE, disco_items, 100), - ejabberd_hooks:add(disco_sm_features, Host, ?MODULE, disco_features, 100), - ejabberd_hooks:add(disco_sm_identity, Host, ?MODULE, disco_identity, 100), - ejabberd_hooks:add(disco_info, Host, ?MODULE, disco_info, 100), - gen_iq_handler:add_iq_handler(ejabberd_local, Host, - ?NS_DISCO_ITEMS, mod_disco, - process_local_iq_items), - gen_iq_handler:add_iq_handler(ejabberd_local, Host, - ?NS_DISCO_INFO, mod_disco, - process_local_iq_info), - gen_iq_handler:add_iq_handler(ejabberd_sm, Host, - ?NS_DISCO_ITEMS, mod_disco, - process_local_iq_items), - gen_iq_handler:add_iq_handler(ejabberd_sm, Host, - ?NS_DISCO_INFO, mod_disco, - process_local_iq_info), - gen_iq_handler:add_iq_handler(ejabberd_sm, Host, - ?NS_PUBSUB, mod_pubsub, iq_sm), - gen_iq_handler:add_iq_handler(ejabberd_sm, Host, - ?NS_MIX_0, ?MODULE, process_iq), - ejabberd_router:register_route(Host, ServerHost) - end, Hosts), - {ok, #state{server_host = ServerHost, hosts = Hosts}}. + Mod = gen_mod:db_mod(Host, Opts, ?MODULE), + MyHosts = gen_mod:get_opt_hosts(Host, Opts), + case Mod:init(Host, [{hosts, MyHosts}|Opts]) of + ok -> + lists:foreach( + fun(MyHost) -> + ejabberd_router:register_route( + MyHost, Host, {apply, ?MODULE, route}), + register_iq_handlers(MyHost) + end, MyHosts), + {ok, #state{hosts = MyHosts, server_host = Host}}; + {error, db_failure} -> + {stop, db_failure} + end. -handle_call(_Request, _From, State) -> - Reply = ok, - {reply, Reply, State}. - -handle_cast(_Msg, State) -> +handle_call(Request, _From, State) -> + ?WARNING_MSG("Unexpected call: ~p", [Request]), {noreply, State}. -handle_info({route, Packet}, State) -> - case catch do_route(State, Packet) of - {'EXIT', _} = Err -> - try - ?ERROR_MSG("failed to route packet:~n~s~nReason: ~p", - [xmpp:pp(Packet), Err]), - Error = xmpp:err_internal_server_error(), - ejabberd_router:route_error(Packet, Error) - catch _:_ -> - ok - end; - _ -> - ok - end, - {noreply, State}; -handle_info(_Info, State) -> +handle_cast(Request, State) -> + ?WARNING_MSG("Unexpected cast: ~p", [Request]), {noreply, State}. -terminate(_Reason, #state{hosts = Hosts}) -> +handle_info(Info, State) -> + ?WARNING_MSG("Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, State) -> lists:foreach( - fun(Host) -> - ejabberd_hooks:delete(disco_local_items, Host, ?MODULE, disco_items, 100), - ejabberd_hooks:delete(disco_local_features, Host, ?MODULE, disco_features, 100), - ejabberd_hooks:delete(disco_local_identity, Host, ?MODULE, disco_identity, 100), - ejabberd_hooks:delete(disco_sm_items, Host, ?MODULE, disco_items, 100), - ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, disco_features, 100), - ejabberd_hooks:delete(disco_sm_identity, Host, ?MODULE, disco_identity, 100), - ejabberd_hooks:delete(disco_info, Host, ?MODULE, disco_info, 100), - gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS), - gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO), - gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_DISCO_ITEMS), - gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_DISCO_INFO), - gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_PUBSUB), - gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MIX_0), - ejabberd_router:unregister_route(Host) - end, Hosts). + fun(MyHost) -> + unregister_iq_handlers(MyHost), + ejabberd_router:unregister_route(MyHost) + end, State#state.hosts). code_change(_OldVsn, State, _Extra) -> {ok, State}. +format_status(_Opt, Status) -> + Status. + %%%=================================================================== %%% Internal functions %%%=================================================================== -do_route(_State, #iq{} = Packet) -> - ejabberd_router:process_iq(Packet); -do_route(_State, #presence{from = From, to = To, type = unavailable}) - when To#jid.luser /= <<"">> -> - delete_presence(From, To); -do_route(_State, _Packet) -> - ok. +-spec process_mix_create(iq()) -> iq(). +process_mix_create(#iq{to = To, from = From, + sub_els = [#mix_create{channel = Chan}]} = IQ) -> + Host = To#jid.lserver, + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + Creator = jid:remove_resource(From), + Chan1 = case Chan of + <<>> -> p1_rand:get_string(); + _ -> Chan + end, + Ret = case Mod:get_channel(ServerHost, Chan1, Host) of + {ok, {#jid{luser = U, lserver = S}, _, _}} -> + case {From#jid.luser, From#jid.lserver} of + {U, S} -> ok; + _ -> {error, conflict} + end; + {error, notfound} -> + Key = xmpp_util:hex(p1_rand:bytes(20)), + Mod:set_channel(ServerHost, Chan1, Host, + Creator, Chan == <<>>, Key); + {error, db_failure} = Err -> + Err + end, + case Ret of + ok -> + xmpp:make_iq_result(IQ, #mix_create{channel = Chan1}); + {error, conflict} -> + xmpp:make_error(IQ, channel_exists_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end. -subscribe_nodes(From, To, Nodes) -> - LTo = jid:tolower(jid:remove_resource(To)), - LFrom = jid:tolower(jid:remove_resource(From)), - lists:foldl( - fun(_Node, {error, _} = Err) -> - Err; - (Node, {result, _}) -> - case mod_pubsub:subscribe_node(LTo, Node, From, From, []) of - {error, _} = Err -> - case is_item_not_found(Err) of - true -> - case mod_pubsub:create_node( - LTo, To#jid.lserver, Node, LFrom, <<"mix">>) of - {result, _} -> - mod_pubsub:subscribe_node(LTo, Node, From, From, []); - Error -> - Error - end; - false -> - Err - end; - {result, _} = Result -> - Result - end - end, {result, []}, Nodes). +-spec process_mix_destroy(iq()) -> iq(). +process_mix_destroy(#iq{to = To, + from = #jid{luser = U, lserver = S}, + sub_els = [#mix_destroy{channel = Chan}]} = IQ) -> + Host = To#jid.lserver, + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + case Mod:get_channel(ServerHost, Chan, Host) of + {ok, {#jid{luser = U, lserver = S}, _, _}} -> + case Mod:del_channel(ServerHost, Chan, Host) of + ok -> + xmpp:make_iq_result(IQ, #mix_destroy{channel = Chan}); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end; + {ok, _} -> + xmpp:make_error(IQ, ownership_error(IQ)); + {error, notfound} -> + xmpp:make_error(IQ, no_channel_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end. -unsubscribe_nodes(From, To, Nodes) -> - LTo = jid:tolower(jid:remove_resource(To)), - BareFrom = jid:remove_resource(From), - lists:foldl( - fun(_Node, {error, _} = Err) -> - Err; - (Node, {result, _} = Result) -> - case mod_pubsub:unsubscribe_node(LTo, Node, From, BareFrom, <<"">>) of - {error, _} = Err -> - case is_not_subscribed(Err) of - true -> Result; - _ -> Err - end; - {result, _} = Res -> - Res - end - end, {result, []}, Nodes). +-spec process_mix_join(iq()) -> iq(). +process_mix_join(#iq{to = To, from = From, + sub_els = [#mix_join{} = JoinReq]} = IQ) -> + Chan = To#jid.luser, + Host = To#jid.lserver, + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + case Mod:get_channel(ServerHost, Chan, Host) of + {ok, {_, _, Key}} -> + ID = make_id(From, Key), + Nick = JoinReq#mix_join.nick, + BFrom = jid:remove_resource(From), + Nodes = filter_nodes(JoinReq#mix_join.subscribe), + try + ok = Mod:set_participant(ServerHost, Chan, Host, BFrom, ID, Nick), + ok = Mod:subscribe(ServerHost, Chan, Host, BFrom, Nodes), + notify_participant_joined(Mod, ServerHost, To, From, ID, Nick), + xmpp:make_iq_result(IQ, #mix_join{id = ID, + subscribe = Nodes, + nick = Nick}) + catch _:{badmatch, {error, db_failure}} -> + xmpp:make_error(IQ, db_error(IQ)) + end; + {error, notfound} -> + xmpp:make_error(IQ, no_channel_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end. -publish_participant(From, To) -> - BareFrom = jid:remove_resource(From), - LFrom = jid:tolower(BareFrom), - LTo = jid:tolower(jid:remove_resource(To)), - Participant = #mix_participant{jid = BareFrom}, - ItemID = str:sha(jid:encode(LFrom)), - mod_pubsub:publish_item( - LTo, To#jid.lserver, ?NS_MIX_NODES_PARTICIPANTS, - From, ItemID, [xmpp:encode(Participant)]). +-spec process_mix_leave(iq()) -> iq(). +process_mix_leave(#iq{to = To, from = From, + sub_els = [#mix_leave{}]} = IQ) -> + {Chan, Host, _} = jid:tolower(To), + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + BFrom = jid:remove_resource(From), + case Mod:get_channel(ServerHost, Chan, Host) of + {ok, _} -> + case Mod:get_participant(ServerHost, Chan, Host, BFrom) of + {ok, {ID, _}} -> + try + ok = Mod:unsubscribe(ServerHost, Chan, Host, BFrom), + ok = Mod:del_participant(ServerHost, Chan, Host, BFrom), + notify_participant_left(Mod, ServerHost, To, ID), + xmpp:make_iq_result(IQ, #mix_leave{}) + catch _:{badmatch, {error, db_failure}} -> + xmpp:make_error(IQ, db_error(IQ)) + end; + {error, notfound} -> + xmpp:make_iq_result(IQ, #mix_leave{}); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end; + {error, notfound} -> + xmpp:make_iq_result(IQ, #mix_leave{}); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end. -delete_presence(From, To) -> - LFrom = jid:tolower(From), - LTo = jid:tolower(jid:remove_resource(To)), - case mod_pubsub:get_items(LTo, ?NS_MIX_NODES_PRESENCE) of - Items when is_list(Items) -> +-spec process_mix_setnick(iq()) -> iq(). +process_mix_setnick(#iq{to = To, from = From, + sub_els = [#mix_setnick{nick = Nick}]} = IQ) -> + {Chan, Host, _} = jid:tolower(To), + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + BFrom = jid:remove_resource(From), + case Mod:get_channel(ServerHost, Chan, Host) of + {ok, _} -> + case Mod:get_participant(ServerHost, Chan, Host, BFrom) of + {ok, {_, Nick}} -> + xmpp:make_iq_result(IQ, #mix_setnick{nick = Nick}); + {ok, {ID, _}} -> + case Mod:set_participant(ServerHost, Chan, Host, BFrom, ID, Nick) of + ok -> + notify_participant_joined(Mod, ServerHost, To, From, ID, Nick), + xmpp:make_iq_result(IQ, #mix_setnick{nick = Nick}); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end; + {error, notfound} -> + xmpp:make_error(IQ, not_joined_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end; + {error, notfound} -> + xmpp:make_error(IQ, no_channel_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end. + +-spec process_mix_message(message()) -> ok. +process_mix_message(#message{from = From, to = To, + id = SubmissionID} = Msg) -> + {Chan, Host, _} = jid:tolower(To), + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + case Mod:get_channel(ServerHost, Chan, Host) of + {ok, _} -> + BFrom = jid:remove_resource(From), + case Mod:get_participant(ServerHost, Chan, Host, BFrom) of + {ok, {_ID, Nick}} -> + MamID = mod_mam:make_id(), + Msg1 = xmpp:put_meta( + xmpp:set_subtag( + Msg#message{from = jid:remove_resource(To), + to = undefined, + id = integer_to_binary(MamID)}, + #mix{jid = BFrom, + nick = Nick, + submission_id = SubmissionID}), + stanza_id, MamID), + case ejabberd_hooks:run_fold( + store_mam_message, ServerHost, Msg1, + [Chan, Host, BFrom, Nick, groupchat, recv]) of + #message{} = Msg2 -> + multicast(Mod, ServerHost, Chan, Host, + ?NS_MIX_NODES_MESSAGES, Msg2); + _ -> + ok + end; + {error, notfound} -> + ejabberd_router:route_error(Msg, not_joined_error(Msg)); + {error, db_failure} -> + ejabberd_router:route_error(Msg, db_error(Msg)) + end; + {error, notfound} -> + ejabberd_router:route_error(Msg, no_channel_error(Msg)); + {error, db_failure} -> + ejabberd_router:route_error(Msg, db_error(Msg)) + end. + +-spec process_participants_list(iq()) -> iq(). +process_participants_list(#iq{from = From, to = To} = IQ) -> + {Chan, Host, _} = jid:tolower(To), + ServerHost = ejabberd_router:host_of_route(Host), + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + case Mod:get_channel(ServerHost, Chan, Host) of + {ok, _} -> + BFrom = jid:remove_resource(From), + case Mod:get_participant(ServerHost, Chan, Host, BFrom) of + {ok, _} -> + case Mod:get_participants(ServerHost, Chan, Host) of + {ok, Participants} -> + Items = items_of_participants(Participants), + Pubsub = #pubsub{ + items = #ps_items{ + node = ?NS_MIX_NODES_PARTICIPANTS, + items = Items}}, + xmpp:make_iq_result(IQ, Pubsub); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end; + {error, notfound} -> + xmpp:make_error(IQ, not_joined_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end; + {error, notfound} -> + xmpp:make_error(IQ, no_channel_error(IQ)); + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end. + +-spec items_of_participants([{jid(), binary(), binary()}]) -> [ps_item()]. +items_of_participants(Participants) -> + lists:map( + fun({JID, ID, Nick}) -> + Participant = #mix_participant{jid = JID, nick = Nick}, + #ps_item{id = ID, + sub_els = [xmpp:encode(Participant)]} + end, Participants). + +-spec known_nodes() -> [binary()]. +known_nodes() -> + [?NS_MIX_NODES_MESSAGES, + ?NS_MIX_NODES_PARTICIPANTS]. + +-spec filter_nodes(binary()) -> [binary()]. +filter_nodes(Nodes) -> + lists:filter( + fun(Node) -> + lists:member(Node, Nodes) + end, known_nodes()). + +-spec multicast(module(), binary(), binary(), + binary(), binary(), message()) -> ok. +multicast(Mod, LServer, Chan, Service, Node, Msg) -> + case Mod:get_subscribed(LServer, Chan, Service, Node) of + {ok, Subscribers} -> lists:foreach( - fun({pubsub_item, {ItemID, _}, _, {_, LJID}, _}) - when LJID == LFrom -> - delete_item(From, To, ?NS_MIX_NODES_PRESENCE, ItemID); - (_) -> - ok - end, Items); - _ -> + fun(To) -> + ejabberd_router:route(Msg#message{to = To}) + end, Subscribers); + {error, db_failure} -> ok end. -delete_participant(From, To) -> - LFrom = jid:tolower(jid:remove_resource(From)), - ItemID = str:sha(jid:encode(LFrom)), - delete_presence(From, To), - delete_item(From, To, ?NS_MIX_NODES_PARTICIPANTS, ItemID). +-spec notify_participant_joined(module(), binary(), + jid(), jid(), binary(), binary()) -> ok. +notify_participant_joined(Mod, LServer, To, From, ID, Nick) -> + {Chan, Host, _} = jid:tolower(To), + Participant = #mix_participant{jid = jid:remove_resource(From), + nick = Nick}, + Item = #ps_item{id = ID, + sub_els = [xmpp:encode(Participant)]}, + Items = #ps_items{node = ?NS_MIX_NODES_PARTICIPANTS, + items = [Item]}, + Event = #ps_event{items = Items}, + Msg = #message{from = jid:remove_resource(To), + id = p1_rand:get_string(), + sub_els = [Event]}, + multicast(Mod, LServer, Chan, Host, ?NS_MIX_NODES_PARTICIPANTS, Msg). -delete_item(From, To, Node, ItemID) -> - LTo = jid:tolower(jid:remove_resource(To)), - case mod_pubsub:delete_item( - LTo, Node, From, ItemID, true) of - {result, _} = Res -> - Res; - {error, _} = Err -> - case is_item_not_found(Err) of - true -> {result, []}; - false -> Err - end - end. +-spec notify_participant_left(module(), binary(), jid(), binary()) -> ok. +notify_participant_left(Mod, LServer, To, ID) -> + {Chan, Host, _} = jid:tolower(To), + Items = #ps_items{node = ?NS_MIX_NODES_PARTICIPANTS, + retract = ID}, + Event = #ps_event{items = Items}, + Msg = #message{from = jid:remove_resource(To), + id = p1_rand:get_string(), + sub_els = [Event]}, + multicast(Mod, LServer, Chan, Host, ?NS_MIX_NODES_PARTICIPANTS, Msg). --spec is_item_not_found({error, stanza_error()}) -> boolean(). -is_item_not_found({error, #stanza_error{reason = 'item-not-found'}}) -> true; -is_item_not_found({error, _}) -> false. +-spec make_id(jid(), binary()) -> binary(). +make_id(JID, Key) -> + Data = jid:encode(jid:tolower(jid:remove_resource(JID))), + xmpp_util:hex(crypto:hmac(sha256, Data, Key, 10)). --spec is_not_subscribed({error, stanza_error()}) -> boolean(). -is_not_subscribed({error, StanzaError}) -> - xmpp:has_subtag(StanzaError, #ps_error{type = 'not-subscribed'}). +%%%=================================================================== +%%% Error generators +%%%=================================================================== +-spec db_error(stanza()) -> stanza_error(). +db_error(Pkt) -> + Txt = <<"Database failure">>, + xmpp:err_internal_server_error(Txt, xmpp:get_lang(Pkt)). -depends(_Host, _Opts) -> - [{mod_pubsub, hard}]. +-spec channel_exists_error(stanza()) -> stanza_error(). +channel_exists_error(Pkt) -> + Txt = <<"Channel already exists">>, + xmpp:err_conflict(Txt, xmpp:get_lang(Pkt)). -mod_opt_type(host) -> fun ejabberd_config:v_host/1; -mod_opt_type(hosts) -> fun ejabberd_config:v_hosts/1. +-spec no_channel_error(stanza()) -> stanza_error(). +no_channel_error(Pkt) -> + Txt = <<"Channel does not exist">>, + xmpp:err_item_not_found(Txt, xmpp:get_lang(Pkt)). -mod_options(_Host) -> - [{host, <<"mix.@HOST@">>}, - {hosts, []}]. +-spec not_joined_error(stanza()) -> stanza_error(). +not_joined_error(Pkt) -> + Txt = <<"You are not joined to the conference">>, + xmpp:err_forbidden(Txt, xmpp:get_lang(Pkt)). + +-spec unsupported_error(stanza()) -> stanza_error(). +unsupported_error(Pkt) -> + Txt = <<"No module is handling this query">>, + xmpp:err_service_unavailable(Txt, xmpp:get_lang(Pkt)). + +-spec ownership_error(stanza()) -> stanza_error(). +ownership_error(Pkt) -> + Txt = <<"Owner privileges required">>, + xmpp:err_forbidden(Txt, xmpp:get_lang(Pkt)). + +%%%=================================================================== +%%% IQ handlers +%%%=================================================================== +-spec register_iq_handlers(binary()) -> ok. +register_iq_handlers(Host) -> + gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO, + ?MODULE, process_disco_info), + gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS, + ?MODULE, process_disco_items), + gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_MIX_CORE_0, + ?MODULE, process_mix_core), + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_DISCO_INFO, + ?MODULE, process_disco_info), + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_DISCO_ITEMS, + ?MODULE, process_disco_items), + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_MIX_CORE_0, + ?MODULE, process_mix_core), + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_PUBSUB, + ?MODULE, process_pubsub_query), + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_MAM_2, + ?MODULE, process_mam_query). + +-spec unregister_iq_handlers(binary()) -> ok. +unregister_iq_handlers(Host) -> + gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO), + gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS), + gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MIX_CORE_0), + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_DISCO_INFO), + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_DISCO_ITEMS), + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MIX_CORE_0), + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_PUBSUB), + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MAM_2). diff --git a/src/mod_mix_mnesia.erl b/src/mod_mix_mnesia.erl new file mode 100644 index 000000000..5786e191f --- /dev/null +++ b/src/mod_mix_mnesia.erl @@ -0,0 +1,189 @@ +%%%------------------------------------------------------------------- +%%% Created : 1 Dec 2018 by Evgeny Khramtsov +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2018 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- +-module(mod_mix_mnesia). +-behaviour(mod_mix). +-compile([{parse_transform, ejabberd_sql_pt}]). + +%% API +-export([init/2]). +-export([set_channel/6, get_channels/2, get_channel/3, del_channel/3]). +-export([set_participant/6, get_participant/4, get_participants/3, del_participant/4]). +-export([subscribe/5, unsubscribe/4, unsubscribe/5, get_subscribed/4]). + +-include("logger.hrl"). +-include("ejabberd_sql_pt.hrl"). + +-record(mix_channel, + {chan_serv :: {binary(), binary()}, + service :: binary(), + creator :: jid:jid(), + hidden :: boolean(), + hmac_key :: binary(), + created_at :: erlang:timestamp()}). + +-record(mix_participant, + {user_chan :: {binary(), binary(), binary(), binary()}, + chan_serv :: {binary(), binary()}, + jid :: jid:jid(), + id :: binary(), + nick :: binary(), + created_at :: erlang:timestamp()}). + +-record(mix_subscription, + {user_chan_node :: {binary(), binary(), binary(), binary(), binary()}, + user_chan :: {binary(), binary(), binary(), binary()}, + chan_serv_node :: {binary(), binary(), binary()}, + chan_serv :: {binary(), binary()}, + jid :: jid:jid()}). + +%%%=================================================================== +%%% API +%%%=================================================================== +init(_Host, _Opts) -> + try + {atomic, _} = ejabberd_mnesia:create( + ?MODULE, mix_channel, + [{disc_only_copies, [node()]}, + {attributes, record_info(fields, mix_channel)}, + {index, [service]}]), + {atomic, _} = ejabberd_mnesia:create( + ?MODULE, mix_participant, + [{disc_only_copies, [node()]}, + {attributes, record_info(fields, mix_participant)}, + {index, [chan_serv]}]), + {atomic, _} = ejabberd_mnesia:create( + ?MODULE, mix_subscription, + [{disc_only_copies, [node()]}, + {attributes, record_info(fields, mix_subscription)}, + {index, [user_chan, chan_serv_node, chan_serv]}]), + ok + catch _:{badmatch, _} -> + {error, db_failure} + end. + +set_channel(_LServer, Channel, Service, CreatorJID, Hidden, Key) -> + mnesia:dirty_write( + #mix_channel{chan_serv = {Channel, Service}, + service = Service, + creator = jid:remove_resource(CreatorJID), + hidden = Hidden, + hmac_key = Key, + created_at = p1_time_compat:timestamp()}). + +get_channels(_LServer, Service) -> + Ret = mnesia:dirty_index_read(mix_channel, Service, #mix_channel.service), + {ok, lists:filtermap( + fun(#mix_channel{chan_serv = {Channel, _}, + hidden = false}) -> + {true, Channel}; + (_) -> + false + end, Ret)}. + +get_channel(_LServer, Channel, Service) -> + case mnesia:dirty_read(mix_channel, {Channel, Service}) of + [#mix_channel{creator = JID, + hidden = Hidden, + hmac_key = Key}] -> + {ok, {JID, Hidden, Key}}; + [] -> + {error, notfound} + end. + +del_channel(_LServer, Channel, Service) -> + Key = {Channel, Service}, + L1 = mnesia:dirty_read(mix_channel, Key), + L2 = mnesia:dirty_index_read(mix_participant, Key, + #mix_participant.chan_serv), + L3 = mnesia:dirty_index_read(mix_subscription, Key, + #mix_subscription.chan_serv), + lists:foreach(fun mnesia:dirty_delete_object/1, L1++L2++L3). + +set_participant(_LServer, Channel, Service, JID, ID, Nick) -> + {User, Domain, _} = jid:tolower(JID), + mnesia:dirty_write( + #mix_participant{ + user_chan = {User, Domain, Channel, Service}, + chan_serv = {Channel, Service}, + jid = jid:remove_resource(JID), + id = ID, + nick = Nick, + created_at = p1_time_compat:timestamp()}). + +get_participant(_LServer, Channel, Service, JID) -> + {User, Domain, _} = jid:tolower(JID), + case mnesia:dirty_read(mix_participant, {User, Domain, Channel, Service}) of + [#mix_participant{id = ID, nick = Nick}] -> {ok, {ID, Nick}}; + [] -> {error, notfound} + end. + +get_participants(_LServer, Channel, Service) -> + Ret = mnesia:dirty_index_read(mix_participant, + {Channel, Service}, + #mix_participant.chan_serv), + {ok, lists:map( + fun(#mix_participant{jid = JID, id = ID, nick = Nick}) -> + {ok, {JID, ID, Nick}} + end, Ret)}. + +del_participant(_LServer, Channel, Service, JID) -> + {User, Domain, _} = jid:tolower(JID), + mnesia:dirty_delete(mix_participant, {User, Domain, Channel, Service}). + +subscribe(_LServer, Channel, Service, JID, Nodes) -> + {User, Domain, _} = jid:tolower(JID), + BJID = jid:remove_resource(JID), + lists:foreach( + fun(Node) -> + mnesia:dirty_write( + #mix_subscription{ + user_chan_node = {User, Domain, Channel, Service, Node}, + user_chan = {User, Domain, Channel, Service}, + chan_serv_node = {Channel, Service, Node}, + chan_serv = {Channel, Service}, + jid = BJID}) + end, Nodes). + +get_subscribed(_LServer, Channel, Service, Node) -> + Ret = mnesia:dirty_index_read(mix_subscription, + {Channel, Service, Node}, + #mix_subscription.chan_serv_node), + {ok, [JID || #mix_subscription{jid = JID} <- Ret]}. + +unsubscribe(_LServer, Channel, Service, JID) -> + {User, Domain, _} = jid:tolower(JID), + Ret = mnesia:dirty_index_read(mix_subscription, + {User, Domain, Channel, Service}, + #mix_subscription.user_chan), + lists:foreach(fun mnesia:dirty_delete_object/1, Ret). + +unsubscribe(_LServer, Channel, Service, JID, Nodes) -> + {User, Domain, _} = jid:tolower(JID), + lists:foreach( + fun(Node) -> + mnesia:dirty_delete(mix_subscription, + {User, Domain, Channel, Service, Node}) + end, Nodes). + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/src/mod_mix_pam.erl b/src/mod_mix_pam.erl new file mode 100644 index 000000000..9bcbfbf21 --- /dev/null +++ b/src/mod_mix_pam.erl @@ -0,0 +1,365 @@ +%%%------------------------------------------------------------------- +%%% Author : Evgeny Khramtsov +%%% Created : 4 Dec 2018 by Evgeny Khramtsov +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2018 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- +-module(mod_mix_pam). +-behaviour(gen_mod). +-protocol({xep, 405, '0.2.1'}). + +%% gen_mod callbacks +-export([start/2, stop/1, reload/3, depends/2, mod_opt_type/1, mod_options/1]). +%% Hooks and handlers +-export([bounce_sm_packet/1, + disco_sm_features/5, + remove_user/2, + process_iq/1]). + +-include("xmpp.hrl"). +-include("logger.hrl"). + +-define(MIX_PAM_CACHE, mix_pam_cache). + +-callback init(binary(), gen_mod:opts()) -> ok | {error, db_failure}. +-callback add_channel(jid(), jid(), binary()) -> ok | {error, db_failure}. +-callback del_channel(jid(), jid()) -> ok | {error, db_failure}. +-callback get_channel(jid(), jid()) -> {ok, binary()} | {error, notfound | db_failure}. +-callback get_channels(jid()) -> {ok, [{jid(), binary()}]} | {error, db_failure}. +-callback del_channels(jid()) -> ok | {error, db_failure}. +-callback use_cache(binary()) -> boolean(). +-callback cache_nodes(binary()) -> [node()]. + +-optional_callbacks([use_cache/1, cache_nodes/1]). + +%%%=================================================================== +%%% API +%%%=================================================================== +start(Host, Opts) -> + Mod = gen_mod:db_mod(Host, Opts, ?MODULE), + case Mod:init(Host, Opts) of + ok -> + init_cache(Mod, Host, Opts), + ejabberd_hooks:add(bounce_sm_packet, Host, ?MODULE, bounce_sm_packet, 50), + ejabberd_hooks:add(disco_sm_features, Host, ?MODULE, disco_sm_features, 50), + ejabberd_hooks:add(remove_user, Host, ?MODULE, remove_user, 50), + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_MIX_PAM_0, + ?MODULE, process_iq); + Err -> + Err + end. + +stop(Host) -> + ejabberd_hooks:delete(bounce_sm_packet, Host, ?MODULE, bounce_sm_packet, 50), + ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, disco_sm_features, 50), + ejabberd_hooks:delete(remove_user, Host, ?MODULE, remove_user, 50), + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MIX_PAM_0). + +reload(Host, NewOpts, OldOpts) -> + NewMod = gen_mod:db_mod(Host, NewOpts, ?MODULE), + OldMod = gen_mod:db_mod(Host, OldOpts, ?MODULE), + if NewMod /= OldMod -> + NewMod:init(Host, NewOpts); + true -> + ok + end, + init_cache(NewMod, Host, NewOpts). + +depends(_Host, _Opts) -> + []. + +mod_opt_type(db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end; +mod_opt_type(O) when O == cache_life_time; O == cache_size -> + fun (I) when is_integer(I), I > 0 -> I; + (infinity) -> infinity + end; +mod_opt_type(O) when O == use_cache; O == cache_missed -> + fun (B) when is_boolean(B) -> B end. + +mod_options(Host) -> + [{db_type, ejabberd_config:default_db(Host, ?MODULE)}, + {use_cache, ejabberd_config:use_cache(Host)}, + {cache_size, ejabberd_config:cache_size(Host)}, + {cache_missed, ejabberd_config:cache_missed(Host)}, + {cache_life_time, ejabberd_config:cache_life_time(Host)}]. + +-spec bounce_sm_packet({term(), stanza()}) -> {term(), stanza()}. +bounce_sm_packet({_, #message{to = #jid{lresource = <<>>} = To, + from = From, + type = groupchat} = Msg} = Acc) -> + case xmpp:has_subtag(Msg, #mix{}) of + true -> + {LUser, LServer, _} = jid:tolower(To), + case get_channel(To, From) of + {ok, _} -> + lists:foreach( + fun(R) -> + To1 = jid:replace_resource(To, R), + ejabberd_router:route(xmpp:set_to(Msg, To1)) + end, ejabberd_sm:get_user_resources(LUser, LServer)), + {pass, Msg}; + _ -> + Acc + end; + false -> + Acc + end; +bounce_sm_packet(Acc) -> + Acc. + +-spec disco_sm_features({error, stanza_error()} | empty | {result, [binary()]}, + jid(), jid(), binary(), binary()) -> + {error, stanza_error()} | empty | {result, [binary()]}. +disco_sm_features({error, _Error} = Acc, _From, _To, _Node, _Lang) -> + Acc; +disco_sm_features(Acc, _From, _To, <<"">>, _Lang) -> + {result, [?NS_MIX_PAM_0 | + case Acc of + {result, Features} -> Features; + empty -> [] + end]}; +disco_sm_features(Acc, _From, _To, _Node, _Lang) -> + Acc. + +-spec process_iq(iq()) -> iq() | ignore. +process_iq(#iq{from = #jid{luser = U1, lserver = S1}, + to = #jid{luser = U2, lserver = S2}} = IQ) + when {U1, S1} /= {U2, S2} -> + xmpp:make_error(IQ, forbidden_query_error(IQ)); +process_iq(#iq{type = set, + sub_els = [#mix_client_join{} = Join]} = IQ) -> + case Join#mix_client_join.channel of + undefined -> + xmpp:make_error(IQ, missing_channel_error(IQ)); + _ -> + process_join(IQ) + end; +process_iq(#iq{type = set, + sub_els = [#mix_client_leave{} = Leave]} = IQ) -> + case Leave#mix_client_leave.channel of + undefined -> + xmpp:make_error(IQ, missing_channel_error(IQ)); + _ -> + process_leave(IQ) + end; +process_iq(IQ) -> + xmpp:make_error(IQ, unsupported_query_error(IQ)). + +-spec remove_user(binary(), binary()) -> ok | {error, db_failure}. +remove_user(LUser, LServer) -> + Mod = gen_mod:db_mod(LServer, ?MODULE), + JID = jid:make(LUser, LServer), + Chans = case Mod:get_channels(JID) of + {ok, Channels} -> + lists:map( + fun({Channel, _}) -> + ejabberd_router:route( + #iq{from = JID, + to = Channel, + id = p1_rand:get_string(), + type = set, + sub_els = [#mix_leave{}]}), + Channel + end, Channels); + _ -> + [] + end, + Mod:del_channels(jid:make(LUser, LServer)), + lists:foreach( + fun(Chan) -> + delete_cache(Mod, JID, Chan) + end, Chans). + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +-spec process_join(iq()) -> ignore. +process_join(#iq{from = From, + sub_els = [#mix_client_join{channel = Channel, + join = Join}]} = IQ) -> + ejabberd_router:route_iq( + #iq{from = jid:remove_resource(From), + to = Channel, type = set, sub_els = [Join]}, + fun(ResIQ) -> process_join_result(ResIQ, IQ) end), + ignore. + +-spec process_leave(iq()) -> iq() | error. +process_leave(#iq{from = From, + sub_els = [#mix_client_leave{channel = Channel, + leave = Leave}]} = IQ) -> + case del_channel(From, Channel) of + ok -> + ejabberd_router:route_iq( + #iq{from = jid:remove_resource(From), + to = Channel, type = set, sub_els = [Leave]}, + fun(ResIQ) -> process_leave_result(ResIQ, IQ) end), + ignore; + {error, db_failure} -> + xmpp:make_error(IQ, db_error(IQ)) + end. + +-spec process_join_result(iq(), iq()) -> ok. +process_join_result(#iq{from = Channel, + type = result, sub_els = [#mix_join{id = ID} = Join]}, + #iq{to = To} = IQ) -> + case add_channel(To, Channel, ID) of + ok -> + ChanID = make_channel_id(Channel, ID), + Join1 = Join#mix_join{id = <<"">>, jid = ChanID}, + ResIQ = xmpp:make_iq_result(IQ, #mix_client_join{join = Join1}), + ejabberd_router:route(ResIQ); + {error, db_failure} -> + ejabberd_router:route_error(IQ, db_error(IQ)) + end; +process_join_result(Err, IQ) -> + process_iq_error(Err, IQ). + +-spec process_leave_result(iq(), iq()) -> ok. +process_leave_result(#iq{type = result, sub_els = [#mix_leave{} = Leave]}, IQ) -> + ResIQ = xmpp:make_iq_result(IQ, #mix_client_leave{leave = Leave}), + ejabberd_router:route(ResIQ); +process_leave_result(Err, IQ) -> + process_iq_error(Err, IQ). + +-spec process_iq_error(iq(), iq()) -> ok. +process_iq_error(#iq{type = error} = ErrIQ, #iq{sub_els = [El]} = IQ) -> + case xmpp:get_error(ErrIQ) of + undefined -> + %% Not sure if this stuff is correct because + %% RFC6120 section 8.3.1 bullet 4 states that + %% an error stanza MUST contain an child element + IQ1 = xmpp:make_iq_result(IQ, El), + ejabberd_router:route(IQ1#iq{type = error}); + Err -> + ejabberd_router:route_error(IQ, Err) + end; +process_iq_error(timeout, IQ) -> + Txt = <<"Request has timed out">>, + Err = xmpp:err_recipient_unavailable(Txt, IQ#iq.lang), + ejabberd_router:route_error(IQ, Err). + +-spec make_channel_id(jid(), binary()) -> jid(). +make_channel_id(JID, ID) -> + {U, S, R} = jid:split(JID), + jid:make(<>, S, R). + +%%%=================================================================== +%%% Error generators +%%%=================================================================== +-spec missing_channel_error(stanza()) -> stanza_error(). +missing_channel_error(Pkt) -> + Txt = <<"Attribute 'channel' is required for this request">>, + xmpp:err_bad_request(Txt, xmpp:get_lang(Pkt)). + +-spec forbidden_query_error(stanza()) -> stanza_error(). +forbidden_query_error(Pkt) -> + Txt = <<"Query to another users is forbidden">>, + xmpp:err_forbidden(Txt, xmpp:get_lang(Pkt)). + +-spec unsupported_query_error(stanza()) -> stanza_error(). +unsupported_query_error(Pkt) -> + Txt = <<"No module is handling this query">>, + xmpp:err_service_unavailable(Txt, xmpp:get_lang(Pkt)). + +-spec db_error(stanza()) -> stanza_error(). +db_error(Pkt) -> + Txt = <<"Database failure">>, + xmpp:err_internal_server_error(Txt, xmpp:get_lang(Pkt)). + +%%%=================================================================== +%%% Database queries +%%%=================================================================== +get_channel(JID, Channel) -> + {LUser, LServer, _} = jid:tolower(JID), + {Chan, Service, _} = jid:tolower(Channel), + Mod = gen_mod:db_mod(LServer, ?MODULE), + case use_cache(Mod, LServer) of + false -> Mod:get_channel(JID, Channel); + true -> + case ets_cache:lookup( + ?MIX_PAM_CACHE, {LUser, LServer, Chan, Service}, + fun() -> Mod:get_channel(JID, Channel) end) of + error -> {error, notfound}; + Ret -> Ret + end + end. + +add_channel(JID, Channel, ID) -> + Mod = gen_mod:db_mod(JID#jid.lserver, ?MODULE), + case Mod:add_channel(JID, Channel, ID) of + ok -> delete_cache(Mod, JID, Channel); + Err -> Err + end. + +del_channel(JID, Channel) -> + Mod = gen_mod:db_mod(JID#jid.lserver, ?MODULE), + case Mod:del_channel(JID, Channel) of + ok -> delete_cache(Mod, JID, Channel); + Err -> Err + end. + +%%%=================================================================== +%%% Cache management +%%%=================================================================== +-spec init_cache(module(), binary(), gen_mod:opts()) -> ok. +init_cache(Mod, Host, Opts) -> + case use_cache(Mod, Host) of + true -> + CacheOpts = cache_opts(Opts), + ets_cache:new(?MIX_PAM_CACHE, CacheOpts); + false -> + ets_cache:delete(?MIX_PAM_CACHE) + end. + +-spec cache_opts(gen_mod:opts()) -> [proplists:property()]. +cache_opts(Opts) -> + MaxSize = gen_mod:get_opt(cache_size, Opts), + CacheMissed = gen_mod:get_opt(cache_missed, Opts), + LifeTime = case gen_mod:get_opt(cache_life_time, Opts) of + infinity -> infinity; + I -> timer:seconds(I) + end, + [{max_size, MaxSize}, {cache_missed, CacheMissed}, {life_time, LifeTime}]. + +-spec use_cache(module(), binary()) -> boolean(). +use_cache(Mod, Host) -> + case erlang:function_exported(Mod, use_cache, 1) of + true -> Mod:use_cache(Host); + false -> gen_mod:get_module_opt(Host, ?MODULE, use_cache) + end. + +-spec cache_nodes(module(), binary()) -> [node()]. +cache_nodes(Mod, Host) -> + case erlang:function_exported(Mod, cache_nodes, 1) of + true -> Mod:cache_nodes(Host); + false -> ejabberd_cluster:get_nodes() + end. + +-spec delete_cache(module(), jid(), jid()) -> ok. +delete_cache(Mod, JID, Channel) -> + {LUser, LServer, _} = jid:tolower(JID), + {Chan, Service, _} = jid:tolower(Channel), + case use_cache(Mod, LServer) of + true -> + ets_cache:delete(?MIX_PAM_CACHE, + {LUser, LServer, Chan, Service}, + cache_nodes(Mod, LServer)); + false -> + ok + end. diff --git a/src/mod_mix_pam_mnesia.erl b/src/mod_mix_pam_mnesia.erl new file mode 100644 index 000000000..568c4b9fa --- /dev/null +++ b/src/mod_mix_pam_mnesia.erl @@ -0,0 +1,91 @@ +%%%------------------------------------------------------------------- +%%% Author : Evgeny Khramtsov +%%% Created : 4 Dec 2018 by Evgeny Khramtsov +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2018 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- +-module(mod_mix_pam_mnesia). +-behaviour(mod_mix_pam). + +%% API +-export([init/2, add_channel/3, get_channel/2, + get_channels/1, del_channel/2, del_channels/1, + use_cache/1]). + +-record(mix_pam, {user_channel :: {binary(), binary(), binary(), binary()}, + user :: {binary(), binary()}, + id :: binary()}). + +%%%=================================================================== +%%% API +%%%=================================================================== +init(_Host, _Opts) -> + case ejabberd_mnesia:create(?MODULE, mix_pam, + [{disc_only_copies, [node()]}, + {attributes, record_info(fields, mix_pam)}, + {index, [user]}]) of + {atomic, _} -> ok; + _ -> {error, db_failure} + end. + +use_cache(Host) -> + case mnesia:table_info(mix_pam, storage_type) of + disc_only_copies -> + gen_mod:get_module_opt(Host, mod_mix_pam, use_cache); + _ -> + false + end. + +add_channel(User, Channel, ID) -> + {LUser, LServer, _} = jid:tolower(User), + {Chan, Service, _} = jid:tolower(Channel), + mnesia:dirty_write(#mix_pam{user_channel = {LUser, LServer, Chan, Service}, + user = {LUser, LServer}, + id = ID}). + +get_channel(User, Channel) -> + {LUser, LServer, _} = jid:tolower(User), + {Chan, Service, _} = jid:tolower(Channel), + case mnesia:dirty_read(mix_pam, {LUser, LServer, Chan, Service}) of + [#mix_pam{id = ID}] -> {ok, ID}; + [] -> {error, notfound} + end. + +get_channels(User) -> + {LUser, LServer, _} = jid:tolower(User), + Ret = mnesia:dirty_index_read(mix_pam, #mix_pam.user, {LUser, LServer}), + {ok, lists:map( + fun(#mix_pam{user_channel = {_, _, Chan, Service}, + id = ID}) -> + {jid:make(Chan, Service), ID} + end, Ret)}. + +del_channel(User, Channel) -> + {LUser, LServer, _} = jid:tolower(User), + {Chan, Service, _} = jid:tolower(Channel), + mnesia:dirty_delete(mix_pam, {LUser, LServer, Chan, Service}). + +del_channels(User) -> + {LUser, LServer, _} = jid:tolower(User), + Ret = mnesia:dirty_index_read(mix_pam, #mix_pam.user, {LUser, LServer}), + lists:foreach(fun mnesia:dirty_delete_object/1, Ret). + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/src/mod_mix_pam_sql.erl b/src/mod_mix_pam_sql.erl new file mode 100644 index 000000000..eda5966f1 --- /dev/null +++ b/src/mod_mix_pam_sql.erl @@ -0,0 +1,114 @@ +%%%------------------------------------------------------------------- +%%% Author : Evgeny Khramtsov +%%% Created : 4 Dec 2018 by Evgeny Khramtsov +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2018 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- +-module(mod_mix_pam_sql). +-behaviour(mod_mix_pam). +-compile([{parse_transform, ejabberd_sql_pt}]). + +%% API +-export([init/2, add_channel/3, get_channel/2, + get_channels/1, del_channel/2, del_channels/1]). + +-include("logger.hrl"). +-include("ejabberd_sql_pt.hrl"). + +%%%=================================================================== +%%% API +%%%=================================================================== +init(_Host, _Opts) -> + %% TODO + ok. + +add_channel(User, Channel, ID) -> + {LUser, LServer, _} = jid:tolower(User), + {Chan, Service, _} = jid:tolower(Channel), + case ?SQL_UPSERT(LServer, "mix_pam", + ["!channel=%(Chan)s", + "!service=%(Service)s", + "!username=%(LUser)s", + "!server_host=%(LServer)s", + "id=%(ID)s"]) of + ok -> ok; + _Err -> {error, db_failure} + end. + +get_channel(User, Channel) -> + {LUser, LServer, _} = jid:tolower(User), + {Chan, Service, _} = jid:tolower(Channel), + case ejabberd_sql:sql_query( + LServer, + ?SQL("select @(id)s from mix_pam where " + "channel=%(Chan)s and service=%(Service)s " + "and username=%(LUser)s and %(LServer)H")) of + {selected, [{ID}]} -> {ok, ID}; + {selected, []} -> {error, notfound}; + _Err -> {error, db_failure} + end. + +get_channels(User) -> + {LUser, LServer, _} = jid:tolower(User), + SQL = ?SQL("select @(channel)s, @(service)s, @(id)s from mix_pam " + "where username=%(LUser)s and %(LServer)H"), + case ejabberd_sql:sql_query(LServer, SQL) of + {selected, Ret} -> + {ok, lists:filtermap( + fun({Chan, Service, ID}) -> + case jid:make(Chan, Service) of + error -> + report_corrupted(SQL), + false; + JID -> + {true, {JID, ID}} + end + end, Ret)}; + _Err -> + {error, db_failure} + end. + +del_channel(User, Channel) -> + {LUser, LServer, _} = jid:tolower(User), + {Chan, Service, _} = jid:tolower(Channel), + case ejabberd_sql:sql_query( + LServer, + ?SQL("delete from mix_pam where " + "channel=%(Chan)s and service=%(Service)s " + "and username=%(LUser)s and %(LServer)H")) of + {updated, _} -> ok; + _Err -> {error, db_failure} + end. + +del_channels(User) -> + {LUser, LServer, _} = jid:tolower(User), + case ejabberd_sql:sql_query( + LServer, + ?SQL("delete from mix_pam where " + "username=%(LUser)s and %(LServer)H")) of + {updated, _} -> ok; + _Err -> {error, db_failure} + end. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +-spec report_corrupted(iolist()) -> ok. +report_corrupted(SQL) -> + ?ERROR_MSG("Corrupted values returned by SQL request: ~s", [SQL]). diff --git a/src/mod_mix_sql.erl b/src/mod_mix_sql.erl new file mode 100644 index 000000000..16f7c0d17 --- /dev/null +++ b/src/mod_mix_sql.erl @@ -0,0 +1,236 @@ +%%%------------------------------------------------------------------- +%%% Created : 1 Dec 2018 by Evgeny Khramtsov +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2018 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- +-module(mod_mix_sql). +-behaviour(mod_mix). +-compile([{parse_transform, ejabberd_sql_pt}]). + +%% API +-export([init/2]). +-export([set_channel/6, get_channels/2, get_channel/3, del_channel/3]). +-export([set_participant/6, get_participant/4, get_participants/3, del_participant/4]). +-export([subscribe/5, unsubscribe/4, unsubscribe/5, get_subscribed/4]). + +-include("logger.hrl"). +-include("ejabberd_sql_pt.hrl"). + +%%%=================================================================== +%%% API +%%%=================================================================== +init(_Host, _Opts) -> + %% TODO + ok. + +set_channel(LServer, Channel, Service, CreatorJID, Hidden, Key) -> + {User, Domain, _} = jid:tolower(CreatorJID), + RawJID = jid:encode(jid:remove_resource(CreatorJID)), + case ?SQL_UPSERT(LServer, "mix_channel", + ["!channel=%(Channel)s", + "!service=%(Service)s", + "username=%(User)s", + "domain=%(Domain)s", + "jid=%(RawJID)s", + "hidden=%(Hidden)b", + "hmac_key=%(Key)s"]) of + ok -> ok; + _Err -> {error, db_failure} + end. + +get_channels(LServer, Service) -> + case ejabberd_sql:sql_query( + LServer, + ?SQL("select @(channel)s, @(hidden)b from mix_channel " + "where service=%(Service)s")) of + {selected, Ret} -> + {ok, [Channel || {Channel, Hidden} <- Ret, Hidden == false]}; + _Err -> + {error, db_failure} + end. + +get_channel(LServer, Channel, Service) -> + SQL = ?SQL("select @(jid)s, @(hidden)b, @(hmac_key)s from mix_channel " + "where channel=%(Channel)s and service=%(Service)s"), + case ejabberd_sql:sql_query(LServer, SQL) of + {selected, [{RawJID, Hidden, Key}]} -> + try jid:decode(RawJID) of + JID -> {ok, {JID, Hidden, Key}} + catch _:{bad_jid, _} -> + report_corrupted(jid, SQL), + {error, db_failure} + end; + {selected, []} -> {error, notfound}; + _Err -> {error, db_failure} + end. + +del_channel(LServer, Channel, Service) -> + F = fun() -> + ejabberd_sql:sql_query_t( + ?SQL("delete from mix_channel where " + "channel=%(Channel)s and service=%(Service)s")), + ejabberd_sql:sql_query_t( + ?SQL("delete from mix_participant where " + "channel=%(Channel)s and service=%(Service)s")), + ejabberd_sql:sql_query_t( + ?SQL("delete from mix_subscription where " + "channel=%(Channel)s and service=%(Service)s")) + end, + case ejabberd_sql:sql_transaction(LServer, F) of + {atomic, _} -> ok; + _Err -> {error, db_failure} + end. + +set_participant(LServer, Channel, Service, JID, ID, Nick) -> + {User, Domain, _} = jid:tolower(JID), + RawJID = jid:encode(jid:remove_resource(JID)), + case ?SQL_UPSERT(LServer, "mix_participant", + ["!channel=%(Channel)s", + "!service=%(Service)s", + "!username=%(User)s", + "!domain=%(Domain)s", + "jid=%(RawJID)s", + "id=%(ID)s", + "nick=%(Nick)s"]) of + ok -> ok; + _Err -> {error, db_failure} + end. + +get_participant(LServer, Channel, Service, JID) -> + {User, Domain, _} = jid:tolower(JID), + case ejabberd_sql:sql_query( + LServer, + ?SQL("select @(id)s, @(nick)s from mix_participant " + "where channel=%(Channel)s and service=%(Service)s " + "and username=%(User)s and domain=%(Domain)s")) of + {selected, [Ret]} -> {ok, Ret}; + {selected, []} -> {error, notfound}; + _Err -> {error, db_failure} + end. + +get_participants(LServer, Channel, Service) -> + SQL = ?SQL("select @(jid)s, @(id)s, @(nick)s from mix_participant " + "where channel=%(Channel)s and service=%(Service)s"), + case ejabberd_sql:sql_query(LServer, SQL) of + {selected, Ret} -> + {ok, lists:filtermap( + fun({RawJID, ID, Nick}) -> + try jid:decode(RawJID) of + JID -> {true, {JID, ID, Nick}} + catch _:{bad_jid, _} -> + report_corrupted(jid, SQL), + false + end + end, Ret)}; + _Err -> + {error, db_failure} + end. + +del_participant(LServer, Channel, Service, JID) -> + {User, Domain, _} = jid:tolower(JID), + case ejabberd_sql:sql_query( + LServer, + ?SQL("delete from mix_participant where " + "channel=%(Channel)s and service=%(Service)s " + "and username=%(User)s and domain=%(Domain)s")) of + {updated, _} -> ok; + _Err -> {error, db_failure} + end. + +subscribe(_LServer, _Channel, _Service, _JID, []) -> + ok; +subscribe(LServer, Channel, Service, JID, Nodes) -> + {User, Domain, _} = jid:tolower(JID), + RawJID = jid:encode(jid:remove_resource(JID)), + F = fun() -> + lists:foreach( + fun(Node) -> + ?SQL_UPSERT_T( + "mix_subscription", + ["!channel=%(Channel)s", + "!service=%(Service)s", + "!username=%(User)s", + "!domain=%(Domain)s", + "!node=%(Node)s", + "jid=%(RawJID)s"]) + end, Nodes) + end, + case ejabberd_sql:sql_transaction(LServer, F) of + {atomic, _} -> ok; + _Err -> {error, db_failure} + end. + +get_subscribed(LServer, Channel, Service, Node) -> + SQL = ?SQL("select @(jid)s from mix_subscription " + "where channel=%(Channel)s and service=%(Service)s " + "and node=%(Node)s"), + case ejabberd_sql:sql_query(LServer, SQL) of + {selected, Ret} -> + {ok, lists:filtermap( + fun({RawJID}) -> + try jid:decode(RawJID) of + JID -> {true, JID} + catch _:{bad_jid, _} -> + report_corrupted(jid, SQL), + false + end + end, Ret)}; + _Err -> + {error, db_failure} + end. + +unsubscribe(LServer, Channel, Service, JID) -> + {User, Domain, _} = jid:tolower(JID), + case ejabberd_sql:sql_query( + LServer, + ?SQL("delete from mix_subscription " + "where channel=%(Channel)s and service=%(Service)s " + "and username=%(User)s and domain=%(Domain)s")) of + {updated, _} -> ok; + _Err -> {error, db_failure} + end. + +unsubscribe(_LServer, _Channel, _Service, _JID, []) -> + ok; +unsubscribe(LServer, Channel, Service, JID, Nodes) -> + {User, Domain, _} = jid:tolower(JID), + F = fun() -> + lists:foreach( + fun(Node) -> + ejabberd_sql:sql_query_t( + ?SQL("delete from mix_subscription " + "where channel=%(Channel)s " + "and service=%(Service)s " + "and username=%(User)s " + "and domain=%(Domain)s " + "and node=%(Node)s")) + end, Nodes) + end, + case ejabberd_sql:sql_transaction(LServer, F) of + {atomic, ok} -> ok; + _Err -> {error, db_failure} + end. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +-spec report_corrupted(atom(), iolist()) -> ok. +report_corrupted(Column, SQL) -> + ?ERROR_MSG("Corrupted value of '~s' column returned by " + "SQL request: ~s", [Column, SQL]). diff --git a/src/mod_push.erl b/src/mod_push.erl index cadce92b0..5e75e0f9d 100644 --- a/src/mod_push.erl +++ b/src/mod_push.erl @@ -34,7 +34,7 @@ %% ejabberd_hooks callbacks. -export([disco_sm_features/5, c2s_session_pending/1, c2s_copy_session/2, - c2s_handle_cast/2, c2s_stanza/3, mam_message/6, offline_message/1, + c2s_handle_cast/2, c2s_stanza/3, mam_message/7, offline_message/1, remove_user/2]). %% gen_iq_handler callback. @@ -352,8 +352,8 @@ c2s_stanza(State, _Pkt, _SendResult) -> State. -spec mam_message(message() | drop, binary(), binary(), jid(), - chat | groupchat, recv | send) -> message(). -mam_message(#message{} = Pkt, LUser, LServer, _Peer, chat, Dir) -> + binary(), chat | groupchat, recv | send) -> message(). +mam_message(#message{} = Pkt, LUser, LServer, _Peer, _Nick, chat, Dir) -> case lookup_sessions(LUser, LServer) of {ok, [_|_] = Clients} -> case drop_online_sessions(LUser, LServer, Clients) of @@ -367,7 +367,7 @@ mam_message(#message{} = Pkt, LUser, LServer, _Peer, chat, Dir) -> ok end, Pkt; -mam_message(Pkt, _LUser, _LServer, _Peer, _Type, _Dir) -> +mam_message(Pkt, _LUser, _LServer, _Peer, _Nick, _Type, _Dir) -> Pkt. -spec offline_message(message()) -> message().