From 4134edf8de26242a89a775ac2d5eb23bea3aa72e Mon Sep 17 00:00:00 2001 From: Eric Cestari Date: Thu, 9 Sep 2010 16:16:28 +0200 Subject: [PATCH 1/4] Merge ApplePush to branch 2.2.x --- src/ejabberd_c2s.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 17539829d..eda4ea9ae 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -1532,7 +1532,7 @@ change_shaper(StateData, JID) -> (StateData#state.sockmod):change_shaper(StateData#state.socket, Shaper). send_text(StateData, Text) -> - ?DEBUG("Send XML on stream = ~p", [lists:flatten(Text)]), + ?DEBUG("Send XML on stream = ~p", [Text]), Text1 = if ?FLASH_HACK and StateData#state.flash_connection -> %% send a null byte after each stanza to Flash clients From 8ecf8d7e277edd0da283b390431a3d6caa35cd94 Mon Sep 17 00:00:00 2001 From: Eric Cestari Date: Thu, 9 Sep 2010 16:16:46 +0200 Subject: [PATCH 2/4] A few additions to .gitignore --- .gitignore | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.gitignore b/.gitignore index 5b89fb9d5..3e3000059 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ Makefile archives +build/ +*.dSYM/ .gitignore contrib/extract_translations/extract_translations.beam doc/contributed_modules.tex @@ -29,3 +31,5 @@ src/eldap/ELDAPv3.beam src/eldap/ELDAPv3.erl src/eldap/ELDAPv3.hrl src/epam +src/ejabberdctl.example +src/ejabberd.init \ No newline at end of file From b8b6fc0da56e0a58920a52b8d0ec4f82e812aec7 Mon Sep 17 00:00:00 2001 From: Eric Cestari Date: Thu, 9 Sep 2010 17:00:18 +0200 Subject: [PATCH 3/4] Merge ApplePush to 2.2.x --- src/ejabberd_c2s.erl | 914 ++++++++++++++++++++++++++++++++-- src/ejabberd_c2s.hrl | 24 +- src/mod_applepush.erl | 381 ++++++++++++++ src/mod_applepush_service.erl | 586 ++++++++++++++++++++++ src/mod_offline.erl | 168 ++----- src/mod_offline_odbc.erl | 174 +++---- src/xml.erl | 26 + src/xml_stream.erl | 3 + 8 files changed, 2020 insertions(+), 256 deletions(-) create mode 100644 src/mod_applepush.erl create mode 100644 src/mod_applepush_service.erl diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index eda4ea9ae..ce68316e8 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -114,6 +114,15 @@ ?SERRT_POLICY_VIOLATION(Lang, Text)). -define(INVALID_FROM, ?SERR_INVALID_FROM). +-define(NS_P1_REBIND, "p1:rebind"). +-define(NS_P1_PUSH, "p1:push"). +-define(NS_P1_ACK, "p1:ack"). +-define(NS_P1_PUSHED, "p1:pushed"). +-define(NS_P1_ATTACHMENT, "http://process-one.net/attachement"). + +-define(C2S_P1_ACK_TIMEOUT, 10000). +-define(MAX_OOR_TIMEOUT, 1440). %% Max allowed session duration 24h (24*60) +-define(MAX_OOR_MESSAGES, 1000). %%%---------------------------------------------------------------------- %%% API @@ -345,9 +354,22 @@ wait_for_stream({xmlstreamstart, Name, Attrs}, StateData) -> false -> [] end, + P1PushFeature = + [{xmlelement, "push", + [{"xmlns", ?NS_P1_PUSH}], []}], + P1RebindFeature = + [{xmlelement, "rebind", + [{"xmlns", ?NS_P1_REBIND}], []}], + P1AckFeature = + [{xmlelement, "ack", + [{"xmlns", ?NS_P1_ACK}], []}], send_element(StateData, {xmlelement, "stream:features", [], - TLSFeature ++ CompressFeature ++ + TLSFeature ++ + CompressFeature ++ + P1PushFeature ++ + P1RebindFeature ++ + P1AckFeature ++ [{xmlelement, "mechanisms", [{"xmlns", ?NS_SASL}], Mechs}] ++ @@ -368,7 +390,9 @@ wait_for_stream({xmlstreamstart, Name, Attrs}, StateData) -> roster_get_versioning_feature, Server, [], [Server]), StreamFeatures = - [{xmlelement, "bind", + [{xmlelement, "push", + [{"xmlns", ?NS_P1_PUSH}], []}, + {xmlelement, "bind", [{"xmlns", ?NS_BIND}], []}, {xmlelement, "session", [{"xmlns", ?NS_SESSION}], []}] @@ -577,8 +601,33 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> end end; _ -> - process_unauthenticated_stanza(StateData, El), - fsm_next_state(wait_for_auth, StateData) + {xmlelement, Name, Attrs, _Els} = El, + case {xml:get_attr_s("xmlns", Attrs), Name} of + {?NS_P1_REBIND, "rebind"} -> + SJID = xml:get_path_s(El, [{elem, "jid"}, cdata]), + SID = xml:get_path_s(El, [{elem, "sid"}, cdata]), + case jlib:string_to_jid(SJID) of + error -> + send_element(StateData, + {xmlelement, "failure", + [{"xmlns", ?NS_P1_REBIND}], + [{xmlcdata, "Invalid JID"}]}), + fsm_next_state(wait_for_auth, + StateData); + JID -> + case rebind(StateData, JID, SID) of + {next_state, wait_for_feature_request, + NewStateData, Timeout} -> + {next_state, wait_for_auth, + NewStateData, Timeout}; + Res -> + Res + end + end; + _ -> + process_unauthenticated_stanza(StateData, El), + fsm_next_state(wait_for_auth, StateData) + end end; wait_for_auth(timeout, StateData) -> @@ -709,6 +758,23 @@ wait_for_feature_request({xmlstreamelement, El}, StateData) -> StateData) end end; + {?NS_P1_REBIND, "rebind"} -> + SJID = xml:get_path_s(El, [{elem, "jid"}, cdata]), + SID = xml:get_path_s(El, [{elem, "sid"}, cdata]), + case jlib:string_to_jid(SJID) of + error -> + send_element(StateData, + {xmlelement, "failure", + [{"xmlns", ?NS_P1_REBIND}], + [{xmlcdata, "Invalid JID"}]}), + fsm_next_state(wait_for_feature_request, + StateData); + JID -> + rebind(StateData, JID, SID) + end; + {?NS_P1_ACK, "ack"} -> + fsm_next_state(wait_for_feature_request, + StateData#state{ack_enabled = true}); _ -> if (SockMod == gen_tcp) and TLSRequired -> @@ -954,7 +1020,9 @@ session_established({xmlstreamelement, El}, StateData) -> send_trailer(StateData), {stop, normal, StateData}; _NewEl -> - session_established2(El, StateData) + NSD1 = change_reception(StateData, true), + NSD2 = start_keepalive_timer(NSD1), + session_established2(El, NSD2) end; %% We hibernate the process to reduce memory consumption after a @@ -981,7 +1049,16 @@ session_established({xmlstreamerror, _}, StateData) -> {stop, normal, StateData}; session_established(closed, StateData) -> - {stop, normal, StateData}. + if + not StateData#state.reception -> + fsm_next_state(session_established, StateData); + (StateData#state.keepalive_timer /= undefined) -> + NewState1 = change_reception(StateData, false), + NewState = start_keepalive_timer(NewState1), + fsm_next_state(session_established, NewState); + true -> + {stop, normal, StateData} + end. %% Process packets sent by user (coming from user on c2s XMPP %% connection) @@ -1052,6 +1129,8 @@ session_established2(El, StateData) -> [StateData#state.debug, FromJID, ToJID, NewEl]), process_privacy_iq( FromJID, ToJID, IQ, StateData); + #iq{xmlns = ?NS_P1_PUSH} = IQ -> + process_push_iq(FromJID, ToJID, IQ, StateData); _ -> ejabberd_hooks:run( user_send_packet, @@ -1068,6 +1147,12 @@ session_established2(El, StateData) -> check_privacy_route(FromJID, StateData, FromJID, ToJID, NewEl), StateData; + "standby" -> + StandBy = xml:get_tag_cdata(NewEl) == "true", + change_standby(StateData, StandBy); + "a" -> + SCounter = xml:get_tag_attr_s("h", NewEl), + receive_ack(StateData, SCounter); _ -> StateData end @@ -1299,6 +1384,17 @@ handle_info({route, From, To, Packet}, StateName, StateData) -> _ -> {false, Attrs, StateData} end; + rebind -> + {Pid2, StreamID2} = Els, + if + StreamID2 == StateData#state.streamid -> + Pid2 ! {rebind, prepare_acks_for_rebind(StateData)}, + receive after 1000 -> ok end, + {exit, Attrs, rebind}; + true -> + Pid2 ! {rebind, false}, + {false, Attrs, StateData} + end; "iq" -> IQ = jlib:iq_query_info(Packet), case IQ of @@ -1344,18 +1440,22 @@ handle_info({route, From, To, Packet}, StateName, StateData) -> {From, To, Packet}, in]) of allow -> - case ejabberd_hooks:run_fold( - feature_check_packet, StateData#state.server, - allow, - [StateData#state.jid, - StateData#state.server, - StateData#state.pres_last, - {From, To, Packet}, - in]) of - allow -> - {true, Attrs, StateData}; - deny -> - {false, Attrs, StateData} + if StateData#state.reception -> + case ejabberd_hooks:run_fold( + feature_check_packet, StateData#state.server, + allow, + [StateData#state.jid, + StateData#state.server, + StateData#state.pres_last, + {From, To, Packet}, + in]) of + allow -> + {true, Attrs, StateData}; + deny -> + {false, Attrs, StateData} + end; + true -> + {true, Attrs, StateData} end; deny -> {false, Attrs, StateData} @@ -1365,29 +1465,85 @@ handle_info({route, From, To, Packet}, StateName, StateData) -> end, if Pass == exit -> - %% When Pass==exit, NewState contains a string instead of a #state{} - Lang = StateData#state.lang, - send_element(StateData, ?SERRT_CONFLICT(Lang, NewState)), - send_trailer(StateData), - {stop, normal, StateData}; + catch send_trailer(StateData), + case NewState of + rebind -> + {stop, normal, StateData#state{authenticated = rebinded}}; + _ -> + {stop, normal, StateData} + end; Pass -> Attrs2 = jlib:replace_from_to_attrs(jlib:jid_to_string(From), jlib:jid_to_string(To), NewAttrs), FixedPacket = {xmlelement, Name, Attrs2, Els}, - send_element(StateData, FixedPacket), + NewState2 = + if + NewState#state.reception and + not (NewState#state.standby and (Name /= "message")) -> + send_element(NewState, FixedPacket), + ack(NewState, From, To, FixedPacket); + true -> + NewState1 = send_out_of_reception_message( + NewState, From, To, Packet), + enqueue(NewState1, From, To, FixedPacket) + end, ejabberd_hooks:run(user_receive_packet, StateData#state.server, [StateData#state.debug, StateData#state.jid, From, To, FixedPacket]), ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), - fsm_next_state(StateName, NewState); + fsm_next_state(StateName, NewState2); true -> ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), fsm_next_state(StateName, NewState) end; -handle_info({'DOWN', Monitor, _Type, _Object, _Info}, _StateName, StateData) - when Monitor == StateData#state.socket_monitor -> +handle_info({timeout, Timer, _}, StateName, + #state{keepalive_timer = Timer, reception = true} = StateData) -> + NewState1 = change_reception(StateData, false), + NewState = start_keepalive_timer(NewState1), + fsm_next_state(StateName, NewState); +handle_info({timeout, Timer, _}, _StateName, + #state{keepalive_timer = Timer, reception = false} = StateData) -> {stop, normal, StateData}; +handle_info({timeout, Timer, PrevCounter}, StateName, + #state{ack_timer = Timer} = StateData) -> + AckCounter = StateData#state.ack_counter, + NewState = + if + PrevCounter >= AckCounter -> + StateData#state{ack_timer = undefined}; + true -> + send_ack_request(StateData#state{ack_timer = undefined}) + end, + fsm_next_state(StateName, NewState); +handle_info({ack_timeout, Counter}, StateName, StateData) -> + AckQueue = StateData#state.ack_queue, + case queue:is_empty(AckQueue) of + true -> + fsm_next_state(StateName, StateData); + false -> + C = element(1, queue:head(AckQueue)), + if + C =< Counter -> + {stop, normal, StateData}; + true -> + fsm_next_state(StateName, StateData) + end + end; +handle_info({'DOWN', Monitor, _Type, _Object, _Info}, StateName, StateData) + when Monitor == StateData#state.socket_monitor -> + if + (StateName == session_established) and + (not StateData#state.reception) -> + fsm_next_state(StateName, StateData); + (StateName == session_established) and + (StateData#state.keepalive_timer /= undefined) -> + NewState1 = change_reception(StateData, false), + NewState = start_keepalive_timer(NewState1), + fsm_next_state(StateName, NewState); + true -> + {stop, normal, StateData} + end; handle_info(system_shutdown, StateName, StateData) -> case StateName of wait_for_stream -> @@ -1484,6 +1640,13 @@ terminate(_Reason, StateName, StateData) -> StateData, From, StateData#state.pres_a, Packet), presence_broadcast( StateData, From, StateData#state.pres_i, Packet); + rebinded -> + ejabberd_sm:close_session( + StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource), + ok; _ -> ?INFO_MSG("(~w) Close session for ~s", [StateData#state.socket, @@ -1515,6 +1678,36 @@ terminate(_Reason, StateName, StateData) -> StateData, From, StateData#state.pres_i, Packet) end end, + case StateData#state.authenticated of + rebinded -> + ok; + _ -> + if + not StateData#state.reception, not StateData#state.oor_offline -> + SFrom = jlib:jid_to_string(StateData#state.jid), + ejabberd_hooks:run( + p1_push_notification, + StateData#state.server, + [StateData#state.server, + StateData#state.jid, + StateData#state.oor_notification, + "Instant messaging session expired", + 0, + false, + StateData#state.oor_appid, + SFrom]); + true -> + ok + end, + lists:foreach( + fun({_Counter, From, To, FixedPacket}) -> + ejabberd_router:route(From, To, FixedPacket) + end, queue:to_list(StateData#state.ack_queue)), + lists:foreach( + fun({From, To, FixedPacket}) -> + ejabberd_router:route(From, To, FixedPacket) + end, queue:to_list(StateData#state.queue)) + end, bounce_messages(); _ -> ok @@ -1684,9 +1877,39 @@ process_presence_probe(From, To, StateData) -> andalso ?SETS:is_element(LFrom, StateData#state.pres_a), if Cond1 -> + Packet = + case StateData#state.reception of + true -> + StateData#state.pres_last; + false -> + case StateData#state.oor_show of + "" -> + StateData#state.pres_last; + _ -> + {xmlelement, _, PresAttrs, PresEls} = + StateData#state.pres_last, + PresEls1 = + lists:flatmap( + fun({xmlelement, Name, _, _}) + when Name == "show"; + Name == "status" -> + []; + (E) -> + [E] + end, PresEls), + {xmlelement, "presence", PresAttrs, + [{xmlelement, "show", [], + [{xmlcdata, + StateData#state.oor_show}]}, + {xmlelement, "status", [], + [{xmlcdata, + StateData#state.oor_status}]}] + ++ PresEls1} + end + end, Timestamp = StateData#state.pres_timestamp, - Packet = xml:append_subtags( - StateData#state.pres_last, + Packet1 = xml:append_subtags( + Packet, %% To is the one sending the presence (the target of the probe) [jlib:timestamp_to_xml(Timestamp, utc, To, ""), %% TODO: Delete the next line once XEP-0091 is Obsolete @@ -1697,7 +1920,7 @@ process_presence_probe(From, To, StateData) -> [StateData#state.user, StateData#state.server, StateData#state.privacy_list, - {To, From, Packet}, + {To, From, Packet1}, out]) of deny -> ok; @@ -1707,7 +1930,7 @@ process_presence_probe(From, To, StateData) -> %% Don't route a presence probe to oneself case From == To of false -> - ejabberd_router:route(To, From, Packet); + ejabberd_router:route(To, From, Packet1); true -> ok end @@ -2035,8 +2258,8 @@ roster_change(IJID, ISubscription, StateData) -> P -> ?DEBUG("roster changed for ~p~n", [StateData#state.user]), From = StateData#state.jid, -% To = jlib:make_jid(IJID) - To = IJID, + To = jlib:make_jid(IJID), +% To = IJID, Cond1 = (not StateData#state.pres_invis) and IsFrom and (not OldIsFrom), Cond2 = (not IsFrom) and OldIsFrom @@ -2333,6 +2556,631 @@ check_from(El, FromJID) -> end end. +start_keepalive_timer(StateData) -> + if + is_reference(StateData#state.keepalive_timer) -> + cancel_timer(StateData#state.keepalive_timer); + true -> + ok + end, + Timeout = + if + StateData#state.reception -> StateData#state.keepalive_timeout; + true -> StateData#state.oor_timeout + end, + Timer = + if + is_integer(Timeout) -> + erlang:start_timer(Timeout * 1000, self(), []); + true -> + undefined + end, + StateData#state{keepalive_timer = Timer}. + +change_reception(#state{reception = Reception} = StateData, Reception) -> + StateData; +change_reception(#state{reception = true} = StateData, false) -> + ?DEBUG("reception -> false", []), + case StateData#state.oor_show of + "" -> + ok; + _ -> + Packet = + {xmlelement, "presence", [], + [{xmlelement, "show", [], + [{xmlcdata, StateData#state.oor_show}]}, + {xmlelement, "status", [], + [{xmlcdata, StateData#state.oor_status}]}]}, + update_priority(0, Packet, StateData), + presence_broadcast_to_trusted( + StateData, + StateData#state.jid, + StateData#state.pres_f, + StateData#state.pres_a, + Packet) + end, + StateData#state{reception = false}; +change_reception(#state{reception = false, standby = true} = StateData, true) -> + ?DEBUG("reception -> standby", []), + NewQueue = + lists:foldl( + fun({_From, _To, {xmlelement, "message", _, _} = FixedPacket}, Q) -> + send_element(StateData, FixedPacket), + Q; + (Item, Q) -> + queue:in(Item, Q) + end, queue:new(), queue:to_list(StateData#state.queue)), + StateData#state{queue = NewQueue, + queue_len = queue:len(NewQueue), + reception = true, + oor_unread = 0, + oor_unread_users = ?SETS:new()}; +change_reception(#state{reception = false} = StateData, true) -> + ?DEBUG("reception -> true", []), + case StateData#state.oor_show of + "" -> + ok; + _ -> + Packet = StateData#state.pres_last, + NewPriority = get_priority_from_presence(Packet), + update_priority(NewPriority, Packet, StateData), + presence_broadcast_to_trusted( + StateData, + StateData#state.jid, + StateData#state.pres_f, + StateData#state.pres_a, + Packet) + end, + lists:foreach( + fun({_From, _To, FixedPacket}) -> + send_element(StateData, FixedPacket) + end, queue:to_list(StateData#state.queue)), + lists:foreach( + fun(FixedPacket) -> + send_element(StateData, FixedPacket) + end, gb_trees:values(StateData#state.pres_queue)), + StateData#state{queue = queue:new(), + queue_len = 0, + pres_queue = gb_trees:empty(), + reception = true, + oor_unread = 0, + oor_unread_users = ?SETS:new()}. + +change_standby(#state{standby = StandBy} = StateData, StandBy) -> + StateData; +change_standby(#state{standby = false} = StateData, true) -> + ?DEBUG("standby -> true", []), + StateData#state{standby = true}; +change_standby(#state{standby = true} = StateData, false) -> + ?DEBUG("standby -> false", []), + lists:foreach( + fun({_From, _To, FixedPacket}) -> + send_element(StateData, FixedPacket) + end, queue:to_list(StateData#state.queue)), + lists:foreach( + fun(FixedPacket) -> + send_element(StateData, FixedPacket) + end, gb_trees:values(StateData#state.pres_queue)), + StateData#state{queue = queue:new(), + queue_len = 0, + pres_queue = gb_trees:empty(), + standby = false}. + +send_out_of_reception_message(StateData, From, To, + {xmlelement, "message", _, _} = Packet) -> + Type = xml:get_tag_attr_s("type", Packet), + if + (Type == "normal") or + (Type == "") or + (Type == "chat") or + (StateData#state.oor_send_groupchat and (Type == "groupchat"))-> + %Lang = case xml:get_tag_attr_s("xml:lang", Packet) of + % "" -> + % StateData#state.lang; + % L -> + % L + % end, + %Text = translate:translate( + % Lang, "User is temporarily out of reception"), + %MsgType = "error", + %Message = {xmlelement, "message", + % [{"type", MsgType}], + % [{xmlelement, "body", [], + % [{xmlcdata, Text}]}]}, + %ejabberd_router:route(To, From, Message), + Body1 = xml:get_path_s(Packet, [{elem, "body"}, cdata]), + Body = + case check_x_attachment(Packet) of + true -> + case Body1 of + "" -> [238, 128, 136]; + _ -> + [238, 128, 136, 32 | Body1] + end; + false -> + Body1 + end, + Pushed = check_x_pushed(Packet), + if + Body == ""; + Pushed -> + StateData; + true -> + BFrom = jlib:jid_remove_resource(From), + LBFrom = jlib:jid_tolower(BFrom), + UnreadUsers = ?SETS:add_element( + LBFrom, + StateData#state.oor_unread_users), + IncludeBody = + case StateData#state.oor_send_body of + all -> + true; + first_per_user -> + not ?SETS:is_element( + LBFrom, + StateData#state.oor_unread_users); + first -> + StateData#state.oor_unread == 0; + none -> + false + end, + Unread = StateData#state.oor_unread + 1, + SFrom = jlib:jid_to_string(BFrom), + Msg = + if + IncludeBody -> + CBody = utf8_cut(Body, 100), + case StateData#state.oor_send_from of + jid -> SFrom ++ ": " ++ CBody; + username -> BFrom#jid.user ++ ": " ++ CBody; + _ -> CBody + end; + true -> + "" + end, + Sound = IncludeBody, + AppID = StateData#state.oor_appid, + ejabberd_hooks:run( + p1_push_notification, + StateData#state.server, + [StateData#state.server, + StateData#state.jid, + StateData#state.oor_notification, + Msg, + Unread, + Sound, + AppID, + SFrom]), + %% This hook is intended to give other module a + %% chance to notify the sender that the message is + %% not directly delivered to the client (almost + %% equivalent to offline). + ejabberd_hooks:run(delayed_message_hook, + StateData#state.server, + [From, To, Packet]), + StateData#state{oor_unread = Unread, + oor_unread_users = UnreadUsers} + end; + true -> + StateData + end; +send_out_of_reception_message(StateData, _From, _To, _Packet) -> + StateData. + +utf8_cut(S, Bytes) -> + utf8_cut(S, [], [], Bytes + 1). + +utf8_cut(_S, _Cur, Prev, 0) -> + lists:reverse(Prev); +utf8_cut([], Cur, _Prev, _Bytes) -> + lists:reverse(Cur); +utf8_cut([C | S], Cur, Prev, Bytes) -> + if + C bsr 6 == 2 -> + utf8_cut(S, [C | Cur], Prev, Bytes - 1); + true -> + utf8_cut(S, [C | Cur], Cur, Bytes - 1) + end. + + +cancel_timer(Timer) -> + erlang:cancel_timer(Timer), + receive + {timeout, Timer, _} -> + ok + after 0 -> + ok + end. + +enqueue(StateData, From, To, Packet) -> + IsPresence = + case Packet of + {xmlelement, "presence", _, _} -> + case xml:get_tag_attr_s("type", Packet) of + "subscribe" -> + false; + "subscribed" -> + false; + "unsubscribe" -> + false; + "unsubscribed" -> + false; + _ -> + true + end; + _ -> + false + end, + Messages = + StateData#state.queue_len + gb_trees:size(StateData#state.pres_queue), + if + Messages >= ?MAX_OOR_MESSAGES -> + self() ! {timeout, StateData#state.keepalive_timer, []}; + true -> + ok + end, + if + IsPresence -> + LFrom = jlib:jid_tolower(From), + case is_own_presence(StateData#state.jid, LFrom) of + true -> StateData; + false -> + NewQueue = gb_trees:enter(LFrom, Packet, + StateData#state.pres_queue), + StateData#state{pres_queue = NewQueue} + end; + true -> + CleanPacket = xml:remove_subtags(Packet, "x", {"xmlns", ?NS_P1_PUSHED}), + Packet2 = + case CleanPacket of + {xmlelement, "message" = Name, Attrs, Els} -> + {xmlelement, Name, Attrs, + Els ++ + [jlib:timestamp_to_xml( + calendar:now_to_universal_time(now())), + {xmlelement, "x", [{"xmlns", ?NS_P1_PUSHED}], []}]}; + _ -> + Packet + end, + NewQueue = queue:in({From, To, Packet2}, + StateData#state.queue), + NewQueueLen = StateData#state.queue_len + 1, + StateData#state{queue = NewQueue, + queue_len = NewQueueLen} + end. + +%% Is my own presence packet ? +is_own_presence(MyFullJID, MyFullJID) -> + true; +is_own_presence(_MyFullJID, _LFrom) -> + false. + +ack(StateData, From, To, Packet) -> + if + StateData#state.ack_enabled -> + NeedsAck = + case Packet of + {xmlelement, "presence", _, _} -> + case xml:get_tag_attr_s("type", Packet) of + "subscribe" -> + true; + "subscribed" -> + true; + "unsubscribe" -> + true; + "unsubscribed" -> + true; + _ -> + false + end; + {xmlelement, "message", _, _} -> + true; + _ -> + false + end, + if + NeedsAck -> + Counter = StateData#state.ack_counter + 1, + NewAckQueue = queue:in({Counter, From, To, Packet}, + StateData#state.ack_queue), + send_ack_request(StateData#state{ack_queue = NewAckQueue, + ack_counter = Counter}); + true -> + StateData + end; + true -> + StateData + end. + +send_ack_request(StateData) -> + case StateData#state.ack_timer of + undefined -> + AckCounter = StateData#state.ack_counter, + AckTimer = + erlang:start_timer(?C2S_P1_ACK_TIMEOUT, self(), AckCounter), + AckTimeout = StateData#state.keepalive_timeout + + StateData#state.oor_timeout, + erlang:send_after(AckTimeout * 1000, self(), + {ack_timeout, AckTimeout}), + send_element( + StateData, + {xmlelement, "r", + [{"h", integer_to_list(AckCounter)}], []}), + StateData#state{ack_timer = AckTimer}; + _ -> + StateData + end. + +receive_ack(StateData, SCounter) -> + case catch list_to_integer(SCounter) of + Counter when is_integer(Counter) -> + NewQueue = clean_queue(StateData#state.ack_queue, Counter), + StateData#state{ack_queue = NewQueue}; + _ -> + StateData + end. + +clean_queue(Queue, Counter) -> + case queue:is_empty(Queue) of + true -> + Queue; + false -> + C = element(1, queue:head(Queue)), + if + C =< Counter -> + clean_queue(queue:tail(Queue), Counter); + true -> + Queue + end + end. + +prepare_acks_for_rebind(StateData) -> + AckQueue = StateData#state.ack_queue, + case queue:is_empty(AckQueue) of + true -> + StateData; + false -> + Unsent = + lists:map( + fun({_Counter, From, To, FixedPacket}) -> + {From, To, FixedPacket} + end, queue:to_list(AckQueue)), + NewQueue = queue:join(queue:from_list(Unsent), + StateData#state.queue), + StateData#state{queue = NewQueue, + queue_len = queue:len(NewQueue), + ack_queue = queue:new(), + reception = false} + end. + + +rebind(StateData, JID, StreamID) -> + case JID#jid.lresource of + "" -> + send_element(StateData, + {xmlelement, "failure", + [{"xmlns", ?NS_P1_REBIND}], + [{xmlcdata, "Invalid JID"}]}), + fsm_next_state(wait_for_feature_request, + StateData); + _ -> + ejabberd_sm:route( + ?MODULE, JID, + {xmlelement, rebind, [], {self(), StreamID}}), + receive + {rebind, false} -> + send_element(StateData, + {xmlelement, "failure", + [{"xmlns", ?NS_P1_REBIND}], + [{xmlcdata, "Session not found"}]}), + fsm_next_state(wait_for_feature_request, + StateData); + {rebind, NewStateData} -> + ?INFO_MSG("(~w) Reopened session for ~s", + [StateData#state.socket, + jlib:jid_to_string(JID)]), + SID = {now(), self()}, + Conn = get_conn_type(NewStateData), + Info = [{ip, StateData#state.ip}, {conn, Conn}, + {auth_module, NewStateData#state.auth_module}], + ejabberd_sm:open_session( + SID, + NewStateData#state.user, + NewStateData#state.server, + NewStateData#state.resource, + Info), + StateData2 = + NewStateData#state{ + socket = StateData#state.socket, + sockmod = StateData#state.sockmod, + socket_monitor = StateData#state.socket_monitor, + sid = SID, + ip = StateData#state.ip, + keepalive_timer = StateData#state.keepalive_timer, + ack_timer = undefined + }, + Presence = StateData2#state.pres_last, + case Presence of + undefined -> + ok; + _ -> + NewPriority = get_priority_from_presence(Presence), + update_priority(NewPriority, Presence, StateData2) + end, + send_element(StateData2, + {xmlelement, "rebind", + [{"xmlns", ?NS_P1_REBIND}], + []}), + StateData3 = change_reception(StateData2, true), + StateData4 = start_keepalive_timer(StateData3), + fsm_next_state(session_established, + StateData4) + after 1000 -> + send_element(StateData, + {xmlelement, "failure", + [{"xmlns", ?NS_P1_REBIND}], + [{xmlcdata, "Session not found"}]}), + fsm_next_state(wait_for_feature_request, + StateData) + end + end. + +process_push_iq(From, To, + #iq{type = _Type, sub_el = El} = IQ, + StateData) -> + {Res, NewStateData} = + case El of + {xmlelement, "push", _, _} -> + SKeepAlive = + xml:get_path_s(El, [{elem, "keepalive"}, {attr, "max"}]), + SOORTimeout = + xml:get_path_s(El, [{elem, "session"}, {attr, "duration"}]), + Status = xml:get_path_s(El, [{elem, "status"}, cdata]), + Show = xml:get_path_s(El, [{elem, "status"}, {attr, "type"}]), + SSendBody = xml:get_path_s(El, [{elem, "body"}, {attr, "send"}]), + SendBody = + case SSendBody of + "all" -> all; + "first-per-user" -> first_per_user; + "first" -> first; + "none" -> none; + _ -> none + end, + SendGroupchat = + xml:get_path_s(El, [{elem, "body"}, + {attr, "groupchat"}]) == "true", + SendFrom = send_from(El), + AppID = xml:get_path_s(El, [{elem, "appid"}, cdata]), + {Offline, Keep} = + case xml:get_path_s(El, [{elem, "offline"}, cdata]) of + "true" -> {true, false}; + "keep" -> {false, true}; + _ -> {false, false} + end, + Notification1 = xml:get_path_s(El, [{elem, "notification"}]), + Notification = + case Notification1 of + {xmlelement, _, _, _} -> + Notification1; + _ -> + {xmlelement, "notification", [], + [{xmlelement, "type", [], + [{xmlcdata, "none"}]}]} + end, + case catch {list_to_integer(SKeepAlive), + list_to_integer(SOORTimeout)} of + {KeepAlive, OORTimeout} + when OORTimeout =< ?MAX_OOR_TIMEOUT -> + if + Offline -> + ejabberd_hooks:run( + p1_push_enable_offline, + StateData#state.server, + [StateData#state.jid, + Notification, SendBody, SendFrom, AppID]); + Keep -> + ok; + true -> + ejabberd_hooks:run( + p1_push_disable, + StateData#state.server, + [StateData#state.jid, + Notification, + AppID]) + end, + NSD1 = + StateData#state{keepalive_timeout = KeepAlive, + oor_timeout = OORTimeout * 60, + oor_status = Status, + oor_show = Show, + oor_notification = Notification, + oor_send_body = SendBody, + oor_send_groupchat = SendGroupchat, + oor_send_from = SendFrom, + oor_appid = AppID, + oor_offline = Offline}, + NSD2 = start_keepalive_timer(NSD1), + {{result, []}, NSD2}; + _ -> + {{error, ?ERR_BAD_REQUEST}, StateData} + end; + {xmlelement, "disable", _, _} -> + ejabberd_hooks:run( + p1_push_disable, + StateData#state.server, + [StateData#state.jid, + StateData#state.oor_notification, + StateData#state.oor_appid]), + NSD1 = + StateData#state{keepalive_timeout = undefined, + oor_timeout = undefined, + oor_status = "", + oor_show = "", + oor_notification = undefined, + oor_send_body = all}, + NSD2 = start_keepalive_timer(NSD1), + {{result, []}, NSD2}; + _ -> + {{error, ?ERR_BAD_REQUEST}, StateData} + end, + IQRes = + case Res of + {result, Result} -> + IQ#iq{type = result, sub_el = Result}; + {error, Error} -> + IQ#iq{type = error, sub_el = [El, Error]} + end, + ejabberd_router:route( + To, From, jlib:iq_to_xml(IQRes)), + NewStateData. + +check_x_pushed({xmlelement, _Name, _Attrs, Els}) -> + check_x_pushed1(Els). + +check_x_pushed1([]) -> + false; +check_x_pushed1([{xmlcdata, _} | Els]) -> + check_x_pushed1(Els); +check_x_pushed1([El | Els]) -> + case xml:get_tag_attr_s("xmlns", El) of + ?NS_P1_PUSHED -> + true; + _ -> + check_x_pushed1(Els) + end. + +check_x_attachment({xmlelement, _Name, _Attrs, Els}) -> + check_x_attachment1(Els). + +check_x_attachment1([]) -> + false; +check_x_attachment1([{xmlcdata, _} | Els]) -> + check_x_attachment1(Els); +check_x_attachment1([El | Els]) -> + case xml:get_tag_attr_s("xmlns", El) of + ?NS_P1_ATTACHMENT -> + true; + _ -> + check_x_attachment1(Els) + end. + + +send_from(El) -> + %% First test previous version attribute: + case xml:get_path_s(El, [{elem, "body"}, {attr, "jid"}]) of + "false" -> + none; + "true" -> + jid; + "" -> + case xml:get_path_s(El, [{elem, "body"}, {attr, "from"}]) of + "jid" -> jid; + "username" -> username; + "none" -> none; + _ -> jid + end + end. + fsm_limit_opts(Opts) -> case lists:keysearch(max_fsm_queue, 1, Opts) of {value, {_, N}} when is_integer(N) -> diff --git a/src/ejabberd_c2s.hrl b/src/ejabberd_c2s.hrl index e2e3c2439..381e042ba 100644 --- a/src/ejabberd_c2s.hrl +++ b/src/ejabberd_c2s.hrl @@ -60,4 +60,26 @@ fsm_limit_opts, lang, debug=false, - flash_connection = false}). \ No newline at end of file + flash_connection = false, + reception = true, + standby = false, + queue = queue:new(), + queue_len = 0, + pres_queue = gb_trees:empty(), + keepalive_timer, + keepalive_timeout, + oor_timeout, + oor_status = "", + oor_show = "", + oor_notification, + oor_send_body = all, + oor_send_groupchat = false, + oor_send_from = jid, + oor_appid = "", + oor_unread = 0, + oor_unread_users = ?SETS:new(), + oor_offline = false, + ack_enabled = false, + ack_counter = 0, + ack_queue = queue:new(), + ack_timer}). \ No newline at end of file diff --git a/src/mod_applepush.erl b/src/mod_applepush.erl new file mode 100644 index 000000000..b8659615c --- /dev/null +++ b/src/mod_applepush.erl @@ -0,0 +1,381 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_applepush.erl +%%% Author : Alexey Shchepin +%%% Purpose : Push module support +%%% Created : 5 Jun 2009 by Alexey Shchepin +%%% +%%% ejabberd, Copyright (C) 2002-2009 ProcessOne +%%%---------------------------------------------------------------------- + +-module(mod_applepush). +-author('alexey@process-one.net'). + +-behaviour(gen_mod). + +-export([start/2, + stop/1, + push_notification/8, + enable_offline_notification/5, + disable_notification/3, + receive_offline_packet/3]). + +%% Debug commands +-export([get_token_by_jid/1]). + + +-include("ejabberd.hrl"). +-include("jlib.hrl"). +-include("mod_privacy.hrl"). + +-define(NS_P1_PUSH, "p1:push"). +-define(NS_P1_PUSHED, "p1:pushed"). +-define(NS_P1_ATTACHMENT, "http://process-one.net/attachement"). + +-record(applepush_cache, {us, device_id, options}). + +start(Host, _Opts) -> + case init_host(Host) of + true -> + mnesia:create_table( + applepush_cache, + [{disc_copies, [node()]}, + {attributes, record_info(fields, applepush_cache)}]), + mnesia:add_table_copy(muc_online_room, node(), ram_copies), + ejabberd_hooks:add(p1_push_notification, Host, + ?MODULE, push_notification, 50), + ejabberd_hooks:add(p1_push_enable_offline, Host, + ?MODULE, enable_offline_notification, 50), + ejabberd_hooks:add(p1_push_disable, Host, + ?MODULE, disable_notification, 50), + ejabberd_hooks:add(offline_message_hook, Host, + ?MODULE, receive_offline_packet, 35); + false -> + ok + end. + +stop(Host) -> + ejabberd_hooks:delete(p1_push_notification, Host, + ?MODULE, push_notification, 50), + ejabberd_hooks:delete(p1_push_disable, Host, + ?MODULE, disable_notification, 50), + ejabberd_hooks:delete(offline_message_hook, Host, + ?MODULE, receive_offline_packet, 35). + + +push_notification(Host, JID, Notification, Msg, Unread, Sound, AppID, Sender) -> + Type = xml:get_path_s(Notification, [{elem, "type"}, cdata]), + case Type of + "applepush" -> + DeviceID = xml:get_path_s(Notification, [{elem, "id"}, cdata]), + PushService = get_push_service(Host, JID, AppID), + ServiceJID = jlib:make_jid("", PushService, ""), + Badge = integer_to_list(Unread), + SSound = + if + Sound -> "true"; + true -> "false" + end, + Receiver = jlib:jid_to_string(JID), + Packet = + {xmlelement, "message", [], + [{xmlelement, "push", [{"xmlns", ?NS_P1_PUSH}], + [{xmlelement, "id", [], [{xmlcdata, DeviceID}]}, + {xmlelement, "msg", [], [{xmlcdata, Msg}]}, + {xmlelement, "badge", [], [{xmlcdata, Badge}]}, + {xmlelement, "sound", [], [{xmlcdata, SSound}]}, + {xmlelement, "from", [], [{xmlcdata, Sender}]}, + {xmlelement, "to", [], [{xmlcdata, Receiver}]}]}]}, + ejabberd_router:route(JID, ServiceJID, Packet), + stop; + _ -> + ok + end. + +enable_offline_notification(JID, Notification, SendBody, SendFrom, AppID1) -> + Type = xml:get_path_s(Notification, [{elem, "type"}, cdata]), + case Type of + "applepush" -> + DeviceID = xml:get_path_s(Notification, [{elem, "id"}, cdata]), + case catch erlang:list_to_integer(DeviceID, 16) of + ID1 when is_integer(ID1) -> + AppID = + case xml:get_path_s(Notification, + [{elem, "appid"}, cdata]) of + "" -> AppID1; + A -> A + end, + {MegaSecs, Secs, _MicroSecs} = now(), + TimeStamp = MegaSecs * 1000000 + Secs, + Options = + [{appid, AppID}, + {send_body, SendBody}, + {send_from, SendFrom}, + {timestamp, TimeStamp}], + store_cache(JID, ID1, Options); + _ -> + ok + end, + stop; + _ -> + ok + end. + +disable_notification(JID, Notification, _AppID) -> + Type = xml:get_path_s(Notification, [{elem, "type"}, cdata]), + case Type of + "applepush" -> + delete_cache(JID), + stop; + _ -> + ok + end. + +receive_offline_packet(From, To, Packet) -> + ?DEBUG("mod_applepush offline~n\tfrom ~p~n\tto ~p~n\tpacket ~P~n", + [From, To, Packet, 8]), + Host = To#jid.lserver, + case gen_mod:is_loaded(Host, mod_applepush) of + true -> + case lookup_cache(To) of + false -> + ok; + {ID, AppID, SendBody, SendFrom} -> + ?DEBUG("lookup: ~p~n", [{ID, AppID, SendBody, SendFrom}]), + Body1 = xml:get_path_s(Packet, [{elem, "body"}, cdata]), + Body = + case check_x_attachment(Packet) of + true -> + case Body1 of + "" -> [238, 128, 136]; + _ -> + [238, 128, 136, 32 | Body1] + end; + false -> + Body1 + end, + Pushed = check_x_pushed(Packet), + PushService = get_push_service(Host, To, AppID), + ServiceJID = jlib:make_jid("", PushService, ""), + if + Body == ""; + Pushed -> + if + From#jid.lserver == ServiceJID#jid.lserver -> + Disable = + xml:get_path_s( + Packet, [{elem, "disable"}]) /= "", + if + Disable -> + delete_cache(To); + true -> + ok + end; + true -> + ok + end, + ok; + true -> + BFrom = jlib:jid_remove_resource(From), + SFrom = jlib:jid_to_string(BFrom), + Offline = ejabberd_hooks:run_fold( + count_offline_messages, + Host, + 0, + [To#jid.luser, Host]), + IncludeBody = + case SendBody of + all -> + true; + first_per_user -> + Offline == 0; + first -> + Offline == 0; + none -> + false + end, + Msg = + if + IncludeBody -> + CBody = utf8_cut(Body, 100), + case SendFrom of + jid -> SFrom ++ ": " ++ CBody; + username -> BFrom#jid.user ++ ": " ++ CBody; + _ -> CBody + end; + true -> + "" + end, + SSound = + if + IncludeBody -> "true"; + true -> "false" + end, + Badge = integer_to_list(Offline + 1), + DeviceID = erlang:integer_to_list(ID, 16), + Packet1 = + {xmlelement, "message", [], + [{xmlelement, "push", [{"xmlns", ?NS_P1_PUSH}], + [{xmlelement, "id", [], + [{xmlcdata, DeviceID}]}, + {xmlelement, "msg", [], + [{xmlcdata, Msg}]}, + {xmlelement, "badge", [], + [{xmlcdata, Badge}]}, + {xmlelement, "sound", [], + [{xmlcdata, SSound}]}, + {xmlelement, "from", [], + [{xmlcdata, SFrom}]}]}]}, + ejabberd_router:route(To, ServiceJID, Packet1) + end + end; + false -> + ok + end. + +lookup_cache(JID) -> + #jid{luser = LUser, lserver = LServer} = JID, + LUS = {LUser, LServer}, + case catch mnesia:dirty_read(applepush_cache, LUS) of + [#applepush_cache{device_id = DeviceID, options = Options}] -> + AppID = proplists:get_value(appid, Options, "applepush.localhost"), + SendBody = proplists:get_value(send_body, Options, none), + SendFrom = proplists:get_value(send_from, Options, true), + {DeviceID, AppID, SendBody, SendFrom}; + _ -> + false + end. + +store_cache(JID, DeviceID, Options) -> + #jid{luser = LUser, lserver = LServer} = JID, + LUS = {LUser, LServer}, + R = #applepush_cache{us = LUS, + device_id = DeviceID, + options = Options}, + case catch mnesia:dirty_read(applepush_cache, LUS) of + [R] -> + ok; + _ -> + catch mnesia:dirty_write(R) + end. + +delete_cache(JID) -> + #jid{luser = LUser, lserver = LServer} = JID, + LUS = {LUser, LServer}, + catch mnesia:dirty_delete(applepush_cache, LUS). + + +utf8_cut(S, Bytes) -> + utf8_cut(S, [], [], Bytes + 1). + +utf8_cut(_S, _Cur, Prev, 0) -> + lists:reverse(Prev); +utf8_cut([], Cur, _Prev, _Bytes) -> + lists:reverse(Cur); +utf8_cut([C | S], Cur, Prev, Bytes) -> + if + C bsr 6 == 2 -> + utf8_cut(S, [C | Cur], Prev, Bytes - 1); + true -> + utf8_cut(S, [C | Cur], Cur, Bytes - 1) + end. + +check_x_pushed({xmlelement, _Name, _Attrs, Els}) -> + check_x_pushed1(Els). + +check_x_pushed1([]) -> + false; +check_x_pushed1([{xmlcdata, _} | Els]) -> + check_x_pushed1(Els); +check_x_pushed1([El | Els]) -> + case xml:get_tag_attr_s("xmlns", El) of + ?NS_P1_PUSHED -> + true; + _ -> + check_x_pushed1(Els) + end. + +check_x_attachment({xmlelement, _Name, _Attrs, Els}) -> + check_x_attachment1(Els). + +check_x_attachment1([]) -> + false; +check_x_attachment1([{xmlcdata, _} | Els]) -> + check_x_attachment1(Els); +check_x_attachment1([El | Els]) -> + case xml:get_tag_attr_s("xmlns", El) of + ?NS_P1_ATTACHMENT -> + true; + _ -> + check_x_attachment1(Els) + end. + + +get_push_service(Host, JID, AppID) -> + PushServices = + gen_mod:get_module_opt( + Host, ?MODULE, + push_services, []), + PushService = + case lists:keysearch(AppID, 1, PushServices) of + false -> + DefaultServices = + gen_mod:get_module_opt( + Host, ?MODULE, + default_services, []), + case lists:keysearch(JID#jid.lserver, 1, DefaultServices) of + false -> + gen_mod:get_module_opt( + Host, ?MODULE, + default_service, "applepush.localhost"); + {value, {_, PS}} -> + PS + end; + {value, {AppID, PS}} -> + PS + end, + PushService. + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Internal module protection + +-define(VALID_HOSTS, []). % default is unlimited use +-define(MAX_USERS, 0). % default is unlimited use + +init_host(VHost) -> + case ?VALID_HOSTS of + [] -> % unlimited use + true; + ValidList -> % limited use + init_host(VHost, ValidList) + end. +init_host([], _) -> + false; +init_host(VHost, ValidEncryptedList) -> + EncryptedHost = erlang:md5(lists:reverse(VHost)), + case lists:member(EncryptedHost, ValidEncryptedList) of + true -> + case ?MAX_USERS of + 0 -> true; + N -> ejabberd_auth:get_vh_registered_users_number(VHost) =< N + end; + false -> + case string:chr(VHost, $.) of + 0 -> false; + Pos -> init_host(string:substr(VHost, Pos+1), ValidEncryptedList) + end + end. + +%% Debug commands +%% JID is of form +get_token_by_jid(JIDString) -> + #jid{luser = LUser, lserver = LServer} = jlib:string_to_jid(JIDString), + LUS = {LUser, LServer}, + case mnesia:dirty_read(applepush_cache, LUS) of + [{applepush_cache,_,I,_}] -> + erlang:integer_to_list(I, 16); + _ -> + undefined + end. + diff --git a/src/mod_applepush_service.erl b/src/mod_applepush_service.erl new file mode 100644 index 000000000..5469550eb --- /dev/null +++ b/src/mod_applepush_service.erl @@ -0,0 +1,586 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_applepush_service.erl +%%% Author : Alexey Shchepin +%%% Purpose : Central push infrastructure +%%% Created : 5 Jun 2009 by Alexey Shchepin +%%% +%%% ejabberd, Copyright (C) 2002-2009 ProcessOne +%%%---------------------------------------------------------------------- + +-module(mod_applepush_service). +-author('alexey@process-one.net'). + +-behaviour(gen_server). +-behaviour(gen_mod). + +%% API +-export([start_link/2, start/2, stop/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-include("ejabberd.hrl"). +-include("jlib.hrl"). + +-record(state, {host, + socket, + gateway, + port, + feedback_socket, + feedback, + feedback_port, + feedback_buf = <<>>, + certfile, + queue, + soundfile, + cmd_id = 0, + cmd_cache = dict:new(), + device_cache = dict:new()}). + +-define(PROCNAME, ejabberd_mod_applepush_service). +-define(RECONNECT_TIMEOUT, 5000). +-define(FEEDBACK_RECONNECT_TIMEOUT, 30000). +-define(MAX_QUEUE_SIZE, 1000). +-define(CACHE_SIZE, 4096). +-define(MAX_PAYLOAD_SIZE, 255). + +-define(NS_P1_PUSH, "p1:push"). + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} +%% Description: Starts the server +%%-------------------------------------------------------------------- +start_link(Host, Opts) -> + Proc = gen_mod:get_module_proc(Host, ?PROCNAME), + gen_server:start_link({local, Proc}, ?MODULE, [Host, Opts], []). + +start(Host, Opts) -> + ssl:start(), + MyHosts = + case catch gen_mod:get_opt(hosts, Opts) of + {'EXIT', _} -> + [{gen_mod:get_opt_host(Host, Opts, "applepush.@HOST@"), Opts}]; + Hs -> + Hs + end, + lists:foreach( + fun({MyHost, MyOpts}) -> + Proc = gen_mod:get_module_proc(MyHost, ?PROCNAME), + ChildSpec = + {Proc, + {?MODULE, start_link, [MyHost, MyOpts]}, + transient, + 1000, + worker, + [?MODULE]}, + supervisor:start_child(ejabberd_sup, ChildSpec) + end, MyHosts). + +stop(Host) -> + MyHosts = + case catch gen_mod:get_module_opt(Host, ?MODULE, hosts, []) of + [] -> + [gen_mod:get_module_opt_host( + Host, ?MODULE, "applepush.@HOST@")]; + Hs -> + [H || {H, _} <- Hs] + end, + lists:foreach( + fun(MyHost) -> + Proc = gen_mod:get_module_proc(MyHost, ?PROCNAME), + gen_server:call(Proc, stop), + supervisor:terminate_child(ejabberd_sup, Proc), + supervisor:delete_child(ejabberd_sup, Proc) + end, MyHosts). + + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% Function: init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%-------------------------------------------------------------------- +init([MyHost, Opts]) -> + CertFile = gen_mod:get_opt(certfile, Opts, ""), + SoundFile = gen_mod:get_opt(sound_file, Opts, "pushalert.wav"), + Gateway = gen_mod:get_opt(gateway, Opts, "gateway.push.apple.com"), + Feedback = gen_mod:get_opt(feedback, Opts, undefined), + Port = gen_mod:get_opt(port, Opts, 2195), + FeedbackPort = gen_mod:get_opt(feedback_port, Opts, 2196), + %MyHost = gen_mod:get_opt_host(Host, Opts, "applepush.@HOST@"), + self() ! connect, + case Feedback of + undefined -> + ok; + _ -> + self() ! connect_feedback + end, + ejabberd_router:register_route(MyHost), + {ok, #state{host = MyHost, + gateway = Gateway, + port = Port, + feedback = Feedback, + feedback_port = FeedbackPort, + certfile = CertFile, + queue = {0, queue:new()}, + soundfile = SoundFile}}. + +%%-------------------------------------------------------------------- +%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% Description: Handling call messages +%%-------------------------------------------------------------------- +handle_call(stop, _From, State) -> + {stop, normal, ok, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling cast messages +%%-------------------------------------------------------------------- +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- +handle_info({route, From, To, Packet}, State) -> + case catch do_route(From, To, Packet, State) of + {'EXIT', Reason} -> + ?ERROR_MSG("~p", [Reason]), + {noreply, State}; + Res -> + Res + end; +handle_info(connect, State) -> + connect(State); +handle_info(connect_feedback, State) + when State#state.feedback /= undefined, + State#state.feedback_socket == undefined -> + Feedback = State#state.feedback, + FeedbackPort = State#state.feedback_port, + CertFile = State#state.certfile, + case ssl:connect(Feedback, FeedbackPort, + [{certfile, CertFile}, + {active, true}, + binary]) of + {ok, Socket} -> + {noreply, State#state{feedback_socket = Socket}}; + {error, Reason} -> + ?ERROR_MSG("(~p) Connection to ~p:~p failed: ~p, " + "retrying after ~p seconds", + [State#state.host, Feedback, FeedbackPort, + Reason, ?FEEDBACK_RECONNECT_TIMEOUT div 1000]), + erlang:send_after(?FEEDBACK_RECONNECT_TIMEOUT, self(), + connect_feedback), + {noreply, State} + end; +handle_info({ssl, Socket, Packet}, State) + when Socket == State#state.socket -> + case Packet of + <<8, Status, CmdID:32>> when Status /= 0 -> + case dict:find(CmdID, State#state.cmd_cache) of + {ok, {JID, _DeviceID}} -> + From = jlib:make_jid("", State#state.host, ""), + ejabberd_router:route( + From, JID, + {xmlelement, "message", [], + [{xmlelement, "disable", + [{"xmlns", ?NS_P1_PUSH}, + {"status", integer_to_list(Status)}], + []}]}); + error -> + ?ERROR_MSG("Unknown cmd ID ~p~n", [CmdID]), + ok + end; + _ -> + ?ERROR_MSG("Received unknown packet ~p~n", [Packet]) + end, + {noreply, State}; +handle_info({ssl, Socket, Packet}, State) + when Socket == State#state.feedback_socket -> + Buf = <<(State#state.feedback_buf)/binary, Packet/binary>>, + Buf2 = parse_feedback_buf(Buf, State), + {noreply, State#state{feedback_buf = Buf2}}; +handle_info({ssl_closed, Socket}, State) + when Socket == State#state.feedback_socket -> + ssl:close(Socket), + erlang:send_after(?FEEDBACK_RECONNECT_TIMEOUT, self(), + connect_feedback), + {noreply, State#state{feedback_socket = undefined, + feedback_buf = <<>>}}; +handle_info(_Info, State) -> + %io:format("got info: ~p~n", [_Info]), + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate(_Reason, State) -> + ejabberd_router:unregister_route(State#state.host), + ok. + +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%---------------------------------------------------------------------- +%%% Internal functions +%%%---------------------------------------------------------------------- + +do_route(From, To, Packet, State) -> + #jid{user = User, resource = Resource} = To, + if + (User /= "") or (Resource /= "") -> + Err = jlib:make_error_reply(Packet, ?ERR_SERVICE_UNAVAILABLE), + ejabberd_router:route(To, From, Err), + {noreply, State}; + true -> + case Packet of + {xmlelement, "iq", _, _} -> + IQ = jlib:iq_query_info(Packet), + case IQ of + #iq{type = get, xmlns = ?NS_DISCO_INFO = XMLNS, + sub_el = _SubEl, lang = Lang} = IQ -> + Res = IQ#iq{type = result, + sub_el = [{xmlelement, "query", + [{"xmlns", XMLNS}], + iq_disco(Lang)}]}, + ejabberd_router:route(To, + From, + jlib:iq_to_xml(Res)), + {noreply, State}; + #iq{type = get, xmlns = ?NS_DISCO_ITEMS = XMLNS} = IQ -> + Res = IQ#iq{type = result, + sub_el = [{xmlelement, "query", + [{"xmlns", XMLNS}], + []}]}, + ejabberd_router:route(To, + From, + jlib:iq_to_xml(Res)), + {noreply, State}; + %%#iq{type = get, xmlns = ?NS_VCARD, lang = Lang} -> + %% ResIQ = + %% IQ#iq{type = result, + %% sub_el = [{xmlelement, + %% "vCard", + %% [{"xmlns", ?NS_VCARD}], + %% iq_get_vcard(Lang)}]}, + %% ejabberd_router:route(To, + %% From, + %% jlib:iq_to_xml(ResIQ)); + _ -> + Err = jlib:make_error_reply(Packet, + ?ERR_SERVICE_UNAVAILABLE), + ejabberd_router:route(To, From, Err), + {noreply, State} + end; + {xmlelement, "message", _, Els} -> + case xml:remove_cdata(Els) of + [{xmlelement, "push", _, _}] -> + NewState = handle_message(From, To, Packet, State), + {noreply, NewState}; + [{xmlelement, "disable", _, _}] -> + {noreply, State}; + _ -> + {noreply, State} + end; + _ -> + {noreply, State} + end + end. + + +handle_message(From, To, Packet, #state{socket = undefined} = State) -> + queue_message(From, To, Packet, State); +handle_message(From, To, Packet, State) -> + DeviceID = + xml:get_path_s(Packet, + [{elem, "push"}, {elem, "id"}, cdata]), + Msg = + xml:get_path_s(Packet, + [{elem, "push"}, {elem, "msg"}, cdata]), + Badge = + xml:get_path_s(Packet, + [{elem, "push"}, {elem, "badge"}, cdata]), + Sound = + xml:get_path_s(Packet, + [{elem, "push"}, {elem, "sound"}, cdata]), + Sender = + xml:get_path_s(Packet, + [{elem, "push"}, {elem, "from"}, cdata]), + Receiver = + xml:get_path_s(Packet, + [{elem, "push"}, {elem, "to"}, cdata]), + Payload = make_payload(State, Msg, Badge, Sound, Sender), + ID = + case catch erlang:list_to_integer(DeviceID, 16) of + ID1 when is_integer(ID1) -> + ID1; + _ -> + false + end, + if + is_integer(ID) -> + Command = 1, + CmdID = State#state.cmd_id, + {MegaSecs, Secs, _MicroSecs} = now(), + Expiry = MegaSecs * 1000000 + Secs + 24 * 60 * 60, + BDeviceID = <>, + BPayload = list_to_binary(Payload), + IDLen = size(BDeviceID), + PayloadLen = size(BPayload), + Notification = + <>, + ?INFO_MSG("(~p) sending notification for ~s~n~p~npayload:~n~s~n" + "Sender: ~s~n" + "Receiver: ~s~n" + "Device ID: ~s~n", + [State#state.host, erlang:integer_to_list(ID, 16), + Notification, Payload, + jlib:jid_to_string(From), + Receiver, DeviceID]), + case ssl:send(State#state.socket, Notification) of + ok -> + cache(From, ID, State); + {error, Reason} -> + ?INFO_MSG("(~p) Connection closed: ~p, reconnecting", + [State#state.host, Reason]), + ssl:close(State#state.socket), + self() ! connect, + queue_message(From, To, Packet, + State#state{socket = undefined}) + end; + true -> + State + end. + +make_payload(State, Msg, Badge, Sound, Sender) -> + Msg2 = json_escape(Msg), + AlertPayload = + case Msg2 of + "" -> ""; + _ -> "\"alert\":\"" ++ Msg2 ++ "\"" + end, + BadgePayload = + case catch list_to_integer(Badge) of + B when is_integer(B) -> + "\"badge\":" ++ Badge; + _ -> "" + end, + SoundPayload = + case Sound of + "true" -> + SoundFile = State#state.soundfile, + "\"sound\":\"" ++ json_escape(SoundFile) ++ "\""; + _ -> "" + end, + Payloads = lists:filter(fun(S) -> S /= "" end, + [AlertPayload, BadgePayload, SoundPayload]), + Payload = + "{\"aps\":{" ++ join(Payloads, ",") ++ "}," + "\"from\":\"" ++ json_escape(Sender) ++ "\"}", + PayloadLen = length(Payload), + if + PayloadLen > ?MAX_PAYLOAD_SIZE -> + Delta = PayloadLen - ?MAX_PAYLOAD_SIZE, + MsgLen = length(Msg), + if + MsgLen /= 0 -> + CutMsg = + if + MsgLen > Delta -> + lists:sublist(Msg, MsgLen - Delta); + true -> + "" + end, + make_payload(State, CutMsg, Badge, Sound, Sender); + true -> + Payload2 = + "{\"aps\":{" ++ join(Payloads, ",") ++ "}}", + %PayloadLen2 = length(Payload2), + Payload2 + end; + true -> + Payload + end. + +connect(#state{socket = undefined} = State) -> + Gateway = State#state.gateway, + Port = State#state.port, + CertFile = State#state.certfile, + case ssl:connect(Gateway, Port, [{certfile, CertFile}, + {active, true}, + binary]) of + {ok, Socket} -> + {noreply, resend_messages(State#state{socket = Socket})}; + {error, Reason} -> + ?ERROR_MSG("(~p) Connection to ~p:~p failed: ~p, " + "retrying after ~p seconds", + [State#state.host, Gateway, Port, + Reason, ?RECONNECT_TIMEOUT div 1000]), + erlang:send_after(?RECONNECT_TIMEOUT, self(), connect), + {noreply, State} + end; +connect(State) -> + {noreply, State}. + +bounce_message(From, To, Packet, Reason) -> + {xmlelement, _, Attrs, _} = Packet, + Type = xml:get_attr_s("type", Attrs), + if Type /= "error"; Type /= "result" -> + ejabberd_router:route( + To, From, + jlib:make_error_reply( + Packet, + ?ERRT_INTERNAL_SERVER_ERROR( + xml:get_attr_s("xml:lang", Attrs), + Reason))); + true -> + ok + end. + +queue_message(From, To, Packet, State) -> + case State#state.queue of + {?MAX_QUEUE_SIZE, Queue} -> + {{value, {From1, To1, Packet1}}, Queue1} = queue:out(Queue), + bounce_message(From1, To1, Packet1, + "Unable to connect to push service"), + Queue2 = queue:in({From, To, Packet}, Queue1), + State#state{queue = {?MAX_QUEUE_SIZE, Queue2}}; + {Size, Queue} -> + Queue1 = queue:in({From, To, Packet}, Queue), + State#state{queue = {Size+1, Queue1}} + end. + +resend_messages(#state{queue = {_, Queue}} = State) -> + lists:foldl( + fun({From, To, Packet}, AccState) -> + case catch handle_message(From, To, Packet, AccState) of + {'EXIT', _} = Err -> + ?ERROR_MSG("error while processing message:~n" + "** From: ~p~n" + "** To: ~p~n" + "** Packet: ~p~n" + "** Reason: ~p", + [From, To, Packet, Err]), + AccState; + NewAccState -> + NewAccState + end + end, State#state{queue = {0, queue:new()}}, queue:to_list(Queue)). + +cache(JID, DeviceID, State) -> + CmdID = State#state.cmd_id, + Key = CmdID rem ?CACHE_SIZE, + C1 = State#state.cmd_cache, + D1 = State#state.device_cache, + D2 = case dict:find(Key, C1) of + {ok, {_, OldDeviceID}} -> + del_device_cache(D1, OldDeviceID); + error -> + D1 + end, + D3 = add_device_cache(D2, DeviceID, JID), + C2 = dict:store(Key, {JID, DeviceID}, C1), + State#state{cmd_id = CmdID + 1, + cmd_cache = C2, + device_cache = D3}. + +add_device_cache(DeviceCache, DeviceID, JID) -> + dict:update( + DeviceID, + fun({Counter, _}) -> {Counter + 1, JID} end, + {1, JID}, + DeviceCache). + +del_device_cache(DeviceCache, DeviceID) -> + case dict:find(DeviceID, DeviceCache) of + {ok, {Counter, JID}} -> + case Counter of + 1 -> + dict:erase(DeviceID, DeviceCache); + _ -> + dict:store(DeviceID, {Counter - 1, JID}, DeviceCache) + end; + error -> + DeviceCache + end. + +json_escape(S) -> + [case C of + $" -> "\\\""; + $\\ -> "\\\\"; + _ when C < 16 -> ["\\u000", erlang:integer_to_list(C, 16)]; + _ when C < 32 -> ["\\u00", erlang:integer_to_list(C, 16)]; + _ -> C + end || C <- S]. + +join(List, Sep) -> + lists:foldr(fun(A, "") -> A; + (A, Acc) -> A ++ Sep ++ Acc + end, "", List). + + + +iq_disco(Lang) -> + [{xmlelement, "identity", + [{"category", "gateway"}, + {"type", "apple"}, + {"name", translate:translate(Lang, "Apple Push Service")}], []}, + {xmlelement, "feature", [{"var", ?NS_DISCO_INFO}], []}]. + + +parse_feedback_buf(Buf, State) -> + case Buf of + <> -> + IDLen8 = IDLen * 8, + <> = BDeviceID, + case dict:find(DeviceID, State#state.device_cache) of + {ok, {_Counter, JID}} -> + From = jlib:make_jid("", State#state.host, ""), + ejabberd_router:route( + From, JID, + {xmlelement, "message", [], + [{xmlelement, "disable", + [{"xmlns", ?NS_P1_PUSH}, + {"status", "feedback"}, + {"ts", integer_to_list(TimeStamp)}], + []}]}); + error -> + ok + end, + parse_feedback_buf(Rest, State); + _ -> + Buf + end. diff --git a/src/mod_offline.erl b/src/mod_offline.erl index dfa2f46c2..2448a17c0 100644 --- a/src/mod_offline.erl +++ b/src/mod_offline.erl @@ -30,19 +30,18 @@ -behaviour(gen_mod). -export([start/2, - loop/1, + init/1, stop/1, store_packet/3, resend_offline_messages/2, pop_offline_messages/3, - get_sm_features/5, remove_expired_messages/0, remove_old_messages/1, remove_user/2, - get_queue_length/2, webadmin_page/3, webadmin_user/4, - webadmin_user_parse_query/5]). + webadmin_user_parse_query/5, + count_offline_messages/3]). -include("ejabberd.hrl"). -include("jlib.hrl"). @@ -54,9 +53,6 @@ -define(PROCNAME, ejabberd_offline). -define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000). -%% default value for the maximum number of user messages --define(MAX_USER_MESSAGES, infinity). - start(Host, Opts) -> mnesia:create_table(offline_msg, [{disc_only_copies, [node()]}, @@ -71,28 +67,30 @@ start(Host, Opts) -> ?MODULE, remove_user, 50), ejabberd_hooks:add(anonymous_purge_hook, Host, ?MODULE, remove_user, 50), - ejabberd_hooks:add(disco_sm_features, Host, - ?MODULE, get_sm_features, 50), - ejabberd_hooks:add(disco_local_features, Host, - ?MODULE, get_sm_features, 50), ejabberd_hooks:add(webadmin_page_host, Host, ?MODULE, webadmin_page, 50), ejabberd_hooks:add(webadmin_user, Host, ?MODULE, webadmin_user, 50), ejabberd_hooks:add(webadmin_user_parse_query, Host, ?MODULE, webadmin_user_parse_query, 50), - AccessMaxOfflineMsgs = gen_mod:get_opt(access_max_user_messages, Opts, max_user_offline_messages), + ejabberd_hooks:add(count_offline_messages, Host, + ?MODULE, count_offline_messages, 50), + MaxOfflineMsgs = gen_mod:get_opt(user_max_messages, Opts, infinity), register(gen_mod:get_module_proc(Host, ?PROCNAME), - spawn(?MODULE, loop, [AccessMaxOfflineMsgs])). + spawn(?MODULE, init, [MaxOfflineMsgs])). -loop(AccessMaxOfflineMsgs) -> +%% MaxOfflineMsgs is either infinity of integer > 0 +init(infinity) -> + loop(infinity); +init(MaxOfflineMsgs) + when is_integer(MaxOfflineMsgs), MaxOfflineMsgs > 0 -> + loop(MaxOfflineMsgs). + +loop(MaxOfflineMsgs) -> receive #offline_msg{us=US} = Msg -> Msgs = receive_all(US, [Msg]), Len = length(Msgs), - {User, Host} = US, - MaxOfflineMsgs = get_max_user_messages(AccessMaxOfflineMsgs, - User, Host), F = fun() -> %% Only count messages if needed: Count = if MaxOfflineMsgs =/= infinity -> @@ -118,18 +116,9 @@ loop(AccessMaxOfflineMsgs) -> end end, mnesia:transaction(F), - loop(AccessMaxOfflineMsgs); + loop(MaxOfflineMsgs); _ -> - loop(AccessMaxOfflineMsgs) - end. - -%% Function copied from ejabberd_sm.erl: -get_max_user_messages(AccessRule, LUser, Host) -> - case acl:match_rule( - Host, AccessRule, jlib:make_jid(LUser, Host, "")) of - Max when is_integer(Max) -> Max; - infinity -> infinity; - _ -> ?MAX_USER_MESSAGES + loop(MaxOfflineMsgs) end. receive_all(US, Msgs) -> @@ -150,8 +139,6 @@ stop(Host) -> ?MODULE, remove_user, 50), ejabberd_hooks:delete(anonymous_purge_hook, Host, ?MODULE, remove_user, 50), - ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, get_sm_features, 50), - ejabberd_hooks:delete(disco_local_features, Host, ?MODULE, get_sm_features, 50), ejabberd_hooks:delete(webadmin_page_host, Host, ?MODULE, webadmin_page, 50), ejabberd_hooks:delete(webadmin_user, Host, @@ -162,27 +149,12 @@ stop(Host) -> exit(whereis(Proc), stop), {wait, Proc}. -get_sm_features(Acc, _From, _To, "", _Lang) -> - Feats = case Acc of - {result, I} -> I; - _ -> [] - end, - {result, Feats ++ [?NS_FEATURE_MSGOFFLINE]}; - -get_sm_features(_Acc, _From, _To, ?NS_FEATURE_MSGOFFLINE, _Lang) -> - %% override all lesser features... - {result, []}; - -get_sm_features(Acc, _From, _To, _Node, _Lang) -> - Acc. - - store_packet(From, To, Packet) -> Type = xml:get_tag_attr_s("type", Packet), if (Type /= "error") and (Type /= "groupchat") and (Type /= "headline") -> - case check_event_chatstates(From, To, Packet) of + case check_event(From, To, Packet) of true -> #jid{luser = LUser, lserver = LServer} = To, TimeStamp = now(), @@ -203,22 +175,12 @@ store_packet(From, To, Packet) -> ok end. -%% Check if the packet has any content about XEP-0022 or XEP-0085 -check_event_chatstates(From, To, Packet) -> +check_event(From, To, Packet) -> {xmlelement, Name, Attrs, Els} = Packet, - case find_x_event_chatstates(Els, {false, false, false}) of - %% There wasn't any x:event or chatstates subelements - {false, false, _} -> + case find_x_event(Els) of + false -> true; - %% There a chatstates subelement and other stuff, but no x:event - {false, CEl, true} when CEl /= false -> - true; - %% There was only a subelement: a chatstates - {false, CEl, false} when CEl /= false -> - %% Don't allow offline storage - false; - %% There was an x:event element, and maybe also other stuff - {El, _, _} when El /= false -> + El -> case xml:get_subtag(El, "id") of false -> case xml:get_subtag(El, "offline") of @@ -246,19 +208,16 @@ check_event_chatstates(From, To, Packet) -> end end. -%% Check if the packet has subelements about XEP-0022, XEP-0085 or other -find_x_event_chatstates([], Res) -> - Res; -find_x_event_chatstates([{xmlcdata, _} | Els], Res) -> - find_x_event_chatstates(Els, Res); -find_x_event_chatstates([El | Els], {A, B, C}) -> +find_x_event([]) -> + false; +find_x_event([{xmlcdata, _} | Els]) -> + find_x_event(Els); +find_x_event([El | Els]) -> case xml:get_tag_attr_s("xmlns", El) of ?NS_EVENT -> - find_x_event_chatstates(Els, {El, B, C}); - ?NS_CHATSTATES -> - find_x_event_chatstates(Els, {A, El, C}); + El; _ -> - find_x_event_chatstates(Els, {A, B, true}) + find_x_event(Els) end. find_x_expire(_, []) -> @@ -307,13 +266,6 @@ resend_offline_messages(User, Server) -> {xmlelement, Name, Attrs, Els ++ [jlib:timestamp_to_xml( - calendar:now_to_universal_time( - R#offline_msg.timestamp), - utc, - jlib:make_jid("", Server, ""), - "Offline Storage"), - %% TODO: Delete the next three lines once XEP-0091 is Obsolete - jlib:timestamp_to_xml( calendar:now_to_universal_time( R#offline_msg.timestamp))]}} end, @@ -343,14 +295,7 @@ pop_offline_messages(Ls, User, Server) -> {xmlelement, Name, Attrs, Els ++ [jlib:timestamp_to_xml( - calendar:now_to_universal_time( - R#offline_msg.timestamp), - utc, - jlib:make_jid("", Server, ""), - "Offline Storage"), - %% TODO: Delete the next three lines once XEP-0091 is Obsolete - jlib:timestamp_to_xml( - calendar:now_to_universal_time( + calendar:now_to_universal_time( R#offline_msg.timestamp))]}} end, lists:filter( @@ -367,7 +312,6 @@ pop_offline_messages(Ls, User, Server) -> Ls end. - remove_expired_messages() -> TimeStamp = now(), F = fun() -> @@ -530,9 +474,8 @@ webadmin_page(Acc, _, _) -> Acc. user_queue(User, Server, Query, Lang) -> US = {jlib:nodeprep(User), jlib:nameprep(Server)}, Res = user_queue_parse_query(US, Query), - MsgsAll = lists:keysort(#offline_msg.timestamp, - mnesia:dirty_read({offline_msg, US})), - Msgs = get_messages_subset(User, Server, MsgsAll), + Msgs = lists:keysort(#offline_msg.timestamp, + mnesia:dirty_read({offline_msg, US})), FMsgs = lists:map( fun(#offline_msg{timestamp = TimeStamp, from = From, to = To, @@ -614,32 +557,9 @@ user_queue_parse_query(US, Query) -> us_to_list({User, Server}) -> jlib:jid_to_string({User, Server, ""}). -get_queue_length(User, Server) -> - length(mnesia:dirty_read({offline_msg, {User, Server}})). - -get_messages_subset(User, Host, MsgsAll) -> - Access = gen_mod:get_module_opt(Host, ?MODULE, access_max_user_messages, - max_user_offline_messages), - MaxOfflineMsgs = case get_max_user_messages(Access, User, Host) of - Number when is_integer(Number) -> Number; - _ -> 100 - end, - Length = length(MsgsAll), - get_messages_subset2(MaxOfflineMsgs, Length, MsgsAll). - -get_messages_subset2(Max, Length, MsgsAll) when Length =< Max*2 -> - MsgsAll; -get_messages_subset2(Max, Length, MsgsAll) -> - FirstN = Max, - {MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll), - MsgsLastN = lists:nthtail(Length - FirstN - FirstN, Msgs2), - NoJID = jlib:make_jid("...", "...", ""), - IntermediateMsg = #offline_msg{timestamp = now(), from = NoJID, to = NoJID, - packet = {xmlelement, "...", [], []}}, - MsgsFirstN ++ [IntermediateMsg] ++ MsgsLastN. - webadmin_user(Acc, User, Server, Lang) -> - QueueLen = get_queue_length(jlib:nodeprep(User), jlib:nameprep(Server)), + US = {jlib:nodeprep(User), jlib:nameprep(Server)}, + QueueLen = length(mnesia:dirty_read({offline_msg, US})), FQueueLen = [?AC("queue/", integer_to_list(QueueLen))], Acc ++ [?XCT("h3", "Offline Messages:")] ++ FQueueLen ++ [?C(" "), ?INPUTT("submit", "removealloffline", "Remove All Offline Messages")]. @@ -663,3 +583,23 @@ webadmin_user_parse_query(_, "removealloffline", User, Server, _Query) -> end; webadmin_user_parse_query(Acc, _Action, _User, _Server, _Query) -> Acc. + + +%% ------------------------------------------------ +%% mod_offline: number of messages quota management + +count_offline_messages(_Acc, User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + US = {LUser, LServer}, + F = fun () -> + p1_mnesia:count_records( + offline_msg, + #offline_msg{us=US, _='_'}) + end, + N = case catch mnesia:async_dirty(F) of + I when is_integer(I) -> I; + _ -> 0 + end, + {stop, N}. + diff --git a/src/mod_offline_odbc.erl b/src/mod_offline_odbc.erl index 063ea709e..420ff581f 100644 --- a/src/mod_offline_odbc.erl +++ b/src/mod_offline_odbc.erl @@ -32,16 +32,15 @@ -export([count_offline_messages/2]). -export([start/2, - loop/2, + init/2, stop/1, store_packet/3, pop_offline_messages/3, - get_sm_features/5, remove_user/2, - get_queue_length/2, webadmin_page/3, webadmin_user/4, - webadmin_user_parse_query/5]). + webadmin_user_parse_query/5, + count_offline_messages/3]). -include("ejabberd.hrl"). -include("jlib.hrl"). @@ -53,9 +52,6 @@ -define(PROCNAME, ejabberd_offline). -define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000). -%% default value for the maximum number of user messages --define(MAX_USER_MESSAGES, infinity). - start(Host, Opts) -> ejabberd_hooks:add(offline_message_hook, Host, ?MODULE, store_packet, 50), @@ -65,27 +61,30 @@ start(Host, Opts) -> ?MODULE, remove_user, 50), ejabberd_hooks:add(anonymous_purge_hook, Host, ?MODULE, remove_user, 50), - ejabberd_hooks:add(disco_sm_features, Host, - ?MODULE, get_sm_features, 50), - ejabberd_hooks:add(disco_local_features, Host, - ?MODULE, get_sm_features, 50), ejabberd_hooks:add(webadmin_page_host, Host, ?MODULE, webadmin_page, 50), ejabberd_hooks:add(webadmin_user, Host, ?MODULE, webadmin_user, 50), ejabberd_hooks:add(webadmin_user_parse_query, Host, ?MODULE, webadmin_user_parse_query, 50), - AccessMaxOfflineMsgs = gen_mod:get_opt(access_max_user_messages, Opts, max_user_offline_messages), + ejabberd_hooks:add(count_offline_messages, Host, + ?MODULE, count_offline_messages, 50), + MaxOfflineMsgs = gen_mod:get_opt(user_max_messages, Opts, infinity), register(gen_mod:get_module_proc(Host, ?PROCNAME), - spawn(?MODULE, loop, [Host, AccessMaxOfflineMsgs])). + spawn(?MODULE, init, [Host, MaxOfflineMsgs])). -loop(Host, AccessMaxOfflineMsgs) -> +%% MaxOfflineMsgs is either infinity of integer > 0 +init(Host, infinity) -> + loop(Host, infinity); +init(Host, MaxOfflineMsgs) + when is_integer(MaxOfflineMsgs), MaxOfflineMsgs > 0 -> + loop(Host, MaxOfflineMsgs). + +loop(Host, MaxOfflineMsgs) -> receive #offline_msg{user = User} = Msg -> Msgs = receive_all(User, [Msg]), Len = length(Msgs), - MaxOfflineMsgs = get_max_user_messages(AccessMaxOfflineMsgs, - User, Host), %% Only count existing messages if needed: Count = if MaxOfflineMsgs =/= infinity -> @@ -113,17 +112,11 @@ loop(Host, AccessMaxOfflineMsgs) -> Els ++ [jlib:timestamp_to_xml( calendar:now_to_universal_time( - M#offline_msg.timestamp), - utc, - jlib:make_jid("", Host, ""), - "Offline Storage"), - %% TODO: Delete the next three lines once XEP-0091 is Obsolete - jlib:timestamp_to_xml( - calendar:now_to_universal_time( M#offline_msg.timestamp))]}, XML = ejabberd_odbc:escape( - xml:element_to_binary(Packet)), + lists:flatten( + xml:element_to_string(Packet))), odbc_queries:add_spool_sql(Username, XML) end, Msgs), case catch odbc_queries:add_spool(Host, Query) of @@ -135,18 +128,9 @@ loop(Host, AccessMaxOfflineMsgs) -> ok end end, - loop(Host, AccessMaxOfflineMsgs); + loop(Host, MaxOfflineMsgs); _ -> - loop(Host, AccessMaxOfflineMsgs) - end. - -%% Function copied from ejabberd_sm.erl: -get_max_user_messages(AccessRule, LUser, Host) -> - case acl:match_rule( - Host, AccessRule, jlib:make_jid(LUser, Host, "")) of - Max when is_integer(Max) -> Max; - infinity -> infinity; - _ -> ?MAX_USER_MESSAGES + loop(Host, MaxOfflineMsgs) end. receive_all(Username, Msgs) -> @@ -167,8 +151,6 @@ stop(Host) -> ?MODULE, remove_user, 50), ejabberd_hooks:delete(anonymous_purge_hook, Host, ?MODULE, remove_user, 50), - ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, get_sm_features, 50), - ejabberd_hooks:delete(disco_local_features, Host, ?MODULE, get_sm_features, 50), ejabberd_hooks:delete(webadmin_page_host, Host, ?MODULE, webadmin_page, 50), ejabberd_hooks:delete(webadmin_user, Host, @@ -179,27 +161,12 @@ stop(Host) -> exit(whereis(Proc), stop), ok. -get_sm_features(Acc, _From, _To, "", _Lang) -> - Feats = case Acc of - {result, I} -> I; - _ -> [] - end, - {result, Feats ++ [?NS_FEATURE_MSGOFFLINE]}; - -get_sm_features(_Acc, _From, _To, ?NS_FEATURE_MSGOFFLINE, _Lang) -> - %% override all lesser features... - {result, []}; - -get_sm_features(Acc, _From, _To, _Node, _Lang) -> - Acc. - - store_packet(From, To, Packet) -> Type = xml:get_tag_attr_s("type", Packet), if (Type /= "error") and (Type /= "groupchat") and (Type /= "headline") -> - case check_event_chatstates(From, To, Packet) of + case check_event(From, To, Packet) of true -> #jid{luser = LUser} = To, TimeStamp = now(), @@ -220,22 +187,12 @@ store_packet(From, To, Packet) -> ok end. -%% Check if the packet has any content about XEP-0022 or XEP-0085 -check_event_chatstates(From, To, Packet) -> +check_event(From, To, Packet) -> {xmlelement, Name, Attrs, Els} = Packet, - case find_x_event_chatstates(Els, {false, false, false}) of - %% There wasn't any x:event or chatstates subelements - {false, false, _} -> + case find_x_event(Els) of + false -> true; - %% There a chatstates subelement and other stuff, but no x:event - {false, CEl, true} when CEl /= false -> - true; - %% There was only a subelement: a chatstates - {false, CEl, false} when CEl /= false -> - %% Don't allow offline storage - false; - %% There was an x:event element, and maybe also other stuff - {El, _, _} when El /= false -> + El -> case xml:get_subtag(El, "id") of false -> case xml:get_subtag(El, "offline") of @@ -257,25 +214,22 @@ check_event_chatstates(From, To, Packet) -> {xmlelement, "offline", [], []}]}] }), true - end; + end; _ -> false end end. -%% Check if the packet has subelements about XEP-0022, XEP-0085 or other -find_x_event_chatstates([], Res) -> - Res; -find_x_event_chatstates([{xmlcdata, _} | Els], Res) -> - find_x_event_chatstates(Els, Res); -find_x_event_chatstates([El | Els], {A, B, C}) -> +find_x_event([]) -> + false; +find_x_event([{xmlcdata, _} | Els]) -> + find_x_event(Els); +find_x_event([El | Els]) -> case xml:get_tag_attr_s("xmlns", El) of ?NS_EVENT -> - find_x_event_chatstates(Els, {El, B, C}); - ?NS_CHATSTATES -> - find_x_event_chatstates(Els, {A, El, C}); + El; _ -> - find_x_event_chatstates(Els, {A, B, true}) + find_x_event(Els) end. find_x_expire(_, []) -> @@ -375,7 +329,7 @@ user_queue(User, Server, Query, Lang) -> Username = ejabberd_odbc:escape(LUser), US = {LUser, LServer}, Res = user_queue_parse_query(Username, LServer, Query), - MsgsAll = case catch ejabberd_odbc:sql_query( + Msgs = case catch ejabberd_odbc:sql_query( LServer, ["select username, xml from spool" " where username='", Username, "'" @@ -393,7 +347,6 @@ user_queue(User, Server, Query, Lang) -> _ -> [] end, - Msgs = get_messages_subset(User, Server, MsgsAll), FMsgs = lists:map( fun({xmlelement, _Name, _Attrs, _Els} = Msg) -> @@ -480,8 +433,11 @@ user_queue_parse_query(Username, LServer, Query) -> us_to_list({User, Server}) -> jlib:jid_to_string({User, Server, ""}). -get_queue_length(Username, LServer) -> - case catch ejabberd_odbc:sql_query( +webadmin_user(Acc, User, Server, Lang) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + Username = ejabberd_odbc:escape(LUser), + QueueLen = case catch ejabberd_odbc:sql_query( LServer, ["select count(*) from spool" " where username='", Username, "';"]) of @@ -489,32 +445,7 @@ get_queue_length(Username, LServer) -> SCount; _ -> 0 - end. - -get_messages_subset(User, Host, MsgsAll) -> - Access = gen_mod:get_module_opt(Host, ?MODULE, access_max_user_messages, - max_user_offline_messages), - MaxOfflineMsgs = case get_max_user_messages(Access, User, Host) of - Number when is_integer(Number) -> Number; - _ -> 100 - end, - Length = length(MsgsAll), - get_messages_subset2(MaxOfflineMsgs, Length, MsgsAll). - -get_messages_subset2(Max, Length, MsgsAll) when Length =< Max*2 -> - MsgsAll; -get_messages_subset2(Max, Length, MsgsAll) -> - FirstN = Max, - {MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll), - MsgsLastN = lists:nthtail(Length - FirstN - FirstN, Msgs2), - IntermediateMsg = {xmlelement, "...", [], []}, - MsgsFirstN ++ [IntermediateMsg] ++ MsgsLastN. - -webadmin_user(Acc, User, Server, Lang) -> - LUser = jlib:nodeprep(User), - LServer = jlib:nameprep(Server), - Username = ejabberd_odbc:escape(LUser), - QueueLen = get_queue_length(Username, LServer), + end, FQueueLen = [?AC("queue/", QueueLen)], Acc ++ [?XCT("h3", "Offline Messages:")] ++ FQueueLen ++ [?C(" "), ?INPUTT("submit", "removealloffline", "Remove All Offline Messages")]. @@ -546,3 +477,30 @@ count_offline_messages(LUser, LServer) -> _ -> 0 end. + +count_offline_messages(_Acc, User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + Num = case catch ejabberd_odbc:sql_query( + LServer, + ["select xml from spool" + " where username='", LUser, "';"]) of + {selected, ["xml"], Rs} -> + lists:foldl( + fun({XML}, Acc) -> + case xml_stream:parse_element(XML) of + {error, _Reason} -> + Acc; + El -> + case xml:get_subtag(El, "body") of + false -> + Acc; + _ -> + Acc + 1 + end + end + end, 0, Rs); + _ -> + 0 + end, + {stop, Num}. diff --git a/src/xml.erl b/src/xml.erl index 22746b10a..3e0157313 100644 --- a/src/xml.erl +++ b/src/xml.erl @@ -31,6 +31,7 @@ element_to_binary/1, crypt/1, make_text_node/1, remove_cdata/1, + remove_subtags/3, get_cdata/1, get_tag_cdata/1, get_attr/2, get_attr_s/2, get_tag_attr/2, get_tag_attr_s/2, @@ -186,6 +187,31 @@ remove_cdata_p(_) -> false. remove_cdata(L) -> [E || E <- L, remove_cdata_p(E)]. +%% TODO: Make more generic. +%% For now only support all parameters: +%% xml:remove_subtags({xmlelement,"message", [{"id","81be72"}],[{xmlelement,"on-sender-server",[{"xmlns","urn:xmpp:receipts"},{"server","text-one.com"}], []}]}, "on-sender-server", {"xmlns","urn:xmpp:receipts"}). +remove_subtags({xmlelement, TagName, TagAttrs, Els}, Name, Attr) -> + {xmlelement, TagName, TagAttrs, remove_subtags1(Els, [], Name, Attr)}. + +remove_subtags1([], NewEls, _Name, _Attr) -> + lists:reverse(NewEls); +remove_subtags1([El | Els], NewEls, Name, {AttrName, AttrValue} = Attr) -> + case El of + {xmlelement, Name, Attrs, _} -> + case get_attr(AttrName, Attrs) of + false -> + remove_subtags1(Els, [El|NewEls], Name, Attr); + {value, AttrValue} -> + remove_subtags1(Els, NewEls, Name, Attr); + _ -> + remove_subtags1(Els, [El|NewEls], Name, Attr) + end; + _ -> + remove_subtags1(Els, [El|NewEls], Name, Attr) + end. + + + get_cdata(L) -> binary_to_list(list_to_binary(get_cdata(L, ""))). diff --git a/src/xml_stream.erl b/src/xml_stream.erl index b98e3291b..f3aa5502d 100644 --- a/src/xml_stream.erl +++ b/src/xml_stream.erl @@ -76,6 +76,9 @@ process_data(CallbackPid, Stack, Data) -> {?XML_CDATA, CData} -> case Stack of [El] -> + catch gen_fsm:send_all_state_event( + CallbackPid, + {xmlstreamcdata, CData}), [El]; %% Merge CDATA nodes if they are contiguous %% This does not change the semantic: the split in From 660a2735f04308a5cf58aac7df51f18d2ff18931 Mon Sep 17 00:00:00 2001 From: Eric Cestari Date: Fri, 10 Sep 2010 14:11:56 +0200 Subject: [PATCH 4/4] mod_keepalive added to repos --- src/keepalive.hrl | 1 + src/mod_keepalive.erl | 71 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+) create mode 100644 src/keepalive.hrl create mode 100644 src/mod_keepalive.erl diff --git a/src/keepalive.hrl b/src/keepalive.hrl new file mode 100644 index 000000000..aad8aeca6 --- /dev/null +++ b/src/keepalive.hrl @@ -0,0 +1 @@ +-define(MODS, []). \ No newline at end of file diff --git a/src/mod_keepalive.erl b/src/mod_keepalive.erl new file mode 100644 index 000000000..1b468d94a --- /dev/null +++ b/src/mod_keepalive.erl @@ -0,0 +1,71 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_keepalive.erl +%%% Author : Christophe romain +%%% Purpose : Hidden code autoload +%%% +%%% ejabberd, Copyright (C) 2002-2009 ProcessOne +%%%---------------------------------------------------------------------- + +-module(mod_keepalive). +-author('cromain@process-one.net'). + +-behaviour(gen_mod). + +-export([start/2, stop/1, init/1]). + +-include("keepalive.hrl"). + +start(Host, _Opts) -> + case init_host(Host) of + true -> + lists:foreach(fun({Mod, Beam}) -> + code:purge(Mod), + load_module(Mod, Beam) + end, ?MODS); + false -> + ok + end. + +stop(_Host) -> + ok. + +init(["pack"|Mods]) -> + Code = lists:foldl(fun(Mod, Acc) -> + case file:read_file(Mod++".beam") of + {error, _} -> Acc; + {ok, Bin} -> [{list_to_atom(Mod), Bin}|Acc] + end + end, [], Mods), + io:format("-define(MODS, ~p).", [Code]); +init(_) -> + error. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Internal module protection + +-define(VALID_HOSTS, []). % default is unlimited use +-define(MAX_USERS, 0). % default is unlimited use + +init_host(VHost) -> + case ?VALID_HOSTS of + [] -> % unlimited use + true; + ValidList -> % limited use + init_host(VHost, ValidList) + end. +init_host([], _) -> + false; +init_host(VHost, ValidEncryptedList) -> + EncryptedHost = erlang:md5(lists:reverse(VHost)), + case lists:member(EncryptedHost, ValidEncryptedList) of + true -> + case ?MAX_USERS of + 0 -> true; + N -> ejabberd_auth:get_vh_registered_users_number(VHost) =< N + end; + false -> + case string:chr(VHost, $.) of + 0 -> false; + Pos -> init_host(string:substr(VHost, Pos+1), ValidEncryptedList) + end + end. \ No newline at end of file