Add MAM (XEP-0313) support

This commit is contained in:
Evgeniy Khramtsov 2015-06-22 16:56:08 +03:00
parent 6631078884
commit 83cce468a5
15 changed files with 2624 additions and 42 deletions

View File

@ -141,6 +141,8 @@
-define(NS_CAPTCHA, <<"urn:xmpp:captcha">>).
-define(NS_MEDIA, <<"urn:xmpp:media-element">>).
-define(NS_BOB, <<"urn:xmpp:bob">>).
-define(NS_MAM_TMP, <<"urn:xmpp:mam:tmp">>).
-define(NS_MAM_0, <<"urn:xmpp:mam:0">>).
-define(NS_PING, <<"urn:xmpp:ping">>).
-define(NS_CARBONS_2, <<"urn:xmpp:carbons:2">>).
-define(NS_CARBONS_1, <<"urn:xmpp:carbons:1">>).

View File

@ -270,3 +270,27 @@ CREATE TABLE caps_features (
);
CREATE INDEX i_caps_features_node_subnode ON caps_features (node, subnode);
CREATE TABLE archive (
username text NOT NULL,
timestamp BIGINT UNSIGNED NOT NULL,
peer text NOT NULL,
bare_peer text NOT NULL,
xml text NOT NULL,
txt text,
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX i_username ON archive(username);
CREATE INDEX i_timestamp ON archive(timestamp);
CREATE INDEX i_peer ON archive(peer);
CREATE INDEX i_bare_peer ON archive(bare_peer);
CREATE TABLE archive_prefs (
username text NOT NULL PRIMARY KEY,
def text NOT NULL,
always text NOT NULL,
never text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);

View File

@ -85,6 +85,31 @@ CREATE TABLE spool (
CREATE INDEX i_despool USING BTREE ON spool(username);
CREATE INDEX i_spool_created_at USING BTREE ON spool(created_at);
CREATE TABLE archive (
username varchar(250) NOT NULL,
timestamp BIGINT UNSIGNED NOT NULL,
peer varchar(250) NOT NULL,
bare_peer varchar(250) NOT NULL,
xml text NOT NULL,
txt text,
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB CHARACTER SET utf8;
CREATE FULLTEXT INDEX i_text ON archive(txt);
CREATE INDEX i_username USING BTREE ON archive(username);
CREATE INDEX i_timestamp USING BTREE ON archive(timestamp);
CREATE INDEX i_peer USING BTREE ON archive(peer);
CREATE INDEX i_bare_peer USING BTREE ON archive(bare_peer);
CREATE TABLE archive_prefs (
username varchar(250) NOT NULL PRIMARY KEY,
def text NOT NULL,
always text NOT NULL,
never text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB CHARACTER SET utf8;
CREATE TABLE vcard (
username varchar(250) PRIMARY KEY,
vcard mediumtext NOT NULL,

View File

@ -85,6 +85,29 @@ CREATE TABLE spool (
CREATE INDEX i_despool ON spool USING btree (username);
CREATE TABLE archive (
username text NOT NULL,
timestamp BIGINT NOT NULL,
peer text NOT NULL,
bare_peer text NOT NULL,
xml text NOT NULL,
txt text,
id SERIAL,
created_at TIMESTAMP NOT NULL DEFAULT now()
);
CREATE INDEX i_username ON archive USING btree (username);
CREATE INDEX i_timestamp ON archive USING btree (timestamp);
CREATE INDEX i_peer ON archive USING btree (peer);
CREATE INDEX i_bare_peer ON archive USING btree (bare_peer);
CREATE TABLE archive_prefs (
username text NOT NULL PRIMARY KEY,
def text NOT NULL,
always text NOT NULL,
never text NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT now()
);
CREATE TABLE vcard (
username text PRIMARY KEY,
@ -299,6 +322,6 @@ CREATE TABLE sm (
info text NOT NULL
);
CREATE UNIQUE INDEX i_sid ON sm USING btree (usec, pid);
CREATE INDEX i_node ON sm USING btree (node);
CREATE INDEX i_username ON sm USING btree (username);
CREATE UNIQUE INDEX i_sm_sid ON sm USING btree (usec, pid);
CREATE INDEX i_sm_node ON sm USING btree (node);
CREATE INDEX i_sm_username ON sm USING btree (username);

View File

@ -1260,12 +1260,14 @@ session_established2(El, StateData) ->
_ ->
case Name of
<<"presence">> ->
PresenceEl =
PresenceEl0 =
ejabberd_hooks:run_fold(c2s_update_presence,
Server, NewEl,
[User, Server]),
ejabberd_hooks:run(user_send_packet, Server,
[FromJID, ToJID, PresenceEl]),
PresenceEl =
ejabberd_hooks:run_fold(
user_send_packet, Server, PresenceEl0,
[NewStateData, FromJID, ToJID]),
case ToJID of
#jid{user = User, server = Server,
resource = <<"">>} ->
@ -1285,16 +1287,18 @@ session_established2(El, StateData) ->
process_privacy_iq(FromJID, ToJID, IQ,
NewStateData);
_ ->
ejabberd_hooks:run(user_send_packet, Server,
[FromJID, ToJID, NewEl]),
NewEl0 = ejabberd_hooks:run_fold(
user_send_packet, Server, NewEl,
[NewStateData, FromJID, ToJID]),
check_privacy_route(FromJID, NewStateData,
FromJID, ToJID, NewEl)
FromJID, ToJID, NewEl0)
end;
<<"message">> ->
ejabberd_hooks:run(user_send_packet, Server,
[FromJID, ToJID, NewEl]),
NewEl0 = ejabberd_hooks:run_fold(
user_send_packet, Server, NewEl,
[NewStateData, FromJID, ToJID]),
check_privacy_route(FromJID, NewStateData, FromJID,
ToJID, NewEl);
ToJID, NewEl0);
_ -> NewStateData
end
end,
@ -1692,11 +1696,13 @@ handle_info({route, From, To,
Attrs2 =
jlib:replace_from_to_attrs(jlib:jid_to_string(From),
jlib:jid_to_string(To), NewAttrs),
FixedPacket = #xmlel{name = Name, attrs = Attrs2, children = Els},
FixedPacket0 = #xmlel{name = Name, attrs = Attrs2, children = Els},
FixedPacket = ejabberd_hooks:run_fold(
user_receive_packet,
NewState#state.server,
FixedPacket0,
[NewState, NewState#state.jid, From, To]),
SentStateData = send_packet(NewState, FixedPacket),
ejabberd_hooks:run(user_receive_packet,
SentStateData#state.server,
[SentStateData#state.jid, From, To, FixedPacket]),
ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]),
fsm_next_state(StateName, SentStateData);
true ->

View File

@ -47,7 +47,7 @@
-export([init/1, handle_info/2, handle_call/3,
handle_cast/2, terminate/2, code_change/3]).
-export([user_send_packet/3, user_receive_packet/4,
-export([user_send_packet/4, user_receive_packet/5,
c2s_presence_in/2, c2s_filter_packet/6,
c2s_broadcast_recipients/6, mod_opt_type/1]).
@ -143,11 +143,12 @@ read_caps([_ | Tail], Result) ->
read_caps(Tail, Result);
read_caps([], Result) -> Result.
user_send_packet(#jid{luser = User, lserver = Server} = From,
user_send_packet(#xmlel{name = <<"presence">>, attrs = Attrs,
children = Els} = Pkt,
_C2SState,
#jid{luser = User, lserver = Server} = From,
#jid{luser = User, lserver = Server,
lresource = <<"">>},
#xmlel{name = <<"presence">>, attrs = Attrs,
children = Els} = Pkt) ->
lresource = <<"">>}) ->
Type = xml:get_attr_s(<<"type">>, Attrs),
if Type == <<"">>; Type == <<"available">> ->
case read_caps(Els) of
@ -158,13 +159,14 @@ user_send_packet(#jid{luser = User, lserver = Server} = From,
true -> ok
end,
Pkt;
user_send_packet( _From, _To, Pkt) ->
user_send_packet(Pkt, _C2SState, _From, _To) ->
Pkt.
user_receive_packet(#jid{lserver = Server},
From, _To,
#xmlel{name = <<"presence">>, attrs = Attrs,
children = Els} = Pkt) ->
user_receive_packet(#xmlel{name = <<"presence">>, attrs = Attrs,
children = Els} = Pkt,
_C2SState,
#jid{lserver = Server},
From, _To) ->
Type = xml:get_attr_s(<<"type">>, Attrs),
IsRemote = not lists:member(From#jid.lserver, ?MYHOSTS),
if IsRemote and
@ -177,7 +179,7 @@ user_receive_packet(#jid{lserver = Server},
true -> ok
end,
Pkt;
user_receive_packet( _JID, _From, _To, Pkt) ->
user_receive_packet(Pkt, _C2SState, _JID, _From, _To) ->
Pkt.
-spec caps_stream_features([xmlel()], binary()) -> [xmlel()].

View File

@ -34,7 +34,7 @@
-export([start/2,
stop/1]).
-export([user_send_packet/3, user_receive_packet/4,
-export([user_send_packet/4, user_receive_packet/5,
iq_handler2/3, iq_handler1/3, remove_connection/4,
is_carbon_copy/1, mod_opt_type/1]).
@ -124,10 +124,10 @@ iq_handler(From, _To, #iq{type=set, sub_el = #xmlel{name = Operation, children
iq_handler(_From, _To, IQ, _CC)->
IQ#iq{type=error, sub_el = [?ERR_NOT_ALLOWED]}.
user_send_packet(From, To, Packet) ->
user_send_packet(Packet, _C2SState, From, To) ->
check_and_forward(From, To, Packet, sent).
user_receive_packet(JID, _From, To, Packet) ->
user_receive_packet(Packet, _C2SState, JID, _From, To) ->
check_and_forward(JID, To, Packet, received).
% verifier si le trafic est local
@ -142,14 +142,15 @@ check_and_forward(JID, To, Packet, Direction)->
true ->
case is_carbon_copy(Packet) of
false ->
send_copies(JID, To, Packet, Direction);
send_copies(JID, To, Packet, Direction),
Packet;
true ->
%% stop the hook chain, we don't want mod_logdb to register
%% this message (duplicate)
stop
{stop, Packet}
end;
_ ->
ok
Packet
end.
remove_connection(User, Server, Resource, _Status)->

823
src/mod_mam.erl Normal file
View File

@ -0,0 +1,823 @@
%%%-------------------------------------------------------------------
%%% @author Evgeniy Khramtsov <ekhramtsov@process-one.net>
%%% @doc
%%% Message Archive Management (XEP-0313)
%%% @end
%%% Created : 4 Jul 2013 by Evgeniy Khramtsov <ekhramtsov@process-one.net>
%%%
%%%
%%% ejabberd, Copyright (C) 2013-2015 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License
%%% along with this program; if not, write to the Free Software
%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
%%% 02111-1307 USA
%%%
%%%-------------------------------------------------------------------
-module(mod_mam).
-behaviour(gen_mod).
%% API
-export([start/2, stop/1]).
-export([user_send_packet/4, user_receive_packet/5,
process_iq/3, remove_user/2, mod_opt_type/1]).
-include_lib("stdlib/include/ms_transform.hrl").
-include("jlib.hrl").
-include("logger.hrl").
-record(archive_msg,
{us = {<<"">>, <<"">>} :: {binary(), binary()} | '$2',
id = <<>> :: binary() | '_',
timestamp = now() :: erlang:timestamp() | '_' | '$1',
peer = {<<"">>, <<"">>, <<"">>} :: ljid() | '_' | '$3',
bare_peer = {<<"">>, <<"">>, <<"">>} :: ljid() | '_' | '$3',
packet = #xmlel{} :: xmlel() | '_'}).
-record(archive_prefs,
{us = {<<"">>, <<"">>} :: {binary(), binary()},
default = never :: never | always | roster,
always = [] :: [ljid()],
never = [] :: [ljid()]}).
%%%===================================================================
%%% API
%%%===================================================================
start(Host, Opts) ->
IQDisc = gen_mod:get_opt(iqdisc, Opts, fun gen_iq_handler:check_type/1,
one_queue),
DBType = gen_mod:db_type(Host, Opts),
init_db(DBType, Host),
init_cache(DBType, Opts),
gen_iq_handler:add_iq_handler(ejabberd_local, Host,
?NS_MAM_TMP, ?MODULE, process_iq, IQDisc),
gen_iq_handler:add_iq_handler(ejabberd_sm, Host,
?NS_MAM_TMP, ?MODULE, process_iq, IQDisc),
gen_iq_handler:add_iq_handler(ejabberd_local, Host,
?NS_MAM_0, ?MODULE, process_iq, IQDisc),
gen_iq_handler:add_iq_handler(ejabberd_sm, Host,
?NS_MAM_0, ?MODULE, process_iq, IQDisc),
ejabberd_hooks:add(user_receive_packet, Host, ?MODULE,
user_receive_packet, 500),
ejabberd_hooks:add(user_send_packet, Host, ?MODULE,
user_send_packet, 500),
ejabberd_hooks:add(remove_user, Host, ?MODULE,
remove_user, 50),
ejabberd_hooks:add(anonymous_purge_hook, Host, ?MODULE,
remove_user, 50),
ok.
init_db(odbc, Host) ->
Muchost = gen_mod:get_module_opt_host(Host, mod_muc,
<<"conference.@HOST@">>),
ets:insert(ejabberd_modules, {ejabberd_module, {mod_mam, Muchost},
[{db_type, odbc}]}),
mnesia:dirty_write({local_config, {modules,Muchost},
[{mod_mam, [{db_type, odbc}]}]});
init_db(mnesia, _Host) ->
mnesia:create_table(archive_msg,
[{disc_only_copies, [node()]},
{type, bag},
{attributes, record_info(fields, archive_msg)}]),
mnesia:create_table(archive_prefs,
[{disc_only_copies, [node()]},
{attributes, record_info(fields, archive_prefs)}]);
init_db(_, _) ->
ok.
init_cache(_DBType, Opts) ->
MaxSize = gen_mod:get_opt(cache_size, Opts,
fun(I) when is_integer(I), I>0 -> I end,
1000),
LifeTime = gen_mod:get_opt(cache_life_time, Opts,
fun(I) when is_integer(I), I>0 -> I end,
timer:hours(1) div 1000),
cache_tab:new(archive_prefs, [{max_size, MaxSize},
{life_time, LifeTime}]).
stop(Host) ->
ejabberd_hooks:delete(user_send_packet, Host, ?MODULE,
user_send_packet, 500),
ejabberd_hooks:delete(user_receive_packet, Host, ?MODULE,
user_receive_packet, 500),
gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MAM_TMP),
gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MAM_TMP),
gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MAM_0),
gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MAM_0),
ejabberd_hooks:delete(remove_user, Host, ?MODULE,
remove_user, 50),
ejabberd_hooks:delete(anonymous_purge_hook, Host,
?MODULE, remove_user, 50),
ok.
remove_user(User, Server) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
remove_user(LUser, LServer,
gen_mod:db_type(LServer, ?MODULE)).
remove_user(LUser, LServer, mnesia) ->
US = {LUser, LServer},
F = fun () ->
mnesia:delete({archive_msg, US}),
mnesia:delete({archive_prefs, US})
end,
mnesia:transaction(F);
remove_user(LUser, LServer, odbc) ->
SUser = ejabberd_odbc:escape(LUser),
ejabberd_odbc:sql_query(
LServer,
[<<"delete from archive where username='">>, SUser, <<"';">>]),
ejabberd_odbc:sql_query(
LServer,
[<<"delete from archive_prefs where username='">>, SUser, <<"';">>]).
user_receive_packet(Pkt, C2SState, JID, Peer, _To) ->
LUser = JID#jid.luser,
LServer = JID#jid.lserver,
case should_archive(Pkt) of
true ->
NewPkt = strip_my_archived_tag(Pkt, LServer),
case store(C2SState, NewPkt, LUser, LServer,
Peer, true, recv) of
{ok, ID} ->
Archived = #xmlel{name = <<"archived">>,
attrs = [{<<"by">>, LServer},
{<<"xmlns">>, ?NS_MAM_TMP},
{<<"id">>, ID}]},
NewEls = [Archived|NewPkt#xmlel.children],
NewPkt#xmlel{children = NewEls};
_ ->
NewPkt
end;
muc ->
Pkt;
false ->
Pkt
end.
user_send_packet(Pkt, C2SState, JID, Peer) ->
LUser = JID#jid.luser,
LServer = JID#jid.lserver,
case should_archive(Pkt) of
S when (S==true) ->
NewPkt = strip_my_archived_tag(Pkt, LServer),
store0(C2SState, jlib:replace_from_to(JID, Peer, NewPkt),
LUser, LServer, Peer, S, send),
NewPkt;
S when (S==muc) ->
NewPkt = strip_my_archived_tag(Pkt, LServer),
case store0(C2SState, jlib:replace_from_to(JID, Peer, NewPkt),
LUser, LServer, Peer, S, send) of
{ok, ID} ->
By = jlib:jid_to_string(Peer),
Archived = #xmlel{name = <<"archived">>,
attrs = [{<<"by">>, By}, {<<"xmlns">>, ?NS_MAM_TMP},
{<<"id">>, ID}]},
NewEls = [Archived|NewPkt#xmlel.children],
NewPkt#xmlel{children = NewEls};
_ ->
NewPkt
end;
false ->
Pkt
end.
process_iq(#jid{lserver = LServer} = From,
#jid{lserver = LServer} = To,
#iq{type = get, sub_el = #xmlel{name = <<"query">>} = SubEl} = IQ) ->
NS = xml:get_tag_attr_s(<<"xmlns">>, SubEl),
Fs = case NS of
?NS_MAM_TMP ->
lists:flatmap(
fun(#xmlel{name = <<"start">>} = El) ->
[{<<"start">>, [xml:get_tag_cdata(El)]}];
(#xmlel{name = <<"end">>} = El) ->
[{<<"end">>, [xml:get_tag_cdata(El)]}];
(#xmlel{name = <<"with">>} = El) ->
[{<<"with">>, [xml:get_tag_cdata(El)]}];
(#xmlel{name = <<"withroom">>} = El) ->
[{<<"withroom">>, [xml:get_tag_cdata(El)]}];
(#xmlel{name = <<"withtext">>} = El) ->
[{<<"withtext">>, [xml:get_tag_cdata(El)]}];
(#xmlel{name = <<"set">>}) ->
[{<<"set">>, SubEl}];
(_) ->
[]
end, SubEl#xmlel.children);
?NS_MAM_0 ->
case {xml:get_subtag_with_xmlns(SubEl, <<"x">>, ?NS_XDATA),
xml:get_subtag_with_xmlns(SubEl, <<"set">>, ?NS_RSM)} of
{#xmlel{} = XData, false} ->
jlib:parse_xdata_submit(XData);
{#xmlel{} = XData, #xmlel{}} ->
[{<<"set">>, SubEl} | jlib:parse_xdata_submit(XData)];
{false, #xmlel{}} ->
[{<<"set">>, SubEl}];
{false, false} ->
[]
end
end,
case catch lists:foldl(
fun({<<"start">>, [Data|_]}, {_, End, With, RSM}) ->
{{_, _, _} = jlib:datetime_string_to_timestamp(Data),
End, With, RSM};
({<<"end">>, [Data|_]}, {Start, _, With, RSM}) ->
{Start,
{_, _, _} = jlib:datetime_string_to_timestamp(Data),
With, RSM};
({<<"with">>, [Data|_]}, {Start, End, _, RSM}) ->
{Start, End, jlib:jid_tolower(jlib:string_to_jid(Data)), RSM};
({<<"withroom">>, [Data|_]}, {Start, End, _, RSM}) ->
{Start, End,
{room, jlib:jid_tolower(jlib:string_to_jid(Data))},
RSM};
({<<"withtext">>, [Data|_]}, {Start, End, _, RSM}) ->
{Start, End, {text, Data}, RSM};
({<<"set">>, El}, {Start, End, With, _}) ->
{Start, End, With, jlib:rsm_decode(El)};
(_, Acc) ->
Acc
end, {none, [], none, none}, Fs) of
{'EXIT', _} ->
IQ#iq{type = error, sub_el = [SubEl, ?ERR_BAD_REQUEST]};
{Start, End, With, RSM} ->
select_and_send(From, To, Start, End, With, RSM, IQ)
end;
process_iq(#jid{luser = LUser, lserver = LServer},
#jid{lserver = LServer},
#iq{type = set, sub_el = #xmlel{name = <<"prefs">>} = SubEl} = IQ) ->
try {case xml:get_tag_attr_s(<<"default">>, SubEl) of
<<"always">> -> always;
<<"never">> -> never;
<<"roster">> -> roster
end,
lists:foldl(
fun(#xmlel{name = <<"always">>, children = Els}, {A, N}) ->
{get_jids(Els) ++ A, N};
(#xmlel{name = <<"never">>, children = Els}, {A, N}) ->
{A, get_jids(Els) ++ N};
(_, {A, N}) ->
{A, N}
end, {[], []}, SubEl#xmlel.children)} of
{Default, {Always, Never}} ->
case write_prefs(LUser, LServer, LServer, Default,
lists:usort(Always), lists:usort(Never)) of
ok ->
IQ#iq{type = result, sub_el = []};
_Err ->
IQ#iq{type = error,
sub_el = [SubEl, ?ERR_INTERNAL_SERVER_ERROR]}
end
catch _:_ ->
IQ#iq{type = error, sub_el = [SubEl, ?ERR_BAD_REQUEST]}
end;
process_iq(_, _, #iq{sub_el = SubEl} = IQ) ->
IQ#iq{type = error, sub_el = [SubEl, ?ERR_NOT_ALLOWED]}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
should_archive(#xmlel{name = <<"message">>} = Pkt) ->
case {xml:get_attr_s(<<"type">>, Pkt#xmlel.attrs),
xml:get_subtag_cdata(Pkt, <<"body">>)} of
{<<"error">>, _} ->
false;
{<<"groupchat">>, _} ->
To = xml:get_attr_s(<<"to">>, Pkt#xmlel.attrs),
case (jlib:string_to_jid(To))#jid.resource of
<<"">> -> muc;
_ -> false
end;
{_, <<>>} ->
%% Empty body
false;
_ ->
true
end;
should_archive(#xmlel{}) ->
false.
strip_my_archived_tag(Pkt, LServer) ->
NewEls = lists:filter(
fun(#xmlel{name = <<"archived">>,
attrs = Attrs}) ->
case catch jlib:nameprep(
xml:get_attr_s(
<<"by">>, Attrs)) of
LServer ->
false;
_ ->
true
end;
(_) ->
true
end, Pkt#xmlel.children),
Pkt#xmlel{children = NewEls}.
should_archive_peer(C2SState,
#archive_prefs{default = Default,
always = Always,
never = Never},
Peer) ->
LPeer = jlib:jid_tolower(Peer),
case lists:member(LPeer, Always) of
true ->
true;
false ->
case lists:member(LPeer, Never) of
true ->
false;
false ->
case Default of
always -> true;
never -> false;
roster ->
case ejabberd_c2s:get_subscription(
LPeer, C2SState) of
both -> true;
from -> true;
to -> true;
_ -> false
end
end
end
end.
store0(C2SState, Pkt, LUser, LServer, Peer, Type, Dir) ->
case Type of
muc -> store(C2SState, Pkt, Peer#jid.luser, LServer,
jlib:jid_replace_resource(Peer, LUser), Type, Dir);
true -> store(C2SState, Pkt, LUser, LServer, Peer, Type, Dir)
end.
store(C2SState, Pkt, LUser, LServer, Peer, Type, Dir) ->
Prefs = get_prefs(LUser, LServer),
case should_archive_peer(C2SState, Prefs, Peer) of
true ->
do_store(Pkt, LUser, LServer, Peer, Type, Dir,
gen_mod:db_type(LServer, ?MODULE));
false ->
pass
end.
do_store(Pkt, LUser, LServer, Peer, Type, _Dir, mnesia) ->
LPeer = {PUser, PServer, _} = jlib:jid_tolower(Peer),
LServer2 = case Type of muc -> Peer#jid.lserver; _ -> LServer end,
TS = now(),
ID = jlib:integer_to_binary(now_to_usec(TS)),
case mnesia:dirty_write(
#archive_msg{us = {LUser, LServer2},
id = ID,
timestamp = TS,
peer = LPeer,
bare_peer = {PUser, PServer, <<>>},
packet = Pkt}) of
ok ->
{ok, ID};
Err ->
Err
end;
do_store(Pkt, LUser, LServer, Peer, _Type, _Dir, odbc) ->
TSinteger = now_to_usec(now()),
ID = TS = jlib:integer_to_binary(TSinteger),
BarePeer = jlib:jid_to_string(
jlib:jid_tolower(
jlib:jid_remove_resource(Peer))),
LPeer = jlib:jid_to_string(
jlib:jid_tolower(Peer)),
XML = xml:element_to_binary(Pkt),
Body = xml:get_subtag_cdata(Pkt, <<"body">>),
case ejabberd_odbc:sql_query(
LServer,
[<<"insert into archive (username, timestamp, "
"peer, bare_peer, xml, txt) values (">>,
<<"'">>, ejabberd_odbc:escape(LUser), <<"', ">>,
<<"'">>, TS, <<"', ">>,
<<"'">>, ejabberd_odbc:escape(LPeer), <<"', ">>,
<<"'">>, ejabberd_odbc:escape(BarePeer), <<"', ">>,
<<"'">>, ejabberd_odbc:escape(XML), <<"', ">>,
<<"'">>, ejabberd_odbc:escape(Body), <<"');">>]) of
{updated, _} ->
{ok, ID};
Err ->
Err
end.
write_prefs(LUser, LServer, Host, Default, Always, Never) ->
DBType = case gen_mod:db_type(Host, ?MODULE) of
odbc -> {odbc, Host};
DB -> DB
end,
Prefs = #archive_prefs{us = {LUser, LServer},
default = Default,
always = Always,
never = Never},
cache_tab:dirty_insert(
archive_prefs, {LUser, LServer}, Prefs,
fun() -> write_prefs(LUser, LServer, Prefs, DBType) end).
write_prefs(_LUser, _LServer, Prefs, mnesia) ->
mnesia:dirty_write(Prefs);
write_prefs(LUser, _LServer, #archive_prefs{default = Default,
never = Never,
always = Always},
{odbc, Host}) ->
SUser = ejabberd_odbc:escape(LUser),
SDefault = erlang:atom_to_binary(Default, utf8),
SAlways = ejabberd_odbc:encode_term(Always),
SNever = ejabberd_odbc:encode_term(Never),
case update(Host, <<"archive_prefs">>,
[<<"username">>, <<"def">>, <<"always">>, <<"never">>],
[SUser, SDefault, SAlways, SNever],
[<<"username='">>, SUser, <<"'">>]) of
{updated, _} ->
ok;
Err ->
Err
end.
get_prefs(LUser, LServer) ->
DBType = gen_mod:db_type(LServer, ?MODULE),
Res = cache_tab:lookup(archive_prefs, {LUser, LServer},
fun() -> get_prefs(LUser, LServer,
DBType)
end),
case Res of
{ok, Prefs} ->
Prefs;
error ->
Default = gen_mod:get_module_opt(
LServer, ?MODULE, default,
fun(always) -> always;
(never) -> never;
(roster) -> roster
end, never),
#archive_prefs{us = {LUser, LServer}, default = Default}
end.
get_prefs(LUser, LServer, mnesia) ->
case mnesia:dirty_read(archive_prefs, {LUser, LServer}) of
[Prefs] ->
{ok, Prefs};
_ ->
error
end;
get_prefs(LUser, LServer, odbc) ->
case ejabberd_odbc:sql_query(
LServer,
[<<"select def, always, never from archive_prefs ">>,
<<"where username='">>,
ejabberd_odbc:escape(LUser), <<"';">>]) of
{selected, _, [[SDefault, SAlways, SNever]]} ->
Default = erlang:binary_to_existing_atom(SDefault, utf8),
Always = ejabberd_odbc:decode_term(SAlways),
Never = ejabberd_odbc:decode_term(SNever),
{ok, #archive_prefs{us = {LUser, LServer},
default = Default,
always = Always,
never = Never}};
_ ->
error
end.
select_and_send(#jid{lserver = LServer} = From,
To, Start, End, With, RSM, IQ) ->
DBType = case gen_mod:db_type(LServer, ?MODULE) of
odbc -> {odbc, LServer};
DB -> DB
end,
select_and_send(From, To, Start, End, With, RSM, IQ,
DBType).
select_and_send(From, To, Start, End, With, RSM, IQ, DBType) ->
{Msgs, Count} = select_and_start(From, To, Start, End, With,
RSM, DBType),
SortedMsgs = lists:keysort(2, Msgs),
send(From, To, SortedMsgs, RSM, Count, IQ).
select_and_start(From, _To, StartUser, End, With, RSM, DB) ->
{JidRequestor, Start, With2} = case With of
{room, {LUserRoom, LServerRoom, <<>>} = WithJid} ->
JR = jlib:make_jid(LUserRoom,LServerRoom,<<>>),
St = StartUser,
{JR, St, WithJid};
_ ->
{From, StartUser, With}
end,
select(JidRequestor, Start, End, With2, RSM, DB).
select(#jid{luser = LUser, lserver = LServer} = JidRequestor,
Start, End, With, RSM, mnesia) ->
MS = make_matchspec(LUser, LServer, Start, End, With),
Msgs = mnesia:dirty_select(archive_msg, MS),
FilteredMsgs = filter_by_rsm(Msgs, RSM),
Count = length(Msgs),
{lists:map(
fun(Msg) ->
{Msg#archive_msg.id,
jlib:binary_to_integer(Msg#archive_msg.id),
msg_to_el(Msg, JidRequestor)}
end, FilteredMsgs), Count};
select(#jid{luser = LUser, lserver = LServer} = JidRequestor,
Start, End, With, RSM, {odbc, Host}) ->
{Query, CountQuery} = make_sql_query(LUser, LServer,
Start, End, With, RSM),
case {ejabberd_odbc:sql_query(Host, Query),
ejabberd_odbc:sql_query(Host, CountQuery)} of
{{selected, _, Res}, {selected, _, [[Count]]}} ->
{lists:map(
fun([TS, XML, PeerBin]) ->
#xmlel{} = El = xml_stream:parse_element(XML),
Now = usec_to_now(jlib:binary_to_integer(TS)),
PeerJid = jlib:jid_tolower(jlib:string_to_jid(PeerBin)),
{TS, jlib:binary_to_integer(TS),
msg_to_el(#archive_msg{timestamp = Now,
packet = El,
peer = PeerJid},
JidRequestor)}
end, Res), jlib:binary_to_integer(Count)};
_ ->
{[], 0}
end.
msg_to_el(#archive_msg{timestamp = TS, packet = Pkt1, peer = Peer},
JidRequestor) ->
Delay = jlib:now_to_utc_string(TS),
Pkt = maybe_update_from_to(Pkt1, JidRequestor, Peer),
#xmlel{name = <<"forwarded">>,
attrs = [{<<"xmlns">>, ?NS_FORWARD}],
children = [#xmlel{name = <<"delay">>,
attrs = [{<<"xmlns">>, ?NS_DELAY},
{<<"stamp">>, Delay}]},
xml:replace_tag_attr(
<<"xmlns">>, <<"jabber:client">>, Pkt)]}.
maybe_update_from_to(Pkt, _JIDRequestor, undefined) ->
Pkt;
maybe_update_from_to(Pkt, JidRequestor, Peer) ->
case xml:get_attr_s(<<"type">>, Pkt#xmlel.attrs) of
<<"groupchat">> ->
Pkt2 = xml:replace_tag_attr(<<"to">>,
jlib:jid_to_string(JidRequestor),
Pkt),
xml:replace_tag_attr(<<"from">>, jlib:jid_to_string(Peer),
Pkt2);
_ -> Pkt
end.
send(From, To, Msgs, RSM, Count, #iq{sub_el = SubEl} = IQ) ->
QID = xml:get_tag_attr_s(<<"queryid">>, SubEl),
NS = xml:get_tag_attr_s(<<"xmlns">>, SubEl),
QIDAttr = if QID /= <<>> ->
[{<<"queryid">>, QID}];
true ->
[]
end,
Els = lists:map(
fun({ID, _IDInt, El}) ->
#xmlel{name = <<"message">>,
children = [#xmlel{name = <<"result">>,
attrs = [{<<"xmlns">>, NS},
{<<"id">>, ID}|QIDAttr],
children = [El]}]}
end, Msgs),
RSMOut = make_rsm_out(Msgs, RSM, Count, QIDAttr, NS),
case NS of
?NS_MAM_TMP ->
lists:foreach(
fun(El) ->
ejabberd_router:route(To, From, El)
end, Els),
IQ#iq{type = result, sub_el = RSMOut};
?NS_MAM_0 ->
ejabberd_router:route(
To, From, jlib:iq_to_xml(IQ#iq{type = result, sub_el = []})),
lists:foreach(
fun(El) ->
ejabberd_router:route(To, From, El)
end, Els),
ejabberd_router:route(
To, From, #xmlel{name = <<"message">>,
children = RSMOut}),
ignore
end.
make_rsm_out(_Msgs, none, _Count, _QIDAttr, ?NS_MAM_TMP) ->
[];
make_rsm_out(_Msgs, none, _Count, QIDAttr, ?NS_MAM_0) ->
[#xmlel{name = <<"fin">>, attrs = [{<<"xmlns">>, ?NS_MAM_0}|QIDAttr]}];
make_rsm_out([], #rsm_in{}, Count, QIDAttr, NS) ->
Tag = if NS == ?NS_MAM_TMP -> <<"query">>;
true -> <<"fin">>
end,
[#xmlel{name = Tag, attrs = [{<<"xmlns">>, NS}|QIDAttr],
children = jlib:rsm_encode(#rsm_out{count = Count})}];
make_rsm_out([{FirstID, _, _}|_] = Msgs, #rsm_in{}, Count, QIDAttr, NS) ->
{LastID, _, _} = lists:last(Msgs),
Tag = if NS == ?NS_MAM_TMP -> <<"query">>;
true -> <<"fin">>
end,
[#xmlel{name = Tag, attrs = [{<<"xmlns">>, NS}|QIDAttr],
children = jlib:rsm_encode(
#rsm_out{first = FirstID, count = Count,
last = LastID})}].
filter_by_rsm(Msgs, none) ->
Msgs;
filter_by_rsm(_Msgs, #rsm_in{max = Max}) when Max =< 0 ->
[];
filter_by_rsm(Msgs, #rsm_in{max = Max, direction = Direction, id = ID}) ->
NewMsgs = case Direction of
aft ->
lists:filter(
fun(#archive_msg{id = I}) ->
I > ID
end, Msgs);
before ->
lists:foldl(
fun(#archive_msg{id = I} = Msg, Acc) when I < ID ->
[Msg|Acc];
(_, Acc) ->
Acc
end, [], Msgs);
_ ->
Msgs
end,
filter_by_max(NewMsgs, Max).
filter_by_max(Msgs, undefined) ->
Msgs;
filter_by_max(Msgs, Len) when is_integer(Len), Len >= 0 ->
lists:sublist(Msgs, Len);
filter_by_max(_Msgs, _Junk) ->
[].
make_matchspec(LUser, LServer, Start, End, {_, _, <<>>} = With) ->
ets:fun2ms(
fun(#archive_msg{timestamp = TS,
us = US,
bare_peer = BPeer} = Msg)
when Start =< TS, End >= TS,
US == {LUser, LServer},
BPeer == With ->
Msg
end);
make_matchspec(LUser, LServer, Start, End, {_, _, _} = With) ->
ets:fun2ms(
fun(#archive_msg{timestamp = TS,
us = US,
peer = Peer} = Msg)
when Start =< TS, End >= TS,
US == {LUser, LServer},
Peer == With ->
Msg
end);
make_matchspec(LUser, LServer, Start, End, none) ->
ets:fun2ms(
fun(#archive_msg{timestamp = TS,
us = US,
peer = Peer} = Msg)
when Start =< TS, End >= TS,
US == {LUser, LServer} ->
Msg
end).
make_sql_query(LUser, _LServer, Start, End, With, RSM) ->
{Max, Direction, ID} = case RSM of
#rsm_in{} ->
{RSM#rsm_in.max,
RSM#rsm_in.direction,
RSM#rsm_in.id};
none ->
{none, none, none}
end,
LimitClause = if is_integer(Max), Max >= 0 ->
[<<" limit ">>, jlib:integer_to_binary(Max)];
true ->
[]
end,
WithClause = case With of
{text, <<>>} ->
[];
{text, Txt} ->
[<<" and match (txt) against ('">>,
ejabberd_odbc:escape(Txt), <<"')">>];
{_, _, <<>>} ->
[<<" and bare_peer='">>,
ejabberd_odbc:escape(jlib:jid_to_string(With)),
<<"'">>];
{_, _, _} ->
[<<" and peer='">>,
ejabberd_odbc:escape(jlib:jid_to_string(With)),
<<"'">>];
none ->
[]
end,
DirectionClause = case catch jlib:binary_to_integer(ID) of
I when is_integer(I), I >= 0 ->
case Direction of
before ->
[<<" and timestamp < ">>, ID,
<<" order by timestamp desc">>];
aft ->
[<<" and timestamp > ">>, ID,
<<" order by timestamp asc">>];
_ ->
[]
end;
_ ->
[]
end,
StartClause = case Start of
{_, _, _} ->
[<<" and timestamp >= ">>,
jlib:integer_to_binary(now_to_usec(Start))];
_ ->
[]
end,
EndClause = case End of
{_, _, _} ->
[<<" and timestamp <= ">>,
jlib:integer_to_binary(now_to_usec(End))];
_ ->
[]
end,
SUser = ejabberd_odbc:escape(LUser),
{[<<"select timestamp, xml, peer from archive where username='">>,
SUser, <<"'">>] ++ WithClause ++ StartClause ++ EndClause ++
DirectionClause ++ LimitClause ++ [<<";">>],
[<<"select count(*) from archive where username='">>,
SUser, <<"'">>] ++ WithClause ++ StartClause ++ EndClause ++ [<<";">>]}.
now_to_usec({MSec, Sec, USec}) ->
(MSec*1000000 + Sec)*1000000 + USec.
usec_to_now(Int) ->
Secs = Int div 1000000,
USec = Int rem 1000000,
MSec = Secs div 1000000,
Sec = Secs rem 1000000,
{MSec, Sec, USec}.
get_jids(Els) ->
lists:flatmap(
fun(#xmlel{name = <<"jid">>} = El) ->
J = jlib:string_to_jid(xml:get_tag_cdata(El)),
[jlib:jid_tolower(jlib:jid_remove_resource(J)),
jlib:jid_tolower(J)];
(_) ->
[]
end, Els).
update(LServer, Table, Fields, Vals, Where) ->
UPairs = lists:zipwith(fun (A, B) ->
<<A/binary, "='", B/binary, "'">>
end,
Fields, Vals),
case ejabberd_odbc:sql_query(LServer,
[<<"update ">>, Table, <<" set ">>,
join(UPairs, <<", ">>), <<" where ">>, Where,
<<";">>])
of
{updated, 1} -> {updated, 1};
_ ->
ejabberd_odbc:sql_query(LServer,
[<<"insert into ">>, Table, <<"(">>,
join(Fields, <<", ">>), <<") values ('">>,
join(Vals, <<"', '">>), <<"');">>])
end.
%% Almost a copy of string:join/2.
join([], _Sep) -> [];
join([H | T], Sep) -> [H, [[Sep, X] || X <- T]].
mod_opt_type(cache_life_time) ->
fun (I) when is_integer(I), I > 0 -> I end;
mod_opt_type(cache_size) ->
fun (I) when is_integer(I), I > 0 -> I end;
mod_opt_type(db_type) -> fun gen_mod:v_db/1;
mod_opt_type(default) ->
fun (always) -> always;
(never) -> never;
(roster) -> roster
end;
mod_opt_type(iqdisc) -> fun gen_iq_handler:check_type/1;
mod_opt_type(store_body_only) ->
fun (B) when is_boolean(B) -> B end;
mod_opt_type(_) ->
[cache_life_time, cache_size, db_type, default, iqdisc,
store_body_only].

View File

@ -57,7 +57,7 @@
handle_cast/2, handle_info/2, code_change/3]).
-export([iq_ping/3, user_online/3, user_offline/3,
user_send/3, mod_opt_type/1]).
user_send/4, mod_opt_type/1]).
-record(state,
{host = <<"">>,
@ -214,8 +214,9 @@ user_online(_SID, JID, _Info) ->
user_offline(_SID, JID, _Info) ->
stop_ping(JID#jid.lserver, JID).
user_send(JID, _From, _Packet) ->
start_ping(JID#jid.lserver, JID).
user_send(Packet, _C2SState, JID, _From) ->
start_ping(JID#jid.lserver, JID),
Packet.
%%====================================================================
%% Internal functions

View File

@ -29,8 +29,8 @@
-behaviour(gen_mod).
-export([start/2, stop/1, log_user_send/3,
log_user_receive/4, mod_opt_type/1]).
-export([start/2, stop/1, log_user_send/4,
log_user_receive/5, mod_opt_type/1]).
-include("ejabberd.hrl").
-include("logger.hrl").
@ -51,11 +51,13 @@ stop(Host) ->
?MODULE, log_user_receive, 50),
ok.
log_user_send(From, To, Packet) ->
log_packet(From, To, Packet, From#jid.lserver).
log_user_send(Packet, _C2SState, From, To) ->
log_packet(From, To, Packet, From#jid.lserver),
Packet.
log_user_receive(_JID, From, To, Packet) ->
log_packet(From, To, Packet, To#jid.lserver).
log_user_receive(Packet, _C2SState, _JID, From, To) ->
log_packet(From, To, Packet, To#jid.lserver),
Packet.
log_packet(From, To,
#xmlel{name = Name, attrs = Attrs, children = Els},

View File

@ -247,6 +247,10 @@ db_tests(mnesia) ->
roster_subscribe_slave]},
{test_offline, [sequence],
[offline_master, offline_slave]},
{test_old_mam, [parallel],
[mam_old_master, mam_old_slave]},
{test_new_mam, [parallel],
[mam_new_master, mam_new_slave]},
{test_carbons, [parallel],
[carbons_master, carbons_slave]},
{test_client_state, [parallel],
@ -283,6 +287,10 @@ db_tests(_) ->
roster_subscribe_slave]},
{test_offline, [sequence],
[offline_master, offline_slave]},
{test_old_mam, [parallel],
[mam_old_master, mam_old_slave]},
{test_new_mam, [parallel],
[mam_new_master, mam_new_slave]},
{test_muc, [parallel],
[muc_master, muc_slave]},
{test_announce, [sequence],
@ -1567,6 +1575,278 @@ carbons_slave(Config) ->
?recv1(#presence{from = Peer, type = unavailable}),
disconnect(Config).
mam_old_master(Config) ->
mam_master(Config, ?NS_MAM_TMP).
mam_new_master(Config) ->
mam_master(Config, ?NS_MAM_0).
mam_master(Config, NS) ->
true = is_feature_advertised(Config, NS),
MyJID = my_jid(Config),
BareMyJID = jlib:jid_remove_resource(MyJID),
Peer = ?config(slave, Config),
send(Config, #presence{}),
?recv1(#presence{}),
wait_for_slave(Config),
?recv1(#presence{from = Peer}),
#iq{type = result, sub_els = []} =
send_recv(Config,
#iq{type = set,
sub_els = [#mam_prefs{xmlns = NS,
default = roster,
never = [MyJID]}]}),
if NS == ?NS_MAM_TMP ->
FakeArchived = #mam_archived{id = randoms:get_string(),
by = server_jid(Config)},
send(Config, #message{to = MyJID,
sub_els = [FakeArchived],
body = [#text{data = <<"a">>}]}),
send(Config, #message{to = BareMyJID,
sub_els = [FakeArchived],
body = [#text{data = <<"b">>}]}),
%% NOTE: The server should strip fake archived tags,
%% i.e. the sub_els received should be [].
?recv2(#message{body = [#text{data = <<"a">>}], sub_els = []},
#message{body = [#text{data = <<"b">>}], sub_els = []});
true ->
ok
end,
wait_for_slave(Config),
lists:foreach(
fun(N) ->
Text = #text{data = jlib:integer_to_binary(N)},
send(Config,
#message{to = Peer, body = [Text]})
end, lists:seq(1, 5)),
?recv1(#presence{type = unavailable, from = Peer}),
mam_query_all(Config, NS),
mam_query_with(Config, Peer, NS),
%% mam_query_with(Config, jlib:jid_remove_resource(Peer)),
mam_query_rsm(Config, NS),
#iq{type = result, sub_els = []} =
send_recv(Config, #iq{type = set,
sub_els = [#mam_prefs{xmlns = NS,
default = never}]}),
disconnect(Config).
mam_old_slave(Config) ->
mam_slave(Config, ?NS_MAM_TMP).
mam_new_slave(Config) ->
mam_slave(Config, ?NS_MAM_0).
mam_slave(Config, NS) ->
Peer = ?config(master, Config),
ServerJID = server_jid(Config),
wait_for_master(Config),
send(Config, #presence{}),
?recv2(#presence{}, #presence{from = Peer}),
#iq{type = result, sub_els = []} =
send_recv(Config,
#iq{type = set,
sub_els = [#mam_prefs{xmlns = NS, default = always}]}),
wait_for_master(Config),
lists:foreach(
fun(N) ->
Text = #text{data = jlib:integer_to_binary(N)},
?recv1(#message{from = Peer, body = [Text],
sub_els = [#mam_archived{by = ServerJID}]})
end, lists:seq(1, 5)),
#iq{type = result, sub_els = []} =
send_recv(Config, #iq{type = set,
sub_els = [#mam_prefs{xmlns = NS, default = never}]}),
disconnect(Config).
mam_query_all(Config, NS) ->
QID = randoms:get_string(),
MyJID = my_jid(Config),
Peer = ?config(slave, Config),
I = send(Config, #iq{type = get, sub_els = [#mam_query{xmlns = NS, id = QID}]}),
maybe_recv_iq_result(NS, I),
Iter = if NS == ?NS_MAM_TMP -> lists:seq(1, 5);
true -> lists:seq(1, 5) ++ lists:seq(1, 5)
end,
lists:foreach(
fun(N) ->
Text = #text{data = jlib:integer_to_binary(N)},
?recv1(#message{to = MyJID,
sub_els =
[#mam_result{
queryid = QID,
sub_els =
[#forwarded{
delay = #delay{},
sub_els =
[#message{
from = MyJID, to = Peer,
body = [Text]}]}]}]})
end, Iter),
if NS == ?NS_MAM_TMP ->
?recv1(#iq{type = result, id = I, sub_els = []});
true ->
?recv1(#message{sub_els = [#mam_fin{id = QID}]})
end.
mam_query_with(Config, JID, NS) ->
MyJID = my_jid(Config),
Peer = ?config(slave, Config),
Query = if NS == ?NS_MAM_TMP ->
#mam_query{xmlns = NS, with = JID};
true ->
Fs = [#xdata_field{var = <<"jid">>,
values = [jlib:jid_to_string(JID)]}],
#mam_query{xmlns = NS,
xdata = #xdata{type = submit, fields = Fs}}
end,
I = send(Config, #iq{type = get, sub_els = [Query]}),
Iter = if NS == ?NS_MAM_TMP -> lists:seq(1, 5);
true -> lists:seq(1, 5) ++ lists:seq(1, 5)
end,
maybe_recv_iq_result(NS, I),
lists:foreach(
fun(N) ->
Text = #text{data = jlib:integer_to_binary(N)},
?recv1(#message{to = MyJID,
sub_els =
[#mam_result{
sub_els =
[#forwarded{
delay = #delay{},
sub_els =
[#message{
from = MyJID, to = Peer,
body = [Text]}]}]}]})
end, Iter),
if NS == ?NS_MAM_TMP ->
?recv1(#iq{type = result, id = I, sub_els = []});
true ->
?recv1(#message{sub_els = [#mam_fin{}]})
end.
maybe_recv_iq_result(?NS_MAM_0, I1) ->
?recv1(#iq{type = result, id = I1});
maybe_recv_iq_result(_, _) ->
ok.
mam_query_rsm(Config, NS) ->
MyJID = my_jid(Config),
Peer = ?config(slave, Config),
%% Get the first 3 items out of 5
I1 = send(Config,
#iq{type = get,
sub_els = [#mam_query{xmlns = NS, rsm = #rsm_set{max = 3}}]}),
maybe_recv_iq_result(NS, I1),
lists:foreach(
fun(N) ->
Text = #text{data = jlib:integer_to_binary(N)},
?recv1(#message{to = MyJID,
sub_els =
[#mam_result{
xmlns = NS,
sub_els =
[#forwarded{
delay = #delay{},
sub_els =
[#message{
from = MyJID, to = Peer,
body = [Text]}]}]}]})
end, lists:seq(1, 3)),
if NS == ?NS_MAM_TMP ->
?recv1(#iq{type = result, id = I1,
sub_els = [#mam_query{xmlns = NS,
rsm = #rsm_set{last = Last, count = 5}}]});
true ->
?recv1(#message{sub_els = [#mam_fin{
rsm = #rsm_set{last = Last, count = 10}}]})
end,
%% Get the next items starting from the `Last`.
%% Limit the response to 2 items.
I2 = send(Config,
#iq{type = get,
sub_els = [#mam_query{xmlns = NS,
rsm = #rsm_set{max = 2,
'after' = Last}}]}),
maybe_recv_iq_result(NS, I2),
lists:foreach(
fun(N) ->
Text = #text{data = jlib:integer_to_binary(N)},
?recv1(#message{to = MyJID,
sub_els =
[#mam_result{
xmlns = NS,
sub_els =
[#forwarded{
delay = #delay{},
sub_els =
[#message{
from = MyJID, to = Peer,
body = [Text]}]}]}]})
end, lists:seq(4, 5)),
if NS == ?NS_MAM_TMP ->
?recv1(#iq{type = result, id = I2,
sub_els = [#mam_query{
xmlns = NS,
rsm = #rsm_set{
count = 5,
first = #rsm_first{data = First}}}]});
true ->
?recv1(#message{
sub_els = [#mam_fin{
rsm = #rsm_set{
count = 10,
first = #rsm_first{data = First}}}]})
end,
%% Paging back. Should receive 2 elements: 2, 3.
I3 = send(Config,
#iq{type = get,
sub_els = [#mam_query{xmlns = NS,
rsm = #rsm_set{max = 2,
before = First}}]}),
maybe_recv_iq_result(NS, I3),
lists:foreach(
fun(N) ->
Text = #text{data = jlib:integer_to_binary(N)},
?recv1(#message{to = MyJID,
sub_els =
[#mam_result{
xmlns = NS,
sub_els =
[#forwarded{
delay = #delay{},
sub_els =
[#message{
from = MyJID, to = Peer,
body = [Text]}]}]}]})
end, lists:seq(2, 3)),
if NS == ?NS_MAM_TMP ->
?recv1(#iq{type = result, id = I3,
sub_els = [#mam_query{xmlns = NS, rsm = #rsm_set{count = 5}}]});
true ->
?recv1(#message{
sub_els = [#mam_fin{rsm = #rsm_set{count = 10}}]})
end,
%% Getting the item count. Should be 5 (or 10).
I4 = send(Config,
#iq{type = get,
sub_els = [#mam_query{xmlns = NS,
rsm = #rsm_set{max = 0}}]}),
maybe_recv_iq_result(NS, I4),
if NS == ?NS_MAM_TMP ->
?recv1(#iq{type = result, id = I4,
sub_els = [#mam_query{
xmlns = NS,
rsm = #rsm_set{count = 5,
first = undefined,
last = undefined}}]});
true ->
?recv1(#message{
sub_els = [#mam_fin{
rsm = #rsm_set{count = 10,
first = undefined,
last = undefined}}]})
end.
client_state_master(Config) ->
true = ?config(csi, Config),
Peer = ?config(slave, Config),

View File

@ -39,6 +39,8 @@ host_config:
versioning: true
store_current_id: true
db_type: odbc
mod_mam:
db_type: odbc
mod_vcard:
db_type: odbc
mod_vcard_xupdate:
@ -90,6 +92,8 @@ Welcome to this XMPP server."
versioning: true
store_current_id: true
db_type: odbc
mod_mam:
db_type: odbc
mod_vcard:
db_type: odbc
mod_vcard_xupdate:
@ -147,6 +151,8 @@ Welcome to this XMPP server."
versioning: true
store_current_id: true
db_type: odbc
mod_mam:
db_type: odbc
mod_vcard:
db_type: odbc
mod_vcard_xupdate:
@ -196,6 +202,8 @@ Welcome to this XMPP server."
versioning: true
store_current_id: true
db_type: internal
mod_mam:
db_type: internal
mod_vcard:
db_type: internal
mod_vcard_xupdate:

File diff suppressed because it is too large Load Diff

View File

@ -9,6 +9,14 @@
-record(sasl_success, {text :: any()}).
-record(mam_result, {xmlns :: binary(),
queryid :: binary(),
id :: binary(),
sub_els = [] :: [any()]}).
-record(rsm_first, {index :: non_neg_integer(),
data :: binary()}).
-record(text, {lang :: binary(),
data :: binary()}).
@ -176,6 +184,11 @@
-record(shim, {headers = [] :: [{binary(),'undefined' | binary()}]}).
-record(mam_prefs, {xmlns :: binary(),
default :: 'always' | 'never' | 'roster',
always = [] :: [any()],
never = [] :: [any()]}).
-record(caps, {hash :: binary(),
node :: binary(),
ver :: any()}).
@ -194,6 +207,9 @@
-record(carbons_sent, {forwarded :: #forwarded{}}).
-record(mam_archived, {by :: any(),
id :: binary()}).
-record(p1_rebind, {}).
-record(compress_failure, {reason :: 'processing-failed' | 'setup-failed' | 'unsupported-method'}).
@ -263,6 +279,17 @@
-record(vcard_org, {name :: binary(),
units = [] :: [binary()]}).
-record(rsm_set, {'after' :: binary(),
before :: binary(),
count :: non_neg_integer(),
first :: #rsm_first{},
index :: non_neg_integer(),
last :: binary(),
max :: non_neg_integer()}).
-record(mam_fin, {id :: binary(),
rsm :: #rsm_set{}}).
-record(vcard_tel, {home = false :: boolean(),
work = false :: boolean(),
voice = false :: boolean(),
@ -343,6 +370,14 @@
items = [] :: [[#xdata_field{}]],
fields = [] :: [#xdata_field{}]}).
-record(mam_query, {xmlns :: binary(),
id :: binary(),
start :: any(),
'end' :: any(),
with :: any(),
rsm :: #rsm_set{},
xdata :: #xdata{}}).
-record(muc_owner, {destroy :: #muc_owner_destroy{},
config :: #xdata{}}).

View File

@ -2063,6 +2063,156 @@
refs = [#ref{name = muc_history, min = 0, max = 1,
label = '$history'}]}).
-xml(rsm_after,
#elem{name = <<"after">>,
xmlns = <<"http://jabber.org/protocol/rsm">>,
result = '$cdata'}).
-xml(rsm_before,
#elem{name = <<"before">>,
xmlns = <<"http://jabber.org/protocol/rsm">>,
result = '$cdata'}).
-xml(rsm_last,
#elem{name = <<"last">>,
xmlns = <<"http://jabber.org/protocol/rsm">>,
result = '$cdata'}).
-xml(rsm_count,
#elem{name = <<"count">>, result = '$cdata',
xmlns = <<"http://jabber.org/protocol/rsm">>,
cdata = #cdata{dec = {dec_int, [0, infinity]},
enc = {enc_int, []}}}).
-xml(rsm_index,
#elem{name = <<"index">>, result = '$cdata',
xmlns = <<"http://jabber.org/protocol/rsm">>,
cdata = #cdata{dec = {dec_int, [0, infinity]},
enc = {enc_int, []}}}).
-xml(rsm_max,
#elem{name = <<"max">>, result = '$cdata',
xmlns = <<"http://jabber.org/protocol/rsm">>,
cdata = #cdata{dec = {dec_int, [0, infinity]},
enc = {enc_int, []}}}).
-xml(rsm_first,
#elem{name = <<"first">>,
xmlns = <<"http://jabber.org/protocol/rsm">>,
result = {rsm_first, '$index', '$data'},
cdata = #cdata{label = '$data'},
attrs = [#attr{name = <<"index">>,
dec = {dec_int, [0, infinity]},
enc = {enc_int, []}}]}).
-xml(rsm_set,
#elem{name = <<"set">>,
xmlns = <<"http://jabber.org/protocol/rsm">>,
result = {rsm_set, '$after', '$before', '$count',
'$first', '$index', '$last', '$max'},
refs = [#ref{name = rsm_after, label = '$after', min = 0, max = 1},
#ref{name = rsm_before, label = '$before', min = 0, max = 1},
#ref{name = rsm_count, label = '$count', min = 0, max = 1},
#ref{name = rsm_first, label = '$first', min = 0, max = 1},
#ref{name = rsm_index, label = '$index', min = 0, max = 1},
#ref{name = rsm_last, label = '$last', min = 0, max = 1},
#ref{name = rsm_max, label = '$max', min = 0, max = 1}]}).
-xml(mam_start,
#elem{name = <<"start">>,
xmlns = <<"urn:xmpp:mam:tmp">>,
result = '$cdata',
cdata = #cdata{required = true,
dec = {dec_utc, []},
enc = {enc_utc, []}}}).
-xml(mam_end,
#elem{name = <<"end">>,
xmlns = <<"urn:xmpp:mam:tmp">>,
result = '$cdata',
cdata = #cdata{required = true,
dec = {dec_utc, []},
enc = {enc_utc, []}}}).
-xml(mam_with,
#elem{name = <<"with">>,
xmlns = <<"urn:xmpp:mam:tmp">>,
result = '$cdata',
cdata = #cdata{required = true,
dec = {dec_jid, []},
enc = {enc_jid, []}}}).
-xml(mam_query,
#elem{name = <<"query">>,
xmlns = [<<"urn:xmpp:mam:0">>, <<"urn:xmpp:mam:tmp">>],
result = {mam_query, '$xmlns', '$id', '$start', '$end', '$with',
'$rsm', '$xdata'},
attrs = [#attr{name = <<"queryid">>, label = '$id'},
#attr{name = <<"xmlns">>}],
refs = [#ref{name = mam_start, min = 0, max = 1, label = '$start'},
#ref{name = mam_end, min = 0, max = 1, label = '$end'},
#ref{name = mam_with, min = 0, max = 1, label = '$with'},
#ref{name = rsm_set, min = 0, max = 1, label = '$rsm'},
#ref{name = xdata, min = 0, max = 1, label = '$xdata'}]}).
-xml(mam_archived,
#elem{name = <<"archived">>,
xmlns = <<"urn:xmpp:mam:tmp">>,
result = {mam_archived, '$by', '$id'},
attrs = [#attr{name = <<"id">>},
#attr{name = <<"by">>,
required = true,
dec = {dec_jid, []},
enc = {enc_jid, []}}]}).
-xml(mam_result,
#elem{name = <<"result">>,
xmlns = [<<"urn:xmpp:mam:0">>, <<"urn:xmpp:mam:tmp">>],
result = {mam_result, '$xmlns', '$queryid', '$id', '$_els'},
attrs = [#attr{name = <<"queryid">>},
#attr{name = <<"xmlns">>},
#attr{name = <<"id">>}]}).
-xml(mam_jid,
#elem{name = <<"jid">>,
xmlns = <<"urn:xmpp:mam:tmp">>,
result = '$cdata',
cdata = #cdata{required = true,
dec = {dec_jid, []},
enc = {enc_jid, []}}}).
-xml(mam_never,
#elem{name = <<"never">>,
xmlns = <<"urn:xmpp:mam:tmp">>,
result = '$jids',
refs = [#ref{name = mam_jid, label = '$jids', default = []}]}).
-xml(mam_always,
#elem{name = <<"always">>,
xmlns = <<"urn:xmpp:mam:tmp">>,
result = '$jids',
refs = [#ref{name = mam_jid, label = '$jids', default = []}]}).
-xml(mam_prefs,
#elem{name = <<"prefs">>,
xmlns = [<<"urn:xmpp:mam:0">>, <<"urn:xmpp:mam:tmp">>],
result = {mam_prefs, '$xmlns', '$default', '$always', '$never'},
attrs = [#attr{name = <<"default">>,
dec = {dec_enum, [[always, never, roster]]},
enc = {enc_enum, []}},
#attr{name = <<"xmlns">>}],
refs = [#ref{name = mam_always, label = '$always',
min = 0, max = 1, default = []},
#ref{name = mam_never, label = '$never',
min = 0, max = 1, default = []}]}).
-xml(mam_fin,
#elem{name = <<"fin">>,
xmlns = <<"urn:xmpp:mam:0">>,
result = {mam_fin, '$id', '$rsm'},
attrs = [#attr{name = <<"queryid">>, label = '$id'}],
refs = [#ref{name = rsm_set, min = 0, max = 1, label = '$rsm'}]}).
-xml(forwarded,
#elem{name = <<"forwarded">>,
xmlns = <<"urn:xmpp:forward:0">>,