Merge branch 'mix'

This commit is contained in:
Evgeny Khramtsov 2019-02-21 12:36:33 +03:00
commit 1684436bfe
16 changed files with 2044 additions and 53 deletions

View File

@ -410,3 +410,57 @@ CREATE TABLE push_session (
);
CREATE UNIQUE INDEX i_push_session_susn ON push_session (server_host, username, service, node);
CREATE TABLE mix_channel (
channel text NOT NULL,
service text NOT NULL,
username text NOT NULL,
domain text NOT NULL,
jid text NOT NULL,
hidden boolean NOT NULL,
hmac_key text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE UNIQUE INDEX i_mix_channel ON mix_channel (channel, service);
CREATE INDEX i_mix_channel_serv ON mix_channel (service);
CREATE TABLE mix_participant (
channel text NOT NULL,
service text NOT NULL,
username text NOT NULL,
domain text NOT NULL,
jid text NOT NULL,
id text NOT NULL,
nick text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE UNIQUE INDEX i_mix_participant ON mix_participant (channel, service, username, domain);
CREATE INDEX i_mix_participant_chan_serv ON mix_participant (channel, service);
CREATE TABLE mix_subscription (
channel text NOT NULL,
service text NOT NULL,
username text NOT NULL,
domain text NOT NULL,
node text NOT NULL,
jid text NOT NULL
);
CREATE UNIQUE INDEX i_mix_subscription ON mix_subscription (channel, service, username, domain, node);
CREATE INDEX i_mix_subscription_chan_serv_ud ON mix_subscription (channel, service, username, domain);
CREATE INDEX i_mix_subscription_chan_serv_node ON mix_subscription (channel, service, node);
CREATE INDEX i_mix_subscription_chan_serv ON mix_subscription (channel, service);
CREATE TABLE mix_pam (
username text NOT NULL,
server_host text NOT NULL,
channel text NOT NULL,
service text NOT NULL,
id text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE UNIQUE INDEX i_mix_pam ON mix_pam (username, server_host, channel, service);
CREATE INDEX i_mix_pam_us ON mix_pam (username, server_host);

View File

@ -379,3 +379,56 @@ CREATE TABLE push_session (
CREATE UNIQUE INDEX i_push_usn ON push_session (username, service, node);
CREATE UNIQUE INDEX i_push_ut ON push_session (username, timestamp);
CREATE TABLE mix_channel (
channel text NOT NULL,
service text NOT NULL,
username text NOT NULL,
domain text NOT NULL,
jid text NOT NULL,
hidden boolean NOT NULL,
hmac_key text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE UNIQUE INDEX i_mix_channel ON mix_channel (channel, service);
CREATE INDEX i_mix_channel_serv ON mix_channel (service);
CREATE TABLE mix_participant (
channel text NOT NULL,
service text NOT NULL,
username text NOT NULL,
domain text NOT NULL,
jid text NOT NULL,
id text NOT NULL,
nick text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE UNIQUE INDEX i_mix_participant ON mix_participant (channel, service, username, domain);
CREATE INDEX i_mix_participant_chan_serv ON mix_participant (channel, service);
CREATE TABLE mix_subscription (
channel text NOT NULL,
service text NOT NULL,
username text NOT NULL,
domain text NOT NULL,
node text NOT NULL,
jid text NOT NULL
);
CREATE UNIQUE INDEX i_mix_subscription ON mix_subscription (channel, service, username, domain, node);
CREATE INDEX i_mix_subscription_chan_serv_ud ON mix_subscription (channel, service, username, domain);
CREATE INDEX i_mix_subscription_chan_serv_node ON mix_subscription (channel, service, node);
CREATE INDEX i_mix_subscription_chan_serv ON mix_subscription (channel, service);
CREATE TABLE mix_pam (
username text NOT NULL,
channel text NOT NULL,
service text NOT NULL,
id text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE UNIQUE INDEX i_mix_pam ON mix_pam (username, channel, service);
CREATE INDEX i_mix_pam_us ON mix_pam (username);

View File

@ -426,3 +426,57 @@ CREATE TABLE push_session (
);
CREATE UNIQUE INDEX i_push_session_susn ON push_session (server_host(191), username(191), service(191), node(191));
CREATE TABLE mix_channel (
channel text NOT NULL,
service text NOT NULL,
username text NOT NULL,
domain text NOT NULL,
jid text NOT NULL,
hidden boolean NOT NULL,
hmac_key text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE UNIQUE INDEX i_mix_channel ON mix_channel (channel(191), service(191));
CREATE INDEX i_mix_channel_serv ON mix_channel (service(191));
CREATE TABLE mix_participant (
channel text NOT NULL,
service text NOT NULL,
username text NOT NULL,
domain text NOT NULL,
jid text NOT NULL,
id text NOT NULL,
nick text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE UNIQUE INDEX i_mix_participant ON mix_participant (channel(191), service(191), username(191), domain(191));
CREATE INDEX i_mix_participant_chan_serv ON mix_participant (channel(191), service(191));
CREATE TABLE mix_subscription (
channel text NOT NULL,
service text NOT NULL,
username text NOT NULL,
domain text NOT NULL,
node text NOT NULL,
jid text NOT NULL
) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE UNIQUE INDEX i_mix_subscription ON mix_subscription (channel(153), service(153), username(153), domain(153), node(153));
CREATE INDEX i_mix_subscription_chan_serv_ud ON mix_subscription (channel(191), service(191), username(191), domain(191));
CREATE INDEX i_mix_subscription_chan_serv_node ON mix_subscription (channel(191), service(191), node(191));
CREATE INDEX i_mix_subscription_chan_serv ON mix_subscription (channel(191), service(191));
CREATE TABLE mix_pam (
username text NOT NULL,
server_host text NOT NULL,
channel text NOT NULL,
service text NOT NULL,
id text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE UNIQUE INDEX i_mix_pam ON mix_pam (username(191), server_host(191), channel(191), service(191));
CREATE INDEX i_mix_pam_us ON mix_pam (username(191), server_host(191));

View File

@ -395,3 +395,56 @@ CREATE TABLE push_session (
CREATE UNIQUE INDEX i_push_usn ON push_session (username(191), service(191), node(191));
CREATE UNIQUE INDEX i_push_ut ON push_session (username(191), timestamp);
CREATE TABLE mix_channel (
channel text NOT NULL,
service text NOT NULL,
username text NOT NULL,
domain text NOT NULL,
jid text NOT NULL,
hidden boolean NOT NULL,
hmac_key text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE UNIQUE INDEX i_mix_channel ON mix_channel (channel(191), service(191));
CREATE INDEX i_mix_channel_serv ON mix_channel (service(191));
CREATE TABLE mix_participant (
channel text NOT NULL,
service text NOT NULL,
username text NOT NULL,
domain text NOT NULL,
jid text NOT NULL,
id text NOT NULL,
nick text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE UNIQUE INDEX i_mix_participant ON mix_participant (channel(191), service(191), username(191), domain(191));
CREATE INDEX i_mix_participant_chan_serv ON mix_participant (channel(191), service(191));
CREATE TABLE mix_subscription (
channel text NOT NULL,
service text NOT NULL,
username text NOT NULL,
domain text NOT NULL,
node text NOT NULL,
jid text NOT NULL
) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE UNIQUE INDEX i_mix_subscription ON mix_subscription (channel(153), service(153), username(153), domain(153), node(153));
CREATE INDEX i_mix_subscription_chan_serv_ud ON mix_subscription (channel(191), service(191), username(191), domain(191));
CREATE INDEX i_mix_subscription_chan_serv_node ON mix_subscription (channel(191), service(191), node(191));
CREATE INDEX i_mix_subscription_chan_serv ON mix_subscription (channel(191), service(191));
CREATE TABLE mix_pam (
username text NOT NULL,
channel text NOT NULL,
service text NOT NULL,
id text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE UNIQUE INDEX i_mix_pam ON mix_pam (username(191), channel(191), service(191));
CREATE INDEX i_mix_pam_u ON mix_pam (username(191));

View File

@ -571,3 +571,57 @@ CREATE TABLE push_session (
);
CREATE UNIQUE INDEX i_push_session_susn ON push_session USING btree (server_host, username, service, node);
CREATE TABLE mix_channel (
channel text NOT NULL,
service text NOT NULL,
username text NOT NULL,
domain text NOT NULL,
jid text NOT NULL,
hidden boolean NOT NULL,
hmac_key text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE UNIQUE INDEX i_mix_channel ON mix_channel (channel, service);
CREATE INDEX i_mix_channel_serv ON mix_channel (service);
CREATE TABLE mix_participant (
channel text NOT NULL,
service text NOT NULL,
username text NOT NULL,
domain text NOT NULL,
jid text NOT NULL,
id text NOT NULL,
nick text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE UNIQUE INDEX i_mix_participant ON mix_participant (channel, service, username, domain);
CREATE INDEX i_mix_participant_chan_serv ON mix_participant (channel, service);
CREATE TABLE mix_subscription (
channel text NOT NULL,
service text NOT NULL,
username text NOT NULL,
domain text NOT NULL,
node text NOT NULL,
jid text NOT NULL
);
CREATE UNIQUE INDEX i_mix_subscription ON mix_subscription (channel, service, username, domain, node);
CREATE INDEX i_mix_subscription_chan_serv_ud ON mix_subscription (channel, service, username, domain);
CREATE INDEX i_mix_subscription_chan_serv_node ON mix_subscription (channel, service, node);
CREATE INDEX i_mix_subscription_chan_serv ON mix_subscription (channel, service);
CREATE TABLE mix_pam (
username text NOT NULL,
server_host text NOT NULL,
channel text NOT NULL,
service text NOT NULL,
id text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE UNIQUE INDEX i_mix_pam ON mix_pam (username, server_host, channel, service);
CREATE INDEX i_mix_pam_us ON mix_pam (username, server_host);

View File

@ -399,3 +399,56 @@ CREATE TABLE push_session (
CREATE UNIQUE INDEX i_push_usn ON push_session USING btree (username, service, node);
CREATE UNIQUE INDEX i_push_ut ON push_session USING btree (username, timestamp);
CREATE TABLE mix_channel (
channel text NOT NULL,
service text NOT NULL,
username text NOT NULL,
domain text NOT NULL,
jid text NOT NULL,
hidden boolean NOT NULL,
hmac_key text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE UNIQUE INDEX i_mix_channel ON mix_channel (channel, service);
CREATE INDEX i_mix_channel_serv ON mix_channel (service);
CREATE TABLE mix_participant (
channel text NOT NULL,
service text NOT NULL,
username text NOT NULL,
domain text NOT NULL,
jid text NOT NULL,
id text NOT NULL,
nick text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE UNIQUE INDEX i_mix_participant ON mix_participant (channel, service, username, domain);
CREATE INDEX i_mix_participant_chan_serv ON mix_participant (channel, service);
CREATE TABLE mix_subscription (
channel text NOT NULL,
service text NOT NULL,
username text NOT NULL,
domain text NOT NULL,
node text NOT NULL,
jid text NOT NULL
);
CREATE UNIQUE INDEX i_mix_subscription ON mix_subscription (channel, service, username, domain, node);
CREATE INDEX i_mix_subscription_chan_serv_ud ON mix_subscription (channel, service, username, domain);
CREATE INDEX i_mix_subscription_chan_serv_node ON mix_subscription (channel, service, node);
CREATE INDEX i_mix_subscription_chan_serv ON mix_subscription (channel, service);
CREATE TABLE mix_pam (
username text NOT NULL,
channel text NOT NULL,
service text NOT NULL,
id text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE UNIQUE INDEX i_mix_pam ON mix_pam (username, channel, service);
CREATE INDEX i_mix_pam_us ON mix_pam (username);

View File

@ -44,6 +44,7 @@
close_session/4,
check_in_subscription/2,
bounce_offline_message/1,
bounce_sm_packet/1,
disconnect_removed_user/2,
get_user_resources/2,
get_user_present_resources/2,
@ -191,14 +192,22 @@ check_in_subscription(Acc, #presence{to = To}) ->
-spec bounce_offline_message({bounce, message()} | any()) -> any().
bounce_offline_message({bounce, #message{type = T} = Packet} = Acc)
when T == chat; T == groupchat; T == normal ->
bounce_offline_message({bounce, #message{type = T}} = Acc)
when T == chat; T == groupchat; T == normal ->
bounce_sm_packet(Acc);
bounce_offline_message(Acc) ->
Acc.
-spec bounce_sm_packet({bounce | term(), stanza()}) -> any().
bounce_sm_packet({bounce, Packet} = Acc) ->
Lang = xmpp:get_lang(Packet),
Txt = <<"User session not found">>,
Err = xmpp:err_service_unavailable(Txt, Lang),
ejabberd_router:route_error(Packet, Err),
{stop, Acc};
bounce_offline_message(Acc) ->
bounce_sm_packet({_, Packet} = Acc) ->
?DEBUG("dropping packet to unavailable resource:~n~s",
[xmpp:pp(Packet)]),
Acc.
-spec disconnect_removed_user(binary(), binary()) -> ok.
@ -508,6 +517,8 @@ host_up(Host) ->
ejabberd_sm, check_in_subscription, 20),
ejabberd_hooks:add(offline_message_hook, Host,
ejabberd_sm, bounce_offline_message, 100),
ejabberd_hooks:add(bounce_sm_packet, Host,
ejabberd_sm, bounce_sm_packet, 100),
ejabberd_hooks:add(remove_user, Host,
ejabberd_sm, disconnect_removed_user, 100),
ejabberd_c2s:host_up(Host).
@ -532,6 +543,8 @@ host_down(Host) ->
ejabberd_sm, check_in_subscription, 20),
ejabberd_hooks:delete(offline_message_hook, Host,
ejabberd_sm, bounce_offline_message, 100),
ejabberd_hooks:delete(bounce_sm_packet, Host,
ejabberd_sm, bounce_sm_packet, 100),
ejabberd_hooks:delete(remove_user, Host,
ejabberd_sm, disconnect_removed_user, 100),
ejabberd_c2s:host_down(Host).
@ -667,19 +680,22 @@ do_route(#presence{to = #jid{lresource = <<"">>} = To} = Packet) ->
fun({_, R}) ->
do_route(Packet#presence{to = jid:replace_resource(To, R)})
end, get_user_present_resources(LUser, LServer));
do_route(#message{to = #jid{lresource = <<"">>}, type = T} = Packet) ->
do_route(#message{to = #jid{lresource = <<"">>} = To, type = T} = Packet) ->
?DEBUG("processing message to bare JID:~n~s", [xmpp:pp(Packet)]),
if T == chat; T == headline; T == normal ->
route_message(Packet);
true ->
Lang = xmpp:get_lang(Packet),
ErrTxt = <<"User session not found">>,
Err = xmpp:err_service_unavailable(ErrTxt, Lang),
ejabberd_router:route_error(Packet, Err)
ejabberd_hooks:run_fold(bounce_sm_packet,
To#jid.lserver, {bounce, Packet}, [])
end;
do_route(#iq{to = #jid{lresource = <<"">>} = To, type = T} = Packet) ->
if T == set; T == get ->
?DEBUG("processing IQ to bare JID:~n~s", [xmpp:pp(Packet)]),
gen_iq_handler:handle(?MODULE, Packet);
true ->
ejabberd_hooks:run_fold(bounce_sm_packet,
To#jid.lserver, {pass, Packet}, [])
end;
do_route(#iq{to = #jid{lresource = <<"">>}} = Packet) ->
?DEBUG("processing IQ to bare JID:~n~s", [xmpp:pp(Packet)]),
gen_iq_handler:handle(?MODULE, Packet);
do_route(Packet) ->
?DEBUG("processing packet to full JID:~n~s", [xmpp:pp(Packet)]),
To = xmpp:get_to(Packet),
@ -691,16 +707,14 @@ do_route(Packet) ->
#message{type = T} when T == chat; T == normal ->
route_message(Packet);
#message{type = T} when T == headline ->
?DEBUG("dropping headline to unavailable resource:~n~s",
[xmpp:pp(Packet)]);
ejabberd_hooks:run_fold(bounce_sm_packet,
LServer, {pass, Packet}, []);
#presence{} ->
?DEBUG("dropping presence to unavailable resource:~n~s",
[xmpp:pp(Packet)]);
ejabberd_hooks:run_fold(bounce_sm_packet,
LServer, {pass, Packet}, []);
_ ->
Lang = xmpp:get_lang(Packet),
ErrTxt = <<"User session not found">>,
Err = xmpp:err_service_unavailable(ErrTxt, Lang),
ejabberd_router:route_error(Packet, Err)
ejabberd_hooks:run_fold(bounce_sm_packet,
LServer, {bounce, Packet}, [])
end;
Ss ->
Session = lists:max(Ss),

View File

@ -41,7 +41,8 @@
delete_old_messages/2, get_commands_spec/0, msg_to_el/4,
get_room_config/4, set_room_option/3, offline_message/1, export/1,
mod_options/1, remove_mam_for_user_with_peer/3, remove_mam_for_user/2,
is_empty_for_user/2, is_empty_for_room/3, check_create_room/4]).
is_empty_for_user/2, is_empty_for_room/3, check_create_room/4,
process_iq/3, store_mam_message/7, make_id/0]).
-include("xmpp.hrl").
-include("logger.hrl").
@ -120,6 +121,8 @@ start(Host, Opts) ->
get_room_config, 50),
ejabberd_hooks:add(set_room_option, Host, ?MODULE,
set_room_option, 50),
ejabberd_hooks:add(store_mam_message, Host, ?MODULE,
store_mam_message, 100),
case gen_mod:get_opt(assume_mam_usage, Opts) of
true ->
ejabberd_hooks:add(message_is_archived, Host, ?MODULE,
@ -135,14 +138,12 @@ start(Host, Opts) ->
ejabberd_hooks:add(check_create_room, Host, ?MODULE,
check_create_room, 50)
end,
ejabberd_commands:register_commands(get_commands_spec());
ejabberd_commands:register_commands(get_commands_spec()),
ok;
Err ->
Err
end.
use_cache(Mod, Host) ->
case erlang:function_exported(Mod, use_cache, 2) of
true -> Mod:use_cache(Host);
@ -196,6 +197,8 @@ stop(Host) ->
get_room_config, 50),
ejabberd_hooks:delete(set_room_option, Host, ?MODULE,
set_room_option, 50),
ejabberd_hooks:delete(store_mam_message, Host, ?MODULE,
store_mam_message, 100),
case gen_mod:get_module_opt(Host, ?MODULE, assume_mam_usage) of
true ->
ejabberd_hooks:delete(message_is_archived, Host, ?MODULE,
@ -435,6 +438,10 @@ muc_filter_message(#message{from = From} = Pkt,
muc_filter_message(Acc, _MUCState, _FromNick) ->
Acc.
-spec make_id() -> binary().
make_id() ->
p1_time_compat:system_time(micro_seconds).
-spec get_stanza_id(stanza()) -> integer().
get_stanza_id(#message{meta = #{stanza_id := ID}}) ->
ID.
@ -445,7 +452,7 @@ init_stanza_id(#message{meta = #{stanza_id := _ID}} = Pkt, _LServer) ->
init_stanza_id(#message{meta = #{from_offline := true}} = Pkt, _LServer) ->
Pkt;
init_stanza_id(Pkt, LServer) ->
ID = p1_time_compat:system_time(micro_seconds),
ID = make_id(),
Pkt1 = strip_my_stanza_id(Pkt, LServer),
xmpp:put_meta(Pkt1, stanza_id, ID).
@ -549,7 +556,7 @@ message_is_archived(false, #{lserver := LServer}, Pkt) ->
delete_old_messages(TypeBin, Days) when TypeBin == <<"chat">>;
TypeBin == <<"groupchat">>;
TypeBin == <<"all">> ->
CurrentTime = p1_time_compat:system_time(micro_seconds),
CurrentTime = make_id(),
Diff = Days * 24 * 60 * 60 * 1000000,
TimeStamp = misc:usec_to_now(CurrentTime - Diff),
Type = misc:binary_to_atom(TypeBin),
@ -656,7 +663,7 @@ process_iq(LServer, #iq{from = #jid{luser = LUser}, lang = Lang,
Ret = case MsgType of
chat ->
maybe_activate_mam(LUser, LServer);
{groupchat, _Role, _MUCState} ->
_ ->
ok
end,
case Ret of
@ -877,15 +884,9 @@ store_msg(Pkt, LUser, LServer, Peer, Dir) ->
ok; % Already stored.
{true, _} ->
case ejabberd_hooks:run_fold(store_mam_message, LServer, Pkt,
[LUser, LServer, Peer, chat, Dir]) of
drop ->
pass;
Pkt1 ->
US = {LUser, LServer},
ID = get_stanza_id(Pkt1),
El = xmpp:encode(Pkt1),
Mod = gen_mod:db_mod(LServer, ?MODULE),
Mod:store(El, LServer, US, chat, Peer, <<"">>, Dir, ID)
[LUser, LServer, Peer, <<"">>, chat, Dir]) of
#message{} -> ok;
_ -> pass
end;
{false, _} ->
pass
@ -902,20 +903,23 @@ store_muc(MUCState, Pkt, RoomJID, Peer, Nick) ->
{U, S, _} = jid:tolower(RoomJID),
LServer = MUCState#state.server_host,
case ejabberd_hooks:run_fold(store_mam_message, LServer, Pkt,
[U, S, Peer, groupchat, recv]) of
drop ->
pass;
Pkt1 ->
US = {U, S},
ID = get_stanza_id(Pkt1),
El = xmpp:encode(Pkt1),
Mod = gen_mod:db_mod(LServer, ?MODULE),
Mod:store(El, LServer, US, groupchat, Peer, Nick, recv, ID)
[U, S, Peer, Nick, groupchat, recv]) of
#message{} -> ok;
_ -> pass
end;
false ->
pass
end.
store_mam_message(Pkt, U, S, Peer, Nick, Type, Dir) ->
LServer = ejabberd_router:host_of_route(S),
US = {U, S},
ID = get_stanza_id(Pkt),
El = xmpp:encode(Pkt),
Mod = gen_mod:db_mod(LServer, ?MODULE),
Mod:store(El, LServer, US, Type, Peer, Nick, Dir, ID),
Pkt.
write_prefs(LUser, LServer, Host, Default, Always, Never) ->
Prefs = #archive_prefs{us = {LUser, LServer},
default = Default,
@ -1003,7 +1007,7 @@ select_and_send(LServer, Query, RSM, #iq{from = From, to = To} = IQ, MsgType) ->
Ret = case MsgType of
chat ->
select(LServer, From, From, Query, RSM, MsgType);
{groupchat, _Role, _MUCState} ->
_ ->
select(LServer, From, To, Query, RSM, MsgType)
end,
case Ret of
@ -1072,7 +1076,11 @@ msg_to_el(#archive_msg{timestamp = TS, packet = El, nick = Nick,
CodecOpts = ejabberd_config:codec_options(LServer),
try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of
Pkt1 ->
Pkt2 = set_stanza_id(Pkt1, JidArchive, ID),
Pkt2 = case MsgType of
chat -> set_stanza_id(Pkt1, JidArchive, ID);
{groupchat, _, _} -> set_stanza_id(Pkt1, JidArchive, ID);
_ -> Pkt1
end,
Pkt3 = maybe_update_from_to(
Pkt2, JidRequestor, JidArchive, Peer, MsgType, Nick),
Delay = #delay{stamp = TS, from = jid:make(LServer)},
@ -1107,7 +1115,7 @@ maybe_update_from_to(#message{sub_els = Els} = Pkt, JidRequestor, JidArchive,
Pkt#message{from = jid:replace_resource(JidArchive, Nick),
to = undefined,
sub_els = Items ++ Els};
maybe_update_from_to(Pkt, _JidRequestor, _JidArchive, _Peer, chat, _Nick) ->
maybe_update_from_to(Pkt, _JidRequestor, _JidArchive, _Peer, _MsgType, _Nick) ->
Pkt.
-spec send([{binary(), integer(), xmlel()}],

View File

@ -176,7 +176,7 @@ select(LServer, JidRequestor, #jid{luser = LUser} = JidArchive,
MAMQuery, RSM, MsgType) ->
User = case MsgType of
chat -> LUser;
{groupchat, _Role, _MUCState} -> jid:encode(JidArchive)
_ -> jid:encode(JidArchive)
end,
{Query, CountQuery} = make_sql_query(User, LServer, MAMQuery, RSM),
% TODO from XEP-0313 v0.2: "To conserve resources, a server MAY place a

653
src/mod_mix.erl Normal file
View File

@ -0,0 +1,653 @@
%%%-------------------------------------------------------------------
%%% File : mod_mix.erl
%%% Author : Evgeny Khramtsov <ekhramtsov@process-one.net>
%%% Created : 2 Mar 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2018 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License along
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
%%%----------------------------------------------------------------------
-module(mod_mix).
-behaviour(gen_mod).
-behaviour(gen_server).
-protocol({xep, 369, '0.13.0'}).
%% API
-export([route/1]).
%% gen_mod callbacks
-export([start/2, stop/1, reload/3, depends/2, mod_opt_type/1, mod_options/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, format_status/2]).
%% Hooks
-export([process_disco_info/1,
process_disco_items/1,
process_mix_core/1,
process_mam_query/1,
process_pubsub_query/1]).
-include("xmpp.hrl").
-include("logger.hrl").
-include("translate.hrl").
-callback init(binary(), gen_mod:opts()) -> ok | {error, db_failure}.
-callback set_channel(binary(), binary(), binary(),
binary(), boolean(), binary()) ->
ok | {error, db_failure}.
-callback get_channels(binary(), binary()) ->
{ok, [binary()]} | {error, db_failure}.
-callback get_channel(binary(), binary(), binary()) ->
{ok, {jid(), boolean(), binary()}} |
{error, notfound | db_failure}.
-callback set_participant(binary(), binary(), binary(), jid(), binary(), binary()) ->
ok | {error, db_failure}.
-callback get_participant(binary(), binary(), binary(), jid()) ->
{ok, {binary(), binary()}} | {error, notfound | db_failure}.
-record(state, {hosts :: [binary()],
server_host :: binary()}).
%%%===================================================================
%%% API
%%%===================================================================
start(Host, Opts) ->
gen_mod:start_child(?MODULE, Host, Opts).
stop(Host) ->
gen_mod:stop_child(?MODULE, Host).
reload(Host, NewOpts, OldOpts) ->
Proc = gen_mod:get_module_proc(Host, ?MODULE),
gen_server:cast(Proc, {reload, Host, NewOpts, OldOpts}).
depends(_Host, _Opts) ->
[{mod_mam, hard}].
mod_opt_type(access_create) -> fun acl:access_rules_validator/1;
mod_opt_type(name) -> fun iolist_to_binary/1;
mod_opt_type(host) -> fun ejabberd_config:v_host/1;
mod_opt_type(hosts) -> fun ejabberd_config:v_hosts/1;
mod_opt_type(db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end.
mod_options(Host) ->
[{access_create, all},
{host, <<"mix.@HOST@">>},
{hosts, []},
{name, ?T("Channels")},
{db_type, ejabberd_config:default_db(Host, ?MODULE)}].
-spec route(stanza()) -> ok.
route(#iq{} = IQ) ->
ejabberd_router:process_iq(IQ);
route(#message{type = groupchat, id = ID, lang = Lang,
to = #jid{luser = <<_, _/binary>>}} = Msg) ->
case ID of
<<>> ->
Txt = <<"Attribute 'id' is mandatory for MIX messages">>,
Err = xmpp:err_bad_request(Txt, Lang),
ejabberd_router:route_error(Msg, Err);
_ ->
process_mix_message(Msg)
end;
route(Pkt) ->
?DEBUG("Dropping packet:~n~s", [xmpp:pp(Pkt)]).
-spec process_disco_info(iq()) -> iq().
process_disco_info(#iq{type = set, lang = Lang} = IQ) ->
Txt = <<"Value 'set' of 'type' attribute is not allowed">>,
xmpp:make_error(IQ, xmpp:err_not_allowed(Txt, Lang));
process_disco_info(#iq{type = get, to = #jid{luser = <<>>} = To,
from = _From, lang = Lang,
sub_els = [#disco_info{node = <<>>}]} = IQ) ->
ServerHost = ejabberd_router:host_of_route(To#jid.lserver),
X = ejabberd_hooks:run_fold(disco_info, ServerHost, [],
[ServerHost, ?MODULE, <<"">>, Lang]),
Name = gen_mod:get_module_opt(ServerHost, ?MODULE, name),
Identity = #identity{category = <<"conference">>,
type = <<"text">>,
name = translate:translate(Lang, Name)},
Features = [?NS_DISCO_INFO, ?NS_DISCO_ITEMS,
?NS_MIX_CORE_0, ?NS_MIX_CORE_SEARCHABLE_0,
?NS_MIX_CORE_CREATE_CHANNEL_0],
xmpp:make_iq_result(
IQ, #disco_info{features = Features,
identities = [Identity],
xdata = X});
process_disco_info(#iq{type = get, to = #jid{luser = <<_, _/binary>>} = To,
sub_els = [#disco_info{node = Node}]} = IQ)
when Node == <<"mix">>; Node == <<>> ->
{Chan, Host, _} = jid:tolower(To),
ServerHost = ejabberd_router:host_of_route(Host),
Mod = gen_mod:db_mod(ServerHost, ?MODULE),
case Mod:get_channel(ServerHost, Chan, Host) of
{ok, _} ->
Identity = #identity{category = <<"conference">>,
type = <<"mix">>},
Features = [?NS_DISCO_INFO, ?NS_DISCO_ITEMS,
?NS_MIX_CORE_0, ?NS_MAM_2],
xmpp:make_iq_result(
IQ, #disco_info{node = Node,
features = Features,
identities = [Identity]});
{error, notfound} ->
xmpp:make_error(IQ, no_channel_error(IQ));
{error, db_failure} ->
xmpp:make_error(IQ, db_error(IQ))
end;
process_disco_info(#iq{type = get, sub_els = [#disco_info{node = Node}]} = IQ) ->
xmpp:make_iq_result(IQ, #disco_info{node = Node, features = [?NS_DISCO_INFO]});
process_disco_info(IQ) ->
xmpp:make_error(IQ, unsupported_error(IQ)).
-spec process_disco_items(iq()) -> iq().
process_disco_items(#iq{type = set, lang = Lang} = IQ) ->
Txt = <<"Value 'set' of 'type' attribute is not allowed">>,
xmpp:make_error(IQ, xmpp:err_not_allowed(Txt, Lang));
process_disco_items(#iq{type = get, to = #jid{luser = <<>>} = To,
sub_els = [#disco_items{node = <<>>}]} = IQ) ->
Host = To#jid.lserver,
ServerHost = ejabberd_router:host_of_route(Host),
Mod = gen_mod:db_mod(ServerHost, ?MODULE),
case Mod:get_channels(ServerHost, Host) of
{ok, Channels} ->
Items = [#disco_item{jid = jid:make(Channel, Host)}
|| Channel <- Channels],
xmpp:make_iq_result(IQ, #disco_items{items = Items});
{error, db_failure} ->
xmpp:make_error(IQ, db_error(IQ))
end;
process_disco_items(#iq{type = get, to = #jid{luser = <<_, _/binary>>} = To,
sub_els = [#disco_items{node = Node}]} = IQ)
when Node == <<"mix">>; Node == <<>> ->
{Chan, Host, _} = jid:tolower(To),
ServerHost = ejabberd_router:host_of_route(Host),
Mod = gen_mod:db_mod(ServerHost, ?MODULE),
case Mod:get_channel(ServerHost, Chan, Host) of
{ok, _} ->
BTo = jid:remove_resource(To),
Items = [#disco_item{jid = BTo, node = N} || N <- known_nodes()],
xmpp:make_iq_result(IQ, #disco_items{node = Node, items = Items});
{error, notfound} ->
xmpp:make_error(IQ, no_channel_error(IQ));
{error, db_failure} ->
xmpp:make_error(IQ, db_error(IQ))
end;
process_disco_items(#iq{type = get, sub_els = [#disco_items{node = Node}]} = IQ) ->
xmpp:make_iq_result(IQ, #disco_items{node = Node});
process_disco_items(IQ) ->
xmpp:make_error(IQ, unsupported_error(IQ)).
-spec process_mix_core(iq()) -> iq().
process_mix_core(#iq{type = set, to = #jid{luser = <<>>},
sub_els = [#mix_create{}]} = IQ) ->
process_mix_create(IQ);
process_mix_core(#iq{type = set, to = #jid{luser = <<>>},
sub_els = [#mix_destroy{}]} = IQ) ->
process_mix_destroy(IQ);
process_mix_core(#iq{type = set, to = #jid{luser = <<_, _/binary>>},
sub_els = [#mix_join{}]} = IQ) ->
process_mix_join(IQ);
process_mix_core(#iq{type = set, to = #jid{luser = <<_, _/binary>>},
sub_els = [#mix_leave{}]} = IQ) ->
process_mix_leave(IQ);
process_mix_core(#iq{type = set, to = #jid{luser = <<_, _/binary>>},
sub_els = [#mix_setnick{}]} = IQ) ->
process_mix_setnick(IQ);
process_mix_core(IQ) ->
xmpp:make_error(IQ, unsupported_error(IQ)).
process_pubsub_query(#iq{type = get,
sub_els = [#pubsub{items = #ps_items{node = Node}}]} = IQ)
when Node == ?NS_MIX_NODES_PARTICIPANTS ->
process_participants_list(IQ);
process_pubsub_query(IQ) ->
xmpp:make_error(IQ, unsupported_error(IQ)).
process_mam_query(#iq{from = From, to = To, type = T,
sub_els = [#mam_query{}]} = IQ)
when T == get; T == set ->
{Chan, Host, _} = jid:tolower(To),
ServerHost = ejabberd_router:host_of_route(Host),
Mod = gen_mod:db_mod(ServerHost, ?MODULE),
case Mod:get_channel(ServerHost, Chan, Host) of
{ok, _} ->
BFrom = jid:remove_resource(From),
case Mod:get_participant(ServerHost, Chan, Host, BFrom) of
{ok, _} ->
mod_mam:process_iq(ServerHost, IQ, mix);
{error, notfound} ->
xmpp:make_error(IQ, not_joined_error(IQ));
{error, db_failure} ->
xmpp:make_error(IQ, db_error(IQ))
end;
{error, notfound} ->
xmpp:make_error(IQ, no_channel_error(IQ));
{error, db_failure} ->
xmpp:make_error(IQ, db_error(IQ))
end;
process_mam_query(IQ) ->
xmpp:make_error(IQ, unsupported_error(IQ)).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([Host, Opts]) ->
process_flag(trap_exit, true),
Mod = gen_mod:db_mod(Host, Opts, ?MODULE),
MyHosts = gen_mod:get_opt_hosts(Host, Opts),
case Mod:init(Host, [{hosts, MyHosts}|Opts]) of
ok ->
lists:foreach(
fun(MyHost) ->
ejabberd_router:register_route(
MyHost, Host, {apply, ?MODULE, route}),
register_iq_handlers(MyHost)
end, MyHosts),
{ok, #state{hosts = MyHosts, server_host = Host}};
{error, db_failure} ->
{stop, db_failure}
end.
handle_call(Request, _From, State) ->
?WARNING_MSG("Unexpected call: ~p", [Request]),
{noreply, State}.
handle_cast(Request, State) ->
?WARNING_MSG("Unexpected cast: ~p", [Request]),
{noreply, State}.
handle_info(Info, State) ->
?WARNING_MSG("Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, State) ->
lists:foreach(
fun(MyHost) ->
unregister_iq_handlers(MyHost),
ejabberd_router:unregister_route(MyHost)
end, State#state.hosts).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
format_status(_Opt, Status) ->
Status.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec process_mix_create(iq()) -> iq().
process_mix_create(#iq{to = To, from = From,
sub_els = [#mix_create{channel = Chan}]} = IQ) ->
Host = To#jid.lserver,
ServerHost = ejabberd_router:host_of_route(Host),
Mod = gen_mod:db_mod(ServerHost, ?MODULE),
Creator = jid:remove_resource(From),
Chan1 = case Chan of
<<>> -> p1_rand:get_string();
_ -> Chan
end,
Ret = case Mod:get_channel(ServerHost, Chan1, Host) of
{ok, {#jid{luser = U, lserver = S}, _, _}} ->
case {From#jid.luser, From#jid.lserver} of
{U, S} -> ok;
_ -> {error, conflict}
end;
{error, notfound} ->
Key = xmpp_util:hex(p1_rand:bytes(20)),
Mod:set_channel(ServerHost, Chan1, Host,
Creator, Chan == <<>>, Key);
{error, db_failure} = Err ->
Err
end,
case Ret of
ok ->
xmpp:make_iq_result(IQ, #mix_create{channel = Chan1});
{error, conflict} ->
xmpp:make_error(IQ, channel_exists_error(IQ));
{error, db_failure} ->
xmpp:make_error(IQ, db_error(IQ))
end.
-spec process_mix_destroy(iq()) -> iq().
process_mix_destroy(#iq{to = To,
from = #jid{luser = U, lserver = S},
sub_els = [#mix_destroy{channel = Chan}]} = IQ) ->
Host = To#jid.lserver,
ServerHost = ejabberd_router:host_of_route(Host),
Mod = gen_mod:db_mod(ServerHost, ?MODULE),
case Mod:get_channel(ServerHost, Chan, Host) of
{ok, {#jid{luser = U, lserver = S}, _, _}} ->
case Mod:del_channel(ServerHost, Chan, Host) of
ok ->
xmpp:make_iq_result(IQ, #mix_destroy{channel = Chan});
{error, db_failure} ->
xmpp:make_error(IQ, db_error(IQ))
end;
{ok, _} ->
xmpp:make_error(IQ, ownership_error(IQ));
{error, notfound} ->
xmpp:make_error(IQ, no_channel_error(IQ));
{error, db_failure} ->
xmpp:make_error(IQ, db_error(IQ))
end.
-spec process_mix_join(iq()) -> iq().
process_mix_join(#iq{to = To, from = From,
sub_els = [#mix_join{} = JoinReq]} = IQ) ->
Chan = To#jid.luser,
Host = To#jid.lserver,
ServerHost = ejabberd_router:host_of_route(Host),
Mod = gen_mod:db_mod(ServerHost, ?MODULE),
case Mod:get_channel(ServerHost, Chan, Host) of
{ok, {_, _, Key}} ->
ID = make_id(From, Key),
Nick = JoinReq#mix_join.nick,
BFrom = jid:remove_resource(From),
Nodes = filter_nodes(JoinReq#mix_join.subscribe),
try
ok = Mod:set_participant(ServerHost, Chan, Host, BFrom, ID, Nick),
ok = Mod:subscribe(ServerHost, Chan, Host, BFrom, Nodes),
notify_participant_joined(Mod, ServerHost, To, From, ID, Nick),
xmpp:make_iq_result(IQ, #mix_join{id = ID,
subscribe = Nodes,
nick = Nick})
catch _:{badmatch, {error, db_failure}} ->
xmpp:make_error(IQ, db_error(IQ))
end;
{error, notfound} ->
xmpp:make_error(IQ, no_channel_error(IQ));
{error, db_failure} ->
xmpp:make_error(IQ, db_error(IQ))
end.
-spec process_mix_leave(iq()) -> iq().
process_mix_leave(#iq{to = To, from = From,
sub_els = [#mix_leave{}]} = IQ) ->
{Chan, Host, _} = jid:tolower(To),
ServerHost = ejabberd_router:host_of_route(Host),
Mod = gen_mod:db_mod(ServerHost, ?MODULE),
BFrom = jid:remove_resource(From),
case Mod:get_channel(ServerHost, Chan, Host) of
{ok, _} ->
case Mod:get_participant(ServerHost, Chan, Host, BFrom) of
{ok, {ID, _}} ->
try
ok = Mod:unsubscribe(ServerHost, Chan, Host, BFrom),
ok = Mod:del_participant(ServerHost, Chan, Host, BFrom),
notify_participant_left(Mod, ServerHost, To, ID),
xmpp:make_iq_result(IQ, #mix_leave{})
catch _:{badmatch, {error, db_failure}} ->
xmpp:make_error(IQ, db_error(IQ))
end;
{error, notfound} ->
xmpp:make_iq_result(IQ, #mix_leave{});
{error, db_failure} ->
xmpp:make_error(IQ, db_error(IQ))
end;
{error, notfound} ->
xmpp:make_iq_result(IQ, #mix_leave{});
{error, db_failure} ->
xmpp:make_error(IQ, db_error(IQ))
end.
-spec process_mix_setnick(iq()) -> iq().
process_mix_setnick(#iq{to = To, from = From,
sub_els = [#mix_setnick{nick = Nick}]} = IQ) ->
{Chan, Host, _} = jid:tolower(To),
ServerHost = ejabberd_router:host_of_route(Host),
Mod = gen_mod:db_mod(ServerHost, ?MODULE),
BFrom = jid:remove_resource(From),
case Mod:get_channel(ServerHost, Chan, Host) of
{ok, _} ->
case Mod:get_participant(ServerHost, Chan, Host, BFrom) of
{ok, {_, Nick}} ->
xmpp:make_iq_result(IQ, #mix_setnick{nick = Nick});
{ok, {ID, _}} ->
case Mod:set_participant(ServerHost, Chan, Host, BFrom, ID, Nick) of
ok ->
notify_participant_joined(Mod, ServerHost, To, From, ID, Nick),
xmpp:make_iq_result(IQ, #mix_setnick{nick = Nick});
{error, db_failure} ->
xmpp:make_error(IQ, db_error(IQ))
end;
{error, notfound} ->
xmpp:make_error(IQ, not_joined_error(IQ));
{error, db_failure} ->
xmpp:make_error(IQ, db_error(IQ))
end;
{error, notfound} ->
xmpp:make_error(IQ, no_channel_error(IQ));
{error, db_failure} ->
xmpp:make_error(IQ, db_error(IQ))
end.
-spec process_mix_message(message()) -> ok.
process_mix_message(#message{from = From, to = To,
id = SubmissionID} = Msg) ->
{Chan, Host, _} = jid:tolower(To),
{FUser, FServer, _} = jid:tolower(From),
ServerHost = ejabberd_router:host_of_route(Host),
Mod = gen_mod:db_mod(ServerHost, ?MODULE),
case Mod:get_channel(ServerHost, Chan, Host) of
{ok, _} ->
BFrom = jid:remove_resource(From),
case Mod:get_participant(ServerHost, Chan, Host, BFrom) of
{ok, {StableID, Nick}} ->
MamID = mod_mam:make_id(),
Msg1 = xmpp:set_subtag(
Msg#message{from = jid:replace_resource(To, StableID),
to = undefined,
id = integer_to_binary(MamID)},
#mix{jid = BFrom, nick = Nick}),
Msg2 = xmpp:put_meta(Msg1, stanza_id, MamID),
case ejabberd_hooks:run_fold(
store_mam_message, ServerHost, Msg2,
[Chan, Host, BFrom, Nick, groupchat, recv]) of
#message{} ->
multicast(Mod, ServerHost, Chan, Host,
?NS_MIX_NODES_MESSAGES,
fun(#jid{luser = U, lserver = S})
when U == FUser, S == FServer ->
xmpp:set_subtag(
Msg1, #mix{jid = BFrom,
nick = Nick,
submission_id = SubmissionID});
(_) ->
Msg1
end);
_ ->
ok
end;
{error, notfound} ->
ejabberd_router:route_error(Msg, not_joined_error(Msg));
{error, db_failure} ->
ejabberd_router:route_error(Msg, db_error(Msg))
end;
{error, notfound} ->
ejabberd_router:route_error(Msg, no_channel_error(Msg));
{error, db_failure} ->
ejabberd_router:route_error(Msg, db_error(Msg))
end.
-spec process_participants_list(iq()) -> iq().
process_participants_list(#iq{from = From, to = To} = IQ) ->
{Chan, Host, _} = jid:tolower(To),
ServerHost = ejabberd_router:host_of_route(Host),
Mod = gen_mod:db_mod(ServerHost, ?MODULE),
case Mod:get_channel(ServerHost, Chan, Host) of
{ok, _} ->
BFrom = jid:remove_resource(From),
case Mod:get_participant(ServerHost, Chan, Host, BFrom) of
{ok, _} ->
case Mod:get_participants(ServerHost, Chan, Host) of
{ok, Participants} ->
Items = items_of_participants(Participants),
Pubsub = #pubsub{
items = #ps_items{
node = ?NS_MIX_NODES_PARTICIPANTS,
items = Items}},
xmpp:make_iq_result(IQ, Pubsub);
{error, db_failure} ->
xmpp:make_error(IQ, db_error(IQ))
end;
{error, notfound} ->
xmpp:make_error(IQ, not_joined_error(IQ));
{error, db_failure} ->
xmpp:make_error(IQ, db_error(IQ))
end;
{error, notfound} ->
xmpp:make_error(IQ, no_channel_error(IQ));
{error, db_failure} ->
xmpp:make_error(IQ, db_error(IQ))
end.
-spec items_of_participants([{jid(), binary(), binary()}]) -> [ps_item()].
items_of_participants(Participants) ->
lists:map(
fun({JID, ID, Nick}) ->
Participant = #mix_participant{jid = JID, nick = Nick},
#ps_item{id = ID,
sub_els = [xmpp:encode(Participant)]}
end, Participants).
-spec known_nodes() -> [binary()].
known_nodes() ->
[?NS_MIX_NODES_MESSAGES,
?NS_MIX_NODES_PARTICIPANTS].
-spec filter_nodes(binary()) -> [binary()].
filter_nodes(Nodes) ->
lists:filter(
fun(Node) ->
lists:member(Node, Nodes)
end, known_nodes()).
-spec multicast(module(), binary(), binary(),
binary(), binary(), fun((jid()) -> message())) -> ok.
multicast(Mod, LServer, Chan, Service, Node, F) ->
case Mod:get_subscribed(LServer, Chan, Service, Node) of
{ok, Subscribers} ->
lists:foreach(
fun(To) ->
Msg = xmpp:set_to(F(To), To),
ejabberd_router:route(Msg)
end, Subscribers);
{error, db_failure} ->
ok
end.
-spec notify_participant_joined(module(), binary(),
jid(), jid(), binary(), binary()) -> ok.
notify_participant_joined(Mod, LServer, To, From, ID, Nick) ->
{Chan, Host, _} = jid:tolower(To),
Participant = #mix_participant{jid = jid:remove_resource(From),
nick = Nick},
Item = #ps_item{id = ID,
sub_els = [xmpp:encode(Participant)]},
Items = #ps_items{node = ?NS_MIX_NODES_PARTICIPANTS,
items = [Item]},
Event = #ps_event{items = Items},
Msg = #message{from = jid:remove_resource(To),
id = p1_rand:get_string(),
sub_els = [Event]},
multicast(Mod, LServer, Chan, Host,
?NS_MIX_NODES_PARTICIPANTS,
fun(_) -> Msg end).
-spec notify_participant_left(module(), binary(), jid(), binary()) -> ok.
notify_participant_left(Mod, LServer, To, ID) ->
{Chan, Host, _} = jid:tolower(To),
Items = #ps_items{node = ?NS_MIX_NODES_PARTICIPANTS,
retract = ID},
Event = #ps_event{items = Items},
Msg = #message{from = jid:remove_resource(To),
id = p1_rand:get_string(),
sub_els = [Event]},
multicast(Mod, LServer, Chan, Host, ?NS_MIX_NODES_PARTICIPANTS,
fun(_) -> Msg end).
-spec make_id(jid(), binary()) -> binary().
make_id(JID, Key) ->
Data = jid:encode(jid:tolower(jid:remove_resource(JID))),
xmpp_util:hex(crypto:hmac(sha256, Data, Key, 10)).
%%%===================================================================
%%% Error generators
%%%===================================================================
-spec db_error(stanza()) -> stanza_error().
db_error(Pkt) ->
Txt = <<"Database failure">>,
xmpp:err_internal_server_error(Txt, xmpp:get_lang(Pkt)).
-spec channel_exists_error(stanza()) -> stanza_error().
channel_exists_error(Pkt) ->
Txt = <<"Channel already exists">>,
xmpp:err_conflict(Txt, xmpp:get_lang(Pkt)).
-spec no_channel_error(stanza()) -> stanza_error().
no_channel_error(Pkt) ->
Txt = <<"Channel does not exist">>,
xmpp:err_item_not_found(Txt, xmpp:get_lang(Pkt)).
-spec not_joined_error(stanza()) -> stanza_error().
not_joined_error(Pkt) ->
Txt = <<"You are not joined to the channel">>,
xmpp:err_forbidden(Txt, xmpp:get_lang(Pkt)).
-spec unsupported_error(stanza()) -> stanza_error().
unsupported_error(Pkt) ->
Txt = <<"No module is handling this query">>,
xmpp:err_service_unavailable(Txt, xmpp:get_lang(Pkt)).
-spec ownership_error(stanza()) -> stanza_error().
ownership_error(Pkt) ->
Txt = <<"Owner privileges required">>,
xmpp:err_forbidden(Txt, xmpp:get_lang(Pkt)).
%%%===================================================================
%%% IQ handlers
%%%===================================================================
-spec register_iq_handlers(binary()) -> ok.
register_iq_handlers(Host) ->
gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO,
?MODULE, process_disco_info),
gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS,
?MODULE, process_disco_items),
gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_MIX_CORE_0,
?MODULE, process_mix_core),
gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_DISCO_INFO,
?MODULE, process_disco_info),
gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_DISCO_ITEMS,
?MODULE, process_disco_items),
gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_MIX_CORE_0,
?MODULE, process_mix_core),
gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_PUBSUB,
?MODULE, process_pubsub_query),
gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_MAM_2,
?MODULE, process_mam_query).
-spec unregister_iq_handlers(binary()) -> ok.
unregister_iq_handlers(Host) ->
gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO),
gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS),
gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MIX_CORE_0),
gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_DISCO_INFO),
gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_DISCO_ITEMS),
gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MIX_CORE_0),
gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_PUBSUB),
gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MAM_2).

189
src/mod_mix_mnesia.erl Normal file
View File

@ -0,0 +1,189 @@
%%%-------------------------------------------------------------------
%%% Created : 1 Dec 2018 by Evgeny Khramtsov <ekhramtsov@process-one.net>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2018 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License along
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
%%%----------------------------------------------------------------------
-module(mod_mix_mnesia).
-behaviour(mod_mix).
-compile([{parse_transform, ejabberd_sql_pt}]).
%% API
-export([init/2]).
-export([set_channel/6, get_channels/2, get_channel/3, del_channel/3]).
-export([set_participant/6, get_participant/4, get_participants/3, del_participant/4]).
-export([subscribe/5, unsubscribe/4, unsubscribe/5, get_subscribed/4]).
-include("logger.hrl").
-include("ejabberd_sql_pt.hrl").
-record(mix_channel,
{chan_serv :: {binary(), binary()},
service :: binary(),
creator :: jid:jid(),
hidden :: boolean(),
hmac_key :: binary(),
created_at :: erlang:timestamp()}).
-record(mix_participant,
{user_chan :: {binary(), binary(), binary(), binary()},
chan_serv :: {binary(), binary()},
jid :: jid:jid(),
id :: binary(),
nick :: binary(),
created_at :: erlang:timestamp()}).
-record(mix_subscription,
{user_chan_node :: {binary(), binary(), binary(), binary(), binary()},
user_chan :: {binary(), binary(), binary(), binary()},
chan_serv_node :: {binary(), binary(), binary()},
chan_serv :: {binary(), binary()},
jid :: jid:jid()}).
%%%===================================================================
%%% API
%%%===================================================================
init(_Host, _Opts) ->
try
{atomic, _} = ejabberd_mnesia:create(
?MODULE, mix_channel,
[{disc_only_copies, [node()]},
{attributes, record_info(fields, mix_channel)},
{index, [service]}]),
{atomic, _} = ejabberd_mnesia:create(
?MODULE, mix_participant,
[{disc_only_copies, [node()]},
{attributes, record_info(fields, mix_participant)},
{index, [chan_serv]}]),
{atomic, _} = ejabberd_mnesia:create(
?MODULE, mix_subscription,
[{disc_only_copies, [node()]},
{attributes, record_info(fields, mix_subscription)},
{index, [user_chan, chan_serv_node, chan_serv]}]),
ok
catch _:{badmatch, _} ->
{error, db_failure}
end.
set_channel(_LServer, Channel, Service, CreatorJID, Hidden, Key) ->
mnesia:dirty_write(
#mix_channel{chan_serv = {Channel, Service},
service = Service,
creator = jid:remove_resource(CreatorJID),
hidden = Hidden,
hmac_key = Key,
created_at = p1_time_compat:timestamp()}).
get_channels(_LServer, Service) ->
Ret = mnesia:dirty_index_read(mix_channel, Service, #mix_channel.service),
{ok, lists:filtermap(
fun(#mix_channel{chan_serv = {Channel, _},
hidden = false}) ->
{true, Channel};
(_) ->
false
end, Ret)}.
get_channel(_LServer, Channel, Service) ->
case mnesia:dirty_read(mix_channel, {Channel, Service}) of
[#mix_channel{creator = JID,
hidden = Hidden,
hmac_key = Key}] ->
{ok, {JID, Hidden, Key}};
[] ->
{error, notfound}
end.
del_channel(_LServer, Channel, Service) ->
Key = {Channel, Service},
L1 = mnesia:dirty_read(mix_channel, Key),
L2 = mnesia:dirty_index_read(mix_participant, Key,
#mix_participant.chan_serv),
L3 = mnesia:dirty_index_read(mix_subscription, Key,
#mix_subscription.chan_serv),
lists:foreach(fun mnesia:dirty_delete_object/1, L1++L2++L3).
set_participant(_LServer, Channel, Service, JID, ID, Nick) ->
{User, Domain, _} = jid:tolower(JID),
mnesia:dirty_write(
#mix_participant{
user_chan = {User, Domain, Channel, Service},
chan_serv = {Channel, Service},
jid = jid:remove_resource(JID),
id = ID,
nick = Nick,
created_at = p1_time_compat:timestamp()}).
get_participant(_LServer, Channel, Service, JID) ->
{User, Domain, _} = jid:tolower(JID),
case mnesia:dirty_read(mix_participant, {User, Domain, Channel, Service}) of
[#mix_participant{id = ID, nick = Nick}] -> {ok, {ID, Nick}};
[] -> {error, notfound}
end.
get_participants(_LServer, Channel, Service) ->
Ret = mnesia:dirty_index_read(mix_participant,
{Channel, Service},
#mix_participant.chan_serv),
{ok, lists:map(
fun(#mix_participant{jid = JID, id = ID, nick = Nick}) ->
{JID, ID, Nick}
end, Ret)}.
del_participant(_LServer, Channel, Service, JID) ->
{User, Domain, _} = jid:tolower(JID),
mnesia:dirty_delete(mix_participant, {User, Domain, Channel, Service}).
subscribe(_LServer, Channel, Service, JID, Nodes) ->
{User, Domain, _} = jid:tolower(JID),
BJID = jid:remove_resource(JID),
lists:foreach(
fun(Node) ->
mnesia:dirty_write(
#mix_subscription{
user_chan_node = {User, Domain, Channel, Service, Node},
user_chan = {User, Domain, Channel, Service},
chan_serv_node = {Channel, Service, Node},
chan_serv = {Channel, Service},
jid = BJID})
end, Nodes).
get_subscribed(_LServer, Channel, Service, Node) ->
Ret = mnesia:dirty_index_read(mix_subscription,
{Channel, Service, Node},
#mix_subscription.chan_serv_node),
{ok, [JID || #mix_subscription{jid = JID} <- Ret]}.
unsubscribe(_LServer, Channel, Service, JID) ->
{User, Domain, _} = jid:tolower(JID),
Ret = mnesia:dirty_index_read(mix_subscription,
{User, Domain, Channel, Service},
#mix_subscription.user_chan),
lists:foreach(fun mnesia:dirty_delete_object/1, Ret).
unsubscribe(_LServer, Channel, Service, JID, Nodes) ->
{User, Domain, _} = jid:tolower(JID),
lists:foreach(
fun(Node) ->
mnesia:dirty_delete(mix_subscription,
{User, Domain, Channel, Service, Node})
end, Nodes).
%%%===================================================================
%%% Internal functions
%%%===================================================================

365
src/mod_mix_pam.erl Normal file
View File

@ -0,0 +1,365 @@
%%%-------------------------------------------------------------------
%%% Author : Evgeny Khramtsov <ekhramtsov@process-one.net>
%%% Created : 4 Dec 2018 by Evgeny Khramtsov <ekhramtsov@process-one.net>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2018 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License along
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
%%%----------------------------------------------------------------------
-module(mod_mix_pam).
-behaviour(gen_mod).
-protocol({xep, 405, '0.2.1'}).
%% gen_mod callbacks
-export([start/2, stop/1, reload/3, depends/2, mod_opt_type/1, mod_options/1]).
%% Hooks and handlers
-export([bounce_sm_packet/1,
disco_sm_features/5,
remove_user/2,
process_iq/1]).
-include("xmpp.hrl").
-include("logger.hrl").
-define(MIX_PAM_CACHE, mix_pam_cache).
-callback init(binary(), gen_mod:opts()) -> ok | {error, db_failure}.
-callback add_channel(jid(), jid(), binary()) -> ok | {error, db_failure}.
-callback del_channel(jid(), jid()) -> ok | {error, db_failure}.
-callback get_channel(jid(), jid()) -> {ok, binary()} | {error, notfound | db_failure}.
-callback get_channels(jid()) -> {ok, [{jid(), binary()}]} | {error, db_failure}.
-callback del_channels(jid()) -> ok | {error, db_failure}.
-callback use_cache(binary()) -> boolean().
-callback cache_nodes(binary()) -> [node()].
-optional_callbacks([use_cache/1, cache_nodes/1]).
%%%===================================================================
%%% API
%%%===================================================================
start(Host, Opts) ->
Mod = gen_mod:db_mod(Host, Opts, ?MODULE),
case Mod:init(Host, Opts) of
ok ->
init_cache(Mod, Host, Opts),
ejabberd_hooks:add(bounce_sm_packet, Host, ?MODULE, bounce_sm_packet, 50),
ejabberd_hooks:add(disco_sm_features, Host, ?MODULE, disco_sm_features, 50),
ejabberd_hooks:add(remove_user, Host, ?MODULE, remove_user, 50),
gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_MIX_PAM_0,
?MODULE, process_iq);
Err ->
Err
end.
stop(Host) ->
ejabberd_hooks:delete(bounce_sm_packet, Host, ?MODULE, bounce_sm_packet, 50),
ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, disco_sm_features, 50),
ejabberd_hooks:delete(remove_user, Host, ?MODULE, remove_user, 50),
gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MIX_PAM_0).
reload(Host, NewOpts, OldOpts) ->
NewMod = gen_mod:db_mod(Host, NewOpts, ?MODULE),
OldMod = gen_mod:db_mod(Host, OldOpts, ?MODULE),
if NewMod /= OldMod ->
NewMod:init(Host, NewOpts);
true ->
ok
end,
init_cache(NewMod, Host, NewOpts).
depends(_Host, _Opts) ->
[].
mod_opt_type(db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end;
mod_opt_type(O) when O == cache_life_time; O == cache_size ->
fun (I) when is_integer(I), I > 0 -> I;
(infinity) -> infinity
end;
mod_opt_type(O) when O == use_cache; O == cache_missed ->
fun (B) when is_boolean(B) -> B end.
mod_options(Host) ->
[{db_type, ejabberd_config:default_db(Host, ?MODULE)},
{use_cache, ejabberd_config:use_cache(Host)},
{cache_size, ejabberd_config:cache_size(Host)},
{cache_missed, ejabberd_config:cache_missed(Host)},
{cache_life_time, ejabberd_config:cache_life_time(Host)}].
-spec bounce_sm_packet({term(), stanza()}) -> {term(), stanza()}.
bounce_sm_packet({_, #message{to = #jid{lresource = <<>>} = To,
from = From,
type = groupchat} = Msg} = Acc) ->
case xmpp:has_subtag(Msg, #mix{}) of
true ->
{LUser, LServer, _} = jid:tolower(To),
case get_channel(To, From) of
{ok, _} ->
lists:foreach(
fun(R) ->
To1 = jid:replace_resource(To, R),
ejabberd_router:route(xmpp:set_to(Msg, To1))
end, ejabberd_sm:get_user_resources(LUser, LServer)),
{pass, Msg};
_ ->
Acc
end;
false ->
Acc
end;
bounce_sm_packet(Acc) ->
Acc.
-spec disco_sm_features({error, stanza_error()} | empty | {result, [binary()]},
jid(), jid(), binary(), binary()) ->
{error, stanza_error()} | empty | {result, [binary()]}.
disco_sm_features({error, _Error} = Acc, _From, _To, _Node, _Lang) ->
Acc;
disco_sm_features(Acc, _From, _To, <<"">>, _Lang) ->
{result, [?NS_MIX_PAM_0 |
case Acc of
{result, Features} -> Features;
empty -> []
end]};
disco_sm_features(Acc, _From, _To, _Node, _Lang) ->
Acc.
-spec process_iq(iq()) -> iq() | ignore.
process_iq(#iq{from = #jid{luser = U1, lserver = S1},
to = #jid{luser = U2, lserver = S2}} = IQ)
when {U1, S1} /= {U2, S2} ->
xmpp:make_error(IQ, forbidden_query_error(IQ));
process_iq(#iq{type = set,
sub_els = [#mix_client_join{} = Join]} = IQ) ->
case Join#mix_client_join.channel of
undefined ->
xmpp:make_error(IQ, missing_channel_error(IQ));
_ ->
process_join(IQ)
end;
process_iq(#iq{type = set,
sub_els = [#mix_client_leave{} = Leave]} = IQ) ->
case Leave#mix_client_leave.channel of
undefined ->
xmpp:make_error(IQ, missing_channel_error(IQ));
_ ->
process_leave(IQ)
end;
process_iq(IQ) ->
xmpp:make_error(IQ, unsupported_query_error(IQ)).
-spec remove_user(binary(), binary()) -> ok | {error, db_failure}.
remove_user(LUser, LServer) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
JID = jid:make(LUser, LServer),
Chans = case Mod:get_channels(JID) of
{ok, Channels} ->
lists:map(
fun({Channel, _}) ->
ejabberd_router:route(
#iq{from = JID,
to = Channel,
id = p1_rand:get_string(),
type = set,
sub_els = [#mix_leave{}]}),
Channel
end, Channels);
_ ->
[]
end,
Mod:del_channels(jid:make(LUser, LServer)),
lists:foreach(
fun(Chan) ->
delete_cache(Mod, JID, Chan)
end, Chans).
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec process_join(iq()) -> ignore.
process_join(#iq{from = From,
sub_els = [#mix_client_join{channel = Channel,
join = Join}]} = IQ) ->
ejabberd_router:route_iq(
#iq{from = jid:remove_resource(From),
to = Channel, type = set, sub_els = [Join]},
fun(ResIQ) -> process_join_result(ResIQ, IQ) end),
ignore.
-spec process_leave(iq()) -> iq() | error.
process_leave(#iq{from = From,
sub_els = [#mix_client_leave{channel = Channel,
leave = Leave}]} = IQ) ->
case del_channel(From, Channel) of
ok ->
ejabberd_router:route_iq(
#iq{from = jid:remove_resource(From),
to = Channel, type = set, sub_els = [Leave]},
fun(ResIQ) -> process_leave_result(ResIQ, IQ) end),
ignore;
{error, db_failure} ->
xmpp:make_error(IQ, db_error(IQ))
end.
-spec process_join_result(iq(), iq()) -> ok.
process_join_result(#iq{from = Channel,
type = result, sub_els = [#mix_join{id = ID} = Join]},
#iq{to = To} = IQ) ->
case add_channel(To, Channel, ID) of
ok ->
ChanID = make_channel_id(Channel, ID),
Join1 = Join#mix_join{id = <<"">>, jid = ChanID},
ResIQ = xmpp:make_iq_result(IQ, #mix_client_join{join = Join1}),
ejabberd_router:route(ResIQ);
{error, db_failure} ->
ejabberd_router:route_error(IQ, db_error(IQ))
end;
process_join_result(Err, IQ) ->
process_iq_error(Err, IQ).
-spec process_leave_result(iq(), iq()) -> ok.
process_leave_result(#iq{type = result, sub_els = [#mix_leave{} = Leave]}, IQ) ->
ResIQ = xmpp:make_iq_result(IQ, #mix_client_leave{leave = Leave}),
ejabberd_router:route(ResIQ);
process_leave_result(Err, IQ) ->
process_iq_error(Err, IQ).
-spec process_iq_error(iq(), iq()) -> ok.
process_iq_error(#iq{type = error} = ErrIQ, #iq{sub_els = [El]} = IQ) ->
case xmpp:get_error(ErrIQ) of
undefined ->
%% Not sure if this stuff is correct because
%% RFC6120 section 8.3.1 bullet 4 states that
%% an error stanza MUST contain an <error/> child element
IQ1 = xmpp:make_iq_result(IQ, El),
ejabberd_router:route(IQ1#iq{type = error});
Err ->
ejabberd_router:route_error(IQ, Err)
end;
process_iq_error(timeout, IQ) ->
Txt = <<"Request has timed out">>,
Err = xmpp:err_recipient_unavailable(Txt, IQ#iq.lang),
ejabberd_router:route_error(IQ, Err).
-spec make_channel_id(jid(), binary()) -> jid().
make_channel_id(JID, ID) ->
{U, S, R} = jid:split(JID),
jid:make(<<ID/binary, $#, U/binary>>, S, R).
%%%===================================================================
%%% Error generators
%%%===================================================================
-spec missing_channel_error(stanza()) -> stanza_error().
missing_channel_error(Pkt) ->
Txt = <<"Attribute 'channel' is required for this request">>,
xmpp:err_bad_request(Txt, xmpp:get_lang(Pkt)).
-spec forbidden_query_error(stanza()) -> stanza_error().
forbidden_query_error(Pkt) ->
Txt = <<"Query to another users is forbidden">>,
xmpp:err_forbidden(Txt, xmpp:get_lang(Pkt)).
-spec unsupported_query_error(stanza()) -> stanza_error().
unsupported_query_error(Pkt) ->
Txt = <<"No module is handling this query">>,
xmpp:err_service_unavailable(Txt, xmpp:get_lang(Pkt)).
-spec db_error(stanza()) -> stanza_error().
db_error(Pkt) ->
Txt = <<"Database failure">>,
xmpp:err_internal_server_error(Txt, xmpp:get_lang(Pkt)).
%%%===================================================================
%%% Database queries
%%%===================================================================
get_channel(JID, Channel) ->
{LUser, LServer, _} = jid:tolower(JID),
{Chan, Service, _} = jid:tolower(Channel),
Mod = gen_mod:db_mod(LServer, ?MODULE),
case use_cache(Mod, LServer) of
false -> Mod:get_channel(JID, Channel);
true ->
case ets_cache:lookup(
?MIX_PAM_CACHE, {LUser, LServer, Chan, Service},
fun() -> Mod:get_channel(JID, Channel) end) of
error -> {error, notfound};
Ret -> Ret
end
end.
add_channel(JID, Channel, ID) ->
Mod = gen_mod:db_mod(JID#jid.lserver, ?MODULE),
case Mod:add_channel(JID, Channel, ID) of
ok -> delete_cache(Mod, JID, Channel);
Err -> Err
end.
del_channel(JID, Channel) ->
Mod = gen_mod:db_mod(JID#jid.lserver, ?MODULE),
case Mod:del_channel(JID, Channel) of
ok -> delete_cache(Mod, JID, Channel);
Err -> Err
end.
%%%===================================================================
%%% Cache management
%%%===================================================================
-spec init_cache(module(), binary(), gen_mod:opts()) -> ok.
init_cache(Mod, Host, Opts) ->
case use_cache(Mod, Host) of
true ->
CacheOpts = cache_opts(Opts),
ets_cache:new(?MIX_PAM_CACHE, CacheOpts);
false ->
ets_cache:delete(?MIX_PAM_CACHE)
end.
-spec cache_opts(gen_mod:opts()) -> [proplists:property()].
cache_opts(Opts) ->
MaxSize = gen_mod:get_opt(cache_size, Opts),
CacheMissed = gen_mod:get_opt(cache_missed, Opts),
LifeTime = case gen_mod:get_opt(cache_life_time, Opts) of
infinity -> infinity;
I -> timer:seconds(I)
end,
[{max_size, MaxSize}, {cache_missed, CacheMissed}, {life_time, LifeTime}].
-spec use_cache(module(), binary()) -> boolean().
use_cache(Mod, Host) ->
case erlang:function_exported(Mod, use_cache, 1) of
true -> Mod:use_cache(Host);
false -> gen_mod:get_module_opt(Host, ?MODULE, use_cache)
end.
-spec cache_nodes(module(), binary()) -> [node()].
cache_nodes(Mod, Host) ->
case erlang:function_exported(Mod, cache_nodes, 1) of
true -> Mod:cache_nodes(Host);
false -> ejabberd_cluster:get_nodes()
end.
-spec delete_cache(module(), jid(), jid()) -> ok.
delete_cache(Mod, JID, Channel) ->
{LUser, LServer, _} = jid:tolower(JID),
{Chan, Service, _} = jid:tolower(Channel),
case use_cache(Mod, LServer) of
true ->
ets_cache:delete(?MIX_PAM_CACHE,
{LUser, LServer, Chan, Service},
cache_nodes(Mod, LServer));
false ->
ok
end.

View File

@ -0,0 +1,91 @@
%%%-------------------------------------------------------------------
%%% Author : Evgeny Khramtsov <ekhramtsov@process-one.net>
%%% Created : 4 Dec 2018 by Evgeny Khramtsov <ekhramtsov@process-one.net>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2018 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License along
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
%%%----------------------------------------------------------------------
-module(mod_mix_pam_mnesia).
-behaviour(mod_mix_pam).
%% API
-export([init/2, add_channel/3, get_channel/2,
get_channels/1, del_channel/2, del_channels/1,
use_cache/1]).
-record(mix_pam, {user_channel :: {binary(), binary(), binary(), binary()},
user :: {binary(), binary()},
id :: binary()}).
%%%===================================================================
%%% API
%%%===================================================================
init(_Host, _Opts) ->
case ejabberd_mnesia:create(?MODULE, mix_pam,
[{disc_only_copies, [node()]},
{attributes, record_info(fields, mix_pam)},
{index, [user]}]) of
{atomic, _} -> ok;
_ -> {error, db_failure}
end.
use_cache(Host) ->
case mnesia:table_info(mix_pam, storage_type) of
disc_only_copies ->
gen_mod:get_module_opt(Host, mod_mix_pam, use_cache);
_ ->
false
end.
add_channel(User, Channel, ID) ->
{LUser, LServer, _} = jid:tolower(User),
{Chan, Service, _} = jid:tolower(Channel),
mnesia:dirty_write(#mix_pam{user_channel = {LUser, LServer, Chan, Service},
user = {LUser, LServer},
id = ID}).
get_channel(User, Channel) ->
{LUser, LServer, _} = jid:tolower(User),
{Chan, Service, _} = jid:tolower(Channel),
case mnesia:dirty_read(mix_pam, {LUser, LServer, Chan, Service}) of
[#mix_pam{id = ID}] -> {ok, ID};
[] -> {error, notfound}
end.
get_channels(User) ->
{LUser, LServer, _} = jid:tolower(User),
Ret = mnesia:dirty_index_read(mix_pam, #mix_pam.user, {LUser, LServer}),
{ok, lists:map(
fun(#mix_pam{user_channel = {_, _, Chan, Service},
id = ID}) ->
{jid:make(Chan, Service), ID}
end, Ret)}.
del_channel(User, Channel) ->
{LUser, LServer, _} = jid:tolower(User),
{Chan, Service, _} = jid:tolower(Channel),
mnesia:dirty_delete(mix_pam, {LUser, LServer, Chan, Service}).
del_channels(User) ->
{LUser, LServer, _} = jid:tolower(User),
Ret = mnesia:dirty_index_read(mix_pam, #mix_pam.user, {LUser, LServer}),
lists:foreach(fun mnesia:dirty_delete_object/1, Ret).
%%%===================================================================
%%% Internal functions
%%%===================================================================

114
src/mod_mix_pam_sql.erl Normal file
View File

@ -0,0 +1,114 @@
%%%-------------------------------------------------------------------
%%% Author : Evgeny Khramtsov <ekhramtsov@process-one.net>
%%% Created : 4 Dec 2018 by Evgeny Khramtsov <ekhramtsov@process-one.net>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2018 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License along
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
%%%----------------------------------------------------------------------
-module(mod_mix_pam_sql).
-behaviour(mod_mix_pam).
-compile([{parse_transform, ejabberd_sql_pt}]).
%% API
-export([init/2, add_channel/3, get_channel/2,
get_channels/1, del_channel/2, del_channels/1]).
-include("logger.hrl").
-include("ejabberd_sql_pt.hrl").
%%%===================================================================
%%% API
%%%===================================================================
init(_Host, _Opts) ->
%% TODO
ok.
add_channel(User, Channel, ID) ->
{LUser, LServer, _} = jid:tolower(User),
{Chan, Service, _} = jid:tolower(Channel),
case ?SQL_UPSERT(LServer, "mix_pam",
["!channel=%(Chan)s",
"!service=%(Service)s",
"!username=%(LUser)s",
"!server_host=%(LServer)s",
"id=%(ID)s"]) of
ok -> ok;
_Err -> {error, db_failure}
end.
get_channel(User, Channel) ->
{LUser, LServer, _} = jid:tolower(User),
{Chan, Service, _} = jid:tolower(Channel),
case ejabberd_sql:sql_query(
LServer,
?SQL("select @(id)s from mix_pam where "
"channel=%(Chan)s and service=%(Service)s "
"and username=%(LUser)s and %(LServer)H")) of
{selected, [{ID}]} -> {ok, ID};
{selected, []} -> {error, notfound};
_Err -> {error, db_failure}
end.
get_channels(User) ->
{LUser, LServer, _} = jid:tolower(User),
SQL = ?SQL("select @(channel)s, @(service)s, @(id)s from mix_pam "
"where username=%(LUser)s and %(LServer)H"),
case ejabberd_sql:sql_query(LServer, SQL) of
{selected, Ret} ->
{ok, lists:filtermap(
fun({Chan, Service, ID}) ->
case jid:make(Chan, Service) of
error ->
report_corrupted(SQL),
false;
JID ->
{true, {JID, ID}}
end
end, Ret)};
_Err ->
{error, db_failure}
end.
del_channel(User, Channel) ->
{LUser, LServer, _} = jid:tolower(User),
{Chan, Service, _} = jid:tolower(Channel),
case ejabberd_sql:sql_query(
LServer,
?SQL("delete from mix_pam where "
"channel=%(Chan)s and service=%(Service)s "
"and username=%(LUser)s and %(LServer)H")) of
{updated, _} -> ok;
_Err -> {error, db_failure}
end.
del_channels(User) ->
{LUser, LServer, _} = jid:tolower(User),
case ejabberd_sql:sql_query(
LServer,
?SQL("delete from mix_pam where "
"username=%(LUser)s and %(LServer)H")) of
{updated, _} -> ok;
_Err -> {error, db_failure}
end.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec report_corrupted(iolist()) -> ok.
report_corrupted(SQL) ->
?ERROR_MSG("Corrupted values returned by SQL request: ~s", [SQL]).

236
src/mod_mix_sql.erl Normal file
View File

@ -0,0 +1,236 @@
%%%-------------------------------------------------------------------
%%% Created : 1 Dec 2018 by Evgeny Khramtsov <ekhramtsov@process-one.net>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2018 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License along
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
%%%----------------------------------------------------------------------
-module(mod_mix_sql).
-behaviour(mod_mix).
-compile([{parse_transform, ejabberd_sql_pt}]).
%% API
-export([init/2]).
-export([set_channel/6, get_channels/2, get_channel/3, del_channel/3]).
-export([set_participant/6, get_participant/4, get_participants/3, del_participant/4]).
-export([subscribe/5, unsubscribe/4, unsubscribe/5, get_subscribed/4]).
-include("logger.hrl").
-include("ejabberd_sql_pt.hrl").
%%%===================================================================
%%% API
%%%===================================================================
init(_Host, _Opts) ->
%% TODO
ok.
set_channel(LServer, Channel, Service, CreatorJID, Hidden, Key) ->
{User, Domain, _} = jid:tolower(CreatorJID),
RawJID = jid:encode(jid:remove_resource(CreatorJID)),
case ?SQL_UPSERT(LServer, "mix_channel",
["!channel=%(Channel)s",
"!service=%(Service)s",
"username=%(User)s",
"domain=%(Domain)s",
"jid=%(RawJID)s",
"hidden=%(Hidden)b",
"hmac_key=%(Key)s"]) of
ok -> ok;
_Err -> {error, db_failure}
end.
get_channels(LServer, Service) ->
case ejabberd_sql:sql_query(
LServer,
?SQL("select @(channel)s, @(hidden)b from mix_channel "
"where service=%(Service)s")) of
{selected, Ret} ->
{ok, [Channel || {Channel, Hidden} <- Ret, Hidden == false]};
_Err ->
{error, db_failure}
end.
get_channel(LServer, Channel, Service) ->
SQL = ?SQL("select @(jid)s, @(hidden)b, @(hmac_key)s from mix_channel "
"where channel=%(Channel)s and service=%(Service)s"),
case ejabberd_sql:sql_query(LServer, SQL) of
{selected, [{RawJID, Hidden, Key}]} ->
try jid:decode(RawJID) of
JID -> {ok, {JID, Hidden, Key}}
catch _:{bad_jid, _} ->
report_corrupted(jid, SQL),
{error, db_failure}
end;
{selected, []} -> {error, notfound};
_Err -> {error, db_failure}
end.
del_channel(LServer, Channel, Service) ->
F = fun() ->
ejabberd_sql:sql_query_t(
?SQL("delete from mix_channel where "
"channel=%(Channel)s and service=%(Service)s")),
ejabberd_sql:sql_query_t(
?SQL("delete from mix_participant where "
"channel=%(Channel)s and service=%(Service)s")),
ejabberd_sql:sql_query_t(
?SQL("delete from mix_subscription where "
"channel=%(Channel)s and service=%(Service)s"))
end,
case ejabberd_sql:sql_transaction(LServer, F) of
{atomic, _} -> ok;
_Err -> {error, db_failure}
end.
set_participant(LServer, Channel, Service, JID, ID, Nick) ->
{User, Domain, _} = jid:tolower(JID),
RawJID = jid:encode(jid:remove_resource(JID)),
case ?SQL_UPSERT(LServer, "mix_participant",
["!channel=%(Channel)s",
"!service=%(Service)s",
"!username=%(User)s",
"!domain=%(Domain)s",
"jid=%(RawJID)s",
"id=%(ID)s",
"nick=%(Nick)s"]) of
ok -> ok;
_Err -> {error, db_failure}
end.
get_participant(LServer, Channel, Service, JID) ->
{User, Domain, _} = jid:tolower(JID),
case ejabberd_sql:sql_query(
LServer,
?SQL("select @(id)s, @(nick)s from mix_participant "
"where channel=%(Channel)s and service=%(Service)s "
"and username=%(User)s and domain=%(Domain)s")) of
{selected, [Ret]} -> {ok, Ret};
{selected, []} -> {error, notfound};
_Err -> {error, db_failure}
end.
get_participants(LServer, Channel, Service) ->
SQL = ?SQL("select @(jid)s, @(id)s, @(nick)s from mix_participant "
"where channel=%(Channel)s and service=%(Service)s"),
case ejabberd_sql:sql_query(LServer, SQL) of
{selected, Ret} ->
{ok, lists:filtermap(
fun({RawJID, ID, Nick}) ->
try jid:decode(RawJID) of
JID -> {true, {JID, ID, Nick}}
catch _:{bad_jid, _} ->
report_corrupted(jid, SQL),
false
end
end, Ret)};
_Err ->
{error, db_failure}
end.
del_participant(LServer, Channel, Service, JID) ->
{User, Domain, _} = jid:tolower(JID),
case ejabberd_sql:sql_query(
LServer,
?SQL("delete from mix_participant where "
"channel=%(Channel)s and service=%(Service)s "
"and username=%(User)s and domain=%(Domain)s")) of
{updated, _} -> ok;
_Err -> {error, db_failure}
end.
subscribe(_LServer, _Channel, _Service, _JID, []) ->
ok;
subscribe(LServer, Channel, Service, JID, Nodes) ->
{User, Domain, _} = jid:tolower(JID),
RawJID = jid:encode(jid:remove_resource(JID)),
F = fun() ->
lists:foreach(
fun(Node) ->
?SQL_UPSERT_T(
"mix_subscription",
["!channel=%(Channel)s",
"!service=%(Service)s",
"!username=%(User)s",
"!domain=%(Domain)s",
"!node=%(Node)s",
"jid=%(RawJID)s"])
end, Nodes)
end,
case ejabberd_sql:sql_transaction(LServer, F) of
{atomic, _} -> ok;
_Err -> {error, db_failure}
end.
get_subscribed(LServer, Channel, Service, Node) ->
SQL = ?SQL("select @(jid)s from mix_subscription "
"where channel=%(Channel)s and service=%(Service)s "
"and node=%(Node)s"),
case ejabberd_sql:sql_query(LServer, SQL) of
{selected, Ret} ->
{ok, lists:filtermap(
fun({RawJID}) ->
try jid:decode(RawJID) of
JID -> {true, JID}
catch _:{bad_jid, _} ->
report_corrupted(jid, SQL),
false
end
end, Ret)};
_Err ->
{error, db_failure}
end.
unsubscribe(LServer, Channel, Service, JID) ->
{User, Domain, _} = jid:tolower(JID),
case ejabberd_sql:sql_query(
LServer,
?SQL("delete from mix_subscription "
"where channel=%(Channel)s and service=%(Service)s "
"and username=%(User)s and domain=%(Domain)s")) of
{updated, _} -> ok;
_Err -> {error, db_failure}
end.
unsubscribe(_LServer, _Channel, _Service, _JID, []) ->
ok;
unsubscribe(LServer, Channel, Service, JID, Nodes) ->
{User, Domain, _} = jid:tolower(JID),
F = fun() ->
lists:foreach(
fun(Node) ->
ejabberd_sql:sql_query_t(
?SQL("delete from mix_subscription "
"where channel=%(Channel)s "
"and service=%(Service)s "
"and username=%(User)s "
"and domain=%(Domain)s "
"and node=%(Node)s"))
end, Nodes)
end,
case ejabberd_sql:sql_transaction(LServer, F) of
{atomic, ok} -> ok;
_Err -> {error, db_failure}
end.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec report_corrupted(atom(), iolist()) -> ok.
report_corrupted(Column, SQL) ->
?ERROR_MSG("Corrupted value of '~s' column returned by "
"SQL request: ~s", [Column, SQL]).

View File

@ -34,7 +34,7 @@
%% ejabberd_hooks callbacks.
-export([disco_sm_features/5, c2s_session_pending/1, c2s_copy_session/2,
c2s_handle_cast/2, c2s_stanza/3, mam_message/6, offline_message/1,
c2s_handle_cast/2, c2s_stanza/3, mam_message/7, offline_message/1,
remove_user/2]).
%% gen_iq_handler callback.
@ -352,8 +352,8 @@ c2s_stanza(State, _Pkt, _SendResult) ->
State.
-spec mam_message(message() | drop, binary(), binary(), jid(),
chat | groupchat, recv | send) -> message().
mam_message(#message{} = Pkt, LUser, LServer, _Peer, chat, Dir) ->
binary(), chat | groupchat, recv | send) -> message().
mam_message(#message{} = Pkt, LUser, LServer, _Peer, _Nick, chat, Dir) ->
case lookup_sessions(LUser, LServer) of
{ok, [_|_] = Clients} ->
case drop_online_sessions(LUser, LServer, Clients) of
@ -367,7 +367,7 @@ mam_message(#message{} = Pkt, LUser, LServer, _Peer, chat, Dir) ->
ok
end,
Pkt;
mam_message(Pkt, _LUser, _LServer, _Peer, _Type, _Dir) ->
mam_message(Pkt, _LUser, _LServer, _Peer, _Nick, _Type, _Dir) ->
Pkt.
-spec offline_message(message()) -> message().