diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index d3b1e1815..bde685024 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -119,6 +119,15 @@ ?SERRT_POLICY_VIOLATION(Lang, Text)). -define(INVALID_FROM, ?SERR_INVALID_FROM). +-define(NS_P1_REBIND, "p1:rebind"). +-define(NS_P1_PUSH, "p1:push"). +-define(NS_P1_ACK, "p1:ack"). +-define(NS_P1_PUSHED, "p1:pushed"). +-define(NS_P1_ATTACHMENT, "http://process-one.net/attachement"). + +-define(C2S_P1_ACK_TIMEOUT, 10000). +-define(MAX_OOR_TIMEOUT, 1440). %% Max allowed session duration 24h (24*60) +-define(MAX_OOR_MESSAGES, 1000). %%%---------------------------------------------------------------------- %%% API @@ -395,9 +404,22 @@ wait_for_stream({xmlstreamstart, Name, Attrs}, StateData) -> false -> [] end, + P1PushFeature = + [{xmlelement, "push", + [{"xmlns", ?NS_P1_PUSH}], []}], + P1RebindFeature = + [{xmlelement, "rebind", + [{"xmlns", ?NS_P1_REBIND}], []}], + P1AckFeature = + [{xmlelement, "ack", + [{"xmlns", ?NS_P1_ACK}], []}], send_element(StateData, {xmlelement, "stream:features", [], - TLSFeature ++ CompressFeature ++ + TLSFeature ++ + CompressFeature ++ + P1PushFeature ++ + P1RebindFeature ++ + P1AckFeature ++ [{xmlelement, "mechanisms", [{"xmlns", ?NS_SASL}], Mechs}] ++ @@ -418,7 +440,9 @@ wait_for_stream({xmlstreamstart, Name, Attrs}, StateData) -> roster_get_versioning_feature, Server, [], [Server]), StreamFeatures = - [{xmlelement, "bind", + [{xmlelement, "push", + [{"xmlns", ?NS_P1_PUSH}], []}, + {xmlelement, "bind", [{"xmlns", ?NS_BIND}], []}, {xmlelement, "session", [{"xmlns", ?NS_SESSION}], []}] @@ -581,7 +605,7 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> privacy_get_user_list, StateData#state.server, #userlist{}, [U, StateData#state.server]), - NewStateData = StateData#state{ + NewStateData = StateData#state{ user = U, resource = R, jid = JID, @@ -627,8 +651,33 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> end end; _ -> - process_unauthenticated_stanza(StateData, El), - fsm_next_state(wait_for_auth, StateData) + {xmlelement, Name, Attrs, _Els} = El, + case {xml:get_attr_s("xmlns", Attrs), Name} of + {?NS_P1_REBIND, "rebind"} -> + SJID = xml:get_path_s(El, [{elem, "jid"}, cdata]), + SID = xml:get_path_s(El, [{elem, "sid"}, cdata]), + case jlib:string_to_jid(SJID) of + error -> + send_element(StateData, + {xmlelement, "failure", + [{"xmlns", ?NS_P1_REBIND}], + [{xmlcdata, "Invalid JID"}]}), + fsm_next_state(wait_for_auth, + StateData); + JID -> + case rebind(StateData, JID, SID) of + {next_state, wait_for_feature_request, + NewStateData, Timeout} -> + {next_state, wait_for_auth, + NewStateData, Timeout}; + Res -> + Res + end + end; + _ -> + process_unauthenticated_stanza(StateData, El), + fsm_next_state(wait_for_auth, StateData) + end end; wait_for_auth(timeout, StateData) -> @@ -759,6 +808,23 @@ wait_for_feature_request({xmlstreamelement, El}, StateData) -> StateData) end end; + {?NS_P1_REBIND, "rebind"} -> + SJID = xml:get_path_s(El, [{elem, "jid"}, cdata]), + SID = xml:get_path_s(El, [{elem, "sid"}, cdata]), + case jlib:string_to_jid(SJID) of + error -> + send_element(StateData, + {xmlelement, "failure", + [{"xmlns", ?NS_P1_REBIND}], + [{xmlcdata, "Invalid JID"}]}), + fsm_next_state(wait_for_feature_request, + StateData); + JID -> + rebind(StateData, JID, SID) + end; + {?NS_P1_ACK, "ack"} -> + fsm_next_state(wait_for_feature_request, + StateData#state{ack_enabled = true}); _ -> if (SockMod == gen_tcp) and TLSRequired -> @@ -1004,7 +1070,9 @@ session_established({xmlstreamelement, El}, StateData) -> send_trailer(StateData), {stop, normal, StateData}; _NewEl -> - session_established2(El, StateData) + NSD1 = change_reception(StateData, true), + NSD2 = start_keepalive_timer(NSD1), + session_established2(El, NSD2) end; %% We hibernate the process to reduce memory consumption after a @@ -1031,7 +1099,16 @@ session_established({xmlstreamerror, _}, StateData) -> {stop, normal, StateData}; session_established(closed, StateData) -> - {stop, normal, StateData}. + if + not StateData#state.reception -> + fsm_next_state(session_established, StateData); + (StateData#state.keepalive_timer /= undefined) -> + NewState1 = change_reception(StateData, false), + NewState = start_keepalive_timer(NewState1), + fsm_next_state(session_established, NewState); + true -> + {stop, normal, StateData} + end. %% Process packets sent by user (coming from user on c2s XMPP %% connection) @@ -1102,6 +1179,8 @@ session_established2(El, StateData) -> [StateData#state.debug, FromJID, ToJID, NewEl]), process_privacy_iq( FromJID, ToJID, IQ, StateData); + #iq{xmlns = ?NS_P1_PUSH} = IQ -> + process_push_iq(FromJID, ToJID, IQ, StateData); _ -> ejabberd_hooks:run( user_send_packet, @@ -1117,6 +1196,12 @@ session_established2(El, StateData) -> check_privacy_route(FromJID, StateData, FromJID, ToJID, NewEl), StateData; + "standby" -> + StandBy = xml:get_tag_cdata(NewEl) == "true", + change_standby(StateData, StandBy); + "a" -> + SCounter = xml:get_tag_attr_s("h", NewEl), + receive_ack(StateData, SCounter); _ -> StateData end @@ -1156,6 +1241,12 @@ handle_event({del_rosteritem, IJID}, StateName, StateData) -> NewStateData = roster_change(IJID, none, StateData), fsm_next_state(StateName, NewStateData); +handle_event({xmlstreamcdata, _}, StateName, StateData) -> + ?DEBUG("cdata ping", []), + NSD1 = change_reception(StateData, true), + NSD2 = start_keepalive_timer(NSD1), + fsm_next_state(StateName, NSD2); + handle_event(_Event, StateName, StateData) -> fsm_next_state(StateName, StateData). @@ -1345,6 +1436,17 @@ handle_info({route, From, To, Packet}, StateName, StateData) -> _ -> {false, Attrs, StateData} end; + rebind -> + {Pid2, StreamID2} = Els, + if + StreamID2 == StateData#state.streamid -> + Pid2 ! {rebind, prepare_acks_for_rebind(StateData)}, + receive after 1000 -> ok end, + {exit, Attrs, rebind}; + true -> + Pid2 ! {rebind, false}, + {false, Attrs, StateData} + end; "iq" -> IQ = jlib:iq_query_info(Packet), case IQ of @@ -1384,7 +1486,23 @@ handle_info({route, From, To, Packet}, StateName, StateData) -> "message" -> case privacy_check_packet(StateData, From, To, Packet, in) of allow -> - {true, Attrs, StateData}; + if StateData#state.reception -> + case ejabberd_hooks:run_fold( + feature_check_packet, StateData#state.server, + allow, + [StateData#state.jid, + StateData#state.server, + StateData#state.pres_last, + {From, To, Packet}, + in]) of + allow -> + {true, Attrs, StateData}; + deny -> + {false, Attrs, StateData} + end; + true -> + {true, Attrs, StateData} + end; deny -> {false, Attrs, StateData} end; @@ -1393,29 +1511,85 @@ handle_info({route, From, To, Packet}, StateName, StateData) -> end, if Pass == exit -> - %% When Pass==exit, NewState contains a string instead of a #state{} - Lang = StateData#state.lang, - send_element(StateData, ?SERRT_CONFLICT(Lang, NewState)), - send_trailer(StateData), - {stop, normal, StateData}; + catch send_trailer(StateData), + case NewState of + rebind -> + {stop, normal, StateData#state{authenticated = rebinded}}; + _ -> + {stop, normal, StateData} + end; Pass -> Attrs2 = jlib:replace_from_to_attrs(jlib:jid_to_string(From), jlib:jid_to_string(To), NewAttrs), FixedPacket = {xmlelement, Name, Attrs2, Els}, - send_element(StateData, FixedPacket), + NewState2 = + if + NewState#state.reception and + not (NewState#state.standby and (Name /= "message")) -> + send_element(NewState, FixedPacket), + ack(NewState, From, To, FixedPacket); + true -> + NewState1 = send_out_of_reception_message( + NewState, From, To, Packet), + enqueue(NewState1, From, To, FixedPacket) + end, ejabberd_hooks:run(user_receive_packet, StateData#state.server, [StateData#state.debug, StateData#state.jid, From, To, FixedPacket]), ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), - fsm_next_state(StateName, NewState); + fsm_next_state(StateName, NewState2); true -> ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), fsm_next_state(StateName, NewState) end; -handle_info({'DOWN', Monitor, _Type, _Object, _Info}, _StateName, StateData) - when Monitor == StateData#state.socket_monitor -> +handle_info({timeout, Timer, _}, StateName, + #state{keepalive_timer = Timer, reception = true} = StateData) -> + NewState1 = change_reception(StateData, false), + NewState = start_keepalive_timer(NewState1), + fsm_next_state(StateName, NewState); +handle_info({timeout, Timer, _}, _StateName, + #state{keepalive_timer = Timer, reception = false} = StateData) -> {stop, normal, StateData}; +handle_info({timeout, Timer, PrevCounter}, StateName, + #state{ack_timer = Timer} = StateData) -> + AckCounter = StateData#state.ack_counter, + NewState = + if + PrevCounter >= AckCounter -> + StateData#state{ack_timer = undefined}; + true -> + send_ack_request(StateData#state{ack_timer = undefined}) + end, + fsm_next_state(StateName, NewState); +handle_info({ack_timeout, Counter}, StateName, StateData) -> + AckQueue = StateData#state.ack_queue, + case queue:is_empty(AckQueue) of + true -> + fsm_next_state(StateName, StateData); + false -> + C = element(1, queue:head(AckQueue)), + if + C =< Counter -> + {stop, normal, StateData}; + true -> + fsm_next_state(StateName, StateData) + end + end; +handle_info({'DOWN', Monitor, _Type, _Object, _Info}, StateName, StateData) + when Monitor == StateData#state.socket_monitor -> + if + (StateName == session_established) and + (not StateData#state.reception) -> + fsm_next_state(StateName, StateData); + (StateName == session_established) and + (StateData#state.keepalive_timer /= undefined) -> + NewState1 = change_reception(StateData, false), + NewState = start_keepalive_timer(NewState1), + fsm_next_state(StateName, NewState); + true -> + {stop, normal, StateData} + end; handle_info(system_shutdown, StateName, StateData) -> case StateName of wait_for_stream -> @@ -1523,6 +1697,13 @@ terminate(_Reason, StateName, StateData) -> StateData, From, StateData#state.pres_a, Packet), presence_broadcast( StateData, From, StateData#state.pres_i, Packet); + rebinded -> + ejabberd_sm:close_session( + StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource), + ok; _ -> ?INFO_MSG("(~w) Close session for ~s", [StateData#state.socket, @@ -1554,6 +1735,36 @@ terminate(_Reason, StateName, StateData) -> StateData, From, StateData#state.pres_i, Packet) end end, + case StateData#state.authenticated of + rebinded -> + ok; + _ -> + if + not StateData#state.reception, not StateData#state.oor_offline -> + SFrom = jlib:jid_to_string(StateData#state.jid), + ejabberd_hooks:run( + p1_push_notification, + StateData#state.server, + [StateData#state.server, + StateData#state.jid, + StateData#state.oor_notification, + "Instant messaging session expired", + 0, + false, + StateData#state.oor_appid, + SFrom]); + true -> + ok + end, + lists:foreach( + fun({_Counter, From, To, FixedPacket}) -> + ejabberd_router:route(From, To, FixedPacket) + end, queue:to_list(StateData#state.ack_queue)), + lists:foreach( + fun({From, To, FixedPacket}) -> + ejabberd_router:route(From, To, FixedPacket) + end, queue:to_list(StateData#state.queue)) + end, bounce_messages(); _ -> ok @@ -1586,9 +1797,19 @@ send_text(StateData, Text) -> (StateData#state.sockmod):send(StateData#state.socket, Text1). send_element(StateData, El) when StateData#state.xml_socket -> + ejabberd_hooks:run(feature_inspect_packet, + StateData#state.server, + [StateData#state.jid, + StateData#state.server, + StateData#state.pres_last, El]), (StateData#state.sockmod):send_xml(StateData#state.socket, {xmlstreamelement, El}); send_element(StateData, El) -> + ejabberd_hooks:run(feature_inspect_packet, + StateData#state.server, + [StateData#state.jid, + StateData#state.server, + StateData#state.pres_last, El]), send_text(StateData, xml:element_to_binary(El)). send_header(StateData,Server, Version, Lang) @@ -1728,14 +1949,52 @@ process_presence_probe(From, To, StateData) -> andalso ?SETS:is_element(LFrom, StateData#state.pres_a), if Cond1 -> + Packet = + case StateData#state.reception of + true -> + StateData#state.pres_last; + false -> + case StateData#state.oor_show of + "" -> + StateData#state.pres_last; + _ -> + {xmlelement, _, PresAttrs, PresEls} = + StateData#state.pres_last, + PresEls1 = + lists:flatmap( + fun({xmlelement, Name, _, _}) + when Name == "show"; + Name == "status" -> + []; + (E) -> + [E] + end, PresEls), + {xmlelement, "presence", PresAttrs, + [{xmlelement, "show", [], + [{xmlcdata, + StateData#state.oor_show}]}, + {xmlelement, "status", [], + [{xmlcdata, + StateData#state.oor_status}]}] + ++ PresEls1} + end + end, Timestamp = StateData#state.pres_timestamp, - Packet = xml:append_subtags( - StateData#state.pres_last, + Packet1 = xml:append_subtags( + xml:remove_subtags( + Packet, "x", {"xmlns", ?NS_DELAY91}), %% To is the one sending the presence (the target of the probe) [jlib:timestamp_to_xml(Timestamp, utc, To, ""), %% TODO: Delete the next line once XEP-0091 is Obsolete jlib:timestamp_to_xml(Timestamp)]), - case privacy_check_packet(StateData, To, From, Packet, out) of + case ejabberd_hooks:run_fold( + privacy_check_packet, StateData#state.server, + allow, + [StateData#state.user, + StateData#state.server, + StateData#state.privacy_list, + {To, From, Packet1}, + out]) of deny -> ok; allow -> @@ -1744,7 +2003,7 @@ process_presence_probe(From, To, StateData) -> %% Don't route a presence probe to oneself case From == To of false -> - ejabberd_router:route(To, From, Packet); + ejabberd_router:route(To, From, Packet1); true -> ok end @@ -2050,6 +2309,7 @@ roster_change(IJID, ISubscription, StateData) -> ?DEBUG("roster changed for ~p~n", [StateData#state.user]), From = StateData#state.jid, To = jlib:make_jid(IJID), +% To = IJID, Cond1 = (not StateData#state.pres_invis) and IsFrom and (not OldIsFrom), Cond2 = (not IsFrom) and OldIsFrom @@ -2094,8 +2354,15 @@ roster_change(IJID, ISubscription, StateData) -> update_priority(Priority, Packet, StateData) -> - Info = [{ip, StateData#state.ip}, {conn, StateData#state.conn}, - {auth_module, StateData#state.auth_module}], + Info1 = [{ip, StateData#state.ip}, {conn, StateData#state.conn}, + {auth_module, StateData#state.auth_module}], + Info = + case StateData#state.reception of + false -> + [{oor, true} | Info1]; + _ -> + Info1 + end, ejabberd_sm:set_presence(StateData#state.sid, StateData#state.user, StateData#state.server, @@ -2333,6 +2600,646 @@ check_from(El, FromJID) -> end end. +start_keepalive_timer(StateData) -> + if + is_reference(StateData#state.keepalive_timer) -> + cancel_timer(StateData#state.keepalive_timer); + true -> + ok + end, + Timeout = + if + StateData#state.reception -> StateData#state.keepalive_timeout; + true -> StateData#state.oor_timeout + end, + Timer = + if + is_integer(Timeout) -> + erlang:start_timer(Timeout * 1000, self(), []); + true -> + undefined + end, + StateData#state{keepalive_timer = Timer}. + +change_reception(#state{reception = Reception} = StateData, Reception) -> + StateData; +change_reception(#state{reception = true} = StateData, false) -> + ?DEBUG("reception -> false", []), + case StateData#state.oor_show of + "" -> + ok; + _ -> + Packet = + {xmlelement, "presence", [], + [{xmlelement, "show", [], + [{xmlcdata, StateData#state.oor_show}]}, + {xmlelement, "status", [], + [{xmlcdata, StateData#state.oor_status}]}]}, + update_priority(0, Packet, StateData#state{reception = false}), + presence_broadcast_to_trusted( + StateData, + StateData#state.jid, + StateData#state.pres_f, + StateData#state.pres_a, + Packet) + end, + StateData#state{reception = false}; +change_reception(#state{reception = false, standby = true} = StateData, true) -> + ?DEBUG("reception -> standby", []), + NewQueue = + lists:foldl( + fun({_From, _To, {xmlelement, "message", _, _} = FixedPacket}, Q) -> + send_element(StateData, FixedPacket), + Q; + (Item, Q) -> + queue:in(Item, Q) + end, queue:new(), queue:to_list(StateData#state.queue)), + StateData#state{queue = NewQueue, + queue_len = queue:len(NewQueue), + reception = true, + oor_unread = 0, + oor_unread_users = ?SETS:new()}; +change_reception(#state{reception = false} = StateData, true) -> + ?DEBUG("reception -> true", []), + case StateData#state.oor_show of + "" -> + ok; + _ -> + Packet = StateData#state.pres_last, + NewPriority = get_priority_from_presence(Packet), + update_priority(NewPriority, Packet, + StateData#state{reception = true}), + presence_broadcast_to_trusted( + StateData, + StateData#state.jid, + StateData#state.pres_f, + StateData#state.pres_a, + Packet) + end, + lists:foreach( + fun({_From, _To, FixedPacket}) -> + send_element(StateData, FixedPacket) + end, queue:to_list(StateData#state.queue)), + lists:foreach( + fun(FixedPacket) -> + send_element(StateData, FixedPacket) + end, gb_trees:values(StateData#state.pres_queue)), + StateData#state{queue = queue:new(), + queue_len = 0, + pres_queue = gb_trees:empty(), + reception = true, + oor_unread = 0, + oor_unread_users = ?SETS:new()}. + +change_standby(#state{standby = StandBy} = StateData, StandBy) -> + StateData; +change_standby(#state{standby = false} = StateData, true) -> + ?DEBUG("standby -> true", []), + StateData#state{standby = true}; +change_standby(#state{standby = true} = StateData, false) -> + ?DEBUG("standby -> false", []), + lists:foreach( + fun({_From, _To, FixedPacket}) -> + send_element(StateData, FixedPacket) + end, queue:to_list(StateData#state.queue)), + lists:foreach( + fun(FixedPacket) -> + send_element(StateData, FixedPacket) + end, gb_trees:values(StateData#state.pres_queue)), + StateData#state{queue = queue:new(), + queue_len = 0, + pres_queue = gb_trees:empty(), + standby = false}. + +send_out_of_reception_message(StateData, From, To, + {xmlelement, "message", _, _} = Packet) -> + Type = xml:get_tag_attr_s("type", Packet), + if + (Type == "normal") or + (Type == "") or + (Type == "chat") or + (StateData#state.oor_send_groupchat and (Type == "groupchat"))-> + %Lang = case xml:get_tag_attr_s("xml:lang", Packet) of + % "" -> + % StateData#state.lang; + % L -> + % L + % end, + %Text = translate:translate( + % Lang, "User is temporarily out of reception"), + %MsgType = "error", + %Message = {xmlelement, "message", + % [{"type", MsgType}], + % [{xmlelement, "body", [], + % [{xmlcdata, Text}]}]}, + %ejabberd_router:route(To, From, Message), + Body1 = xml:get_path_s(Packet, [{elem, "body"}, cdata]), + Body = + case check_x_attachment(Packet) of + true -> + case Body1 of + "" -> [238, 128, 136]; + _ -> + [238, 128, 136, 32 | Body1] + end; + false -> + Body1 + end, + Pushed = check_x_pushed(Packet), + if + Body == ""; + Pushed -> + StateData; + true -> + BFrom = jlib:jid_remove_resource(From), + LBFrom = jlib:jid_tolower(BFrom), + UnreadUsers = ?SETS:add_element( + LBFrom, + StateData#state.oor_unread_users), + IncludeBody = + case StateData#state.oor_send_body of + all -> + true; + first_per_user -> + not ?SETS:is_element( + LBFrom, + StateData#state.oor_unread_users); + first -> + StateData#state.oor_unread == 0; + none -> + false + end, + Unread = StateData#state.oor_unread + 1, + SFrom = jlib:jid_to_string(BFrom), + Msg = + if + IncludeBody -> + CBody = utf8_cut(Body, 100), + case StateData#state.oor_send_from of + jid -> SFrom ++ ": " ++ CBody; + username -> BFrom#jid.user ++ ": " ++ CBody; + _ -> CBody + end; + true -> + "" + end, + Sound = IncludeBody, + AppID = StateData#state.oor_appid, + ejabberd_hooks:run( + p1_push_notification, + StateData#state.server, + [StateData#state.server, + StateData#state.jid, + StateData#state.oor_notification, + Msg, + Unread + StateData#state.oor_unread_client, + Sound, + AppID, + SFrom]), + %% This hook is intended to give other module a + %% chance to notify the sender that the message is + %% not directly delivered to the client (almost + %% equivalent to offline). + ejabberd_hooks:run(delayed_message_hook, + StateData#state.server, + [From, To, Packet]), + StateData#state{oor_unread = Unread, + oor_unread_users = UnreadUsers} + end; + true -> + StateData + end; +send_out_of_reception_message(StateData, _From, _To, _Packet) -> + StateData. + +utf8_cut(S, Bytes) -> + utf8_cut(S, [], [], Bytes + 1). + +utf8_cut(_S, _Cur, Prev, 0) -> + lists:reverse(Prev); +utf8_cut([], Cur, _Prev, _Bytes) -> + lists:reverse(Cur); +utf8_cut([C | S], Cur, Prev, Bytes) -> + if + C bsr 6 == 2 -> + utf8_cut(S, [C | Cur], Prev, Bytes - 1); + true -> + utf8_cut(S, [C | Cur], Cur, Bytes - 1) + end. + + +cancel_timer(Timer) -> + erlang:cancel_timer(Timer), + receive + {timeout, Timer, _} -> + ok + after 0 -> + ok + end. + +enqueue(StateData, From, To, Packet) -> + IsPresence = + case Packet of + {xmlelement, "presence", _, _} -> + case xml:get_tag_attr_s("type", Packet) of + "subscribe" -> + false; + "subscribed" -> + false; + "unsubscribe" -> + false; + "unsubscribed" -> + false; + _ -> + true + end; + _ -> + false + end, + Messages = + StateData#state.queue_len + gb_trees:size(StateData#state.pres_queue), + if + Messages >= ?MAX_OOR_MESSAGES -> + self() ! {timeout, StateData#state.keepalive_timer, []}; + true -> + ok + end, + if + IsPresence -> + LFrom = jlib:jid_tolower(From), + case is_own_presence(StateData#state.jid, LFrom) of + true -> StateData; + false -> + NewQueue = gb_trees:enter(LFrom, Packet, + StateData#state.pres_queue), + StateData#state{pres_queue = NewQueue} + end; + true -> + CleanPacket = xml:remove_subtags( + xml:remove_subtags(Packet, "x", {"xmlns", ?NS_P1_PUSHED}), + "x", {"xmlns", ?NS_DELAY91}), + Packet2 = + case CleanPacket of + {xmlelement, "message" = Name, Attrs, Els} -> + {xmlelement, Name, Attrs, + Els ++ + [jlib:timestamp_to_xml( + calendar:now_to_universal_time(now())), + {xmlelement, "x", [{"xmlns", ?NS_P1_PUSHED}], []}]}; + _ -> + Packet + end, + NewQueue = queue:in({From, To, Packet2}, + StateData#state.queue), + NewQueueLen = StateData#state.queue_len + 1, + StateData#state{queue = NewQueue, + queue_len = NewQueueLen} + end. + +%% Is my own presence packet ? +is_own_presence(MyFullJID, MyFullJID) -> + true; +is_own_presence(_MyFullJID, _LFrom) -> + false. + +ack(StateData, From, To, Packet) -> + if + StateData#state.ack_enabled -> + NeedsAck = + case Packet of + {xmlelement, "presence", _, _} -> + case xml:get_tag_attr_s("type", Packet) of + "subscribe" -> + true; + "subscribed" -> + true; + "unsubscribe" -> + true; + "unsubscribed" -> + true; + _ -> + false + end; + {xmlelement, "message", _, _} -> + true; + _ -> + false + end, + if + NeedsAck -> + Counter = StateData#state.ack_counter + 1, + NewAckQueue = queue:in({Counter, From, To, Packet}, + StateData#state.ack_queue), + send_ack_request(StateData#state{ack_queue = NewAckQueue, + ack_counter = Counter}); + true -> + StateData + end; + true -> + StateData + end. + +send_ack_request(StateData) -> + case StateData#state.ack_timer of + undefined -> + AckCounter = StateData#state.ack_counter, + AckTimer = + erlang:start_timer(?C2S_P1_ACK_TIMEOUT, self(), AckCounter), + AckTimeout = StateData#state.keepalive_timeout + + StateData#state.oor_timeout, + erlang:send_after(AckTimeout * 1000, self(), + {ack_timeout, AckTimeout}), + send_element( + StateData, + {xmlelement, "r", + [{"h", integer_to_list(AckCounter)}], []}), + StateData#state{ack_timer = AckTimer}; + _ -> + StateData + end. + +receive_ack(StateData, SCounter) -> + case catch list_to_integer(SCounter) of + Counter when is_integer(Counter) -> + NewQueue = clean_queue(StateData#state.ack_queue, Counter), + StateData#state{ack_queue = NewQueue}; + _ -> + StateData + end. + +clean_queue(Queue, Counter) -> + case queue:is_empty(Queue) of + true -> + Queue; + false -> + C = element(1, queue:head(Queue)), + if + C =< Counter -> + clean_queue(queue:tail(Queue), Counter); + true -> + Queue + end + end. + +prepare_acks_for_rebind(StateData) -> + AckQueue = StateData#state.ack_queue, + case queue:is_empty(AckQueue) of + true -> + StateData; + false -> + Unsent = + lists:map( + fun({_Counter, From, To, FixedPacket}) -> + {From, To, FixedPacket} + end, queue:to_list(AckQueue)), + NewQueue = queue:join(queue:from_list(Unsent), + StateData#state.queue), + StateData#state{queue = NewQueue, + queue_len = queue:len(NewQueue), + ack_queue = queue:new(), + reception = false} + end. + + +rebind(StateData, JID, StreamID) -> + case JID#jid.lresource of + "" -> + send_element(StateData, + {xmlelement, "failure", + [{"xmlns", ?NS_P1_REBIND}], + [{xmlcdata, "Invalid JID"}]}), + fsm_next_state(wait_for_feature_request, + StateData); + _ -> + ejabberd_sm:route( + ?MODULE, JID, + {xmlelement, rebind, [], {self(), StreamID}}), + receive + {rebind, false} -> + send_element(StateData, + {xmlelement, "failure", + [{"xmlns", ?NS_P1_REBIND}], + [{xmlcdata, "Session not found"}]}), + fsm_next_state(wait_for_feature_request, + StateData); + {rebind, NewStateData} -> + ?INFO_MSG("(~w) Reopened session for ~s", + [StateData#state.socket, + jlib:jid_to_string(JID)]), + SID = {now(), self()}, + Conn = get_conn_type(NewStateData), + Info = [{ip, StateData#state.ip}, {conn, Conn}, + {auth_module, NewStateData#state.auth_module}], + ejabberd_sm:open_session( + SID, + NewStateData#state.user, + NewStateData#state.server, + NewStateData#state.resource, + Info), + StateData2 = + NewStateData#state{ + socket = StateData#state.socket, + sockmod = StateData#state.sockmod, + socket_monitor = StateData#state.socket_monitor, + sid = SID, + ip = StateData#state.ip, + keepalive_timer = StateData#state.keepalive_timer, + ack_timer = undefined + }, + Presence = StateData2#state.pres_last, + case Presence of + undefined -> + ok; + _ -> + NewPriority = get_priority_from_presence(Presence), + update_priority(NewPriority, Presence, StateData2) + end, + send_element(StateData2, + {xmlelement, "rebind", + [{"xmlns", ?NS_P1_REBIND}], + []}), + StateData3 = change_reception(StateData2, true), + StateData4 = start_keepalive_timer(StateData3), + fsm_next_state(session_established, + StateData4) + after 1000 -> + send_element(StateData, + {xmlelement, "failure", + [{"xmlns", ?NS_P1_REBIND}], + [{xmlcdata, "Session not found"}]}), + fsm_next_state(wait_for_feature_request, + StateData) + end + end. + +process_push_iq(From, To, + #iq{type = _Type, sub_el = El} = IQ, + StateData) -> + {Res, NewStateData} = + case El of + {xmlelement, "push", _, _} -> + SKeepAlive = + xml:get_path_s(El, [{elem, "keepalive"}, {attr, "max"}]), + SOORTimeout = + xml:get_path_s(El, [{elem, "session"}, {attr, "duration"}]), + Status = xml:get_path_s(El, [{elem, "status"}, cdata]), + Show = xml:get_path_s(El, [{elem, "status"}, {attr, "type"}]), + SSendBody = xml:get_path_s(El, [{elem, "body"}, {attr, "send"}]), + SendBody = + case SSendBody of + "all" -> all; + "first-per-user" -> first_per_user; + "first" -> first; + "none" -> none; + _ -> none + end, + SendGroupchat = + xml:get_path_s(El, [{elem, "body"}, + {attr, "groupchat"}]) == "true", + SendFrom = send_from(El), + AppID = xml:get_path_s(El, [{elem, "appid"}, cdata]), + {Offline, Keep} = + case xml:get_path_s(El, [{elem, "offline"}, cdata]) of + "true" -> {true, false}; + "keep" -> {false, true}; + _ -> {false, false} + end, + Notification1 = xml:get_path_s(El, [{elem, "notification"}]), + Notification = + case Notification1 of + {xmlelement, _, _, _} -> + Notification1; + _ -> + {xmlelement, "notification", [], + [{xmlelement, "type", [], + [{xmlcdata, "none"}]}]} + end, + case catch {list_to_integer(SKeepAlive), + list_to_integer(SOORTimeout)} of + {KeepAlive, OORTimeout} + when OORTimeout =< ?MAX_OOR_TIMEOUT -> + if + Offline -> + ejabberd_hooks:run( + p1_push_enable_offline, + StateData#state.server, + [StateData#state.jid, + Notification, SendBody, SendFrom, AppID]); + Keep -> + ok; + true -> + ejabberd_hooks:run( + p1_push_disable, + StateData#state.server, + [StateData#state.jid, + Notification, + AppID]) + end, + NSD1 = + StateData#state{keepalive_timeout = KeepAlive, + oor_timeout = OORTimeout * 60, + oor_status = Status, + oor_show = Show, + oor_notification = Notification, + oor_send_body = SendBody, + oor_send_groupchat = SendGroupchat, + oor_send_from = SendFrom, + oor_appid = AppID, + oor_offline = Offline}, + NSD2 = start_keepalive_timer(NSD1), + {{result, []}, NSD2}; + _ -> + {{error, ?ERR_BAD_REQUEST}, StateData} + end; + {xmlelement, "disable", _, _} -> + ejabberd_hooks:run( + p1_push_disable, + StateData#state.server, + [StateData#state.jid, + StateData#state.oor_notification, + StateData#state.oor_appid]), + NSD1 = + StateData#state{keepalive_timeout = undefined, + oor_timeout = undefined, + oor_status = "", + oor_show = "", + oor_notification = undefined, + oor_send_body = all}, + NSD2 = start_keepalive_timer(NSD1), + {{result, []}, NSD2}; + {xmlelement, "badge", _, _} -> + SBadge = xml:get_path_s(El, [{attr, "unread"}]), + Badge = + case catch list_to_integer(SBadge) of + B when is_integer(B) -> + B; + _ -> + 0 + end, + NSD1 = + StateData#state{oor_unread_client = Badge}, + {{result, []}, NSD1}; + _ -> + {{error, ?ERR_BAD_REQUEST}, StateData} + end, + IQRes = + case Res of + {result, Result} -> + IQ#iq{type = result, sub_el = Result}; + {error, Error} -> + IQ#iq{type = error, sub_el = [El, Error]} + end, + ejabberd_router:route( + To, From, jlib:iq_to_xml(IQRes)), + NewStateData. + +check_x_pushed({xmlelement, _Name, _Attrs, Els}) -> + check_x_pushed1(Els). + +check_x_pushed1([]) -> + false; +check_x_pushed1([{xmlcdata, _} | Els]) -> + check_x_pushed1(Els); +check_x_pushed1([El | Els]) -> + case xml:get_tag_attr_s("xmlns", El) of + ?NS_P1_PUSHED -> + true; + _ -> + check_x_pushed1(Els) + end. + +check_x_attachment({xmlelement, _Name, _Attrs, Els}) -> + check_x_attachment1(Els). + +check_x_attachment1([]) -> + false; +check_x_attachment1([{xmlcdata, _} | Els]) -> + check_x_attachment1(Els); +check_x_attachment1([El | Els]) -> + case xml:get_tag_attr_s("xmlns", El) of + ?NS_P1_ATTACHMENT -> + true; + _ -> + check_x_attachment1(Els) + end. + + +send_from(El) -> + %% First test previous version attribute: + case xml:get_path_s(El, [{elem, "body"}, {attr, "jid"}]) of + "false" -> + none; + "true" -> + jid; + "" -> + case xml:get_path_s(El, [{elem, "body"}, {attr, "from"}]) of + "jid" -> jid; + "username" -> username; + "none" -> none; + _ -> jid + end + end. + fsm_limit_opts(Opts) -> case lists:keysearch(max_fsm_queue, 1, Opts) of {value, {_, N}} when is_integer(N) -> diff --git a/src/ejabberd_c2s.hrl b/src/ejabberd_c2s.hrl index e64abe95d..9af6b02d5 100644 --- a/src/ejabberd_c2s.hrl +++ b/src/ejabberd_c2s.hrl @@ -61,4 +61,27 @@ fsm_limit_opts, lang, debug=false, - flash_connection = false}). + flash_connection = false, + reception = true, + standby = false, + queue = queue:new(), + queue_len = 0, + pres_queue = gb_trees:empty(), + keepalive_timer, + keepalive_timeout, + oor_timeout, + oor_status = "", + oor_show = "", + oor_notification, + oor_send_body = all, + oor_send_groupchat = false, + oor_send_from = jid, + oor_appid = "", + oor_unread = 0, + oor_unread_users = ?SETS:new(), + oor_unread_client = 0, + oor_offline = false, + ack_enabled = false, + ack_counter = 0, + ack_queue = queue:new(), + ack_timer}). diff --git a/src/mod_offline.erl b/src/mod_offline.erl index dfa2f46c2..2448a17c0 100644 --- a/src/mod_offline.erl +++ b/src/mod_offline.erl @@ -30,19 +30,18 @@ -behaviour(gen_mod). -export([start/2, - loop/1, + init/1, stop/1, store_packet/3, resend_offline_messages/2, pop_offline_messages/3, - get_sm_features/5, remove_expired_messages/0, remove_old_messages/1, remove_user/2, - get_queue_length/2, webadmin_page/3, webadmin_user/4, - webadmin_user_parse_query/5]). + webadmin_user_parse_query/5, + count_offline_messages/3]). -include("ejabberd.hrl"). -include("jlib.hrl"). @@ -54,9 +53,6 @@ -define(PROCNAME, ejabberd_offline). -define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000). -%% default value for the maximum number of user messages --define(MAX_USER_MESSAGES, infinity). - start(Host, Opts) -> mnesia:create_table(offline_msg, [{disc_only_copies, [node()]}, @@ -71,28 +67,30 @@ start(Host, Opts) -> ?MODULE, remove_user, 50), ejabberd_hooks:add(anonymous_purge_hook, Host, ?MODULE, remove_user, 50), - ejabberd_hooks:add(disco_sm_features, Host, - ?MODULE, get_sm_features, 50), - ejabberd_hooks:add(disco_local_features, Host, - ?MODULE, get_sm_features, 50), ejabberd_hooks:add(webadmin_page_host, Host, ?MODULE, webadmin_page, 50), ejabberd_hooks:add(webadmin_user, Host, ?MODULE, webadmin_user, 50), ejabberd_hooks:add(webadmin_user_parse_query, Host, ?MODULE, webadmin_user_parse_query, 50), - AccessMaxOfflineMsgs = gen_mod:get_opt(access_max_user_messages, Opts, max_user_offline_messages), + ejabberd_hooks:add(count_offline_messages, Host, + ?MODULE, count_offline_messages, 50), + MaxOfflineMsgs = gen_mod:get_opt(user_max_messages, Opts, infinity), register(gen_mod:get_module_proc(Host, ?PROCNAME), - spawn(?MODULE, loop, [AccessMaxOfflineMsgs])). + spawn(?MODULE, init, [MaxOfflineMsgs])). -loop(AccessMaxOfflineMsgs) -> +%% MaxOfflineMsgs is either infinity of integer > 0 +init(infinity) -> + loop(infinity); +init(MaxOfflineMsgs) + when is_integer(MaxOfflineMsgs), MaxOfflineMsgs > 0 -> + loop(MaxOfflineMsgs). + +loop(MaxOfflineMsgs) -> receive #offline_msg{us=US} = Msg -> Msgs = receive_all(US, [Msg]), Len = length(Msgs), - {User, Host} = US, - MaxOfflineMsgs = get_max_user_messages(AccessMaxOfflineMsgs, - User, Host), F = fun() -> %% Only count messages if needed: Count = if MaxOfflineMsgs =/= infinity -> @@ -118,18 +116,9 @@ loop(AccessMaxOfflineMsgs) -> end end, mnesia:transaction(F), - loop(AccessMaxOfflineMsgs); + loop(MaxOfflineMsgs); _ -> - loop(AccessMaxOfflineMsgs) - end. - -%% Function copied from ejabberd_sm.erl: -get_max_user_messages(AccessRule, LUser, Host) -> - case acl:match_rule( - Host, AccessRule, jlib:make_jid(LUser, Host, "")) of - Max when is_integer(Max) -> Max; - infinity -> infinity; - _ -> ?MAX_USER_MESSAGES + loop(MaxOfflineMsgs) end. receive_all(US, Msgs) -> @@ -150,8 +139,6 @@ stop(Host) -> ?MODULE, remove_user, 50), ejabberd_hooks:delete(anonymous_purge_hook, Host, ?MODULE, remove_user, 50), - ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, get_sm_features, 50), - ejabberd_hooks:delete(disco_local_features, Host, ?MODULE, get_sm_features, 50), ejabberd_hooks:delete(webadmin_page_host, Host, ?MODULE, webadmin_page, 50), ejabberd_hooks:delete(webadmin_user, Host, @@ -162,27 +149,12 @@ stop(Host) -> exit(whereis(Proc), stop), {wait, Proc}. -get_sm_features(Acc, _From, _To, "", _Lang) -> - Feats = case Acc of - {result, I} -> I; - _ -> [] - end, - {result, Feats ++ [?NS_FEATURE_MSGOFFLINE]}; - -get_sm_features(_Acc, _From, _To, ?NS_FEATURE_MSGOFFLINE, _Lang) -> - %% override all lesser features... - {result, []}; - -get_sm_features(Acc, _From, _To, _Node, _Lang) -> - Acc. - - store_packet(From, To, Packet) -> Type = xml:get_tag_attr_s("type", Packet), if (Type /= "error") and (Type /= "groupchat") and (Type /= "headline") -> - case check_event_chatstates(From, To, Packet) of + case check_event(From, To, Packet) of true -> #jid{luser = LUser, lserver = LServer} = To, TimeStamp = now(), @@ -203,22 +175,12 @@ store_packet(From, To, Packet) -> ok end. -%% Check if the packet has any content about XEP-0022 or XEP-0085 -check_event_chatstates(From, To, Packet) -> +check_event(From, To, Packet) -> {xmlelement, Name, Attrs, Els} = Packet, - case find_x_event_chatstates(Els, {false, false, false}) of - %% There wasn't any x:event or chatstates subelements - {false, false, _} -> + case find_x_event(Els) of + false -> true; - %% There a chatstates subelement and other stuff, but no x:event - {false, CEl, true} when CEl /= false -> - true; - %% There was only a subelement: a chatstates - {false, CEl, false} when CEl /= false -> - %% Don't allow offline storage - false; - %% There was an x:event element, and maybe also other stuff - {El, _, _} when El /= false -> + El -> case xml:get_subtag(El, "id") of false -> case xml:get_subtag(El, "offline") of @@ -246,19 +208,16 @@ check_event_chatstates(From, To, Packet) -> end end. -%% Check if the packet has subelements about XEP-0022, XEP-0085 or other -find_x_event_chatstates([], Res) -> - Res; -find_x_event_chatstates([{xmlcdata, _} | Els], Res) -> - find_x_event_chatstates(Els, Res); -find_x_event_chatstates([El | Els], {A, B, C}) -> +find_x_event([]) -> + false; +find_x_event([{xmlcdata, _} | Els]) -> + find_x_event(Els); +find_x_event([El | Els]) -> case xml:get_tag_attr_s("xmlns", El) of ?NS_EVENT -> - find_x_event_chatstates(Els, {El, B, C}); - ?NS_CHATSTATES -> - find_x_event_chatstates(Els, {A, El, C}); + El; _ -> - find_x_event_chatstates(Els, {A, B, true}) + find_x_event(Els) end. find_x_expire(_, []) -> @@ -307,13 +266,6 @@ resend_offline_messages(User, Server) -> {xmlelement, Name, Attrs, Els ++ [jlib:timestamp_to_xml( - calendar:now_to_universal_time( - R#offline_msg.timestamp), - utc, - jlib:make_jid("", Server, ""), - "Offline Storage"), - %% TODO: Delete the next three lines once XEP-0091 is Obsolete - jlib:timestamp_to_xml( calendar:now_to_universal_time( R#offline_msg.timestamp))]}} end, @@ -343,14 +295,7 @@ pop_offline_messages(Ls, User, Server) -> {xmlelement, Name, Attrs, Els ++ [jlib:timestamp_to_xml( - calendar:now_to_universal_time( - R#offline_msg.timestamp), - utc, - jlib:make_jid("", Server, ""), - "Offline Storage"), - %% TODO: Delete the next three lines once XEP-0091 is Obsolete - jlib:timestamp_to_xml( - calendar:now_to_universal_time( + calendar:now_to_universal_time( R#offline_msg.timestamp))]}} end, lists:filter( @@ -367,7 +312,6 @@ pop_offline_messages(Ls, User, Server) -> Ls end. - remove_expired_messages() -> TimeStamp = now(), F = fun() -> @@ -530,9 +474,8 @@ webadmin_page(Acc, _, _) -> Acc. user_queue(User, Server, Query, Lang) -> US = {jlib:nodeprep(User), jlib:nameprep(Server)}, Res = user_queue_parse_query(US, Query), - MsgsAll = lists:keysort(#offline_msg.timestamp, - mnesia:dirty_read({offline_msg, US})), - Msgs = get_messages_subset(User, Server, MsgsAll), + Msgs = lists:keysort(#offline_msg.timestamp, + mnesia:dirty_read({offline_msg, US})), FMsgs = lists:map( fun(#offline_msg{timestamp = TimeStamp, from = From, to = To, @@ -614,32 +557,9 @@ user_queue_parse_query(US, Query) -> us_to_list({User, Server}) -> jlib:jid_to_string({User, Server, ""}). -get_queue_length(User, Server) -> - length(mnesia:dirty_read({offline_msg, {User, Server}})). - -get_messages_subset(User, Host, MsgsAll) -> - Access = gen_mod:get_module_opt(Host, ?MODULE, access_max_user_messages, - max_user_offline_messages), - MaxOfflineMsgs = case get_max_user_messages(Access, User, Host) of - Number when is_integer(Number) -> Number; - _ -> 100 - end, - Length = length(MsgsAll), - get_messages_subset2(MaxOfflineMsgs, Length, MsgsAll). - -get_messages_subset2(Max, Length, MsgsAll) when Length =< Max*2 -> - MsgsAll; -get_messages_subset2(Max, Length, MsgsAll) -> - FirstN = Max, - {MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll), - MsgsLastN = lists:nthtail(Length - FirstN - FirstN, Msgs2), - NoJID = jlib:make_jid("...", "...", ""), - IntermediateMsg = #offline_msg{timestamp = now(), from = NoJID, to = NoJID, - packet = {xmlelement, "...", [], []}}, - MsgsFirstN ++ [IntermediateMsg] ++ MsgsLastN. - webadmin_user(Acc, User, Server, Lang) -> - QueueLen = get_queue_length(jlib:nodeprep(User), jlib:nameprep(Server)), + US = {jlib:nodeprep(User), jlib:nameprep(Server)}, + QueueLen = length(mnesia:dirty_read({offline_msg, US})), FQueueLen = [?AC("queue/", integer_to_list(QueueLen))], Acc ++ [?XCT("h3", "Offline Messages:")] ++ FQueueLen ++ [?C(" "), ?INPUTT("submit", "removealloffline", "Remove All Offline Messages")]. @@ -663,3 +583,23 @@ webadmin_user_parse_query(_, "removealloffline", User, Server, _Query) -> end; webadmin_user_parse_query(Acc, _Action, _User, _Server, _Query) -> Acc. + + +%% ------------------------------------------------ +%% mod_offline: number of messages quota management + +count_offline_messages(_Acc, User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + US = {LUser, LServer}, + F = fun () -> + p1_mnesia:count_records( + offline_msg, + #offline_msg{us=US, _='_'}) + end, + N = case catch mnesia:async_dirty(F) of + I when is_integer(I) -> I; + _ -> 0 + end, + {stop, N}. + diff --git a/src/mod_offline_odbc.erl b/src/mod_offline_odbc.erl index 063ea709e..420ff581f 100644 --- a/src/mod_offline_odbc.erl +++ b/src/mod_offline_odbc.erl @@ -32,16 +32,15 @@ -export([count_offline_messages/2]). -export([start/2, - loop/2, + init/2, stop/1, store_packet/3, pop_offline_messages/3, - get_sm_features/5, remove_user/2, - get_queue_length/2, webadmin_page/3, webadmin_user/4, - webadmin_user_parse_query/5]). + webadmin_user_parse_query/5, + count_offline_messages/3]). -include("ejabberd.hrl"). -include("jlib.hrl"). @@ -53,9 +52,6 @@ -define(PROCNAME, ejabberd_offline). -define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000). -%% default value for the maximum number of user messages --define(MAX_USER_MESSAGES, infinity). - start(Host, Opts) -> ejabberd_hooks:add(offline_message_hook, Host, ?MODULE, store_packet, 50), @@ -65,27 +61,30 @@ start(Host, Opts) -> ?MODULE, remove_user, 50), ejabberd_hooks:add(anonymous_purge_hook, Host, ?MODULE, remove_user, 50), - ejabberd_hooks:add(disco_sm_features, Host, - ?MODULE, get_sm_features, 50), - ejabberd_hooks:add(disco_local_features, Host, - ?MODULE, get_sm_features, 50), ejabberd_hooks:add(webadmin_page_host, Host, ?MODULE, webadmin_page, 50), ejabberd_hooks:add(webadmin_user, Host, ?MODULE, webadmin_user, 50), ejabberd_hooks:add(webadmin_user_parse_query, Host, ?MODULE, webadmin_user_parse_query, 50), - AccessMaxOfflineMsgs = gen_mod:get_opt(access_max_user_messages, Opts, max_user_offline_messages), + ejabberd_hooks:add(count_offline_messages, Host, + ?MODULE, count_offline_messages, 50), + MaxOfflineMsgs = gen_mod:get_opt(user_max_messages, Opts, infinity), register(gen_mod:get_module_proc(Host, ?PROCNAME), - spawn(?MODULE, loop, [Host, AccessMaxOfflineMsgs])). + spawn(?MODULE, init, [Host, MaxOfflineMsgs])). -loop(Host, AccessMaxOfflineMsgs) -> +%% MaxOfflineMsgs is either infinity of integer > 0 +init(Host, infinity) -> + loop(Host, infinity); +init(Host, MaxOfflineMsgs) + when is_integer(MaxOfflineMsgs), MaxOfflineMsgs > 0 -> + loop(Host, MaxOfflineMsgs). + +loop(Host, MaxOfflineMsgs) -> receive #offline_msg{user = User} = Msg -> Msgs = receive_all(User, [Msg]), Len = length(Msgs), - MaxOfflineMsgs = get_max_user_messages(AccessMaxOfflineMsgs, - User, Host), %% Only count existing messages if needed: Count = if MaxOfflineMsgs =/= infinity -> @@ -113,17 +112,11 @@ loop(Host, AccessMaxOfflineMsgs) -> Els ++ [jlib:timestamp_to_xml( calendar:now_to_universal_time( - M#offline_msg.timestamp), - utc, - jlib:make_jid("", Host, ""), - "Offline Storage"), - %% TODO: Delete the next three lines once XEP-0091 is Obsolete - jlib:timestamp_to_xml( - calendar:now_to_universal_time( M#offline_msg.timestamp))]}, XML = ejabberd_odbc:escape( - xml:element_to_binary(Packet)), + lists:flatten( + xml:element_to_string(Packet))), odbc_queries:add_spool_sql(Username, XML) end, Msgs), case catch odbc_queries:add_spool(Host, Query) of @@ -135,18 +128,9 @@ loop(Host, AccessMaxOfflineMsgs) -> ok end end, - loop(Host, AccessMaxOfflineMsgs); + loop(Host, MaxOfflineMsgs); _ -> - loop(Host, AccessMaxOfflineMsgs) - end. - -%% Function copied from ejabberd_sm.erl: -get_max_user_messages(AccessRule, LUser, Host) -> - case acl:match_rule( - Host, AccessRule, jlib:make_jid(LUser, Host, "")) of - Max when is_integer(Max) -> Max; - infinity -> infinity; - _ -> ?MAX_USER_MESSAGES + loop(Host, MaxOfflineMsgs) end. receive_all(Username, Msgs) -> @@ -167,8 +151,6 @@ stop(Host) -> ?MODULE, remove_user, 50), ejabberd_hooks:delete(anonymous_purge_hook, Host, ?MODULE, remove_user, 50), - ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, get_sm_features, 50), - ejabberd_hooks:delete(disco_local_features, Host, ?MODULE, get_sm_features, 50), ejabberd_hooks:delete(webadmin_page_host, Host, ?MODULE, webadmin_page, 50), ejabberd_hooks:delete(webadmin_user, Host, @@ -179,27 +161,12 @@ stop(Host) -> exit(whereis(Proc), stop), ok. -get_sm_features(Acc, _From, _To, "", _Lang) -> - Feats = case Acc of - {result, I} -> I; - _ -> [] - end, - {result, Feats ++ [?NS_FEATURE_MSGOFFLINE]}; - -get_sm_features(_Acc, _From, _To, ?NS_FEATURE_MSGOFFLINE, _Lang) -> - %% override all lesser features... - {result, []}; - -get_sm_features(Acc, _From, _To, _Node, _Lang) -> - Acc. - - store_packet(From, To, Packet) -> Type = xml:get_tag_attr_s("type", Packet), if (Type /= "error") and (Type /= "groupchat") and (Type /= "headline") -> - case check_event_chatstates(From, To, Packet) of + case check_event(From, To, Packet) of true -> #jid{luser = LUser} = To, TimeStamp = now(), @@ -220,22 +187,12 @@ store_packet(From, To, Packet) -> ok end. -%% Check if the packet has any content about XEP-0022 or XEP-0085 -check_event_chatstates(From, To, Packet) -> +check_event(From, To, Packet) -> {xmlelement, Name, Attrs, Els} = Packet, - case find_x_event_chatstates(Els, {false, false, false}) of - %% There wasn't any x:event or chatstates subelements - {false, false, _} -> + case find_x_event(Els) of + false -> true; - %% There a chatstates subelement and other stuff, but no x:event - {false, CEl, true} when CEl /= false -> - true; - %% There was only a subelement: a chatstates - {false, CEl, false} when CEl /= false -> - %% Don't allow offline storage - false; - %% There was an x:event element, and maybe also other stuff - {El, _, _} when El /= false -> + El -> case xml:get_subtag(El, "id") of false -> case xml:get_subtag(El, "offline") of @@ -257,25 +214,22 @@ check_event_chatstates(From, To, Packet) -> {xmlelement, "offline", [], []}]}] }), true - end; + end; _ -> false end end. -%% Check if the packet has subelements about XEP-0022, XEP-0085 or other -find_x_event_chatstates([], Res) -> - Res; -find_x_event_chatstates([{xmlcdata, _} | Els], Res) -> - find_x_event_chatstates(Els, Res); -find_x_event_chatstates([El | Els], {A, B, C}) -> +find_x_event([]) -> + false; +find_x_event([{xmlcdata, _} | Els]) -> + find_x_event(Els); +find_x_event([El | Els]) -> case xml:get_tag_attr_s("xmlns", El) of ?NS_EVENT -> - find_x_event_chatstates(Els, {El, B, C}); - ?NS_CHATSTATES -> - find_x_event_chatstates(Els, {A, El, C}); + El; _ -> - find_x_event_chatstates(Els, {A, B, true}) + find_x_event(Els) end. find_x_expire(_, []) -> @@ -375,7 +329,7 @@ user_queue(User, Server, Query, Lang) -> Username = ejabberd_odbc:escape(LUser), US = {LUser, LServer}, Res = user_queue_parse_query(Username, LServer, Query), - MsgsAll = case catch ejabberd_odbc:sql_query( + Msgs = case catch ejabberd_odbc:sql_query( LServer, ["select username, xml from spool" " where username='", Username, "'" @@ -393,7 +347,6 @@ user_queue(User, Server, Query, Lang) -> _ -> [] end, - Msgs = get_messages_subset(User, Server, MsgsAll), FMsgs = lists:map( fun({xmlelement, _Name, _Attrs, _Els} = Msg) -> @@ -480,8 +433,11 @@ user_queue_parse_query(Username, LServer, Query) -> us_to_list({User, Server}) -> jlib:jid_to_string({User, Server, ""}). -get_queue_length(Username, LServer) -> - case catch ejabberd_odbc:sql_query( +webadmin_user(Acc, User, Server, Lang) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + Username = ejabberd_odbc:escape(LUser), + QueueLen = case catch ejabberd_odbc:sql_query( LServer, ["select count(*) from spool" " where username='", Username, "';"]) of @@ -489,32 +445,7 @@ get_queue_length(Username, LServer) -> SCount; _ -> 0 - end. - -get_messages_subset(User, Host, MsgsAll) -> - Access = gen_mod:get_module_opt(Host, ?MODULE, access_max_user_messages, - max_user_offline_messages), - MaxOfflineMsgs = case get_max_user_messages(Access, User, Host) of - Number when is_integer(Number) -> Number; - _ -> 100 - end, - Length = length(MsgsAll), - get_messages_subset2(MaxOfflineMsgs, Length, MsgsAll). - -get_messages_subset2(Max, Length, MsgsAll) when Length =< Max*2 -> - MsgsAll; -get_messages_subset2(Max, Length, MsgsAll) -> - FirstN = Max, - {MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll), - MsgsLastN = lists:nthtail(Length - FirstN - FirstN, Msgs2), - IntermediateMsg = {xmlelement, "...", [], []}, - MsgsFirstN ++ [IntermediateMsg] ++ MsgsLastN. - -webadmin_user(Acc, User, Server, Lang) -> - LUser = jlib:nodeprep(User), - LServer = jlib:nameprep(Server), - Username = ejabberd_odbc:escape(LUser), - QueueLen = get_queue_length(Username, LServer), + end, FQueueLen = [?AC("queue/", QueueLen)], Acc ++ [?XCT("h3", "Offline Messages:")] ++ FQueueLen ++ [?C(" "), ?INPUTT("submit", "removealloffline", "Remove All Offline Messages")]. @@ -546,3 +477,30 @@ count_offline_messages(LUser, LServer) -> _ -> 0 end. + +count_offline_messages(_Acc, User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + Num = case catch ejabberd_odbc:sql_query( + LServer, + ["select xml from spool" + " where username='", LUser, "';"]) of + {selected, ["xml"], Rs} -> + lists:foldl( + fun({XML}, Acc) -> + case xml_stream:parse_element(XML) of + {error, _Reason} -> + Acc; + El -> + case xml:get_subtag(El, "body") of + false -> + Acc; + _ -> + Acc + 1 + end + end + end, 0, Rs); + _ -> + 0 + end, + {stop, Num}. diff --git a/src/xml.erl b/src/xml.erl index 22746b10a..3e0157313 100644 --- a/src/xml.erl +++ b/src/xml.erl @@ -31,6 +31,7 @@ element_to_binary/1, crypt/1, make_text_node/1, remove_cdata/1, + remove_subtags/3, get_cdata/1, get_tag_cdata/1, get_attr/2, get_attr_s/2, get_tag_attr/2, get_tag_attr_s/2, @@ -186,6 +187,31 @@ remove_cdata_p(_) -> false. remove_cdata(L) -> [E || E <- L, remove_cdata_p(E)]. +%% TODO: Make more generic. +%% For now only support all parameters: +%% xml:remove_subtags({xmlelement,"message", [{"id","81be72"}],[{xmlelement,"on-sender-server",[{"xmlns","urn:xmpp:receipts"},{"server","text-one.com"}], []}]}, "on-sender-server", {"xmlns","urn:xmpp:receipts"}). +remove_subtags({xmlelement, TagName, TagAttrs, Els}, Name, Attr) -> + {xmlelement, TagName, TagAttrs, remove_subtags1(Els, [], Name, Attr)}. + +remove_subtags1([], NewEls, _Name, _Attr) -> + lists:reverse(NewEls); +remove_subtags1([El | Els], NewEls, Name, {AttrName, AttrValue} = Attr) -> + case El of + {xmlelement, Name, Attrs, _} -> + case get_attr(AttrName, Attrs) of + false -> + remove_subtags1(Els, [El|NewEls], Name, Attr); + {value, AttrValue} -> + remove_subtags1(Els, NewEls, Name, Attr); + _ -> + remove_subtags1(Els, [El|NewEls], Name, Attr) + end; + _ -> + remove_subtags1(Els, [El|NewEls], Name, Attr) + end. + + + get_cdata(L) -> binary_to_list(list_to_binary(get_cdata(L, ""))). diff --git a/src/xml_stream.erl b/src/xml_stream.erl index b98e3291b..947992ad0 100644 --- a/src/xml_stream.erl +++ b/src/xml_stream.erl @@ -76,6 +76,9 @@ process_data(CallbackPid, Stack, Data) -> {?XML_CDATA, CData} -> case Stack of [El] -> + catch gen_fsm:send_all_state_event( + CallbackPid, + {xmlstreamcdata, CData}), [El]; %% Merge CDATA nodes if they are contiguous %% This does not change the semantic: the split in @@ -89,7 +92,8 @@ process_data(CallbackPid, Stack, Data) -> [{xmlelement, Name, Attrs, Els} | Tail] -> [{xmlelement, Name, Attrs, [{xmlcdata, CData} | Els]} | Tail]; - [] -> [] + [] -> + [] end; {?XML_ERROR, Err} -> catch gen_fsm:send_event(CallbackPid, {xmlstreamerror, Err})