From 35cde6787dfb9223a586ab32b34cfd0a743d600b Mon Sep 17 00:00:00 2001 From: Alexey Shchepin Date: Thu, 19 Aug 2010 15:30:39 +0300 Subject: [PATCH 1/9] Initial applepush git commit --- src/ejabberd_c2s.erl | 941 ++++++++++++++++++++++++++++++++-- src/mod_applepush.erl | 381 ++++++++++++++ src/mod_applepush_service.erl | 558 ++++++++++++++++++++ src/mod_offline.erl | 25 +- src/mod_offline_odbc.erl | 32 +- src/xml_stream.erl | 6 +- 6 files changed, 1907 insertions(+), 36 deletions(-) create mode 100644 src/mod_applepush.erl create mode 100644 src/mod_applepush_service.erl diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index ef4614aa2..a9334186c 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -94,7 +94,29 @@ conn = unknown, auth_module = unknown, ip, - lang}). + lang, + reception = true, + standby = false, + queue = queue:new(), + queue_len = 0, + pres_queue = gb_trees:empty(), + keepalive_timer, + keepalive_timeout, + oor_timeout, + oor_status = "", + oor_show = "", + oor_notification, + oor_send_body = all, + oor_send_groupchat = false, + oor_send_from = jid, + oor_appid = "", + oor_unread = 0, + oor_unread_users = ?SETS:new(), + oor_offline = false, + ack_enabled = false, + ack_counter = 0, + ack_queue = queue:new(), + ack_timer}). %-define(DBGFSM, true). @@ -134,6 +156,15 @@ ?SERRT_POLICY_VIOLATION(Lang, Text)). -define(INVALID_FROM, ?SERR_INVALID_FROM). +-define(NS_P1_REBIND, "p1:rebind"). +-define(NS_P1_PUSH, "p1:push"). +-define(NS_P1_ACK, "p1:ack"). +-define(NS_P1_PUSHED, "p1:pushed"). +-define(NS_P1_ATTACHMENT, "http://process-one.net/attachement"). + +-define(C2S_P1_ACK_TIMEOUT, 10000). +-define(MAX_OOR_TIMEOUT, 1440). %% Max allowed session duration 24h (24*60) +-define(MAX_OOR_MESSAGES, 1000). %%%---------------------------------------------------------------------- %%% API @@ -309,9 +340,22 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) -> false -> [] end, + P1PushFeature = + [{xmlelement, "push", + [{"xmlns", ?NS_P1_PUSH}], []}], + P1RebindFeature = + [{xmlelement, "rebind", + [{"xmlns", ?NS_P1_REBIND}], []}], + P1AckFeature = + [{xmlelement, "ack", + [{"xmlns", ?NS_P1_ACK}], []}], send_element(StateData, {xmlelement, "stream:features", [], - TLSFeature ++ CompressFeature ++ + TLSFeature ++ + CompressFeature ++ + P1PushFeature ++ + P1RebindFeature ++ + P1AckFeature ++ [{xmlelement, "mechanisms", [{"xmlns", ?NS_SASL}], Mechs}] ++ @@ -328,7 +372,9 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) -> case StateData#state.resource of "" -> RosterVersioningFeature = ejabberd_hooks:run_fold(roster_get_versioning_feature, Server, [], [Server]), - StreamFeatures = [{xmlelement, "bind", + StreamFeatures = [{xmlelement, "push", + [{"xmlns", ?NS_P1_PUSH}], []}, + {xmlelement, "bind", [{"xmlns", ?NS_BIND}], []}, {xmlelement, "session", [{"xmlns", ?NS_SESSION}], []} | RosterVersioningFeature], @@ -480,7 +526,7 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> privacy_get_user_list, StateData#state.server, #userlist{}, [U, StateData#state.server]), - NewStateData = + NewStateData = StateData#state{ user = U, resource = R, @@ -524,8 +570,33 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> end end; _ -> - process_unauthenticated_stanza(StateData, El), - fsm_next_state(wait_for_auth, StateData) + {xmlelement, Name, Attrs, _Els} = El, + case {xml:get_attr_s("xmlns", Attrs), Name} of + {?NS_P1_REBIND, "rebind"} -> + SJID = xml:get_path_s(El, [{elem, "jid"}, cdata]), + SID = xml:get_path_s(El, [{elem, "sid"}, cdata]), + case jlib:string_to_jid(SJID) of + error -> + send_element(StateData, + {xmlelement, "failure", + [{"xmlns", ?NS_P1_REBIND}], + [{xmlcdata, "Invalid JID"}]}), + fsm_next_state(wait_for_auth, + StateData); + JID -> + case rebind(StateData, JID, SID) of + {next_state, wait_for_feature_request, + NewStateData, Timeout} -> + {next_state, wait_for_auth, + NewStateData, Timeout}; + Res -> + Res + end + end; + _ -> + process_unauthenticated_stanza(StateData, El), + fsm_next_state(wait_for_auth, StateData) + end end; wait_for_auth(timeout, StateData) -> @@ -656,6 +727,23 @@ wait_for_feature_request({xmlstreamelement, El}, StateData) -> StateData) end end; + {?NS_P1_REBIND, "rebind"} -> + SJID = xml:get_path_s(El, [{elem, "jid"}, cdata]), + SID = xml:get_path_s(El, [{elem, "sid"}, cdata]), + case jlib:string_to_jid(SJID) of + error -> + send_element(StateData, + {xmlelement, "failure", + [{"xmlns", ?NS_P1_REBIND}], + [{xmlcdata, "Invalid JID"}]}), + fsm_next_state(wait_for_feature_request, + StateData); + JID -> + rebind(StateData, JID, SID) + end; + {?NS_P1_ACK, "ack"} -> + fsm_next_state(wait_for_feature_request, + StateData#state{ack_enabled = true}); _ -> if (SockMod == gen_tcp) and TLSRequired -> @@ -898,7 +986,9 @@ session_established({xmlstreamelement, El}, StateData) -> send_trailer(StateData), {stop, normal, StateData}; _NewEl -> - session_established2(El, StateData) + NSD1 = change_reception(StateData, true), + NSD2 = start_keepalive_timer(NSD1), + session_established2(El, NSD2) end; %% We hibernate the process to reduce memory consumption after a @@ -925,7 +1015,16 @@ session_established({xmlstreamerror, _}, StateData) -> {stop, normal, StateData}; session_established(closed, StateData) -> - {stop, normal, StateData}. + if + not StateData#state.reception -> + fsm_next_state(session_established, StateData); + (StateData#state.keepalive_timer /= undefined) -> + NewState1 = change_reception(StateData, false), + NewState = start_keepalive_timer(NewState1), + fsm_next_state(session_established, NewState); + true -> + {stop, normal, StateData} + end. %% Process packets sent by user (coming from user on c2s XMPP %% connection) @@ -992,6 +1091,8 @@ session_established2(El, StateData) -> #iq{xmlns = ?NS_PRIVACY} = IQ -> process_privacy_iq( FromJID, ToJID, IQ, StateData); + #iq{xmlns = ?NS_P1_PUSH} = IQ -> + process_push_iq(FromJID, ToJID, IQ, StateData); _ -> ejabberd_hooks:run( user_send_packet, @@ -1008,6 +1109,12 @@ session_established2(El, StateData) -> check_privacy_route(FromJID, StateData, FromJID, ToJID, NewEl), StateData; + "standby" -> + StandBy = xml:get_tag_cdata(NewEl) == "true", + change_standby(StateData, StandBy); + "a" -> + SCounter = xml:get_tag_attr_s("h", NewEl), + receive_ack(StateData, SCounter); _ -> StateData end @@ -1036,6 +1143,12 @@ session_established2(El, StateData) -> %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- +handle_event({xmlstreamcdata, _}, StateName, StateData) -> + ?DEBUG("cdata ping", []), + NSD1 = change_reception(StateData, true), + NSD2 = start_keepalive_timer(NSD1), + fsm_next_state(StateName, NSD2); + handle_event(_Event, StateName, StateData) -> fsm_next_state(StateName, StateData). @@ -1228,6 +1341,17 @@ handle_info({route, From, To, Packet}, StateName, StateData) -> _ -> {false, Attrs, StateData} end; + rebind -> + {Pid2, StreamID2} = Els, + if + StreamID2 == StateData#state.streamid -> + Pid2 ! {rebind, prepare_acks_for_rebind(StateData)}, + receive after 1000 -> ok end, + {exit, Attrs, rebind}; + true -> + Pid2 ! {rebind, false}, + {false, Attrs, StateData} + end; "iq" -> IQ = jlib:iq_query_info(Packet), case IQ of @@ -1273,18 +1397,22 @@ handle_info({route, From, To, Packet}, StateName, StateData) -> {From, To, Packet}, in]) of allow -> - case ejabberd_hooks:run_fold( - feature_check_packet, StateData#state.server, - allow, - [StateData#state.jid, - StateData#state.server, - StateData#state.pres_last, - {From, To, Packet}, - in]) of - allow -> - {true, Attrs, StateData}; - deny -> - {false, Attrs, StateData} + if StateData#state.reception -> + case ejabberd_hooks:run_fold( + feature_check_packet, StateData#state.server, + allow, + [StateData#state.jid, + StateData#state.server, + StateData#state.pres_last, + {From, To, Packet}, + in]) of + allow -> + {true, Attrs, StateData}; + deny -> + {false, Attrs, StateData} + end; + true -> + {true, Attrs, StateData} end; deny -> {false, Attrs, StateData} @@ -1294,29 +1422,85 @@ handle_info({route, From, To, Packet}, StateName, StateData) -> end, if Pass == exit -> - %% When Pass==exit, NewState contains a string instead of a #state{} - Lang = StateData#state.lang, - send_element(StateData, ?SERRT_CONFLICT(Lang, NewState)), - send_trailer(StateData), - {stop, normal, StateData}; + catch send_trailer(StateData), + case NewState of + rebind -> + {stop, normal, StateData#state{authenticated = rebinded}}; + _ -> + {stop, normal, StateData} + end; Pass -> Attrs2 = jlib:replace_from_to_attrs(jlib:jid_to_string(From), jlib:jid_to_string(To), NewAttrs), FixedPacket = {xmlelement, Name, Attrs2, Els}, - send_element(StateData, FixedPacket), + NewState2 = + if + NewState#state.reception and + not (NewState#state.standby and (Name /= "message")) -> + send_element(NewState, FixedPacket), + ack(NewState, From, To, FixedPacket); + true -> + NewState1 = send_out_of_reception_message( + NewState, From, To, Packet), + enqueue(NewState1, From, To, FixedPacket) + end, ejabberd_hooks:run(user_receive_packet, StateData#state.server, [StateData#state.jid, From, To, FixedPacket]), ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), - fsm_next_state(StateName, NewState); + fsm_next_state(StateName, NewState2); true -> ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), fsm_next_state(StateName, NewState) end; -handle_info({'DOWN', Monitor, _Type, _Object, _Info}, _StateName, StateData) - when Monitor == StateData#state.socket_monitor -> +handle_info({timeout, Timer, _}, StateName, + #state{keepalive_timer = Timer, reception = true} = StateData) -> + NewState1 = change_reception(StateData, false), + NewState = start_keepalive_timer(NewState1), + fsm_next_state(StateName, NewState); +handle_info({timeout, Timer, _}, _StateName, + #state{keepalive_timer = Timer, reception = false} = StateData) -> {stop, normal, StateData}; +handle_info({timeout, Timer, PrevCounter}, StateName, + #state{ack_timer = Timer} = StateData) -> + AckCounter = StateData#state.ack_counter, + NewState = + if + PrevCounter >= AckCounter -> + StateData#state{ack_timer = undefined}; + true -> + send_ack_request(StateData#state{ack_timer = undefined}) + end, + fsm_next_state(StateName, NewState); +handle_info({ack_timeout, Counter}, StateName, StateData) -> + AckQueue = StateData#state.ack_queue, + case queue:is_empty(AckQueue) of + true -> + fsm_next_state(StateName, StateData); + false -> + C = element(1, queue:head(AckQueue)), + if + C =< Counter -> + {stop, normal, StateData}; + true -> + fsm_next_state(StateName, StateData) + end + end; +handle_info({'DOWN', Monitor, _Type, _Object, _Info}, StateName, StateData) + when Monitor == StateData#state.socket_monitor -> + if + (StateName == session_established) and + (not StateData#state.reception) -> + fsm_next_state(StateName, StateData); + (StateName == session_established) and + (StateData#state.keepalive_timer /= undefined) -> + NewState1 = change_reception(StateData, false), + NewState = start_keepalive_timer(NewState1), + fsm_next_state(StateName, NewState); + true -> + {stop, normal, StateData} + end; handle_info(system_shutdown, StateName, StateData) -> case StateName of wait_for_stream -> @@ -1362,6 +1546,13 @@ terminate(_Reason, StateName, StateData) -> StateData, From, StateData#state.pres_a, Packet), presence_broadcast( StateData, From, StateData#state.pres_i, Packet); + rebinded -> + ejabberd_sm:close_session( + StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource), + ok; _ -> ?INFO_MSG("(~w) Close session for ~s", [StateData#state.socket, @@ -1393,6 +1584,36 @@ terminate(_Reason, StateName, StateData) -> StateData, From, StateData#state.pres_i, Packet) end end, + case StateData#state.authenticated of + rebinded -> + ok; + _ -> + if + not StateData#state.reception, not StateData#state.oor_offline -> + SFrom = jlib:jid_to_string(StateData#state.jid), + ejabberd_hooks:run( + p1_push_notification, + StateData#state.server, + [StateData#state.server, + StateData#state.jid, + StateData#state.oor_notification, + "Instant messaging session expired", + 0, + false, + StateData#state.oor_appid, + SFrom]); + true -> + ok + end, + lists:foreach( + fun({_Counter, From, To, FixedPacket}) -> + ejabberd_router:route(From, To, FixedPacket) + end, queue:to_list(StateData#state.ack_queue)), + lists:foreach( + fun({From, To, FixedPacket}) -> + ejabberd_router:route(From, To, FixedPacket) + end, queue:to_list(StateData#state.queue)) + end, bounce_messages(); _ -> ok @@ -1541,9 +1762,39 @@ process_presence_probe(From, To, StateData) -> andalso ?SETS:is_element(LFrom, StateData#state.pres_a), if Cond1 -> + Packet = + case StateData#state.reception of + true -> + StateData#state.pres_last; + false -> + case StateData#state.oor_show of + "" -> + StateData#state.pres_last; + _ -> + {xmlelement, _, PresAttrs, PresEls} = + StateData#state.pres_last, + PresEls1 = + lists:flatmap( + fun({xmlelement, Name, _, _}) + when Name == "show"; + Name == "status" -> + []; + (E) -> + [E] + end, PresEls), + {xmlelement, "presence", PresAttrs, + [{xmlelement, "show", [], + [{xmlcdata, + StateData#state.oor_show}]}, + {xmlelement, "status", [], + [{xmlcdata, + StateData#state.oor_status}]}] + ++ PresEls1} + end + end, Timestamp = StateData#state.pres_timestamp, - Packet = xml:append_subtags( - StateData#state.pres_last, + Packet1 = xml:append_subtags( + Packet, %% To is the one sending the presence (the target of the probe) [jlib:timestamp_to_xml(Timestamp, utc, To, ""), %% TODO: Delete the next line once XEP-0091 is Obsolete @@ -1554,7 +1805,7 @@ process_presence_probe(From, To, StateData) -> [StateData#state.user, StateData#state.server, StateData#state.privacy_list, - {To, From, Packet}, + {To, From, Packet1}, out]) of deny -> ok; @@ -1564,7 +1815,7 @@ process_presence_probe(From, To, StateData) -> %% Don't route a presence probe to oneself case From == To of false -> - ejabberd_router:route(To, From, Packet); + ejabberd_router:route(To, From, Packet1); true -> ok end @@ -2173,6 +2424,630 @@ check_from(El, FromJID) -> end end. +start_keepalive_timer(StateData) -> + if + is_reference(StateData#state.keepalive_timer) -> + cancel_timer(StateData#state.keepalive_timer); + true -> + ok + end, + Timeout = + if + StateData#state.reception -> StateData#state.keepalive_timeout; + true -> StateData#state.oor_timeout + end, + Timer = + if + is_integer(Timeout) -> + erlang:start_timer(Timeout * 1000, self(), []); + true -> + undefined + end, + StateData#state{keepalive_timer = Timer}. + +change_reception(#state{reception = Reception} = StateData, Reception) -> + StateData; +change_reception(#state{reception = true} = StateData, false) -> + ?DEBUG("reception -> false", []), + case StateData#state.oor_show of + "" -> + ok; + _ -> + Packet = + {xmlelement, "presence", [], + [{xmlelement, "show", [], + [{xmlcdata, StateData#state.oor_show}]}, + {xmlelement, "status", [], + [{xmlcdata, StateData#state.oor_status}]}]}, + update_priority(0, Packet, StateData), + presence_broadcast_to_trusted( + StateData, + StateData#state.jid, + StateData#state.pres_f, + StateData#state.pres_a, + Packet) + end, + StateData#state{reception = false}; +change_reception(#state{reception = false, standby = true} = StateData, true) -> + ?DEBUG("reception -> standby", []), + NewQueue = + lists:foldl( + fun({_From, _To, {xmlelement, "message", _, _} = FixedPacket}, Q) -> + send_element(StateData, FixedPacket), + Q; + (Item, Q) -> + queue:in(Item, Q) + end, queue:new(), queue:to_list(StateData#state.queue)), + StateData#state{queue = NewQueue, + queue_len = queue:len(NewQueue), + reception = true, + oor_unread = 0, + oor_unread_users = ?SETS:new()}; +change_reception(#state{reception = false} = StateData, true) -> + ?DEBUG("reception -> true", []), + case StateData#state.oor_show of + "" -> + ok; + _ -> + Packet = StateData#state.pres_last, + NewPriority = get_priority_from_presence(Packet), + update_priority(NewPriority, Packet, StateData), + presence_broadcast_to_trusted( + StateData, + StateData#state.jid, + StateData#state.pres_f, + StateData#state.pres_a, + Packet) + end, + lists:foreach( + fun({_From, _To, FixedPacket}) -> + send_element(StateData, FixedPacket) + end, queue:to_list(StateData#state.queue)), + lists:foreach( + fun(FixedPacket) -> + send_element(StateData, FixedPacket) + end, gb_trees:values(StateData#state.pres_queue)), + StateData#state{queue = queue:new(), + queue_len = 0, + pres_queue = gb_trees:empty(), + reception = true, + oor_unread = 0, + oor_unread_users = ?SETS:new()}. + +change_standby(#state{standby = StandBy} = StateData, StandBy) -> + StateData; +change_standby(#state{standby = false} = StateData, true) -> + ?DEBUG("standby -> true", []), + StateData#state{standby = true}; +change_standby(#state{standby = true} = StateData, false) -> + ?DEBUG("standby -> false", []), + lists:foreach( + fun({_From, _To, FixedPacket}) -> + send_element(StateData, FixedPacket) + end, queue:to_list(StateData#state.queue)), + lists:foreach( + fun(FixedPacket) -> + send_element(StateData, FixedPacket) + end, gb_trees:values(StateData#state.pres_queue)), + StateData#state{queue = queue:new(), + queue_len = 0, + pres_queue = gb_trees:empty(), + standby = false}. + +send_out_of_reception_message(StateData, From, To, + {xmlelement, "message", _, _} = Packet) -> + Type = xml:get_tag_attr_s("type", Packet), + if + (Type == "normal") or + (Type == "") or + (Type == "chat") or + (StateData#state.oor_send_groupchat and (Type == "groupchat"))-> + %Lang = case xml:get_tag_attr_s("xml:lang", Packet) of + % "" -> + % StateData#state.lang; + % L -> + % L + % end, + %Text = translate:translate( + % Lang, "User is temporarily out of reception"), + %MsgType = "error", + %Message = {xmlelement, "message", + % [{"type", MsgType}], + % [{xmlelement, "body", [], + % [{xmlcdata, Text}]}]}, + %ejabberd_router:route(To, From, Message), + Body1 = xml:get_path_s(Packet, [{elem, "body"}, cdata]), + Body = + case check_x_attachment(Packet) of + true -> + case Body1 of + "" -> [238, 128, 136]; + _ -> + [238, 128, 136, 32 | Body1] + end; + false -> + Body1 + end, + Pushed = check_x_pushed(Packet), + if + Body == ""; + Pushed -> + StateData; + true -> + BFrom = jlib:jid_remove_resource(From), + LBFrom = jlib:jid_tolower(BFrom), + UnreadUsers = ?SETS:add_element( + LBFrom, + StateData#state.oor_unread_users), + IncludeBody = + case StateData#state.oor_send_body of + all -> + true; + first_per_user -> + not ?SETS:is_element( + LBFrom, + StateData#state.oor_unread_users); + first -> + StateData#state.oor_unread == 0; + none -> + false + end, + Unread = StateData#state.oor_unread + 1, + SFrom = jlib:jid_to_string(BFrom), + Msg = + if + IncludeBody -> + CBody = utf8_cut(Body, 100), + case StateData#state.oor_send_from of + jid -> SFrom ++ ": " ++ CBody; + username -> BFrom#jid.user ++ ": " ++ CBody; + _ -> CBody + end; + true -> + "" + end, + Sound = IncludeBody, + AppID = StateData#state.oor_appid, + ejabberd_hooks:run( + p1_push_notification, + StateData#state.server, + [StateData#state.server, + StateData#state.jid, + StateData#state.oor_notification, + Msg, + Unread, + Sound, + AppID, + SFrom]), + %% This hook is intended to give other module a + %% chance to notify the sender that the message is + %% not directly delivered to the client (almost + %% equivalent to offline). + ejabberd_hooks:run(delayed_message_hook, + StateData#state.server, + [From, To, Packet]), + StateData#state{oor_unread = Unread, + oor_unread_users = UnreadUsers} + end; + true -> + StateData + end; +send_out_of_reception_message(StateData, _From, _To, _Packet) -> + StateData. + +utf8_cut(S, Bytes) -> + utf8_cut(S, [], [], Bytes + 1). + +utf8_cut(_S, _Cur, Prev, 0) -> + lists:reverse(Prev); +utf8_cut([], Cur, _Prev, _Bytes) -> + lists:reverse(Cur); +utf8_cut([C | S], Cur, Prev, Bytes) -> + if + C bsr 6 == 2 -> + utf8_cut(S, [C | Cur], Prev, Bytes - 1); + true -> + utf8_cut(S, [C | Cur], Cur, Bytes - 1) + end. + + +cancel_timer(Timer) -> + erlang:cancel_timer(Timer), + receive + {timeout, Timer, _} -> + ok + after 0 -> + ok + end. + +enqueue(StateData, From, To, Packet) -> + IsPresence = + case Packet of + {xmlelement, "presence", _, _} -> + case xml:get_tag_attr_s("type", Packet) of + "subscribe" -> + false; + "subscribed" -> + false; + "unsubscribe" -> + false; + "unsubscribed" -> + false; + _ -> + true + end; + _ -> + false + end, + Messages = + StateData#state.queue_len + gb_trees:size(StateData#state.pres_queue), + if + Messages >= ?MAX_OOR_MESSAGES -> + self() ! {timeout, StateData#state.keepalive_timer, []}; + true -> + ok + end, + if + IsPresence -> + LFrom = jlib:jid_tolower(From), + case is_own_presence(StateData#state.jid, LFrom) of + true -> StateData; + false -> + NewQueue = gb_trees:enter(LFrom, Packet, + StateData#state.pres_queue), + StateData#state{pres_queue = NewQueue} + end; + true -> + Packet2 = + case Packet of + {xmlelement, "message" = Name, Attrs, Els} -> + {xmlelement, Name, Attrs, + Els ++ + [jlib:timestamp_to_xml( + calendar:now_to_universal_time(now())), + {xmlelement, "x", [{"xmlns", ?NS_P1_PUSHED}], []}]}; + _ -> + Packet + end, + NewQueue = queue:in({From, To, Packet2}, + StateData#state.queue), + NewQueueLen = StateData#state.queue_len + 1, + StateData#state{queue = NewQueue, + queue_len = NewQueueLen} + end. + +%% Is my own presence packet ? +is_own_presence(MyFullJID, MyFullJID) -> + true; +is_own_presence(_MyFullJID, _LFrom) -> + false. + +ack(StateData, From, To, Packet) -> + if + StateData#state.ack_enabled -> + NeedsAck = + case Packet of + {xmlelement, "presence", _, _} -> + case xml:get_tag_attr_s("type", Packet) of + "subscribe" -> + true; + "subscribed" -> + true; + "unsubscribe" -> + true; + "unsubscribed" -> + true; + _ -> + false + end; + {xmlelement, "message", _, _} -> + true; + _ -> + false + end, + if + NeedsAck -> + Counter = StateData#state.ack_counter + 1, + NewAckQueue = queue:in({Counter, From, To, Packet}, + StateData#state.ack_queue), + send_ack_request(StateData#state{ack_queue = NewAckQueue, + ack_counter = Counter}); + true -> + StateData + end; + true -> + StateData + end. + +send_ack_request(StateData) -> + case StateData#state.ack_timer of + undefined -> + AckCounter = StateData#state.ack_counter, + AckTimer = + erlang:start_timer(?C2S_P1_ACK_TIMEOUT, self(), AckCounter), + AckTimeout = StateData#state.keepalive_timeout + + StateData#state.oor_timeout, + erlang:send_after(AckTimeout * 1000, self(), + {ack_timeout, AckTimeout}), + send_element( + StateData, + {xmlelement, "r", + [{"h", integer_to_list(AckCounter)}], []}), + StateData#state{ack_timer = AckTimer}; + _ -> + StateData + end. + +receive_ack(StateData, SCounter) -> + case catch list_to_integer(SCounter) of + Counter when is_integer(Counter) -> + NewQueue = clean_queue(StateData#state.ack_queue, Counter), + StateData#state{ack_queue = NewQueue}; + _ -> + StateData + end. + +clean_queue(Queue, Counter) -> + case queue:is_empty(Queue) of + true -> + Queue; + false -> + C = element(1, queue:head(Queue)), + if + C =< Counter -> + clean_queue(queue:tail(Queue), Counter); + true -> + Queue + end + end. + +prepare_acks_for_rebind(StateData) -> + AckQueue = StateData#state.ack_queue, + case queue:is_empty(AckQueue) of + true -> + StateData; + false -> + Unsent = + lists:map( + fun({_Counter, From, To, FixedPacket}) -> + {From, To, FixedPacket} + end, queue:to_list(AckQueue)), + NewQueue = queue:join(queue:from_list(Unsent), + StateData#state.queue), + StateData#state{queue = NewQueue, + queue_len = queue:len(NewQueue), + ack_queue = queue:new(), + reception = false} + end. + + +rebind(StateData, JID, StreamID) -> + case JID#jid.lresource of + "" -> + send_element(StateData, + {xmlelement, "failure", + [{"xmlns", ?NS_P1_REBIND}], + [{xmlcdata, "Invalid JID"}]}), + fsm_next_state(wait_for_feature_request, + StateData); + _ -> + ejabberd_sm:route( + ?MODULE, JID, + {xmlelement, rebind, [], {self(), StreamID}}), + receive + {rebind, false} -> + send_element(StateData, + {xmlelement, "failure", + [{"xmlns", ?NS_P1_REBIND}], + [{xmlcdata, "Session not found"}]}), + fsm_next_state(wait_for_feature_request, + StateData); + {rebind, NewStateData} -> + ?INFO_MSG("(~w) Reopened session for ~s", + [StateData#state.socket, + jlib:jid_to_string(JID)]), + SID = {now(), self()}, + Conn = get_conn_type(NewStateData), + Info = [{ip, StateData#state.ip}, {conn, Conn}, + {auth_module, NewStateData#state.auth_module}], + ejabberd_sm:open_session( + SID, + NewStateData#state.user, + NewStateData#state.server, + NewStateData#state.resource, + Info), + StateData2 = + NewStateData#state{ + socket = StateData#state.socket, + sockmod = StateData#state.sockmod, + socket_monitor = StateData#state.socket_monitor, + sid = SID, + ip = StateData#state.ip, + keepalive_timer = StateData#state.keepalive_timer, + ack_timer = undefined + }, + Presence = StateData2#state.pres_last, + case Presence of + undefined -> + ok; + _ -> + NewPriority = get_priority_from_presence(Presence), + update_priority(NewPriority, Presence, StateData2) + end, + send_element(StateData2, + {xmlelement, "rebind", + [{"xmlns", ?NS_P1_REBIND}], + []}), + StateData3 = change_reception(StateData2, true), + StateData4 = start_keepalive_timer(StateData3), + fsm_next_state(session_established, + StateData4) + after 1000 -> + send_element(StateData, + {xmlelement, "failure", + [{"xmlns", ?NS_P1_REBIND}], + [{xmlcdata, "Session not found"}]}), + fsm_next_state(wait_for_feature_request, + StateData) + end + end. + +process_push_iq(From, To, + #iq{type = _Type, sub_el = El} = IQ, + StateData) -> + {Res, NewStateData} = + case El of + {xmlelement, "push", _, _} -> + SKeepAlive = + xml:get_path_s(El, [{elem, "keepalive"}, {attr, "max"}]), + SOORTimeout = + xml:get_path_s(El, [{elem, "session"}, {attr, "duration"}]), + Status = xml:get_path_s(El, [{elem, "status"}, cdata]), + Show = xml:get_path_s(El, [{elem, "status"}, {attr, "type"}]), + SSendBody = xml:get_path_s(El, [{elem, "body"}, {attr, "send"}]), + SendBody = + case SSendBody of + "all" -> all; + "first-per-user" -> first_per_user; + "first" -> first; + "none" -> none; + _ -> none + end, + SendGroupchat = + xml:get_path_s(El, [{elem, "body"}, + {attr, "groupchat"}]) == "true", + SendFrom = send_from(El), + AppID = xml:get_path_s(El, [{elem, "appid"}, cdata]), + {Offline, Keep} = + case xml:get_path_s(El, [{elem, "offline"}, cdata]) of + "true" -> {true, false}; + "keep" -> {false, true}; + _ -> {false, false} + end, + Notification1 = xml:get_path_s(El, [{elem, "notification"}]), + Notification = + case Notification1 of + {xmlelement, _, _, _} -> + Notification1; + _ -> + {xmlelement, "notification", [], + [{xmlelement, "type", [], + [{xmlcdata, "none"}]}]} + end, + case catch {list_to_integer(SKeepAlive), + list_to_integer(SOORTimeout)} of + {KeepAlive, OORTimeout} + when OORTimeout =< ?MAX_OOR_TIMEOUT -> + if + Offline -> + ejabberd_hooks:run( + p1_push_enable_offline, + StateData#state.server, + [StateData#state.jid, + Notification, SendBody, SendFrom, AppID]); + Keep -> + ok; + true -> + ejabberd_hooks:run( + p1_push_disable, + StateData#state.server, + [StateData#state.jid, + Notification, + AppID]) + end, + NSD1 = + StateData#state{keepalive_timeout = KeepAlive, + oor_timeout = OORTimeout * 60, + oor_status = Status, + oor_show = Show, + oor_notification = Notification, + oor_send_body = SendBody, + oor_send_groupchat = SendGroupchat, + oor_send_from = SendFrom, + oor_appid = AppID, + oor_offline = Offline}, + NSD2 = start_keepalive_timer(NSD1), + {{result, []}, NSD2}; + _ -> + {{error, ?ERR_BAD_REQUEST}, StateData} + end; + {xmlelement, "disable", _, _} -> + ejabberd_hooks:run( + p1_push_disable, + StateData#state.server, + [StateData#state.jid, + StateData#state.oor_notification, + StateData#state.oor_appid]), + NSD1 = + StateData#state{keepalive_timeout = undefined, + oor_timeout = undefined, + oor_status = "", + oor_show = "", + oor_notification = undefined, + oor_send_body = all}, + NSD2 = start_keepalive_timer(NSD1), + {{result, []}, NSD2}; + _ -> + {{error, ?ERR_BAD_REQUEST}, StateData} + end, + IQRes = + case Res of + {result, Result} -> + IQ#iq{type = result, sub_el = Result}; + {error, Error} -> + IQ#iq{type = error, sub_el = [El, Error]} + end, + ejabberd_router:route( + To, From, jlib:iq_to_xml(IQRes)), + NewStateData. + +check_x_pushed({xmlelement, _Name, _Attrs, Els}) -> + check_x_pushed1(Els). + +check_x_pushed1([]) -> + false; +check_x_pushed1([{xmlcdata, _} | Els]) -> + check_x_pushed1(Els); +check_x_pushed1([El | Els]) -> + case xml:get_tag_attr_s("xmlns", El) of + ?NS_P1_PUSHED -> + true; + _ -> + check_x_pushed1(Els) + end. + +check_x_attachment({xmlelement, _Name, _Attrs, Els}) -> + check_x_attachment1(Els). + +check_x_attachment1([]) -> + false; +check_x_attachment1([{xmlcdata, _} | Els]) -> + check_x_attachment1(Els); +check_x_attachment1([El | Els]) -> + case xml:get_tag_attr_s("xmlns", El) of + ?NS_P1_ATTACHMENT -> + true; + _ -> + check_x_attachment1(Els) + end. + + +send_from(El) -> + %% First test previous version attribute: + case xml:get_path_s(El, [{elem, "body"}, {attr, "jid"}]) of + "false" -> + none; + "true" -> + jid; + "" -> + case xml:get_path_s(El, [{elem, "body"}, {attr, "from"}]) of + "jid" -> jid; + "username" -> username; + "none" -> none; + _ -> jid + end + end. + fsm_limit_opts(Opts) -> case lists:keysearch(max_fsm_queue, 1, Opts) of {value, {_, N}} when is_integer(N) -> diff --git a/src/mod_applepush.erl b/src/mod_applepush.erl new file mode 100644 index 000000000..b8659615c --- /dev/null +++ b/src/mod_applepush.erl @@ -0,0 +1,381 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_applepush.erl +%%% Author : Alexey Shchepin +%%% Purpose : Push module support +%%% Created : 5 Jun 2009 by Alexey Shchepin +%%% +%%% ejabberd, Copyright (C) 2002-2009 ProcessOne +%%%---------------------------------------------------------------------- + +-module(mod_applepush). +-author('alexey@process-one.net'). + +-behaviour(gen_mod). + +-export([start/2, + stop/1, + push_notification/8, + enable_offline_notification/5, + disable_notification/3, + receive_offline_packet/3]). + +%% Debug commands +-export([get_token_by_jid/1]). + + +-include("ejabberd.hrl"). +-include("jlib.hrl"). +-include("mod_privacy.hrl"). + +-define(NS_P1_PUSH, "p1:push"). +-define(NS_P1_PUSHED, "p1:pushed"). +-define(NS_P1_ATTACHMENT, "http://process-one.net/attachement"). + +-record(applepush_cache, {us, device_id, options}). + +start(Host, _Opts) -> + case init_host(Host) of + true -> + mnesia:create_table( + applepush_cache, + [{disc_copies, [node()]}, + {attributes, record_info(fields, applepush_cache)}]), + mnesia:add_table_copy(muc_online_room, node(), ram_copies), + ejabberd_hooks:add(p1_push_notification, Host, + ?MODULE, push_notification, 50), + ejabberd_hooks:add(p1_push_enable_offline, Host, + ?MODULE, enable_offline_notification, 50), + ejabberd_hooks:add(p1_push_disable, Host, + ?MODULE, disable_notification, 50), + ejabberd_hooks:add(offline_message_hook, Host, + ?MODULE, receive_offline_packet, 35); + false -> + ok + end. + +stop(Host) -> + ejabberd_hooks:delete(p1_push_notification, Host, + ?MODULE, push_notification, 50), + ejabberd_hooks:delete(p1_push_disable, Host, + ?MODULE, disable_notification, 50), + ejabberd_hooks:delete(offline_message_hook, Host, + ?MODULE, receive_offline_packet, 35). + + +push_notification(Host, JID, Notification, Msg, Unread, Sound, AppID, Sender) -> + Type = xml:get_path_s(Notification, [{elem, "type"}, cdata]), + case Type of + "applepush" -> + DeviceID = xml:get_path_s(Notification, [{elem, "id"}, cdata]), + PushService = get_push_service(Host, JID, AppID), + ServiceJID = jlib:make_jid("", PushService, ""), + Badge = integer_to_list(Unread), + SSound = + if + Sound -> "true"; + true -> "false" + end, + Receiver = jlib:jid_to_string(JID), + Packet = + {xmlelement, "message", [], + [{xmlelement, "push", [{"xmlns", ?NS_P1_PUSH}], + [{xmlelement, "id", [], [{xmlcdata, DeviceID}]}, + {xmlelement, "msg", [], [{xmlcdata, Msg}]}, + {xmlelement, "badge", [], [{xmlcdata, Badge}]}, + {xmlelement, "sound", [], [{xmlcdata, SSound}]}, + {xmlelement, "from", [], [{xmlcdata, Sender}]}, + {xmlelement, "to", [], [{xmlcdata, Receiver}]}]}]}, + ejabberd_router:route(JID, ServiceJID, Packet), + stop; + _ -> + ok + end. + +enable_offline_notification(JID, Notification, SendBody, SendFrom, AppID1) -> + Type = xml:get_path_s(Notification, [{elem, "type"}, cdata]), + case Type of + "applepush" -> + DeviceID = xml:get_path_s(Notification, [{elem, "id"}, cdata]), + case catch erlang:list_to_integer(DeviceID, 16) of + ID1 when is_integer(ID1) -> + AppID = + case xml:get_path_s(Notification, + [{elem, "appid"}, cdata]) of + "" -> AppID1; + A -> A + end, + {MegaSecs, Secs, _MicroSecs} = now(), + TimeStamp = MegaSecs * 1000000 + Secs, + Options = + [{appid, AppID}, + {send_body, SendBody}, + {send_from, SendFrom}, + {timestamp, TimeStamp}], + store_cache(JID, ID1, Options); + _ -> + ok + end, + stop; + _ -> + ok + end. + +disable_notification(JID, Notification, _AppID) -> + Type = xml:get_path_s(Notification, [{elem, "type"}, cdata]), + case Type of + "applepush" -> + delete_cache(JID), + stop; + _ -> + ok + end. + +receive_offline_packet(From, To, Packet) -> + ?DEBUG("mod_applepush offline~n\tfrom ~p~n\tto ~p~n\tpacket ~P~n", + [From, To, Packet, 8]), + Host = To#jid.lserver, + case gen_mod:is_loaded(Host, mod_applepush) of + true -> + case lookup_cache(To) of + false -> + ok; + {ID, AppID, SendBody, SendFrom} -> + ?DEBUG("lookup: ~p~n", [{ID, AppID, SendBody, SendFrom}]), + Body1 = xml:get_path_s(Packet, [{elem, "body"}, cdata]), + Body = + case check_x_attachment(Packet) of + true -> + case Body1 of + "" -> [238, 128, 136]; + _ -> + [238, 128, 136, 32 | Body1] + end; + false -> + Body1 + end, + Pushed = check_x_pushed(Packet), + PushService = get_push_service(Host, To, AppID), + ServiceJID = jlib:make_jid("", PushService, ""), + if + Body == ""; + Pushed -> + if + From#jid.lserver == ServiceJID#jid.lserver -> + Disable = + xml:get_path_s( + Packet, [{elem, "disable"}]) /= "", + if + Disable -> + delete_cache(To); + true -> + ok + end; + true -> + ok + end, + ok; + true -> + BFrom = jlib:jid_remove_resource(From), + SFrom = jlib:jid_to_string(BFrom), + Offline = ejabberd_hooks:run_fold( + count_offline_messages, + Host, + 0, + [To#jid.luser, Host]), + IncludeBody = + case SendBody of + all -> + true; + first_per_user -> + Offline == 0; + first -> + Offline == 0; + none -> + false + end, + Msg = + if + IncludeBody -> + CBody = utf8_cut(Body, 100), + case SendFrom of + jid -> SFrom ++ ": " ++ CBody; + username -> BFrom#jid.user ++ ": " ++ CBody; + _ -> CBody + end; + true -> + "" + end, + SSound = + if + IncludeBody -> "true"; + true -> "false" + end, + Badge = integer_to_list(Offline + 1), + DeviceID = erlang:integer_to_list(ID, 16), + Packet1 = + {xmlelement, "message", [], + [{xmlelement, "push", [{"xmlns", ?NS_P1_PUSH}], + [{xmlelement, "id", [], + [{xmlcdata, DeviceID}]}, + {xmlelement, "msg", [], + [{xmlcdata, Msg}]}, + {xmlelement, "badge", [], + [{xmlcdata, Badge}]}, + {xmlelement, "sound", [], + [{xmlcdata, SSound}]}, + {xmlelement, "from", [], + [{xmlcdata, SFrom}]}]}]}, + ejabberd_router:route(To, ServiceJID, Packet1) + end + end; + false -> + ok + end. + +lookup_cache(JID) -> + #jid{luser = LUser, lserver = LServer} = JID, + LUS = {LUser, LServer}, + case catch mnesia:dirty_read(applepush_cache, LUS) of + [#applepush_cache{device_id = DeviceID, options = Options}] -> + AppID = proplists:get_value(appid, Options, "applepush.localhost"), + SendBody = proplists:get_value(send_body, Options, none), + SendFrom = proplists:get_value(send_from, Options, true), + {DeviceID, AppID, SendBody, SendFrom}; + _ -> + false + end. + +store_cache(JID, DeviceID, Options) -> + #jid{luser = LUser, lserver = LServer} = JID, + LUS = {LUser, LServer}, + R = #applepush_cache{us = LUS, + device_id = DeviceID, + options = Options}, + case catch mnesia:dirty_read(applepush_cache, LUS) of + [R] -> + ok; + _ -> + catch mnesia:dirty_write(R) + end. + +delete_cache(JID) -> + #jid{luser = LUser, lserver = LServer} = JID, + LUS = {LUser, LServer}, + catch mnesia:dirty_delete(applepush_cache, LUS). + + +utf8_cut(S, Bytes) -> + utf8_cut(S, [], [], Bytes + 1). + +utf8_cut(_S, _Cur, Prev, 0) -> + lists:reverse(Prev); +utf8_cut([], Cur, _Prev, _Bytes) -> + lists:reverse(Cur); +utf8_cut([C | S], Cur, Prev, Bytes) -> + if + C bsr 6 == 2 -> + utf8_cut(S, [C | Cur], Prev, Bytes - 1); + true -> + utf8_cut(S, [C | Cur], Cur, Bytes - 1) + end. + +check_x_pushed({xmlelement, _Name, _Attrs, Els}) -> + check_x_pushed1(Els). + +check_x_pushed1([]) -> + false; +check_x_pushed1([{xmlcdata, _} | Els]) -> + check_x_pushed1(Els); +check_x_pushed1([El | Els]) -> + case xml:get_tag_attr_s("xmlns", El) of + ?NS_P1_PUSHED -> + true; + _ -> + check_x_pushed1(Els) + end. + +check_x_attachment({xmlelement, _Name, _Attrs, Els}) -> + check_x_attachment1(Els). + +check_x_attachment1([]) -> + false; +check_x_attachment1([{xmlcdata, _} | Els]) -> + check_x_attachment1(Els); +check_x_attachment1([El | Els]) -> + case xml:get_tag_attr_s("xmlns", El) of + ?NS_P1_ATTACHMENT -> + true; + _ -> + check_x_attachment1(Els) + end. + + +get_push_service(Host, JID, AppID) -> + PushServices = + gen_mod:get_module_opt( + Host, ?MODULE, + push_services, []), + PushService = + case lists:keysearch(AppID, 1, PushServices) of + false -> + DefaultServices = + gen_mod:get_module_opt( + Host, ?MODULE, + default_services, []), + case lists:keysearch(JID#jid.lserver, 1, DefaultServices) of + false -> + gen_mod:get_module_opt( + Host, ?MODULE, + default_service, "applepush.localhost"); + {value, {_, PS}} -> + PS + end; + {value, {AppID, PS}} -> + PS + end, + PushService. + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Internal module protection + +-define(VALID_HOSTS, []). % default is unlimited use +-define(MAX_USERS, 0). % default is unlimited use + +init_host(VHost) -> + case ?VALID_HOSTS of + [] -> % unlimited use + true; + ValidList -> % limited use + init_host(VHost, ValidList) + end. +init_host([], _) -> + false; +init_host(VHost, ValidEncryptedList) -> + EncryptedHost = erlang:md5(lists:reverse(VHost)), + case lists:member(EncryptedHost, ValidEncryptedList) of + true -> + case ?MAX_USERS of + 0 -> true; + N -> ejabberd_auth:get_vh_registered_users_number(VHost) =< N + end; + false -> + case string:chr(VHost, $.) of + 0 -> false; + Pos -> init_host(string:substr(VHost, Pos+1), ValidEncryptedList) + end + end. + +%% Debug commands +%% JID is of form +get_token_by_jid(JIDString) -> + #jid{luser = LUser, lserver = LServer} = jlib:string_to_jid(JIDString), + LUS = {LUser, LServer}, + case mnesia:dirty_read(applepush_cache, LUS) of + [{applepush_cache,_,I,_}] -> + erlang:integer_to_list(I, 16); + _ -> + undefined + end. + diff --git a/src/mod_applepush_service.erl b/src/mod_applepush_service.erl new file mode 100644 index 000000000..8f7621b89 --- /dev/null +++ b/src/mod_applepush_service.erl @@ -0,0 +1,558 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_applepush_service.erl +%%% Author : Alexey Shchepin +%%% Purpose : Central push infrastructure +%%% Created : 5 Jun 2009 by Alexey Shchepin +%%% +%%% ejabberd, Copyright (C) 2002-2009 ProcessOne +%%%---------------------------------------------------------------------- + +-module(mod_applepush_service). +-author('alexey@process-one.net'). + +-behaviour(gen_server). +-behaviour(gen_mod). + +%% API +-export([start_link/2, start/2, stop/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-include("ejabberd.hrl"). +-include("jlib.hrl"). + +-record(state, {host, + socket, + gateway, + port, + feedback_socket, + feedback, + feedback_port, + feedback_buf = <<>>, + certfile, + queue, + soundfile, + cmd_id = 0, + cmd_cache = dict:new(), + device_cache = dict:new()}). + +-define(PROCNAME, ejabberd_mod_applepush_service). +-define(RECONNECT_TIMEOUT, 5000). +-define(FEEDBACK_RECONNECT_TIMEOUT, 30000). +-define(MAX_QUEUE_SIZE, 1000). +-define(CACHE_SIZE, 4096). + +-define(NS_P1_PUSH, "p1:push"). + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} +%% Description: Starts the server +%%-------------------------------------------------------------------- +start_link(Host, Opts) -> + Proc = gen_mod:get_module_proc(Host, ?PROCNAME), + gen_server:start_link({local, Proc}, ?MODULE, [Host, Opts], []). + +start(Host, Opts) -> + ssl:start(), + MyHosts = + case catch gen_mod:get_opt(hosts, Opts) of + {'EXIT', _} -> + [{gen_mod:get_opt_host(Host, Opts, "applepush.@HOST@"), Opts}]; + Hs -> + Hs + end, + lists:foreach( + fun({MyHost, MyOpts}) -> + Proc = gen_mod:get_module_proc(MyHost, ?PROCNAME), + ChildSpec = + {Proc, + {?MODULE, start_link, [MyHost, MyOpts]}, + transient, + 1000, + worker, + [?MODULE]}, + supervisor:start_child(ejabberd_sup, ChildSpec) + end, MyHosts). + +stop(Host) -> + MyHosts = + case catch gen_mod:get_module_opt(Host, ?MODULE, hosts, []) of + [] -> + [gen_mod:get_module_opt_host( + Host, ?MODULE, "applepush.@HOST@")]; + Hs -> + [H || {H, _} <- Hs] + end, + lists:foreach( + fun(MyHost) -> + Proc = gen_mod:get_module_proc(MyHost, ?PROCNAME), + gen_server:call(Proc, stop), + supervisor:terminate_child(ejabberd_sup, Proc), + supervisor:delete_child(ejabberd_sup, Proc) + end, MyHosts). + + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% Function: init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%-------------------------------------------------------------------- +init([MyHost, Opts]) -> + CertFile = gen_mod:get_opt(certfile, Opts, ""), + SoundFile = gen_mod:get_opt(sound_file, Opts, "pushalert.wav"), + Gateway = gen_mod:get_opt(gateway, Opts, "gateway.push.apple.com"), + Feedback = gen_mod:get_opt(feedback, Opts, undefined), + Port = gen_mod:get_opt(port, Opts, 2195), + FeedbackPort = gen_mod:get_opt(feedback_port, Opts, 2196), + %MyHost = gen_mod:get_opt_host(Host, Opts, "applepush.@HOST@"), + self() ! connect, + case Feedback of + undefined -> + ok; + _ -> + self() ! connect_feedback + end, + ejabberd_router:register_route(MyHost), + {ok, #state{host = MyHost, + gateway = Gateway, + port = Port, + feedback = Feedback, + feedback_port = FeedbackPort, + certfile = CertFile, + queue = {0, queue:new()}, + soundfile = SoundFile}}. + +%%-------------------------------------------------------------------- +%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% Description: Handling call messages +%%-------------------------------------------------------------------- +handle_call(stop, _From, State) -> + {stop, normal, ok, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling cast messages +%%-------------------------------------------------------------------- +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- +handle_info({route, From, To, Packet}, State) -> + case catch do_route(From, To, Packet, State) of + {'EXIT', Reason} -> + ?ERROR_MSG("~p", [Reason]), + {noreply, State}; + Res -> + Res + end; +handle_info(connect, State) -> + connect(State); +handle_info(connect_feedback, State) + when State#state.feedback /= undefined, + State#state.feedback_socket == undefined -> + Feedback = State#state.feedback, + FeedbackPort = State#state.feedback_port, + CertFile = State#state.certfile, + case ssl:connect(Feedback, FeedbackPort, + [{certfile, CertFile}, + {active, true}, + binary]) of + {ok, Socket} -> + {noreply, State#state{feedback_socket = Socket}}; + {error, Reason} -> + ?ERROR_MSG("(~p) Connection to ~p:~p failed: ~p, " + "retrying after ~p seconds", + [State#state.host, Feedback, FeedbackPort, + Reason, ?FEEDBACK_RECONNECT_TIMEOUT div 1000]), + erlang:send_after(?FEEDBACK_RECONNECT_TIMEOUT, self(), + connect_feedback), + {noreply, State} + end; +handle_info({ssl, Socket, Packet}, State) + when Socket == State#state.socket -> + case Packet of + <<8, Status, CmdID:32>> when Status /= 0 -> + case dict:find(CmdID, State#state.cmd_cache) of + {ok, {JID, _DeviceID}} -> + From = jlib:make_jid("", State#state.host, ""), + ejabberd_router:route( + From, JID, + {xmlelement, "message", [], + [{xmlelement, "disable", + [{"xmlns", ?NS_P1_PUSH}, + {"status", integer_to_list(Status)}], + []}]}); + error -> + ?ERROR_MSG("Unknown cmd ID ~p~n", [CmdID]), + ok + end; + _ -> + ?ERROR_MSG("Received unknown packet ~p~n", [Packet]) + end, + {noreply, State}; +handle_info({ssl, Socket, Packet}, State) + when Socket == State#state.feedback_socket -> + Buf = <<(State#state.feedback_buf)/binary, Packet/binary>>, + Buf2 = parse_feedback_buf(Buf, State), + {noreply, State#state{feedback_buf = Buf2}}; +handle_info({ssl_closed, Socket}, State) + when Socket == State#state.feedback_socket -> + ssl:close(Socket), + erlang:send_after(?FEEDBACK_RECONNECT_TIMEOUT, self(), + connect_feedback), + {noreply, State#state{feedback_socket = undefined, + feedback_buf = <<>>}}; +handle_info(_Info, State) -> + %io:format("got info: ~p~n", [_Info]), + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate(_Reason, State) -> + ejabberd_router:unregister_route(State#state.host), + ok. + +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%---------------------------------------------------------------------- +%%% Internal functions +%%%---------------------------------------------------------------------- + +do_route(From, To, Packet, State) -> + #jid{user = User, resource = Resource} = To, + if + (User /= "") or (Resource /= "") -> + Err = jlib:make_error_reply(Packet, ?ERR_SERVICE_UNAVAILABLE), + ejabberd_router:route(To, From, Err), + {noreply, State}; + true -> + case Packet of + {xmlelement, "iq", _, _} -> + IQ = jlib:iq_query_info(Packet), + case IQ of + #iq{type = get, xmlns = ?NS_DISCO_INFO = XMLNS, + sub_el = _SubEl, lang = Lang} = IQ -> + Res = IQ#iq{type = result, + sub_el = [{xmlelement, "query", + [{"xmlns", XMLNS}], + iq_disco(Lang)}]}, + ejabberd_router:route(To, + From, + jlib:iq_to_xml(Res)), + {noreply, State}; + #iq{type = get, xmlns = ?NS_DISCO_ITEMS = XMLNS} = IQ -> + Res = IQ#iq{type = result, + sub_el = [{xmlelement, "query", + [{"xmlns", XMLNS}], + []}]}, + ejabberd_router:route(To, + From, + jlib:iq_to_xml(Res)), + {noreply, State}; + %%#iq{type = get, xmlns = ?NS_VCARD, lang = Lang} -> + %% ResIQ = + %% IQ#iq{type = result, + %% sub_el = [{xmlelement, + %% "vCard", + %% [{"xmlns", ?NS_VCARD}], + %% iq_get_vcard(Lang)}]}, + %% ejabberd_router:route(To, + %% From, + %% jlib:iq_to_xml(ResIQ)); + _ -> + Err = jlib:make_error_reply(Packet, + ?ERR_SERVICE_UNAVAILABLE), + ejabberd_router:route(To, From, Err), + {noreply, State} + end; + {xmlelement, "message", _, Els} -> + case xml:remove_cdata(Els) of + [{xmlelement, "push", _, _}] -> + NewState = handle_message(From, To, Packet, State), + {noreply, NewState}; + [{xmlelement, "disable", _, _}] -> + {noreply, State}; + _ -> + {noreply, State} + end; + _ -> + {noreply, State} + end + end. + + +handle_message(From, To, Packet, #state{socket = undefined} = State) -> + queue_message(From, To, Packet, State); +handle_message(From, To, Packet, State) -> + DeviceID = + xml:get_path_s(Packet, + [{elem, "push"}, {elem, "id"}, cdata]), + Msg = + xml:get_path_s(Packet, + [{elem, "push"}, {elem, "msg"}, cdata]), + Badge = + xml:get_path_s(Packet, + [{elem, "push"}, {elem, "badge"}, cdata]), + Sound = + xml:get_path_s(Packet, + [{elem, "push"}, {elem, "sound"}, cdata]), + Sender = + xml:get_path_s(Packet, + [{elem, "push"}, {elem, "from"}, cdata]), + Receiver = + xml:get_path_s(Packet, + [{elem, "push"}, {elem, "to"}, cdata]), + Msg2 = json_escape(Msg), + AlertPayload = + case Msg2 of + "" -> ""; + _ -> "\"alert\":\"" ++ Msg2 ++ "\"" + end, + BadgePayload = + case catch list_to_integer(Badge) of + B when is_integer(B) -> + "\"badge\":" ++ Badge; + _ -> "" + end, + SoundPayload = + case Sound of + "true" -> + SoundFile = State#state.soundfile, + "\"sound\":\"" ++ json_escape(SoundFile) ++ "\""; + _ -> "" + end, + Payloads = lists:filter(fun(S) -> S /= "" end, + [AlertPayload, BadgePayload, SoundPayload]), + Payload = + "{\"aps\":{" ++ join(Payloads, ",") ++ "}," + "\"from\":\"" ++ json_escape(Sender) ++ "\"}", + ID = + case catch erlang:list_to_integer(DeviceID, 16) of + ID1 when is_integer(ID1) -> + ID1; + _ -> + false + end, + if + is_integer(ID) -> + Command = 1, + CmdID = State#state.cmd_id, + {MegaSecs, Secs, _MicroSecs} = now(), + Expiry = MegaSecs * 1000000 + Secs + 24 * 60 * 60, + BDeviceID = <>, + BPayload = list_to_binary(Payload), + IDLen = size(BDeviceID), + PayloadLen = size(BPayload), + Notification = + <>, + ?INFO_MSG("(~p) sending notification for ~s~n~p~npayload:~n~s~n" + "Sender: ~s~n" + "Receiver: ~s~n" + "Device ID: ~s~n", + [State#state.host, erlang:integer_to_list(ID, 16), + Notification, Payload, + jlib:jid_to_string(From), + Receiver, DeviceID]), + case ssl:send(State#state.socket, Notification) of + ok -> + cache(From, ID, State); + {error, Reason} -> + ?INFO_MSG("(~p) Connection closed: ~p, reconnecting", + [State#state.host, Reason]), + ssl:close(State#state.socket), + self() ! connect, + queue_message(From, To, Packet, + State#state{socket = undefined}) + end; + true -> + State + end. + +connect(#state{socket = undefined} = State) -> + Gateway = State#state.gateway, + Port = State#state.port, + CertFile = State#state.certfile, + case ssl:connect(Gateway, Port, [{certfile, CertFile}, + {active, true}, + binary]) of + {ok, Socket} -> + {noreply, resend_messages(State#state{socket = Socket})}; + {error, Reason} -> + ?ERROR_MSG("(~p) Connection to ~p:~p failed: ~p, " + "retrying after ~p seconds", + [State#state.host, Gateway, Port, + Reason, ?RECONNECT_TIMEOUT div 1000]), + erlang:send_after(?RECONNECT_TIMEOUT, self(), connect), + {noreply, State} + end; +connect(State) -> + {noreply, State}. + +bounce_message(From, To, Packet, Reason) -> + {xmlelement, _, Attrs, _} = Packet, + Type = xml:get_attr_s("type", Attrs), + if Type /= "error"; Type /= "result" -> + ejabberd_router:route( + To, From, + jlib:make_error_reply( + Packet, + ?ERRT_INTERNAL_SERVER_ERROR( + xml:get_attr_s("xml:lang", Attrs), + Reason))); + true -> + ok + end. + +queue_message(From, To, Packet, State) -> + case State#state.queue of + {?MAX_QUEUE_SIZE, Queue} -> + {{value, {From1, To1, Packet1}}, Queue1} = queue:out(Queue), + bounce_message(From1, To1, Packet1, + "Unable to connect to push service"), + Queue2 = queue:in({From, To, Packet}, Queue1), + State#state{queue = {?MAX_QUEUE_SIZE, Queue2}}; + {Size, Queue} -> + Queue1 = queue:in({From, To, Packet}, Queue), + State#state{queue = {Size+1, Queue1}} + end. + +resend_messages(#state{queue = {_, Queue}} = State) -> + lists:foldl( + fun({From, To, Packet}, AccState) -> + case catch handle_message(From, To, Packet, AccState) of + {'EXIT', _} = Err -> + ?ERROR_MSG("error while processing message:~n" + "** From: ~p~n" + "** To: ~p~n" + "** Packet: ~p~n" + "** Reason: ~p", + [From, To, Packet, Err]), + AccState; + NewAccState -> + NewAccState + end + end, State#state{queue = {0, queue:new()}}, queue:to_list(Queue)). + +cache(JID, DeviceID, State) -> + CmdID = State#state.cmd_id, + Key = CmdID rem ?CACHE_SIZE, + C1 = State#state.cmd_cache, + D1 = State#state.device_cache, + D2 = case dict:find(Key, C1) of + {ok, {_, OldDeviceID}} -> + del_device_cache(D1, OldDeviceID); + error -> + D1 + end, + D3 = add_device_cache(D2, DeviceID, JID), + C2 = dict:store(Key, {JID, DeviceID}, C1), + State#state{cmd_id = CmdID + 1, + cmd_cache = C2, + device_cache = D3}. + +add_device_cache(DeviceCache, DeviceID, JID) -> + dict:update( + DeviceID, + fun({Counter, _}) -> {Counter + 1, JID} end, + {1, JID}, + DeviceCache). + +del_device_cache(DeviceCache, DeviceID) -> + case dict:find(DeviceID, DeviceCache) of + {ok, {Counter, JID}} -> + case Counter of + 1 -> + dict:erase(DeviceID, DeviceCache); + _ -> + dict:store(DeviceID, {Counter - 1, JID}, DeviceCache) + end; + error -> + DeviceCache + end. + +json_escape(S) -> + [case C of + $" -> "\\\""; + $\\ -> "\\\\"; + _ when C < 16 -> ["\\u000", erlang:integer_to_list(C, 16)]; + _ when C < 32 -> ["\\u00", erlang:integer_to_list(C, 16)]; + _ -> C + end || C <- S]. + +join(List, Sep) -> + lists:foldr(fun(A, "") -> A; + (A, Acc) -> A ++ Sep ++ Acc + end, "", List). + + + +iq_disco(Lang) -> + [{xmlelement, "identity", + [{"category", "gateway"}, + {"type", "apple"}, + {"name", translate:translate(Lang, "Apple Push Service")}], []}, + {xmlelement, "feature", [{"var", ?NS_DISCO_INFO}], []}]. + + +parse_feedback_buf(Buf, State) -> + case Buf of + <> -> + IDLen8 = IDLen * 8, + <> = BDeviceID, + case dict:find(DeviceID, State#state.device_cache) of + {ok, {_Counter, JID}} -> + From = jlib:make_jid("", State#state.host, ""), + ejabberd_router:route( + From, JID, + {xmlelement, "message", [], + [{xmlelement, "disable", + [{"xmlns", ?NS_P1_PUSH}, + {"status", "feedback"}, + {"ts", integer_to_list(TimeStamp)}], + []}]}); + error -> + ok + end, + parse_feedback_buf(Rest, State); + _ -> + Buf + end. diff --git a/src/mod_offline.erl b/src/mod_offline.erl index dfa2f46c2..88d0572de 100644 --- a/src/mod_offline.erl +++ b/src/mod_offline.erl @@ -42,7 +42,8 @@ get_queue_length/2, webadmin_page/3, webadmin_user/4, - webadmin_user_parse_query/5]). + webadmin_user_parse_query/5, + count_offline_messages/3]). -include("ejabberd.hrl"). -include("jlib.hrl"). @@ -81,6 +82,8 @@ start(Host, Opts) -> ?MODULE, webadmin_user, 50), ejabberd_hooks:add(webadmin_user_parse_query, Host, ?MODULE, webadmin_user_parse_query, 50), + ejabberd_hooks:add(count_offline_messages, Host, + ?MODULE, count_offline_messages, 50), AccessMaxOfflineMsgs = gen_mod:get_opt(access_max_user_messages, Opts, max_user_offline_messages), register(gen_mod:get_module_proc(Host, ?PROCNAME), spawn(?MODULE, loop, [AccessMaxOfflineMsgs])). @@ -663,3 +666,23 @@ webadmin_user_parse_query(_, "removealloffline", User, Server, _Query) -> end; webadmin_user_parse_query(Acc, _Action, _User, _Server, _Query) -> Acc. + + +%% ------------------------------------------------ +%% mod_offline: number of messages quota management + +count_offline_messages(_Acc, User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + US = {LUser, LServer}, + F = fun () -> + p1_mnesia:count_records( + offline_msg, + #offline_msg{us=US, _='_'}) + end, + N = case catch mnesia:async_dirty(F) of + I when is_integer(I) -> I; + _ -> 0 + end, + {stop, N}. + diff --git a/src/mod_offline_odbc.erl b/src/mod_offline_odbc.erl index c89beda4e..cb7318e7d 100644 --- a/src/mod_offline_odbc.erl +++ b/src/mod_offline_odbc.erl @@ -41,7 +41,8 @@ get_queue_length/2, webadmin_page/3, webadmin_user/4, - webadmin_user_parse_query/5]). + webadmin_user_parse_query/5, + count_offline_messages/3]). -include("ejabberd.hrl"). -include("jlib.hrl"). @@ -75,6 +76,8 @@ start(Host, Opts) -> ?MODULE, webadmin_user, 50), ejabberd_hooks:add(webadmin_user_parse_query, Host, ?MODULE, webadmin_user_parse_query, 50), + ejabberd_hooks:add(count_offline_messages, Host, + ?MODULE, count_offline_messages, 50), AccessMaxOfflineMsgs = gen_mod:get_opt(access_max_user_messages, Opts, max_user_offline_messages), register(gen_mod:get_module_proc(Host, ?PROCNAME), spawn(?MODULE, loop, [Host, AccessMaxOfflineMsgs])). @@ -547,3 +550,30 @@ count_offline_messages(LUser, LServer) -> _ -> 0 end. + +count_offline_messages(_Acc, User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + Num = case catch ejabberd_odbc:sql_query( + LServer, + ["select xml from spool" + " where username='", LUser, "';"]) of + {selected, ["xml"], Rs} -> + lists:foldl( + fun({XML}, Acc) -> + case xml_stream:parse_element(XML) of + {error, _Reason} -> + Acc; + El -> + case xml:get_subtag(El, "body") of + false -> + Acc; + _ -> + Acc + 1 + end + end + end, 0, Rs); + _ -> + 0 + end, + {stop, Num}. diff --git a/src/xml_stream.erl b/src/xml_stream.erl index e97a61476..a918811cb 100644 --- a/src/xml_stream.erl +++ b/src/xml_stream.erl @@ -75,6 +75,9 @@ process_data(CallbackPid, Stack, Data) -> {?XML_CDATA, CData} -> case Stack of [El] -> + catch gen_fsm:send_all_state_event( + CallbackPid, + {xmlstreamcdata, CData}), [El]; %% Merge CDATA nodes if they are contiguous %% This does not change the semantic: the split in @@ -88,7 +91,8 @@ process_data(CallbackPid, Stack, Data) -> [{xmlelement, Name, Attrs, Els} | Tail] -> [{xmlelement, Name, Attrs, [{xmlcdata, CData} | Els]} | Tail]; - [] -> [] + [] -> + [] end; {?XML_ERROR, Err} -> catch gen_fsm:send_event(CallbackPid, {xmlstreamerror, Err}) From ea8aa1f25bcec81a4c337d21bc1f210550b9b634 Mon Sep 17 00:00:00 2001 From: Alexey Shchepin Date: Thu, 19 Aug 2010 15:53:44 +0300 Subject: [PATCH 2/9] clean p1:pushed tag (thanks to Mickael Remond) --- src/ejabberd_c2s.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index cb3f45cc3..4d15be94a 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -2741,8 +2741,9 @@ enqueue(StateData, From, To, Packet) -> StateData#state{pres_queue = NewQueue} end; true -> + CleanPacket = xml:remove_subtags(Packet, "x", {"xmlns", ?NS_P1_PUSHED}), Packet2 = - case Packet of + case CleanPacket of {xmlelement, "message" = Name, Attrs, Els} -> {xmlelement, Name, Attrs, Els ++ From ad00ec1518985424af69f81087fb03b77493b75f Mon Sep 17 00:00:00 2001 From: Alexey Shchepin Date: Thu, 2 Sep 2010 13:40:45 +0300 Subject: [PATCH 3/9] Added xml:remove_subtags (thanks to Mickael Remond) --- src/xml.erl | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/xml.erl b/src/xml.erl index 22746b10a..3e0157313 100644 --- a/src/xml.erl +++ b/src/xml.erl @@ -31,6 +31,7 @@ element_to_binary/1, crypt/1, make_text_node/1, remove_cdata/1, + remove_subtags/3, get_cdata/1, get_tag_cdata/1, get_attr/2, get_attr_s/2, get_tag_attr/2, get_tag_attr_s/2, @@ -186,6 +187,31 @@ remove_cdata_p(_) -> false. remove_cdata(L) -> [E || E <- L, remove_cdata_p(E)]. +%% TODO: Make more generic. +%% For now only support all parameters: +%% xml:remove_subtags({xmlelement,"message", [{"id","81be72"}],[{xmlelement,"on-sender-server",[{"xmlns","urn:xmpp:receipts"},{"server","text-one.com"}], []}]}, "on-sender-server", {"xmlns","urn:xmpp:receipts"}). +remove_subtags({xmlelement, TagName, TagAttrs, Els}, Name, Attr) -> + {xmlelement, TagName, TagAttrs, remove_subtags1(Els, [], Name, Attr)}. + +remove_subtags1([], NewEls, _Name, _Attr) -> + lists:reverse(NewEls); +remove_subtags1([El | Els], NewEls, Name, {AttrName, AttrValue} = Attr) -> + case El of + {xmlelement, Name, Attrs, _} -> + case get_attr(AttrName, Attrs) of + false -> + remove_subtags1(Els, [El|NewEls], Name, Attr); + {value, AttrValue} -> + remove_subtags1(Els, NewEls, Name, Attr); + _ -> + remove_subtags1(Els, [El|NewEls], Name, Attr) + end; + _ -> + remove_subtags1(Els, [El|NewEls], Name, Attr) + end. + + + get_cdata(L) -> binary_to_list(list_to_binary(get_cdata(L, ""))). From 6bb0dc12f12d2a17bcd9e12cf9a93f3dc3440927 Mon Sep 17 00:00:00 2001 From: Alexey Shchepin Date: Thu, 2 Sep 2010 13:41:18 +0300 Subject: [PATCH 4/9] Cut payload when it's too big --- src/mod_applepush_service.erl | 76 ++++++++++++++++++++++++----------- 1 file changed, 52 insertions(+), 24 deletions(-) diff --git a/src/mod_applepush_service.erl b/src/mod_applepush_service.erl index 8f7621b89..5469550eb 100644 --- a/src/mod_applepush_service.erl +++ b/src/mod_applepush_service.erl @@ -43,6 +43,7 @@ -define(FEEDBACK_RECONNECT_TIMEOUT, 30000). -define(MAX_QUEUE_SIZE, 1000). -define(CACHE_SIZE, 4096). +-define(MAX_PAYLOAD_SIZE, 255). -define(NS_P1_PUSH, "p1:push"). @@ -335,30 +336,7 @@ handle_message(From, To, Packet, State) -> Receiver = xml:get_path_s(Packet, [{elem, "push"}, {elem, "to"}, cdata]), - Msg2 = json_escape(Msg), - AlertPayload = - case Msg2 of - "" -> ""; - _ -> "\"alert\":\"" ++ Msg2 ++ "\"" - end, - BadgePayload = - case catch list_to_integer(Badge) of - B when is_integer(B) -> - "\"badge\":" ++ Badge; - _ -> "" - end, - SoundPayload = - case Sound of - "true" -> - SoundFile = State#state.soundfile, - "\"sound\":\"" ++ json_escape(SoundFile) ++ "\""; - _ -> "" - end, - Payloads = lists:filter(fun(S) -> S /= "" end, - [AlertPayload, BadgePayload, SoundPayload]), - Payload = - "{\"aps\":{" ++ join(Payloads, ",") ++ "}," - "\"from\":\"" ++ json_escape(Sender) ++ "\"}", + Payload = make_payload(State, Msg, Badge, Sound, Sender), ID = case catch erlang:list_to_integer(DeviceID, 16) of ID1 when is_integer(ID1) -> @@ -407,6 +385,56 @@ handle_message(From, To, Packet, State) -> State end. +make_payload(State, Msg, Badge, Sound, Sender) -> + Msg2 = json_escape(Msg), + AlertPayload = + case Msg2 of + "" -> ""; + _ -> "\"alert\":\"" ++ Msg2 ++ "\"" + end, + BadgePayload = + case catch list_to_integer(Badge) of + B when is_integer(B) -> + "\"badge\":" ++ Badge; + _ -> "" + end, + SoundPayload = + case Sound of + "true" -> + SoundFile = State#state.soundfile, + "\"sound\":\"" ++ json_escape(SoundFile) ++ "\""; + _ -> "" + end, + Payloads = lists:filter(fun(S) -> S /= "" end, + [AlertPayload, BadgePayload, SoundPayload]), + Payload = + "{\"aps\":{" ++ join(Payloads, ",") ++ "}," + "\"from\":\"" ++ json_escape(Sender) ++ "\"}", + PayloadLen = length(Payload), + if + PayloadLen > ?MAX_PAYLOAD_SIZE -> + Delta = PayloadLen - ?MAX_PAYLOAD_SIZE, + MsgLen = length(Msg), + if + MsgLen /= 0 -> + CutMsg = + if + MsgLen > Delta -> + lists:sublist(Msg, MsgLen - Delta); + true -> + "" + end, + make_payload(State, CutMsg, Badge, Sound, Sender); + true -> + Payload2 = + "{\"aps\":{" ++ join(Payloads, ",") ++ "}}", + %PayloadLen2 = length(Payload2), + Payload2 + end; + true -> + Payload + end. + connect(#state{socket = undefined} = State) -> Gateway = State#state.gateway, Port = State#state.port, From 8a693df6e6d8c55b623b23483e973481c6f55df9 Mon Sep 17 00:00:00 2001 From: Alexey Shchepin Date: Mon, 13 Sep 2010 06:19:38 +0300 Subject: [PATCH 5/9] Added badge resending functions --- src/mod_applepush.erl | 44 ++++++++++++++++++++++++++++++++++- src/mod_applepush_service.erl | 9 +++++-- 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/src/mod_applepush.erl b/src/mod_applepush.erl index b8659615c..500e7651f 100644 --- a/src/mod_applepush.erl +++ b/src/mod_applepush.erl @@ -17,7 +17,10 @@ push_notification/8, enable_offline_notification/5, disable_notification/3, - receive_offline_packet/3]). + receive_offline_packet/3, + resend_badge/1, + multi_resend_badge/1, + offline_resend_badge/0]). %% Debug commands -export([get_token_by_jid/1]). @@ -232,6 +235,45 @@ receive_offline_packet(From, To, Packet) -> ok end. +resend_badge(To) -> + Host = To#jid.lserver, + case gen_mod:is_loaded(Host, mod_applepush) of + true -> + case lookup_cache(To) of + false -> + {error, "no cached data for the user"}; + {ID, AppID, SendBody, SendFrom} -> + ?DEBUG("lookup: ~p~n", [{ID, AppID, SendBody, SendFrom}]), + PushService = get_push_service(Host, To, AppID), + ServiceJID = jlib:make_jid("", PushService, ""), + Offline = ejabberd_hooks:run_fold( + count_offline_messages, + Host, + 0, + [To#jid.luser, Host]), + Badge = integer_to_list(Offline + 1), + DeviceID = erlang:integer_to_list(ID, 16), + Packet1 = + {xmlelement, "message", [], + [{xmlelement, "push", [{"xmlns", ?NS_P1_PUSH}], + [{xmlelement, "id", [], + [{xmlcdata, DeviceID}]}, + {xmlelement, "badge", [], + [{xmlcdata, Badge}]}]}]}, + ejabberd_router:route(To, ServiceJID, Packet1) + end; + false -> + {error, "mod_applepush is not loaded"} + end. + +multi_resend_badge(JIDs) -> + lists:foreach(fun resend_badge/1, JIDs). + +offline_resend_badge() -> + USs = mnesia:dirty_all_keys(applepush_cache), + JIDs = lists:map(fun({U, S}) -> jlib:make_jid(U, S, "") end, USs), + multi_resend_badge(JIDs). + lookup_cache(JID) -> #jid{luser = LUser, lserver = LServer} = JID, LUS = {LUser, LServer}, diff --git a/src/mod_applepush_service.erl b/src/mod_applepush_service.erl index 5469550eb..30956acc1 100644 --- a/src/mod_applepush_service.erl +++ b/src/mod_applepush_service.erl @@ -408,8 +408,13 @@ make_payload(State, Msg, Badge, Sound, Sender) -> Payloads = lists:filter(fun(S) -> S /= "" end, [AlertPayload, BadgePayload, SoundPayload]), Payload = - "{\"aps\":{" ++ join(Payloads, ",") ++ "}," - "\"from\":\"" ++ json_escape(Sender) ++ "\"}", + case Sender of + "" -> + "{\"aps\":{" ++ join(Payloads, ",") ++ "}}"; + _ -> + "{\"aps\":{" ++ join(Payloads, ",") ++ "}," + "\"from\":\"" ++ json_escape(Sender) ++ "\"}" + end, PayloadLen = length(Payload), if PayloadLen > ?MAX_PAYLOAD_SIZE -> From 73f7b2ba3844745142b3a602298696c16fe635c7 Mon Sep 17 00:00:00 2001 From: Alexey Shchepin Date: Mon, 13 Sep 2010 06:22:14 +0300 Subject: [PATCH 6/9] Do not disable push on send error --- src/mod_applepush_service.erl | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/mod_applepush_service.erl b/src/mod_applepush_service.erl index 30956acc1..b2d3251ac 100644 --- a/src/mod_applepush_service.erl +++ b/src/mod_applepush_service.erl @@ -198,14 +198,16 @@ handle_info({ssl, Socket, Packet}, State) <<8, Status, CmdID:32>> when Status /= 0 -> case dict:find(CmdID, State#state.cmd_cache) of {ok, {JID, _DeviceID}} -> - From = jlib:make_jid("", State#state.host, ""), - ejabberd_router:route( - From, JID, - {xmlelement, "message", [], - [{xmlelement, "disable", - [{"xmlns", ?NS_P1_PUSH}, - {"status", integer_to_list(Status)}], - []}]}); + ?ERROR_MSG("PUSH ERROR for ~p: ~p", [JID, Status]), + %From = jlib:make_jid("", State#state.host, ""), + %ejabberd_router:route( + % From, JID, + % {xmlelement, "message", [], + % [{xmlelement, "disable", + % [{"xmlns", ?NS_P1_PUSH}, + % {"status", integer_to_list(Status)}], + % []}]}); + ok; error -> ?ERROR_MSG("Unknown cmd ID ~p~n", [CmdID]), ok From 261acfce54191f1c81262289c0fbb37522ea68e1 Mon Sep 17 00:00:00 2001 From: Alexey Shchepin Date: Mon, 13 Sep 2010 15:46:39 +0300 Subject: [PATCH 7/9] Don't resend badge if there are no offline messages --- src/mod_applepush.erl | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/mod_applepush.erl b/src/mod_applepush.erl index 500e7651f..18b8dd2b0 100644 --- a/src/mod_applepush.erl +++ b/src/mod_applepush.erl @@ -251,16 +251,21 @@ resend_badge(To) -> Host, 0, [To#jid.luser, Host]), - Badge = integer_to_list(Offline + 1), - DeviceID = erlang:integer_to_list(ID, 16), - Packet1 = - {xmlelement, "message", [], - [{xmlelement, "push", [{"xmlns", ?NS_P1_PUSH}], - [{xmlelement, "id", [], - [{xmlcdata, DeviceID}]}, - {xmlelement, "badge", [], - [{xmlcdata, Badge}]}]}]}, - ejabberd_router:route(To, ServiceJID, Packet1) + if + Offline == 0 -> + ok; + true -> + Badge = integer_to_list(Offline), + DeviceID = erlang:integer_to_list(ID, 16), + Packet1 = + {xmlelement, "message", [], + [{xmlelement, "push", [{"xmlns", ?NS_P1_PUSH}], + [{xmlelement, "id", [], + [{xmlcdata, DeviceID}]}, + {xmlelement, "badge", [], + [{xmlcdata, Badge}]}]}]}, + ejabberd_router:route(To, ServiceJID, Packet1) + end end; false -> {error, "mod_applepush is not loaded"} From c86e4faba3d988b07c43e9dc8069f4be89a48d6b Mon Sep 17 00:00:00 2001 From: Alexey Shchepin Date: Tue, 14 Sep 2010 18:00:01 +0300 Subject: [PATCH 8/9] Fixed "sender" log field --- src/mod_applepush.erl | 5 ++++- src/mod_applepush_service.erl | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/mod_applepush.erl b/src/mod_applepush.erl index 18b8dd2b0..5e97f6165 100644 --- a/src/mod_applepush.erl +++ b/src/mod_applepush.erl @@ -215,6 +215,7 @@ receive_offline_packet(From, To, Packet) -> end, Badge = integer_to_list(Offline + 1), DeviceID = erlang:integer_to_list(ID, 16), + STo = jlib:jid_to_string(To), Packet1 = {xmlelement, "message", [], [{xmlelement, "push", [{"xmlns", ?NS_P1_PUSH}], @@ -227,7 +228,9 @@ receive_offline_packet(From, To, Packet) -> {xmlelement, "sound", [], [{xmlcdata, SSound}]}, {xmlelement, "from", [], - [{xmlcdata, SFrom}]}]}]}, + [{xmlcdata, SFrom}]}, + {xmlelement, "to", [], + [{xmlcdata, STo}]}]}]}, ejabberd_router:route(To, ServiceJID, Packet1) end end; diff --git a/src/mod_applepush_service.erl b/src/mod_applepush_service.erl index b2d3251ac..425da7aac 100644 --- a/src/mod_applepush_service.erl +++ b/src/mod_applepush_service.erl @@ -370,7 +370,7 @@ handle_message(From, To, Packet, State) -> "Device ID: ~s~n", [State#state.host, erlang:integer_to_list(ID, 16), Notification, Payload, - jlib:jid_to_string(From), + Sender, Receiver, DeviceID]), case ssl:send(State#state.socket, Notification) of ok -> From f2cfee11de38bd96d329334c6d2d30ed11e049a1 Mon Sep 17 00:00:00 2001 From: Alexey Shchepin Date: Mon, 20 Sep 2010 13:35:42 +0300 Subject: [PATCH 9/9] Disable notifications for a user on "Invalid token" error --- src/mod_applepush_service.erl | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/mod_applepush_service.erl b/src/mod_applepush_service.erl index 425da7aac..7c5ab711f 100644 --- a/src/mod_applepush_service.erl +++ b/src/mod_applepush_service.erl @@ -199,14 +199,19 @@ handle_info({ssl, Socket, Packet}, State) case dict:find(CmdID, State#state.cmd_cache) of {ok, {JID, _DeviceID}} -> ?ERROR_MSG("PUSH ERROR for ~p: ~p", [JID, Status]), - %From = jlib:make_jid("", State#state.host, ""), - %ejabberd_router:route( - % From, JID, - % {xmlelement, "message", [], - % [{xmlelement, "disable", - % [{"xmlns", ?NS_P1_PUSH}, - % {"status", integer_to_list(Status)}], - % []}]}); + if + Status == 8 -> + From = jlib:make_jid("", State#state.host, ""), + ejabberd_router:route( + From, JID, + {xmlelement, "message", [], + [{xmlelement, "disable", + [{"xmlns", ?NS_P1_PUSH}, + {"status", integer_to_list(Status)}], + []}]}); + true -> + ok + end, ok; error -> ?ERROR_MSG("Unknown cmd ID ~p~n", [CmdID]),