diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 277a6cbb2..0b2f3a722 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -114,15 +114,6 @@ ?SERRT_POLICY_VIOLATION(Lang, Text)). -define(INVALID_FROM, ?SERR_INVALID_FROM). --define(NS_P1_REBIND, "p1:rebind"). --define(NS_P1_PUSH, "p1:push"). --define(NS_P1_ACK, "p1:ack"). --define(NS_P1_PUSHED, "p1:pushed"). --define(NS_P1_ATTACHMENT, "http://process-one.net/attachement"). - --define(C2S_P1_ACK_TIMEOUT, 10000). --define(MAX_OOR_TIMEOUT, 1440). %% Max allowed session duration 24h (24*60) --define(MAX_OOR_MESSAGES, 1000). %%%---------------------------------------------------------------------- %%% API @@ -354,22 +345,9 @@ wait_for_stream({xmlstreamstart, Name, Attrs}, StateData) -> false -> [] end, - P1PushFeature = - [{xmlelement, "push", - [{"xmlns", ?NS_P1_PUSH}], []}], - P1RebindFeature = - [{xmlelement, "rebind", - [{"xmlns", ?NS_P1_REBIND}], []}], - P1AckFeature = - [{xmlelement, "ack", - [{"xmlns", ?NS_P1_ACK}], []}], send_element(StateData, {xmlelement, "stream:features", [], - TLSFeature ++ - CompressFeature ++ - P1PushFeature ++ - P1RebindFeature ++ - P1AckFeature ++ + TLSFeature ++ CompressFeature ++ [{xmlelement, "mechanisms", [{"xmlns", ?NS_SASL}], Mechs}] ++ @@ -390,9 +368,7 @@ wait_for_stream({xmlstreamstart, Name, Attrs}, StateData) -> roster_get_versioning_feature, Server, [], [Server]), StreamFeatures = - [{xmlelement, "push", - [{"xmlns", ?NS_P1_PUSH}], []}, - {xmlelement, "bind", + [{xmlelement, "bind", [{"xmlns", ?NS_BIND}], []}, {xmlelement, "session", [{"xmlns", ?NS_SESSION}], []}] @@ -601,33 +577,8 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> end end; _ -> - {xmlelement, Name, Attrs, _Els} = El, - case {xml:get_attr_s("xmlns", Attrs), Name} of - {?NS_P1_REBIND, "rebind"} -> - SJID = xml:get_path_s(El, [{elem, "jid"}, cdata]), - SID = xml:get_path_s(El, [{elem, "sid"}, cdata]), - case jlib:string_to_jid(SJID) of - error -> - send_element(StateData, - {xmlelement, "failure", - [{"xmlns", ?NS_P1_REBIND}], - [{xmlcdata, "Invalid JID"}]}), - fsm_next_state(wait_for_auth, - StateData); - JID -> - case rebind(StateData, JID, SID) of - {next_state, wait_for_feature_request, - NewStateData, Timeout} -> - {next_state, wait_for_auth, - NewStateData, Timeout}; - Res -> - Res - end - end; - _ -> - process_unauthenticated_stanza(StateData, El), - fsm_next_state(wait_for_auth, StateData) - end + process_unauthenticated_stanza(StateData, El), + fsm_next_state(wait_for_auth, StateData) end; wait_for_auth(timeout, StateData) -> @@ -758,23 +709,6 @@ wait_for_feature_request({xmlstreamelement, El}, StateData) -> StateData) end end; - {?NS_P1_REBIND, "rebind"} -> - SJID = xml:get_path_s(El, [{elem, "jid"}, cdata]), - SID = xml:get_path_s(El, [{elem, "sid"}, cdata]), - case jlib:string_to_jid(SJID) of - error -> - send_element(StateData, - {xmlelement, "failure", - [{"xmlns", ?NS_P1_REBIND}], - [{xmlcdata, "Invalid JID"}]}), - fsm_next_state(wait_for_feature_request, - StateData); - JID -> - rebind(StateData, JID, SID) - end; - {?NS_P1_ACK, "ack"} -> - fsm_next_state(wait_for_feature_request, - StateData#state{ack_enabled = true}); _ -> if (SockMod == gen_tcp) and TLSRequired -> @@ -1020,9 +954,7 @@ session_established({xmlstreamelement, El}, StateData) -> send_trailer(StateData), {stop, normal, StateData}; _NewEl -> - NSD1 = change_reception(StateData, true), - NSD2 = start_keepalive_timer(NSD1), - session_established2(El, NSD2) + session_established2(El, StateData) end; %% We hibernate the process to reduce memory consumption after a @@ -1049,16 +981,7 @@ session_established({xmlstreamerror, _}, StateData) -> {stop, normal, StateData}; session_established(closed, StateData) -> - if - not StateData#state.reception -> - fsm_next_state(session_established, StateData); - (StateData#state.keepalive_timer /= undefined) -> - NewState1 = change_reception(StateData, false), - NewState = start_keepalive_timer(NewState1), - fsm_next_state(session_established, NewState); - true -> - {stop, normal, StateData} - end. + {stop, normal, StateData}. %% Process packets sent by user (coming from user on c2s XMPP %% connection) @@ -1129,8 +1052,6 @@ session_established2(El, StateData) -> [StateData#state.debug, FromJID, ToJID, NewEl]), process_privacy_iq( FromJID, ToJID, IQ, StateData); - #iq{xmlns = ?NS_P1_PUSH} = IQ -> - process_push_iq(FromJID, ToJID, IQ, StateData); _ -> ejabberd_hooks:run( user_send_packet, @@ -1147,12 +1068,6 @@ session_established2(El, StateData) -> check_privacy_route(FromJID, StateData, FromJID, ToJID, NewEl), StateData; - "standby" -> - StandBy = xml:get_tag_cdata(NewEl) == "true", - change_standby(StateData, StandBy); - "a" -> - SCounter = xml:get_tag_attr_s("h", NewEl), - receive_ack(StateData, SCounter); _ -> StateData end @@ -1384,17 +1299,6 @@ handle_info({route, From, To, Packet}, StateName, StateData) -> _ -> {false, Attrs, StateData} end; - rebind -> - {Pid2, StreamID2} = Els, - if - StreamID2 == StateData#state.streamid -> - Pid2 ! {rebind, prepare_acks_for_rebind(StateData)}, - receive after 1000 -> ok end, - {exit, Attrs, rebind}; - true -> - Pid2 ! {rebind, false}, - {false, Attrs, StateData} - end; "iq" -> IQ = jlib:iq_query_info(Packet), case IQ of @@ -1440,22 +1344,18 @@ handle_info({route, From, To, Packet}, StateName, StateData) -> {From, To, Packet}, in]) of allow -> - if StateData#state.reception -> - case ejabberd_hooks:run_fold( - feature_check_packet, StateData#state.server, - allow, - [StateData#state.jid, - StateData#state.server, - StateData#state.pres_last, - {From, To, Packet}, - in]) of - allow -> - {true, Attrs, StateData}; - deny -> - {false, Attrs, StateData} - end; - true -> - {true, Attrs, StateData} + case ejabberd_hooks:run_fold( + feature_check_packet, StateData#state.server, + allow, + [StateData#state.jid, + StateData#state.server, + StateData#state.pres_last, + {From, To, Packet}, + in]) of + allow -> + {true, Attrs, StateData}; + deny -> + {false, Attrs, StateData} end; deny -> {false, Attrs, StateData} @@ -1465,85 +1365,29 @@ handle_info({route, From, To, Packet}, StateName, StateData) -> end, if Pass == exit -> - catch send_trailer(StateData), - case NewState of - rebind -> - {stop, normal, StateData#state{authenticated = rebinded}}; - _ -> - {stop, normal, StateData} - end; + %% When Pass==exit, NewState contains a string instead of a #state{} + Lang = StateData#state.lang, + send_element(StateData, ?SERRT_CONFLICT(Lang, NewState)), + send_trailer(StateData), + {stop, normal, StateData}; Pass -> Attrs2 = jlib:replace_from_to_attrs(jlib:jid_to_string(From), jlib:jid_to_string(To), NewAttrs), FixedPacket = {xmlelement, Name, Attrs2, Els}, - NewState2 = - if - NewState#state.reception and - not (NewState#state.standby and (Name /= "message")) -> - send_element(NewState, FixedPacket), - ack(NewState, From, To, FixedPacket); - true -> - NewState1 = send_out_of_reception_message( - NewState, From, To, Packet), - enqueue(NewState1, From, To, FixedPacket) - end, + send_element(StateData, FixedPacket), ejabberd_hooks:run(user_receive_packet, StateData#state.server, [StateData#state.debug, StateData#state.jid, From, To, FixedPacket]), ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), - fsm_next_state(StateName, NewState2); + fsm_next_state(StateName, NewState); true -> ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), fsm_next_state(StateName, NewState) end; -handle_info({timeout, Timer, _}, StateName, - #state{keepalive_timer = Timer, reception = true} = StateData) -> - NewState1 = change_reception(StateData, false), - NewState = start_keepalive_timer(NewState1), - fsm_next_state(StateName, NewState); -handle_info({timeout, Timer, _}, _StateName, - #state{keepalive_timer = Timer, reception = false} = StateData) -> - {stop, normal, StateData}; -handle_info({timeout, Timer, PrevCounter}, StateName, - #state{ack_timer = Timer} = StateData) -> - AckCounter = StateData#state.ack_counter, - NewState = - if - PrevCounter >= AckCounter -> - StateData#state{ack_timer = undefined}; - true -> - send_ack_request(StateData#state{ack_timer = undefined}) - end, - fsm_next_state(StateName, NewState); -handle_info({ack_timeout, Counter}, StateName, StateData) -> - AckQueue = StateData#state.ack_queue, - case queue:is_empty(AckQueue) of - true -> - fsm_next_state(StateName, StateData); - false -> - C = element(1, queue:head(AckQueue)), - if - C =< Counter -> - {stop, normal, StateData}; - true -> - fsm_next_state(StateName, StateData) - end - end; -handle_info({'DOWN', Monitor, _Type, _Object, _Info}, StateName, StateData) +handle_info({'DOWN', Monitor, _Type, _Object, _Info}, _StateName, StateData) when Monitor == StateData#state.socket_monitor -> - if - (StateName == session_established) and - (not StateData#state.reception) -> - fsm_next_state(StateName, StateData); - (StateName == session_established) and - (StateData#state.keepalive_timer /= undefined) -> - NewState1 = change_reception(StateData, false), - NewState = start_keepalive_timer(NewState1), - fsm_next_state(StateName, NewState); - true -> - {stop, normal, StateData} - end; + {stop, normal, StateData}; handle_info(system_shutdown, StateName, StateData) -> case StateName of wait_for_stream -> @@ -1640,13 +1484,6 @@ terminate(_Reason, StateName, StateData) -> StateData, From, StateData#state.pres_a, Packet), presence_broadcast( StateData, From, StateData#state.pres_i, Packet); - rebinded -> - ejabberd_sm:close_session( - StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource), - ok; _ -> ?INFO_MSG("(~w) Close session for ~s", [StateData#state.socket, @@ -1678,36 +1515,6 @@ terminate(_Reason, StateName, StateData) -> StateData, From, StateData#state.pres_i, Packet) end end, - case StateData#state.authenticated of - rebinded -> - ok; - _ -> - if - not StateData#state.reception, not StateData#state.oor_offline -> - SFrom = jlib:jid_to_string(StateData#state.jid), - ejabberd_hooks:run( - p1_push_notification, - StateData#state.server, - [StateData#state.server, - StateData#state.jid, - StateData#state.oor_notification, - "Instant messaging session expired", - 0, - false, - StateData#state.oor_appid, - SFrom]); - true -> - ok - end, - lists:foreach( - fun({_Counter, From, To, FixedPacket}) -> - ejabberd_router:route(From, To, FixedPacket) - end, queue:to_list(StateData#state.ack_queue)), - lists:foreach( - fun({From, To, FixedPacket}) -> - ejabberd_router:route(From, To, FixedPacket) - end, queue:to_list(StateData#state.queue)) - end, bounce_messages(); _ -> ok @@ -1878,39 +1685,9 @@ process_presence_probe(From, To, StateData) -> andalso ?SETS:is_element(LFrom, StateData#state.pres_a), if Cond1 -> - Packet = - case StateData#state.reception of - true -> - StateData#state.pres_last; - false -> - case StateData#state.oor_show of - "" -> - StateData#state.pres_last; - _ -> - {xmlelement, _, PresAttrs, PresEls} = - StateData#state.pres_last, - PresEls1 = - lists:flatmap( - fun({xmlelement, Name, _, _}) - when Name == "show"; - Name == "status" -> - []; - (E) -> - [E] - end, PresEls), - {xmlelement, "presence", PresAttrs, - [{xmlelement, "show", [], - [{xmlcdata, - StateData#state.oor_show}]}, - {xmlelement, "status", [], - [{xmlcdata, - StateData#state.oor_status}]}] - ++ PresEls1} - end - end, Timestamp = StateData#state.pres_timestamp, - Packet1 = xml:append_subtags( - Packet, + Packet = xml:append_subtags( + StateData#state.pres_last, %% To is the one sending the presence (the target of the probe) [jlib:timestamp_to_xml(Timestamp, utc, To, ""), %% TODO: Delete the next line once XEP-0091 is Obsolete @@ -1921,7 +1698,7 @@ process_presence_probe(From, To, StateData) -> [StateData#state.user, StateData#state.server, StateData#state.privacy_list, - {To, From, Packet1}, + {To, From, Packet}, out]) of deny -> ok; @@ -1931,7 +1708,7 @@ process_presence_probe(From, To, StateData) -> %% Don't route a presence probe to oneself case From == To of false -> - ejabberd_router:route(To, From, Packet1); + ejabberd_router:route(To, From, Packet); true -> ok end @@ -2259,8 +2036,8 @@ roster_change(IJID, ISubscription, StateData) -> P -> ?DEBUG("roster changed for ~p~n", [StateData#state.user]), From = StateData#state.jid, - To = jlib:make_jid(IJID), -% To = IJID, +% To = jlib:make_jid(IJID) + To = IJID, Cond1 = (not StateData#state.pres_invis) and IsFrom and (not OldIsFrom), Cond2 = (not IsFrom) and OldIsFrom @@ -2557,631 +2334,6 @@ check_from(El, FromJID) -> end end. -start_keepalive_timer(StateData) -> - if - is_reference(StateData#state.keepalive_timer) -> - cancel_timer(StateData#state.keepalive_timer); - true -> - ok - end, - Timeout = - if - StateData#state.reception -> StateData#state.keepalive_timeout; - true -> StateData#state.oor_timeout - end, - Timer = - if - is_integer(Timeout) -> - erlang:start_timer(Timeout * 1000, self(), []); - true -> - undefined - end, - StateData#state{keepalive_timer = Timer}. - -change_reception(#state{reception = Reception} = StateData, Reception) -> - StateData; -change_reception(#state{reception = true} = StateData, false) -> - ?DEBUG("reception -> false", []), - case StateData#state.oor_show of - "" -> - ok; - _ -> - Packet = - {xmlelement, "presence", [], - [{xmlelement, "show", [], - [{xmlcdata, StateData#state.oor_show}]}, - {xmlelement, "status", [], - [{xmlcdata, StateData#state.oor_status}]}]}, - update_priority(0, Packet, StateData), - presence_broadcast_to_trusted( - StateData, - StateData#state.jid, - StateData#state.pres_f, - StateData#state.pres_a, - Packet) - end, - StateData#state{reception = false}; -change_reception(#state{reception = false, standby = true} = StateData, true) -> - ?DEBUG("reception -> standby", []), - NewQueue = - lists:foldl( - fun({_From, _To, {xmlelement, "message", _, _} = FixedPacket}, Q) -> - send_element(StateData, FixedPacket), - Q; - (Item, Q) -> - queue:in(Item, Q) - end, queue:new(), queue:to_list(StateData#state.queue)), - StateData#state{queue = NewQueue, - queue_len = queue:len(NewQueue), - reception = true, - oor_unread = 0, - oor_unread_users = ?SETS:new()}; -change_reception(#state{reception = false} = StateData, true) -> - ?DEBUG("reception -> true", []), - case StateData#state.oor_show of - "" -> - ok; - _ -> - Packet = StateData#state.pres_last, - NewPriority = get_priority_from_presence(Packet), - update_priority(NewPriority, Packet, StateData), - presence_broadcast_to_trusted( - StateData, - StateData#state.jid, - StateData#state.pres_f, - StateData#state.pres_a, - Packet) - end, - lists:foreach( - fun({_From, _To, FixedPacket}) -> - send_element(StateData, FixedPacket) - end, queue:to_list(StateData#state.queue)), - lists:foreach( - fun(FixedPacket) -> - send_element(StateData, FixedPacket) - end, gb_trees:values(StateData#state.pres_queue)), - StateData#state{queue = queue:new(), - queue_len = 0, - pres_queue = gb_trees:empty(), - reception = true, - oor_unread = 0, - oor_unread_users = ?SETS:new()}. - -change_standby(#state{standby = StandBy} = StateData, StandBy) -> - StateData; -change_standby(#state{standby = false} = StateData, true) -> - ?DEBUG("standby -> true", []), - StateData#state{standby = true}; -change_standby(#state{standby = true} = StateData, false) -> - ?DEBUG("standby -> false", []), - lists:foreach( - fun({_From, _To, FixedPacket}) -> - send_element(StateData, FixedPacket) - end, queue:to_list(StateData#state.queue)), - lists:foreach( - fun(FixedPacket) -> - send_element(StateData, FixedPacket) - end, gb_trees:values(StateData#state.pres_queue)), - StateData#state{queue = queue:new(), - queue_len = 0, - pres_queue = gb_trees:empty(), - standby = false}. - -send_out_of_reception_message(StateData, From, To, - {xmlelement, "message", _, _} = Packet) -> - Type = xml:get_tag_attr_s("type", Packet), - if - (Type == "normal") or - (Type == "") or - (Type == "chat") or - (StateData#state.oor_send_groupchat and (Type == "groupchat"))-> - %Lang = case xml:get_tag_attr_s("xml:lang", Packet) of - % "" -> - % StateData#state.lang; - % L -> - % L - % end, - %Text = translate:translate( - % Lang, "User is temporarily out of reception"), - %MsgType = "error", - %Message = {xmlelement, "message", - % [{"type", MsgType}], - % [{xmlelement, "body", [], - % [{xmlcdata, Text}]}]}, - %ejabberd_router:route(To, From, Message), - Body1 = xml:get_path_s(Packet, [{elem, "body"}, cdata]), - Body = - case check_x_attachment(Packet) of - true -> - case Body1 of - "" -> [238, 128, 136]; - _ -> - [238, 128, 136, 32 | Body1] - end; - false -> - Body1 - end, - Pushed = check_x_pushed(Packet), - if - Body == ""; - Pushed -> - StateData; - true -> - BFrom = jlib:jid_remove_resource(From), - LBFrom = jlib:jid_tolower(BFrom), - UnreadUsers = ?SETS:add_element( - LBFrom, - StateData#state.oor_unread_users), - IncludeBody = - case StateData#state.oor_send_body of - all -> - true; - first_per_user -> - not ?SETS:is_element( - LBFrom, - StateData#state.oor_unread_users); - first -> - StateData#state.oor_unread == 0; - none -> - false - end, - Unread = StateData#state.oor_unread + 1, - SFrom = jlib:jid_to_string(BFrom), - Msg = - if - IncludeBody -> - CBody = utf8_cut(Body, 100), - case StateData#state.oor_send_from of - jid -> SFrom ++ ": " ++ CBody; - username -> BFrom#jid.user ++ ": " ++ CBody; - _ -> CBody - end; - true -> - "" - end, - Sound = IncludeBody, - AppID = StateData#state.oor_appid, - ejabberd_hooks:run( - p1_push_notification, - StateData#state.server, - [StateData#state.server, - StateData#state.jid, - StateData#state.oor_notification, - Msg, - Unread, - Sound, - AppID, - SFrom]), - %% This hook is intended to give other module a - %% chance to notify the sender that the message is - %% not directly delivered to the client (almost - %% equivalent to offline). - ejabberd_hooks:run(delayed_message_hook, - StateData#state.server, - [From, To, Packet]), - StateData#state{oor_unread = Unread, - oor_unread_users = UnreadUsers} - end; - true -> - StateData - end; -send_out_of_reception_message(StateData, _From, _To, _Packet) -> - StateData. - -utf8_cut(S, Bytes) -> - utf8_cut(S, [], [], Bytes + 1). - -utf8_cut(_S, _Cur, Prev, 0) -> - lists:reverse(Prev); -utf8_cut([], Cur, _Prev, _Bytes) -> - lists:reverse(Cur); -utf8_cut([C | S], Cur, Prev, Bytes) -> - if - C bsr 6 == 2 -> - utf8_cut(S, [C | Cur], Prev, Bytes - 1); - true -> - utf8_cut(S, [C | Cur], Cur, Bytes - 1) - end. - - -cancel_timer(Timer) -> - erlang:cancel_timer(Timer), - receive - {timeout, Timer, _} -> - ok - after 0 -> - ok - end. - -enqueue(StateData, From, To, Packet) -> - IsPresence = - case Packet of - {xmlelement, "presence", _, _} -> - case xml:get_tag_attr_s("type", Packet) of - "subscribe" -> - false; - "subscribed" -> - false; - "unsubscribe" -> - false; - "unsubscribed" -> - false; - _ -> - true - end; - _ -> - false - end, - Messages = - StateData#state.queue_len + gb_trees:size(StateData#state.pres_queue), - if - Messages >= ?MAX_OOR_MESSAGES -> - self() ! {timeout, StateData#state.keepalive_timer, []}; - true -> - ok - end, - if - IsPresence -> - LFrom = jlib:jid_tolower(From), - case is_own_presence(StateData#state.jid, LFrom) of - true -> StateData; - false -> - NewQueue = gb_trees:enter(LFrom, Packet, - StateData#state.pres_queue), - StateData#state{pres_queue = NewQueue} - end; - true -> - CleanPacket = xml:remove_subtags(Packet, "x", {"xmlns", ?NS_P1_PUSHED}), - Packet2 = - case CleanPacket of - {xmlelement, "message" = Name, Attrs, Els} -> - {xmlelement, Name, Attrs, - Els ++ - [jlib:timestamp_to_xml( - calendar:now_to_universal_time(now())), - {xmlelement, "x", [{"xmlns", ?NS_P1_PUSHED}], []}]}; - _ -> - Packet - end, - NewQueue = queue:in({From, To, Packet2}, - StateData#state.queue), - NewQueueLen = StateData#state.queue_len + 1, - StateData#state{queue = NewQueue, - queue_len = NewQueueLen} - end. - -%% Is my own presence packet ? -is_own_presence(MyFullJID, MyFullJID) -> - true; -is_own_presence(_MyFullJID, _LFrom) -> - false. - -ack(StateData, From, To, Packet) -> - if - StateData#state.ack_enabled -> - NeedsAck = - case Packet of - {xmlelement, "presence", _, _} -> - case xml:get_tag_attr_s("type", Packet) of - "subscribe" -> - true; - "subscribed" -> - true; - "unsubscribe" -> - true; - "unsubscribed" -> - true; - _ -> - false - end; - {xmlelement, "message", _, _} -> - true; - _ -> - false - end, - if - NeedsAck -> - Counter = StateData#state.ack_counter + 1, - NewAckQueue = queue:in({Counter, From, To, Packet}, - StateData#state.ack_queue), - send_ack_request(StateData#state{ack_queue = NewAckQueue, - ack_counter = Counter}); - true -> - StateData - end; - true -> - StateData - end. - -send_ack_request(StateData) -> - case StateData#state.ack_timer of - undefined -> - AckCounter = StateData#state.ack_counter, - AckTimer = - erlang:start_timer(?C2S_P1_ACK_TIMEOUT, self(), AckCounter), - AckTimeout = StateData#state.keepalive_timeout + - StateData#state.oor_timeout, - erlang:send_after(AckTimeout * 1000, self(), - {ack_timeout, AckTimeout}), - send_element( - StateData, - {xmlelement, "r", - [{"h", integer_to_list(AckCounter)}], []}), - StateData#state{ack_timer = AckTimer}; - _ -> - StateData - end. - -receive_ack(StateData, SCounter) -> - case catch list_to_integer(SCounter) of - Counter when is_integer(Counter) -> - NewQueue = clean_queue(StateData#state.ack_queue, Counter), - StateData#state{ack_queue = NewQueue}; - _ -> - StateData - end. - -clean_queue(Queue, Counter) -> - case queue:is_empty(Queue) of - true -> - Queue; - false -> - C = element(1, queue:head(Queue)), - if - C =< Counter -> - clean_queue(queue:tail(Queue), Counter); - true -> - Queue - end - end. - -prepare_acks_for_rebind(StateData) -> - AckQueue = StateData#state.ack_queue, - case queue:is_empty(AckQueue) of - true -> - StateData; - false -> - Unsent = - lists:map( - fun({_Counter, From, To, FixedPacket}) -> - {From, To, FixedPacket} - end, queue:to_list(AckQueue)), - NewQueue = queue:join(queue:from_list(Unsent), - StateData#state.queue), - StateData#state{queue = NewQueue, - queue_len = queue:len(NewQueue), - ack_queue = queue:new(), - reception = false} - end. - - -rebind(StateData, JID, StreamID) -> - case JID#jid.lresource of - "" -> - send_element(StateData, - {xmlelement, "failure", - [{"xmlns", ?NS_P1_REBIND}], - [{xmlcdata, "Invalid JID"}]}), - fsm_next_state(wait_for_feature_request, - StateData); - _ -> - ejabberd_sm:route( - ?MODULE, JID, - {xmlelement, rebind, [], {self(), StreamID}}), - receive - {rebind, false} -> - send_element(StateData, - {xmlelement, "failure", - [{"xmlns", ?NS_P1_REBIND}], - [{xmlcdata, "Session not found"}]}), - fsm_next_state(wait_for_feature_request, - StateData); - {rebind, NewStateData} -> - ?INFO_MSG("(~w) Reopened session for ~s", - [StateData#state.socket, - jlib:jid_to_string(JID)]), - SID = {now(), self()}, - Conn = get_conn_type(NewStateData), - Info = [{ip, StateData#state.ip}, {conn, Conn}, - {auth_module, NewStateData#state.auth_module}], - ejabberd_sm:open_session( - SID, - NewStateData#state.user, - NewStateData#state.server, - NewStateData#state.resource, - Info), - StateData2 = - NewStateData#state{ - socket = StateData#state.socket, - sockmod = StateData#state.sockmod, - socket_monitor = StateData#state.socket_monitor, - sid = SID, - ip = StateData#state.ip, - keepalive_timer = StateData#state.keepalive_timer, - ack_timer = undefined - }, - Presence = StateData2#state.pres_last, - case Presence of - undefined -> - ok; - _ -> - NewPriority = get_priority_from_presence(Presence), - update_priority(NewPriority, Presence, StateData2) - end, - send_element(StateData2, - {xmlelement, "rebind", - [{"xmlns", ?NS_P1_REBIND}], - []}), - StateData3 = change_reception(StateData2, true), - StateData4 = start_keepalive_timer(StateData3), - fsm_next_state(session_established, - StateData4) - after 1000 -> - send_element(StateData, - {xmlelement, "failure", - [{"xmlns", ?NS_P1_REBIND}], - [{xmlcdata, "Session not found"}]}), - fsm_next_state(wait_for_feature_request, - StateData) - end - end. - -process_push_iq(From, To, - #iq{type = _Type, sub_el = El} = IQ, - StateData) -> - {Res, NewStateData} = - case El of - {xmlelement, "push", _, _} -> - SKeepAlive = - xml:get_path_s(El, [{elem, "keepalive"}, {attr, "max"}]), - SOORTimeout = - xml:get_path_s(El, [{elem, "session"}, {attr, "duration"}]), - Status = xml:get_path_s(El, [{elem, "status"}, cdata]), - Show = xml:get_path_s(El, [{elem, "status"}, {attr, "type"}]), - SSendBody = xml:get_path_s(El, [{elem, "body"}, {attr, "send"}]), - SendBody = - case SSendBody of - "all" -> all; - "first-per-user" -> first_per_user; - "first" -> first; - "none" -> none; - _ -> none - end, - SendGroupchat = - xml:get_path_s(El, [{elem, "body"}, - {attr, "groupchat"}]) == "true", - SendFrom = send_from(El), - AppID = xml:get_path_s(El, [{elem, "appid"}, cdata]), - {Offline, Keep} = - case xml:get_path_s(El, [{elem, "offline"}, cdata]) of - "true" -> {true, false}; - "keep" -> {false, true}; - _ -> {false, false} - end, - Notification1 = xml:get_path_s(El, [{elem, "notification"}]), - Notification = - case Notification1 of - {xmlelement, _, _, _} -> - Notification1; - _ -> - {xmlelement, "notification", [], - [{xmlelement, "type", [], - [{xmlcdata, "none"}]}]} - end, - case catch {list_to_integer(SKeepAlive), - list_to_integer(SOORTimeout)} of - {KeepAlive, OORTimeout} - when OORTimeout =< ?MAX_OOR_TIMEOUT -> - if - Offline -> - ejabberd_hooks:run( - p1_push_enable_offline, - StateData#state.server, - [StateData#state.jid, - Notification, SendBody, SendFrom, AppID]); - Keep -> - ok; - true -> - ejabberd_hooks:run( - p1_push_disable, - StateData#state.server, - [StateData#state.jid, - Notification, - AppID]) - end, - NSD1 = - StateData#state{keepalive_timeout = KeepAlive, - oor_timeout = OORTimeout * 60, - oor_status = Status, - oor_show = Show, - oor_notification = Notification, - oor_send_body = SendBody, - oor_send_groupchat = SendGroupchat, - oor_send_from = SendFrom, - oor_appid = AppID, - oor_offline = Offline}, - NSD2 = start_keepalive_timer(NSD1), - {{result, []}, NSD2}; - _ -> - {{error, ?ERR_BAD_REQUEST}, StateData} - end; - {xmlelement, "disable", _, _} -> - ejabberd_hooks:run( - p1_push_disable, - StateData#state.server, - [StateData#state.jid, - StateData#state.oor_notification, - StateData#state.oor_appid]), - NSD1 = - StateData#state{keepalive_timeout = undefined, - oor_timeout = undefined, - oor_status = "", - oor_show = "", - oor_notification = undefined, - oor_send_body = all}, - NSD2 = start_keepalive_timer(NSD1), - {{result, []}, NSD2}; - _ -> - {{error, ?ERR_BAD_REQUEST}, StateData} - end, - IQRes = - case Res of - {result, Result} -> - IQ#iq{type = result, sub_el = Result}; - {error, Error} -> - IQ#iq{type = error, sub_el = [El, Error]} - end, - ejabberd_router:route( - To, From, jlib:iq_to_xml(IQRes)), - NewStateData. - -check_x_pushed({xmlelement, _Name, _Attrs, Els}) -> - check_x_pushed1(Els). - -check_x_pushed1([]) -> - false; -check_x_pushed1([{xmlcdata, _} | Els]) -> - check_x_pushed1(Els); -check_x_pushed1([El | Els]) -> - case xml:get_tag_attr_s("xmlns", El) of - ?NS_P1_PUSHED -> - true; - _ -> - check_x_pushed1(Els) - end. - -check_x_attachment({xmlelement, _Name, _Attrs, Els}) -> - check_x_attachment1(Els). - -check_x_attachment1([]) -> - false; -check_x_attachment1([{xmlcdata, _} | Els]) -> - check_x_attachment1(Els); -check_x_attachment1([El | Els]) -> - case xml:get_tag_attr_s("xmlns", El) of - ?NS_P1_ATTACHMENT -> - true; - _ -> - check_x_attachment1(Els) - end. - - -send_from(El) -> - %% First test previous version attribute: - case xml:get_path_s(El, [{elem, "body"}, {attr, "jid"}]) of - "false" -> - none; - "true" -> - jid; - "" -> - case xml:get_path_s(El, [{elem, "body"}, {attr, "from"}]) of - "jid" -> jid; - "username" -> username; - "none" -> none; - _ -> jid - end - end. - fsm_limit_opts(Opts) -> case lists:keysearch(max_fsm_queue, 1, Opts) of {value, {_, N}} when is_integer(N) -> diff --git a/src/ejabberd_c2s.hrl b/src/ejabberd_c2s.hrl index 381e042ba..e2e3c2439 100644 --- a/src/ejabberd_c2s.hrl +++ b/src/ejabberd_c2s.hrl @@ -60,26 +60,4 @@ fsm_limit_opts, lang, debug=false, - flash_connection = false, - reception = true, - standby = false, - queue = queue:new(), - queue_len = 0, - pres_queue = gb_trees:empty(), - keepalive_timer, - keepalive_timeout, - oor_timeout, - oor_status = "", - oor_show = "", - oor_notification, - oor_send_body = all, - oor_send_groupchat = false, - oor_send_from = jid, - oor_appid = "", - oor_unread = 0, - oor_unread_users = ?SETS:new(), - oor_offline = false, - ack_enabled = false, - ack_counter = 0, - ack_queue = queue:new(), - ack_timer}). \ No newline at end of file + flash_connection = false}). \ No newline at end of file diff --git a/src/mod_applepush.erl b/src/mod_applepush.erl deleted file mode 100644 index 5e97f6165..000000000 --- a/src/mod_applepush.erl +++ /dev/null @@ -1,431 +0,0 @@ -%%%---------------------------------------------------------------------- -%%% File : mod_applepush.erl -%%% Author : Alexey Shchepin -%%% Purpose : Push module support -%%% Created : 5 Jun 2009 by Alexey Shchepin -%%% -%%% ejabberd, Copyright (C) 2002-2009 ProcessOne -%%%---------------------------------------------------------------------- - --module(mod_applepush). --author('alexey@process-one.net'). - --behaviour(gen_mod). - --export([start/2, - stop/1, - push_notification/8, - enable_offline_notification/5, - disable_notification/3, - receive_offline_packet/3, - resend_badge/1, - multi_resend_badge/1, - offline_resend_badge/0]). - -%% Debug commands --export([get_token_by_jid/1]). - - --include("ejabberd.hrl"). --include("jlib.hrl"). --include("mod_privacy.hrl"). - --define(NS_P1_PUSH, "p1:push"). --define(NS_P1_PUSHED, "p1:pushed"). --define(NS_P1_ATTACHMENT, "http://process-one.net/attachement"). - --record(applepush_cache, {us, device_id, options}). - -start(Host, _Opts) -> - case init_host(Host) of - true -> - mnesia:create_table( - applepush_cache, - [{disc_copies, [node()]}, - {attributes, record_info(fields, applepush_cache)}]), - mnesia:add_table_copy(muc_online_room, node(), ram_copies), - ejabberd_hooks:add(p1_push_notification, Host, - ?MODULE, push_notification, 50), - ejabberd_hooks:add(p1_push_enable_offline, Host, - ?MODULE, enable_offline_notification, 50), - ejabberd_hooks:add(p1_push_disable, Host, - ?MODULE, disable_notification, 50), - ejabberd_hooks:add(offline_message_hook, Host, - ?MODULE, receive_offline_packet, 35); - false -> - ok - end. - -stop(Host) -> - ejabberd_hooks:delete(p1_push_notification, Host, - ?MODULE, push_notification, 50), - ejabberd_hooks:delete(p1_push_disable, Host, - ?MODULE, disable_notification, 50), - ejabberd_hooks:delete(offline_message_hook, Host, - ?MODULE, receive_offline_packet, 35). - - -push_notification(Host, JID, Notification, Msg, Unread, Sound, AppID, Sender) -> - Type = xml:get_path_s(Notification, [{elem, "type"}, cdata]), - case Type of - "applepush" -> - DeviceID = xml:get_path_s(Notification, [{elem, "id"}, cdata]), - PushService = get_push_service(Host, JID, AppID), - ServiceJID = jlib:make_jid("", PushService, ""), - Badge = integer_to_list(Unread), - SSound = - if - Sound -> "true"; - true -> "false" - end, - Receiver = jlib:jid_to_string(JID), - Packet = - {xmlelement, "message", [], - [{xmlelement, "push", [{"xmlns", ?NS_P1_PUSH}], - [{xmlelement, "id", [], [{xmlcdata, DeviceID}]}, - {xmlelement, "msg", [], [{xmlcdata, Msg}]}, - {xmlelement, "badge", [], [{xmlcdata, Badge}]}, - {xmlelement, "sound", [], [{xmlcdata, SSound}]}, - {xmlelement, "from", [], [{xmlcdata, Sender}]}, - {xmlelement, "to", [], [{xmlcdata, Receiver}]}]}]}, - ejabberd_router:route(JID, ServiceJID, Packet), - stop; - _ -> - ok - end. - -enable_offline_notification(JID, Notification, SendBody, SendFrom, AppID1) -> - Type = xml:get_path_s(Notification, [{elem, "type"}, cdata]), - case Type of - "applepush" -> - DeviceID = xml:get_path_s(Notification, [{elem, "id"}, cdata]), - case catch erlang:list_to_integer(DeviceID, 16) of - ID1 when is_integer(ID1) -> - AppID = - case xml:get_path_s(Notification, - [{elem, "appid"}, cdata]) of - "" -> AppID1; - A -> A - end, - {MegaSecs, Secs, _MicroSecs} = now(), - TimeStamp = MegaSecs * 1000000 + Secs, - Options = - [{appid, AppID}, - {send_body, SendBody}, - {send_from, SendFrom}, - {timestamp, TimeStamp}], - store_cache(JID, ID1, Options); - _ -> - ok - end, - stop; - _ -> - ok - end. - -disable_notification(JID, Notification, _AppID) -> - Type = xml:get_path_s(Notification, [{elem, "type"}, cdata]), - case Type of - "applepush" -> - delete_cache(JID), - stop; - _ -> - ok - end. - -receive_offline_packet(From, To, Packet) -> - ?DEBUG("mod_applepush offline~n\tfrom ~p~n\tto ~p~n\tpacket ~P~n", - [From, To, Packet, 8]), - Host = To#jid.lserver, - case gen_mod:is_loaded(Host, mod_applepush) of - true -> - case lookup_cache(To) of - false -> - ok; - {ID, AppID, SendBody, SendFrom} -> - ?DEBUG("lookup: ~p~n", [{ID, AppID, SendBody, SendFrom}]), - Body1 = xml:get_path_s(Packet, [{elem, "body"}, cdata]), - Body = - case check_x_attachment(Packet) of - true -> - case Body1 of - "" -> [238, 128, 136]; - _ -> - [238, 128, 136, 32 | Body1] - end; - false -> - Body1 - end, - Pushed = check_x_pushed(Packet), - PushService = get_push_service(Host, To, AppID), - ServiceJID = jlib:make_jid("", PushService, ""), - if - Body == ""; - Pushed -> - if - From#jid.lserver == ServiceJID#jid.lserver -> - Disable = - xml:get_path_s( - Packet, [{elem, "disable"}]) /= "", - if - Disable -> - delete_cache(To); - true -> - ok - end; - true -> - ok - end, - ok; - true -> - BFrom = jlib:jid_remove_resource(From), - SFrom = jlib:jid_to_string(BFrom), - Offline = ejabberd_hooks:run_fold( - count_offline_messages, - Host, - 0, - [To#jid.luser, Host]), - IncludeBody = - case SendBody of - all -> - true; - first_per_user -> - Offline == 0; - first -> - Offline == 0; - none -> - false - end, - Msg = - if - IncludeBody -> - CBody = utf8_cut(Body, 100), - case SendFrom of - jid -> SFrom ++ ": " ++ CBody; - username -> BFrom#jid.user ++ ": " ++ CBody; - _ -> CBody - end; - true -> - "" - end, - SSound = - if - IncludeBody -> "true"; - true -> "false" - end, - Badge = integer_to_list(Offline + 1), - DeviceID = erlang:integer_to_list(ID, 16), - STo = jlib:jid_to_string(To), - Packet1 = - {xmlelement, "message", [], - [{xmlelement, "push", [{"xmlns", ?NS_P1_PUSH}], - [{xmlelement, "id", [], - [{xmlcdata, DeviceID}]}, - {xmlelement, "msg", [], - [{xmlcdata, Msg}]}, - {xmlelement, "badge", [], - [{xmlcdata, Badge}]}, - {xmlelement, "sound", [], - [{xmlcdata, SSound}]}, - {xmlelement, "from", [], - [{xmlcdata, SFrom}]}, - {xmlelement, "to", [], - [{xmlcdata, STo}]}]}]}, - ejabberd_router:route(To, ServiceJID, Packet1) - end - end; - false -> - ok - end. - -resend_badge(To) -> - Host = To#jid.lserver, - case gen_mod:is_loaded(Host, mod_applepush) of - true -> - case lookup_cache(To) of - false -> - {error, "no cached data for the user"}; - {ID, AppID, SendBody, SendFrom} -> - ?DEBUG("lookup: ~p~n", [{ID, AppID, SendBody, SendFrom}]), - PushService = get_push_service(Host, To, AppID), - ServiceJID = jlib:make_jid("", PushService, ""), - Offline = ejabberd_hooks:run_fold( - count_offline_messages, - Host, - 0, - [To#jid.luser, Host]), - if - Offline == 0 -> - ok; - true -> - Badge = integer_to_list(Offline), - DeviceID = erlang:integer_to_list(ID, 16), - Packet1 = - {xmlelement, "message", [], - [{xmlelement, "push", [{"xmlns", ?NS_P1_PUSH}], - [{xmlelement, "id", [], - [{xmlcdata, DeviceID}]}, - {xmlelement, "badge", [], - [{xmlcdata, Badge}]}]}]}, - ejabberd_router:route(To, ServiceJID, Packet1) - end - end; - false -> - {error, "mod_applepush is not loaded"} - end. - -multi_resend_badge(JIDs) -> - lists:foreach(fun resend_badge/1, JIDs). - -offline_resend_badge() -> - USs = mnesia:dirty_all_keys(applepush_cache), - JIDs = lists:map(fun({U, S}) -> jlib:make_jid(U, S, "") end, USs), - multi_resend_badge(JIDs). - -lookup_cache(JID) -> - #jid{luser = LUser, lserver = LServer} = JID, - LUS = {LUser, LServer}, - case catch mnesia:dirty_read(applepush_cache, LUS) of - [#applepush_cache{device_id = DeviceID, options = Options}] -> - AppID = proplists:get_value(appid, Options, "applepush.localhost"), - SendBody = proplists:get_value(send_body, Options, none), - SendFrom = proplists:get_value(send_from, Options, true), - {DeviceID, AppID, SendBody, SendFrom}; - _ -> - false - end. - -store_cache(JID, DeviceID, Options) -> - #jid{luser = LUser, lserver = LServer} = JID, - LUS = {LUser, LServer}, - R = #applepush_cache{us = LUS, - device_id = DeviceID, - options = Options}, - case catch mnesia:dirty_read(applepush_cache, LUS) of - [R] -> - ok; - _ -> - catch mnesia:dirty_write(R) - end. - -delete_cache(JID) -> - #jid{luser = LUser, lserver = LServer} = JID, - LUS = {LUser, LServer}, - catch mnesia:dirty_delete(applepush_cache, LUS). - - -utf8_cut(S, Bytes) -> - utf8_cut(S, [], [], Bytes + 1). - -utf8_cut(_S, _Cur, Prev, 0) -> - lists:reverse(Prev); -utf8_cut([], Cur, _Prev, _Bytes) -> - lists:reverse(Cur); -utf8_cut([C | S], Cur, Prev, Bytes) -> - if - C bsr 6 == 2 -> - utf8_cut(S, [C | Cur], Prev, Bytes - 1); - true -> - utf8_cut(S, [C | Cur], Cur, Bytes - 1) - end. - -check_x_pushed({xmlelement, _Name, _Attrs, Els}) -> - check_x_pushed1(Els). - -check_x_pushed1([]) -> - false; -check_x_pushed1([{xmlcdata, _} | Els]) -> - check_x_pushed1(Els); -check_x_pushed1([El | Els]) -> - case xml:get_tag_attr_s("xmlns", El) of - ?NS_P1_PUSHED -> - true; - _ -> - check_x_pushed1(Els) - end. - -check_x_attachment({xmlelement, _Name, _Attrs, Els}) -> - check_x_attachment1(Els). - -check_x_attachment1([]) -> - false; -check_x_attachment1([{xmlcdata, _} | Els]) -> - check_x_attachment1(Els); -check_x_attachment1([El | Els]) -> - case xml:get_tag_attr_s("xmlns", El) of - ?NS_P1_ATTACHMENT -> - true; - _ -> - check_x_attachment1(Els) - end. - - -get_push_service(Host, JID, AppID) -> - PushServices = - gen_mod:get_module_opt( - Host, ?MODULE, - push_services, []), - PushService = - case lists:keysearch(AppID, 1, PushServices) of - false -> - DefaultServices = - gen_mod:get_module_opt( - Host, ?MODULE, - default_services, []), - case lists:keysearch(JID#jid.lserver, 1, DefaultServices) of - false -> - gen_mod:get_module_opt( - Host, ?MODULE, - default_service, "applepush.localhost"); - {value, {_, PS}} -> - PS - end; - {value, {AppID, PS}} -> - PS - end, - PushService. - - - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% Internal module protection - --define(VALID_HOSTS, []). % default is unlimited use --define(MAX_USERS, 0). % default is unlimited use - -init_host(VHost) -> - case ?VALID_HOSTS of - [] -> % unlimited use - true; - ValidList -> % limited use - init_host(VHost, ValidList) - end. -init_host([], _) -> - false; -init_host(VHost, ValidEncryptedList) -> - EncryptedHost = erlang:md5(lists:reverse(VHost)), - case lists:member(EncryptedHost, ValidEncryptedList) of - true -> - case ?MAX_USERS of - 0 -> true; - N -> ejabberd_auth:get_vh_registered_users_number(VHost) =< N - end; - false -> - case string:chr(VHost, $.) of - 0 -> false; - Pos -> init_host(string:substr(VHost, Pos+1), ValidEncryptedList) - end - end. - -%% Debug commands -%% JID is of form -get_token_by_jid(JIDString) -> - #jid{luser = LUser, lserver = LServer} = jlib:string_to_jid(JIDString), - LUS = {LUser, LServer}, - case mnesia:dirty_read(applepush_cache, LUS) of - [{applepush_cache,_,I,_}] -> - erlang:integer_to_list(I, 16); - _ -> - undefined - end. - diff --git a/src/mod_applepush_service.erl b/src/mod_applepush_service.erl deleted file mode 100644 index 7c5ab711f..000000000 --- a/src/mod_applepush_service.erl +++ /dev/null @@ -1,598 +0,0 @@ -%%%---------------------------------------------------------------------- -%%% File : mod_applepush_service.erl -%%% Author : Alexey Shchepin -%%% Purpose : Central push infrastructure -%%% Created : 5 Jun 2009 by Alexey Shchepin -%%% -%%% ejabberd, Copyright (C) 2002-2009 ProcessOne -%%%---------------------------------------------------------------------- - --module(mod_applepush_service). --author('alexey@process-one.net'). - --behaviour(gen_server). --behaviour(gen_mod). - -%% API --export([start_link/2, start/2, stop/1]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --include("ejabberd.hrl"). --include("jlib.hrl"). - --record(state, {host, - socket, - gateway, - port, - feedback_socket, - feedback, - feedback_port, - feedback_buf = <<>>, - certfile, - queue, - soundfile, - cmd_id = 0, - cmd_cache = dict:new(), - device_cache = dict:new()}). - --define(PROCNAME, ejabberd_mod_applepush_service). --define(RECONNECT_TIMEOUT, 5000). --define(FEEDBACK_RECONNECT_TIMEOUT, 30000). --define(MAX_QUEUE_SIZE, 1000). --define(CACHE_SIZE, 4096). --define(MAX_PAYLOAD_SIZE, 255). - --define(NS_P1_PUSH, "p1:push"). - -%%==================================================================== -%% API -%%==================================================================== -%%-------------------------------------------------------------------- -%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} -%% Description: Starts the server -%%-------------------------------------------------------------------- -start_link(Host, Opts) -> - Proc = gen_mod:get_module_proc(Host, ?PROCNAME), - gen_server:start_link({local, Proc}, ?MODULE, [Host, Opts], []). - -start(Host, Opts) -> - ssl:start(), - MyHosts = - case catch gen_mod:get_opt(hosts, Opts) of - {'EXIT', _} -> - [{gen_mod:get_opt_host(Host, Opts, "applepush.@HOST@"), Opts}]; - Hs -> - Hs - end, - lists:foreach( - fun({MyHost, MyOpts}) -> - Proc = gen_mod:get_module_proc(MyHost, ?PROCNAME), - ChildSpec = - {Proc, - {?MODULE, start_link, [MyHost, MyOpts]}, - transient, - 1000, - worker, - [?MODULE]}, - supervisor:start_child(ejabberd_sup, ChildSpec) - end, MyHosts). - -stop(Host) -> - MyHosts = - case catch gen_mod:get_module_opt(Host, ?MODULE, hosts, []) of - [] -> - [gen_mod:get_module_opt_host( - Host, ?MODULE, "applepush.@HOST@")]; - Hs -> - [H || {H, _} <- Hs] - end, - lists:foreach( - fun(MyHost) -> - Proc = gen_mod:get_module_proc(MyHost, ?PROCNAME), - gen_server:call(Proc, stop), - supervisor:terminate_child(ejabberd_sup, Proc), - supervisor:delete_child(ejabberd_sup, Proc) - end, MyHosts). - - -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -%%-------------------------------------------------------------------- -%% Function: init(Args) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% Description: Initiates the server -%%-------------------------------------------------------------------- -init([MyHost, Opts]) -> - CertFile = gen_mod:get_opt(certfile, Opts, ""), - SoundFile = gen_mod:get_opt(sound_file, Opts, "pushalert.wav"), - Gateway = gen_mod:get_opt(gateway, Opts, "gateway.push.apple.com"), - Feedback = gen_mod:get_opt(feedback, Opts, undefined), - Port = gen_mod:get_opt(port, Opts, 2195), - FeedbackPort = gen_mod:get_opt(feedback_port, Opts, 2196), - %MyHost = gen_mod:get_opt_host(Host, Opts, "applepush.@HOST@"), - self() ! connect, - case Feedback of - undefined -> - ok; - _ -> - self() ! connect_feedback - end, - ejabberd_router:register_route(MyHost), - {ok, #state{host = MyHost, - gateway = Gateway, - port = Port, - feedback = Feedback, - feedback_port = FeedbackPort, - certfile = CertFile, - queue = {0, queue:new()}, - soundfile = SoundFile}}. - -%%-------------------------------------------------------------------- -%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | -%% {reply, Reply, State, Timeout} | -%% {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, Reply, State} | -%% {stop, Reason, State} -%% Description: Handling call messages -%%-------------------------------------------------------------------- -handle_call(stop, _From, State) -> - {stop, normal, ok, State}. - -%%-------------------------------------------------------------------- -%% Function: handle_cast(Msg, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling cast messages -%%-------------------------------------------------------------------- -handle_cast(_Msg, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% Function: handle_info(Info, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling all non call/cast messages -%%-------------------------------------------------------------------- -handle_info({route, From, To, Packet}, State) -> - case catch do_route(From, To, Packet, State) of - {'EXIT', Reason} -> - ?ERROR_MSG("~p", [Reason]), - {noreply, State}; - Res -> - Res - end; -handle_info(connect, State) -> - connect(State); -handle_info(connect_feedback, State) - when State#state.feedback /= undefined, - State#state.feedback_socket == undefined -> - Feedback = State#state.feedback, - FeedbackPort = State#state.feedback_port, - CertFile = State#state.certfile, - case ssl:connect(Feedback, FeedbackPort, - [{certfile, CertFile}, - {active, true}, - binary]) of - {ok, Socket} -> - {noreply, State#state{feedback_socket = Socket}}; - {error, Reason} -> - ?ERROR_MSG("(~p) Connection to ~p:~p failed: ~p, " - "retrying after ~p seconds", - [State#state.host, Feedback, FeedbackPort, - Reason, ?FEEDBACK_RECONNECT_TIMEOUT div 1000]), - erlang:send_after(?FEEDBACK_RECONNECT_TIMEOUT, self(), - connect_feedback), - {noreply, State} - end; -handle_info({ssl, Socket, Packet}, State) - when Socket == State#state.socket -> - case Packet of - <<8, Status, CmdID:32>> when Status /= 0 -> - case dict:find(CmdID, State#state.cmd_cache) of - {ok, {JID, _DeviceID}} -> - ?ERROR_MSG("PUSH ERROR for ~p: ~p", [JID, Status]), - if - Status == 8 -> - From = jlib:make_jid("", State#state.host, ""), - ejabberd_router:route( - From, JID, - {xmlelement, "message", [], - [{xmlelement, "disable", - [{"xmlns", ?NS_P1_PUSH}, - {"status", integer_to_list(Status)}], - []}]}); - true -> - ok - end, - ok; - error -> - ?ERROR_MSG("Unknown cmd ID ~p~n", [CmdID]), - ok - end; - _ -> - ?ERROR_MSG("Received unknown packet ~p~n", [Packet]) - end, - {noreply, State}; -handle_info({ssl, Socket, Packet}, State) - when Socket == State#state.feedback_socket -> - Buf = <<(State#state.feedback_buf)/binary, Packet/binary>>, - Buf2 = parse_feedback_buf(Buf, State), - {noreply, State#state{feedback_buf = Buf2}}; -handle_info({ssl_closed, Socket}, State) - when Socket == State#state.feedback_socket -> - ssl:close(Socket), - erlang:send_after(?FEEDBACK_RECONNECT_TIMEOUT, self(), - connect_feedback), - {noreply, State#state{feedback_socket = undefined, - feedback_buf = <<>>}}; -handle_info(_Info, State) -> - %io:format("got info: ~p~n", [_Info]), - {noreply, State}. - -%%-------------------------------------------------------------------- -%% Function: terminate(Reason, State) -> void() -%% Description: This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any necessary -%% cleaning up. When it returns, the gen_server terminates with Reason. -%% The return value is ignored. -%%-------------------------------------------------------------------- -terminate(_Reason, State) -> - ejabberd_router:unregister_route(State#state.host), - ok. - -%%-------------------------------------------------------------------- -%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} -%% Description: Convert process state when code is changed -%%-------------------------------------------------------------------- -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%%---------------------------------------------------------------------- -%%% Internal functions -%%%---------------------------------------------------------------------- - -do_route(From, To, Packet, State) -> - #jid{user = User, resource = Resource} = To, - if - (User /= "") or (Resource /= "") -> - Err = jlib:make_error_reply(Packet, ?ERR_SERVICE_UNAVAILABLE), - ejabberd_router:route(To, From, Err), - {noreply, State}; - true -> - case Packet of - {xmlelement, "iq", _, _} -> - IQ = jlib:iq_query_info(Packet), - case IQ of - #iq{type = get, xmlns = ?NS_DISCO_INFO = XMLNS, - sub_el = _SubEl, lang = Lang} = IQ -> - Res = IQ#iq{type = result, - sub_el = [{xmlelement, "query", - [{"xmlns", XMLNS}], - iq_disco(Lang)}]}, - ejabberd_router:route(To, - From, - jlib:iq_to_xml(Res)), - {noreply, State}; - #iq{type = get, xmlns = ?NS_DISCO_ITEMS = XMLNS} = IQ -> - Res = IQ#iq{type = result, - sub_el = [{xmlelement, "query", - [{"xmlns", XMLNS}], - []}]}, - ejabberd_router:route(To, - From, - jlib:iq_to_xml(Res)), - {noreply, State}; - %%#iq{type = get, xmlns = ?NS_VCARD, lang = Lang} -> - %% ResIQ = - %% IQ#iq{type = result, - %% sub_el = [{xmlelement, - %% "vCard", - %% [{"xmlns", ?NS_VCARD}], - %% iq_get_vcard(Lang)}]}, - %% ejabberd_router:route(To, - %% From, - %% jlib:iq_to_xml(ResIQ)); - _ -> - Err = jlib:make_error_reply(Packet, - ?ERR_SERVICE_UNAVAILABLE), - ejabberd_router:route(To, From, Err), - {noreply, State} - end; - {xmlelement, "message", _, Els} -> - case xml:remove_cdata(Els) of - [{xmlelement, "push", _, _}] -> - NewState = handle_message(From, To, Packet, State), - {noreply, NewState}; - [{xmlelement, "disable", _, _}] -> - {noreply, State}; - _ -> - {noreply, State} - end; - _ -> - {noreply, State} - end - end. - - -handle_message(From, To, Packet, #state{socket = undefined} = State) -> - queue_message(From, To, Packet, State); -handle_message(From, To, Packet, State) -> - DeviceID = - xml:get_path_s(Packet, - [{elem, "push"}, {elem, "id"}, cdata]), - Msg = - xml:get_path_s(Packet, - [{elem, "push"}, {elem, "msg"}, cdata]), - Badge = - xml:get_path_s(Packet, - [{elem, "push"}, {elem, "badge"}, cdata]), - Sound = - xml:get_path_s(Packet, - [{elem, "push"}, {elem, "sound"}, cdata]), - Sender = - xml:get_path_s(Packet, - [{elem, "push"}, {elem, "from"}, cdata]), - Receiver = - xml:get_path_s(Packet, - [{elem, "push"}, {elem, "to"}, cdata]), - Payload = make_payload(State, Msg, Badge, Sound, Sender), - ID = - case catch erlang:list_to_integer(DeviceID, 16) of - ID1 when is_integer(ID1) -> - ID1; - _ -> - false - end, - if - is_integer(ID) -> - Command = 1, - CmdID = State#state.cmd_id, - {MegaSecs, Secs, _MicroSecs} = now(), - Expiry = MegaSecs * 1000000 + Secs + 24 * 60 * 60, - BDeviceID = <>, - BPayload = list_to_binary(Payload), - IDLen = size(BDeviceID), - PayloadLen = size(BPayload), - Notification = - <>, - ?INFO_MSG("(~p) sending notification for ~s~n~p~npayload:~n~s~n" - "Sender: ~s~n" - "Receiver: ~s~n" - "Device ID: ~s~n", - [State#state.host, erlang:integer_to_list(ID, 16), - Notification, Payload, - Sender, - Receiver, DeviceID]), - case ssl:send(State#state.socket, Notification) of - ok -> - cache(From, ID, State); - {error, Reason} -> - ?INFO_MSG("(~p) Connection closed: ~p, reconnecting", - [State#state.host, Reason]), - ssl:close(State#state.socket), - self() ! connect, - queue_message(From, To, Packet, - State#state{socket = undefined}) - end; - true -> - State - end. - -make_payload(State, Msg, Badge, Sound, Sender) -> - Msg2 = json_escape(Msg), - AlertPayload = - case Msg2 of - "" -> ""; - _ -> "\"alert\":\"" ++ Msg2 ++ "\"" - end, - BadgePayload = - case catch list_to_integer(Badge) of - B when is_integer(B) -> - "\"badge\":" ++ Badge; - _ -> "" - end, - SoundPayload = - case Sound of - "true" -> - SoundFile = State#state.soundfile, - "\"sound\":\"" ++ json_escape(SoundFile) ++ "\""; - _ -> "" - end, - Payloads = lists:filter(fun(S) -> S /= "" end, - [AlertPayload, BadgePayload, SoundPayload]), - Payload = - case Sender of - "" -> - "{\"aps\":{" ++ join(Payloads, ",") ++ "}}"; - _ -> - "{\"aps\":{" ++ join(Payloads, ",") ++ "}," - "\"from\":\"" ++ json_escape(Sender) ++ "\"}" - end, - PayloadLen = length(Payload), - if - PayloadLen > ?MAX_PAYLOAD_SIZE -> - Delta = PayloadLen - ?MAX_PAYLOAD_SIZE, - MsgLen = length(Msg), - if - MsgLen /= 0 -> - CutMsg = - if - MsgLen > Delta -> - lists:sublist(Msg, MsgLen - Delta); - true -> - "" - end, - make_payload(State, CutMsg, Badge, Sound, Sender); - true -> - Payload2 = - "{\"aps\":{" ++ join(Payloads, ",") ++ "}}", - %PayloadLen2 = length(Payload2), - Payload2 - end; - true -> - Payload - end. - -connect(#state{socket = undefined} = State) -> - Gateway = State#state.gateway, - Port = State#state.port, - CertFile = State#state.certfile, - case ssl:connect(Gateway, Port, [{certfile, CertFile}, - {active, true}, - binary]) of - {ok, Socket} -> - {noreply, resend_messages(State#state{socket = Socket})}; - {error, Reason} -> - ?ERROR_MSG("(~p) Connection to ~p:~p failed: ~p, " - "retrying after ~p seconds", - [State#state.host, Gateway, Port, - Reason, ?RECONNECT_TIMEOUT div 1000]), - erlang:send_after(?RECONNECT_TIMEOUT, self(), connect), - {noreply, State} - end; -connect(State) -> - {noreply, State}. - -bounce_message(From, To, Packet, Reason) -> - {xmlelement, _, Attrs, _} = Packet, - Type = xml:get_attr_s("type", Attrs), - if Type /= "error"; Type /= "result" -> - ejabberd_router:route( - To, From, - jlib:make_error_reply( - Packet, - ?ERRT_INTERNAL_SERVER_ERROR( - xml:get_attr_s("xml:lang", Attrs), - Reason))); - true -> - ok - end. - -queue_message(From, To, Packet, State) -> - case State#state.queue of - {?MAX_QUEUE_SIZE, Queue} -> - {{value, {From1, To1, Packet1}}, Queue1} = queue:out(Queue), - bounce_message(From1, To1, Packet1, - "Unable to connect to push service"), - Queue2 = queue:in({From, To, Packet}, Queue1), - State#state{queue = {?MAX_QUEUE_SIZE, Queue2}}; - {Size, Queue} -> - Queue1 = queue:in({From, To, Packet}, Queue), - State#state{queue = {Size+1, Queue1}} - end. - -resend_messages(#state{queue = {_, Queue}} = State) -> - lists:foldl( - fun({From, To, Packet}, AccState) -> - case catch handle_message(From, To, Packet, AccState) of - {'EXIT', _} = Err -> - ?ERROR_MSG("error while processing message:~n" - "** From: ~p~n" - "** To: ~p~n" - "** Packet: ~p~n" - "** Reason: ~p", - [From, To, Packet, Err]), - AccState; - NewAccState -> - NewAccState - end - end, State#state{queue = {0, queue:new()}}, queue:to_list(Queue)). - -cache(JID, DeviceID, State) -> - CmdID = State#state.cmd_id, - Key = CmdID rem ?CACHE_SIZE, - C1 = State#state.cmd_cache, - D1 = State#state.device_cache, - D2 = case dict:find(Key, C1) of - {ok, {_, OldDeviceID}} -> - del_device_cache(D1, OldDeviceID); - error -> - D1 - end, - D3 = add_device_cache(D2, DeviceID, JID), - C2 = dict:store(Key, {JID, DeviceID}, C1), - State#state{cmd_id = CmdID + 1, - cmd_cache = C2, - device_cache = D3}. - -add_device_cache(DeviceCache, DeviceID, JID) -> - dict:update( - DeviceID, - fun({Counter, _}) -> {Counter + 1, JID} end, - {1, JID}, - DeviceCache). - -del_device_cache(DeviceCache, DeviceID) -> - case dict:find(DeviceID, DeviceCache) of - {ok, {Counter, JID}} -> - case Counter of - 1 -> - dict:erase(DeviceID, DeviceCache); - _ -> - dict:store(DeviceID, {Counter - 1, JID}, DeviceCache) - end; - error -> - DeviceCache - end. - -json_escape(S) -> - [case C of - $" -> "\\\""; - $\\ -> "\\\\"; - _ when C < 16 -> ["\\u000", erlang:integer_to_list(C, 16)]; - _ when C < 32 -> ["\\u00", erlang:integer_to_list(C, 16)]; - _ -> C - end || C <- S]. - -join(List, Sep) -> - lists:foldr(fun(A, "") -> A; - (A, Acc) -> A ++ Sep ++ Acc - end, "", List). - - - -iq_disco(Lang) -> - [{xmlelement, "identity", - [{"category", "gateway"}, - {"type", "apple"}, - {"name", translate:translate(Lang, "Apple Push Service")}], []}, - {xmlelement, "feature", [{"var", ?NS_DISCO_INFO}], []}]. - - -parse_feedback_buf(Buf, State) -> - case Buf of - <> -> - IDLen8 = IDLen * 8, - <> = BDeviceID, - case dict:find(DeviceID, State#state.device_cache) of - {ok, {_Counter, JID}} -> - From = jlib:make_jid("", State#state.host, ""), - ejabberd_router:route( - From, JID, - {xmlelement, "message", [], - [{xmlelement, "disable", - [{"xmlns", ?NS_P1_PUSH}, - {"status", "feedback"}, - {"ts", integer_to_list(TimeStamp)}], - []}]}); - error -> - ok - end, - parse_feedback_buf(Rest, State); - _ -> - Buf - end. diff --git a/src/mod_offline.erl b/src/mod_offline.erl index 2448a17c0..dfa2f46c2 100644 --- a/src/mod_offline.erl +++ b/src/mod_offline.erl @@ -30,18 +30,19 @@ -behaviour(gen_mod). -export([start/2, - init/1, + loop/1, stop/1, store_packet/3, resend_offline_messages/2, pop_offline_messages/3, + get_sm_features/5, remove_expired_messages/0, remove_old_messages/1, remove_user/2, + get_queue_length/2, webadmin_page/3, webadmin_user/4, - webadmin_user_parse_query/5, - count_offline_messages/3]). + webadmin_user_parse_query/5]). -include("ejabberd.hrl"). -include("jlib.hrl"). @@ -53,6 +54,9 @@ -define(PROCNAME, ejabberd_offline). -define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000). +%% default value for the maximum number of user messages +-define(MAX_USER_MESSAGES, infinity). + start(Host, Opts) -> mnesia:create_table(offline_msg, [{disc_only_copies, [node()]}, @@ -67,30 +71,28 @@ start(Host, Opts) -> ?MODULE, remove_user, 50), ejabberd_hooks:add(anonymous_purge_hook, Host, ?MODULE, remove_user, 50), + ejabberd_hooks:add(disco_sm_features, Host, + ?MODULE, get_sm_features, 50), + ejabberd_hooks:add(disco_local_features, Host, + ?MODULE, get_sm_features, 50), ejabberd_hooks:add(webadmin_page_host, Host, ?MODULE, webadmin_page, 50), ejabberd_hooks:add(webadmin_user, Host, ?MODULE, webadmin_user, 50), ejabberd_hooks:add(webadmin_user_parse_query, Host, ?MODULE, webadmin_user_parse_query, 50), - ejabberd_hooks:add(count_offline_messages, Host, - ?MODULE, count_offline_messages, 50), - MaxOfflineMsgs = gen_mod:get_opt(user_max_messages, Opts, infinity), + AccessMaxOfflineMsgs = gen_mod:get_opt(access_max_user_messages, Opts, max_user_offline_messages), register(gen_mod:get_module_proc(Host, ?PROCNAME), - spawn(?MODULE, init, [MaxOfflineMsgs])). + spawn(?MODULE, loop, [AccessMaxOfflineMsgs])). -%% MaxOfflineMsgs is either infinity of integer > 0 -init(infinity) -> - loop(infinity); -init(MaxOfflineMsgs) - when is_integer(MaxOfflineMsgs), MaxOfflineMsgs > 0 -> - loop(MaxOfflineMsgs). - -loop(MaxOfflineMsgs) -> +loop(AccessMaxOfflineMsgs) -> receive #offline_msg{us=US} = Msg -> Msgs = receive_all(US, [Msg]), Len = length(Msgs), + {User, Host} = US, + MaxOfflineMsgs = get_max_user_messages(AccessMaxOfflineMsgs, + User, Host), F = fun() -> %% Only count messages if needed: Count = if MaxOfflineMsgs =/= infinity -> @@ -116,9 +118,18 @@ loop(MaxOfflineMsgs) -> end end, mnesia:transaction(F), - loop(MaxOfflineMsgs); + loop(AccessMaxOfflineMsgs); _ -> - loop(MaxOfflineMsgs) + loop(AccessMaxOfflineMsgs) + end. + +%% Function copied from ejabberd_sm.erl: +get_max_user_messages(AccessRule, LUser, Host) -> + case acl:match_rule( + Host, AccessRule, jlib:make_jid(LUser, Host, "")) of + Max when is_integer(Max) -> Max; + infinity -> infinity; + _ -> ?MAX_USER_MESSAGES end. receive_all(US, Msgs) -> @@ -139,6 +150,8 @@ stop(Host) -> ?MODULE, remove_user, 50), ejabberd_hooks:delete(anonymous_purge_hook, Host, ?MODULE, remove_user, 50), + ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, get_sm_features, 50), + ejabberd_hooks:delete(disco_local_features, Host, ?MODULE, get_sm_features, 50), ejabberd_hooks:delete(webadmin_page_host, Host, ?MODULE, webadmin_page, 50), ejabberd_hooks:delete(webadmin_user, Host, @@ -149,12 +162,27 @@ stop(Host) -> exit(whereis(Proc), stop), {wait, Proc}. +get_sm_features(Acc, _From, _To, "", _Lang) -> + Feats = case Acc of + {result, I} -> I; + _ -> [] + end, + {result, Feats ++ [?NS_FEATURE_MSGOFFLINE]}; + +get_sm_features(_Acc, _From, _To, ?NS_FEATURE_MSGOFFLINE, _Lang) -> + %% override all lesser features... + {result, []}; + +get_sm_features(Acc, _From, _To, _Node, _Lang) -> + Acc. + + store_packet(From, To, Packet) -> Type = xml:get_tag_attr_s("type", Packet), if (Type /= "error") and (Type /= "groupchat") and (Type /= "headline") -> - case check_event(From, To, Packet) of + case check_event_chatstates(From, To, Packet) of true -> #jid{luser = LUser, lserver = LServer} = To, TimeStamp = now(), @@ -175,12 +203,22 @@ store_packet(From, To, Packet) -> ok end. -check_event(From, To, Packet) -> +%% Check if the packet has any content about XEP-0022 or XEP-0085 +check_event_chatstates(From, To, Packet) -> {xmlelement, Name, Attrs, Els} = Packet, - case find_x_event(Els) of - false -> + case find_x_event_chatstates(Els, {false, false, false}) of + %% There wasn't any x:event or chatstates subelements + {false, false, _} -> true; - El -> + %% There a chatstates subelement and other stuff, but no x:event + {false, CEl, true} when CEl /= false -> + true; + %% There was only a subelement: a chatstates + {false, CEl, false} when CEl /= false -> + %% Don't allow offline storage + false; + %% There was an x:event element, and maybe also other stuff + {El, _, _} when El /= false -> case xml:get_subtag(El, "id") of false -> case xml:get_subtag(El, "offline") of @@ -208,16 +246,19 @@ check_event(From, To, Packet) -> end end. -find_x_event([]) -> - false; -find_x_event([{xmlcdata, _} | Els]) -> - find_x_event(Els); -find_x_event([El | Els]) -> +%% Check if the packet has subelements about XEP-0022, XEP-0085 or other +find_x_event_chatstates([], Res) -> + Res; +find_x_event_chatstates([{xmlcdata, _} | Els], Res) -> + find_x_event_chatstates(Els, Res); +find_x_event_chatstates([El | Els], {A, B, C}) -> case xml:get_tag_attr_s("xmlns", El) of ?NS_EVENT -> - El; + find_x_event_chatstates(Els, {El, B, C}); + ?NS_CHATSTATES -> + find_x_event_chatstates(Els, {A, El, C}); _ -> - find_x_event(Els) + find_x_event_chatstates(Els, {A, B, true}) end. find_x_expire(_, []) -> @@ -266,6 +307,13 @@ resend_offline_messages(User, Server) -> {xmlelement, Name, Attrs, Els ++ [jlib:timestamp_to_xml( + calendar:now_to_universal_time( + R#offline_msg.timestamp), + utc, + jlib:make_jid("", Server, ""), + "Offline Storage"), + %% TODO: Delete the next three lines once XEP-0091 is Obsolete + jlib:timestamp_to_xml( calendar:now_to_universal_time( R#offline_msg.timestamp))]}} end, @@ -295,7 +343,14 @@ pop_offline_messages(Ls, User, Server) -> {xmlelement, Name, Attrs, Els ++ [jlib:timestamp_to_xml( - calendar:now_to_universal_time( + calendar:now_to_universal_time( + R#offline_msg.timestamp), + utc, + jlib:make_jid("", Server, ""), + "Offline Storage"), + %% TODO: Delete the next three lines once XEP-0091 is Obsolete + jlib:timestamp_to_xml( + calendar:now_to_universal_time( R#offline_msg.timestamp))]}} end, lists:filter( @@ -312,6 +367,7 @@ pop_offline_messages(Ls, User, Server) -> Ls end. + remove_expired_messages() -> TimeStamp = now(), F = fun() -> @@ -474,8 +530,9 @@ webadmin_page(Acc, _, _) -> Acc. user_queue(User, Server, Query, Lang) -> US = {jlib:nodeprep(User), jlib:nameprep(Server)}, Res = user_queue_parse_query(US, Query), - Msgs = lists:keysort(#offline_msg.timestamp, - mnesia:dirty_read({offline_msg, US})), + MsgsAll = lists:keysort(#offline_msg.timestamp, + mnesia:dirty_read({offline_msg, US})), + Msgs = get_messages_subset(User, Server, MsgsAll), FMsgs = lists:map( fun(#offline_msg{timestamp = TimeStamp, from = From, to = To, @@ -557,9 +614,32 @@ user_queue_parse_query(US, Query) -> us_to_list({User, Server}) -> jlib:jid_to_string({User, Server, ""}). +get_queue_length(User, Server) -> + length(mnesia:dirty_read({offline_msg, {User, Server}})). + +get_messages_subset(User, Host, MsgsAll) -> + Access = gen_mod:get_module_opt(Host, ?MODULE, access_max_user_messages, + max_user_offline_messages), + MaxOfflineMsgs = case get_max_user_messages(Access, User, Host) of + Number when is_integer(Number) -> Number; + _ -> 100 + end, + Length = length(MsgsAll), + get_messages_subset2(MaxOfflineMsgs, Length, MsgsAll). + +get_messages_subset2(Max, Length, MsgsAll) when Length =< Max*2 -> + MsgsAll; +get_messages_subset2(Max, Length, MsgsAll) -> + FirstN = Max, + {MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll), + MsgsLastN = lists:nthtail(Length - FirstN - FirstN, Msgs2), + NoJID = jlib:make_jid("...", "...", ""), + IntermediateMsg = #offline_msg{timestamp = now(), from = NoJID, to = NoJID, + packet = {xmlelement, "...", [], []}}, + MsgsFirstN ++ [IntermediateMsg] ++ MsgsLastN. + webadmin_user(Acc, User, Server, Lang) -> - US = {jlib:nodeprep(User), jlib:nameprep(Server)}, - QueueLen = length(mnesia:dirty_read({offline_msg, US})), + QueueLen = get_queue_length(jlib:nodeprep(User), jlib:nameprep(Server)), FQueueLen = [?AC("queue/", integer_to_list(QueueLen))], Acc ++ [?XCT("h3", "Offline Messages:")] ++ FQueueLen ++ [?C(" "), ?INPUTT("submit", "removealloffline", "Remove All Offline Messages")]. @@ -583,23 +663,3 @@ webadmin_user_parse_query(_, "removealloffline", User, Server, _Query) -> end; webadmin_user_parse_query(Acc, _Action, _User, _Server, _Query) -> Acc. - - -%% ------------------------------------------------ -%% mod_offline: number of messages quota management - -count_offline_messages(_Acc, User, Server) -> - LUser = jlib:nodeprep(User), - LServer = jlib:nameprep(Server), - US = {LUser, LServer}, - F = fun () -> - p1_mnesia:count_records( - offline_msg, - #offline_msg{us=US, _='_'}) - end, - N = case catch mnesia:async_dirty(F) of - I when is_integer(I) -> I; - _ -> 0 - end, - {stop, N}. - diff --git a/src/mod_offline_odbc.erl b/src/mod_offline_odbc.erl index 420ff581f..063ea709e 100644 --- a/src/mod_offline_odbc.erl +++ b/src/mod_offline_odbc.erl @@ -32,15 +32,16 @@ -export([count_offline_messages/2]). -export([start/2, - init/2, + loop/2, stop/1, store_packet/3, pop_offline_messages/3, + get_sm_features/5, remove_user/2, + get_queue_length/2, webadmin_page/3, webadmin_user/4, - webadmin_user_parse_query/5, - count_offline_messages/3]). + webadmin_user_parse_query/5]). -include("ejabberd.hrl"). -include("jlib.hrl"). @@ -52,6 +53,9 @@ -define(PROCNAME, ejabberd_offline). -define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000). +%% default value for the maximum number of user messages +-define(MAX_USER_MESSAGES, infinity). + start(Host, Opts) -> ejabberd_hooks:add(offline_message_hook, Host, ?MODULE, store_packet, 50), @@ -61,30 +65,27 @@ start(Host, Opts) -> ?MODULE, remove_user, 50), ejabberd_hooks:add(anonymous_purge_hook, Host, ?MODULE, remove_user, 50), + ejabberd_hooks:add(disco_sm_features, Host, + ?MODULE, get_sm_features, 50), + ejabberd_hooks:add(disco_local_features, Host, + ?MODULE, get_sm_features, 50), ejabberd_hooks:add(webadmin_page_host, Host, ?MODULE, webadmin_page, 50), ejabberd_hooks:add(webadmin_user, Host, ?MODULE, webadmin_user, 50), ejabberd_hooks:add(webadmin_user_parse_query, Host, ?MODULE, webadmin_user_parse_query, 50), - ejabberd_hooks:add(count_offline_messages, Host, - ?MODULE, count_offline_messages, 50), - MaxOfflineMsgs = gen_mod:get_opt(user_max_messages, Opts, infinity), + AccessMaxOfflineMsgs = gen_mod:get_opt(access_max_user_messages, Opts, max_user_offline_messages), register(gen_mod:get_module_proc(Host, ?PROCNAME), - spawn(?MODULE, init, [Host, MaxOfflineMsgs])). + spawn(?MODULE, loop, [Host, AccessMaxOfflineMsgs])). -%% MaxOfflineMsgs is either infinity of integer > 0 -init(Host, infinity) -> - loop(Host, infinity); -init(Host, MaxOfflineMsgs) - when is_integer(MaxOfflineMsgs), MaxOfflineMsgs > 0 -> - loop(Host, MaxOfflineMsgs). - -loop(Host, MaxOfflineMsgs) -> +loop(Host, AccessMaxOfflineMsgs) -> receive #offline_msg{user = User} = Msg -> Msgs = receive_all(User, [Msg]), Len = length(Msgs), + MaxOfflineMsgs = get_max_user_messages(AccessMaxOfflineMsgs, + User, Host), %% Only count existing messages if needed: Count = if MaxOfflineMsgs =/= infinity -> @@ -112,11 +113,17 @@ loop(Host, MaxOfflineMsgs) -> Els ++ [jlib:timestamp_to_xml( calendar:now_to_universal_time( + M#offline_msg.timestamp), + utc, + jlib:make_jid("", Host, ""), + "Offline Storage"), + %% TODO: Delete the next three lines once XEP-0091 is Obsolete + jlib:timestamp_to_xml( + calendar:now_to_universal_time( M#offline_msg.timestamp))]}, XML = ejabberd_odbc:escape( - lists:flatten( - xml:element_to_string(Packet))), + xml:element_to_binary(Packet)), odbc_queries:add_spool_sql(Username, XML) end, Msgs), case catch odbc_queries:add_spool(Host, Query) of @@ -128,9 +135,18 @@ loop(Host, MaxOfflineMsgs) -> ok end end, - loop(Host, MaxOfflineMsgs); + loop(Host, AccessMaxOfflineMsgs); _ -> - loop(Host, MaxOfflineMsgs) + loop(Host, AccessMaxOfflineMsgs) + end. + +%% Function copied from ejabberd_sm.erl: +get_max_user_messages(AccessRule, LUser, Host) -> + case acl:match_rule( + Host, AccessRule, jlib:make_jid(LUser, Host, "")) of + Max when is_integer(Max) -> Max; + infinity -> infinity; + _ -> ?MAX_USER_MESSAGES end. receive_all(Username, Msgs) -> @@ -151,6 +167,8 @@ stop(Host) -> ?MODULE, remove_user, 50), ejabberd_hooks:delete(anonymous_purge_hook, Host, ?MODULE, remove_user, 50), + ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, get_sm_features, 50), + ejabberd_hooks:delete(disco_local_features, Host, ?MODULE, get_sm_features, 50), ejabberd_hooks:delete(webadmin_page_host, Host, ?MODULE, webadmin_page, 50), ejabberd_hooks:delete(webadmin_user, Host, @@ -161,12 +179,27 @@ stop(Host) -> exit(whereis(Proc), stop), ok. +get_sm_features(Acc, _From, _To, "", _Lang) -> + Feats = case Acc of + {result, I} -> I; + _ -> [] + end, + {result, Feats ++ [?NS_FEATURE_MSGOFFLINE]}; + +get_sm_features(_Acc, _From, _To, ?NS_FEATURE_MSGOFFLINE, _Lang) -> + %% override all lesser features... + {result, []}; + +get_sm_features(Acc, _From, _To, _Node, _Lang) -> + Acc. + + store_packet(From, To, Packet) -> Type = xml:get_tag_attr_s("type", Packet), if (Type /= "error") and (Type /= "groupchat") and (Type /= "headline") -> - case check_event(From, To, Packet) of + case check_event_chatstates(From, To, Packet) of true -> #jid{luser = LUser} = To, TimeStamp = now(), @@ -187,12 +220,22 @@ store_packet(From, To, Packet) -> ok end. -check_event(From, To, Packet) -> +%% Check if the packet has any content about XEP-0022 or XEP-0085 +check_event_chatstates(From, To, Packet) -> {xmlelement, Name, Attrs, Els} = Packet, - case find_x_event(Els) of - false -> + case find_x_event_chatstates(Els, {false, false, false}) of + %% There wasn't any x:event or chatstates subelements + {false, false, _} -> true; - El -> + %% There a chatstates subelement and other stuff, but no x:event + {false, CEl, true} when CEl /= false -> + true; + %% There was only a subelement: a chatstates + {false, CEl, false} when CEl /= false -> + %% Don't allow offline storage + false; + %% There was an x:event element, and maybe also other stuff + {El, _, _} when El /= false -> case xml:get_subtag(El, "id") of false -> case xml:get_subtag(El, "offline") of @@ -214,22 +257,25 @@ check_event(From, To, Packet) -> {xmlelement, "offline", [], []}]}] }), true - end; + end; _ -> false end end. -find_x_event([]) -> - false; -find_x_event([{xmlcdata, _} | Els]) -> - find_x_event(Els); -find_x_event([El | Els]) -> +%% Check if the packet has subelements about XEP-0022, XEP-0085 or other +find_x_event_chatstates([], Res) -> + Res; +find_x_event_chatstates([{xmlcdata, _} | Els], Res) -> + find_x_event_chatstates(Els, Res); +find_x_event_chatstates([El | Els], {A, B, C}) -> case xml:get_tag_attr_s("xmlns", El) of ?NS_EVENT -> - El; + find_x_event_chatstates(Els, {El, B, C}); + ?NS_CHATSTATES -> + find_x_event_chatstates(Els, {A, El, C}); _ -> - find_x_event(Els) + find_x_event_chatstates(Els, {A, B, true}) end. find_x_expire(_, []) -> @@ -329,7 +375,7 @@ user_queue(User, Server, Query, Lang) -> Username = ejabberd_odbc:escape(LUser), US = {LUser, LServer}, Res = user_queue_parse_query(Username, LServer, Query), - Msgs = case catch ejabberd_odbc:sql_query( + MsgsAll = case catch ejabberd_odbc:sql_query( LServer, ["select username, xml from spool" " where username='", Username, "'" @@ -347,6 +393,7 @@ user_queue(User, Server, Query, Lang) -> _ -> [] end, + Msgs = get_messages_subset(User, Server, MsgsAll), FMsgs = lists:map( fun({xmlelement, _Name, _Attrs, _Els} = Msg) -> @@ -433,11 +480,8 @@ user_queue_parse_query(Username, LServer, Query) -> us_to_list({User, Server}) -> jlib:jid_to_string({User, Server, ""}). -webadmin_user(Acc, User, Server, Lang) -> - LUser = jlib:nodeprep(User), - LServer = jlib:nameprep(Server), - Username = ejabberd_odbc:escape(LUser), - QueueLen = case catch ejabberd_odbc:sql_query( +get_queue_length(Username, LServer) -> + case catch ejabberd_odbc:sql_query( LServer, ["select count(*) from spool" " where username='", Username, "';"]) of @@ -445,7 +489,32 @@ webadmin_user(Acc, User, Server, Lang) -> SCount; _ -> 0 - end, + end. + +get_messages_subset(User, Host, MsgsAll) -> + Access = gen_mod:get_module_opt(Host, ?MODULE, access_max_user_messages, + max_user_offline_messages), + MaxOfflineMsgs = case get_max_user_messages(Access, User, Host) of + Number when is_integer(Number) -> Number; + _ -> 100 + end, + Length = length(MsgsAll), + get_messages_subset2(MaxOfflineMsgs, Length, MsgsAll). + +get_messages_subset2(Max, Length, MsgsAll) when Length =< Max*2 -> + MsgsAll; +get_messages_subset2(Max, Length, MsgsAll) -> + FirstN = Max, + {MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll), + MsgsLastN = lists:nthtail(Length - FirstN - FirstN, Msgs2), + IntermediateMsg = {xmlelement, "...", [], []}, + MsgsFirstN ++ [IntermediateMsg] ++ MsgsLastN. + +webadmin_user(Acc, User, Server, Lang) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + Username = ejabberd_odbc:escape(LUser), + QueueLen = get_queue_length(Username, LServer), FQueueLen = [?AC("queue/", QueueLen)], Acc ++ [?XCT("h3", "Offline Messages:")] ++ FQueueLen ++ [?C(" "), ?INPUTT("submit", "removealloffline", "Remove All Offline Messages")]. @@ -477,30 +546,3 @@ count_offline_messages(LUser, LServer) -> _ -> 0 end. - -count_offline_messages(_Acc, User, Server) -> - LUser = jlib:nodeprep(User), - LServer = jlib:nameprep(Server), - Num = case catch ejabberd_odbc:sql_query( - LServer, - ["select xml from spool" - " where username='", LUser, "';"]) of - {selected, ["xml"], Rs} -> - lists:foldl( - fun({XML}, Acc) -> - case xml_stream:parse_element(XML) of - {error, _Reason} -> - Acc; - El -> - case xml:get_subtag(El, "body") of - false -> - Acc; - _ -> - Acc + 1 - end - end - end, 0, Rs); - _ -> - 0 - end, - {stop, Num}. diff --git a/src/xml.erl b/src/xml.erl index 3e0157313..22746b10a 100644 --- a/src/xml.erl +++ b/src/xml.erl @@ -31,7 +31,6 @@ element_to_binary/1, crypt/1, make_text_node/1, remove_cdata/1, - remove_subtags/3, get_cdata/1, get_tag_cdata/1, get_attr/2, get_attr_s/2, get_tag_attr/2, get_tag_attr_s/2, @@ -187,31 +186,6 @@ remove_cdata_p(_) -> false. remove_cdata(L) -> [E || E <- L, remove_cdata_p(E)]. -%% TODO: Make more generic. -%% For now only support all parameters: -%% xml:remove_subtags({xmlelement,"message", [{"id","81be72"}],[{xmlelement,"on-sender-server",[{"xmlns","urn:xmpp:receipts"},{"server","text-one.com"}], []}]}, "on-sender-server", {"xmlns","urn:xmpp:receipts"}). -remove_subtags({xmlelement, TagName, TagAttrs, Els}, Name, Attr) -> - {xmlelement, TagName, TagAttrs, remove_subtags1(Els, [], Name, Attr)}. - -remove_subtags1([], NewEls, _Name, _Attr) -> - lists:reverse(NewEls); -remove_subtags1([El | Els], NewEls, Name, {AttrName, AttrValue} = Attr) -> - case El of - {xmlelement, Name, Attrs, _} -> - case get_attr(AttrName, Attrs) of - false -> - remove_subtags1(Els, [El|NewEls], Name, Attr); - {value, AttrValue} -> - remove_subtags1(Els, NewEls, Name, Attr); - _ -> - remove_subtags1(Els, [El|NewEls], Name, Attr) - end; - _ -> - remove_subtags1(Els, [El|NewEls], Name, Attr) - end. - - - get_cdata(L) -> binary_to_list(list_to_binary(get_cdata(L, ""))). diff --git a/src/xml_stream.erl b/src/xml_stream.erl index f3aa5502d..b98e3291b 100644 --- a/src/xml_stream.erl +++ b/src/xml_stream.erl @@ -76,9 +76,6 @@ process_data(CallbackPid, Stack, Data) -> {?XML_CDATA, CData} -> case Stack of [El] -> - catch gen_fsm:send_all_state_event( - CallbackPid, - {xmlstreamcdata, CData}), [El]; %% Merge CDATA nodes if they are contiguous %% This does not change the semantic: the split in