25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-12-26 17:38:45 +01:00

Initial applepush git commit

This commit is contained in:
Alexey Shchepin 2010-08-19 15:30:39 +03:00
parent 584fa98564
commit 35cde6787d
6 changed files with 1907 additions and 36 deletions

File diff suppressed because it is too large Load Diff

381
src/mod_applepush.erl Normal file
View File

@ -0,0 +1,381 @@
%%%----------------------------------------------------------------------
%%% File : mod_applepush.erl
%%% Author : Alexey Shchepin <alexey@process-one.net>
%%% Purpose : Push module support
%%% Created : 5 Jun 2009 by Alexey Shchepin <alexey@process-one.net>
%%%
%%% ejabberd, Copyright (C) 2002-2009 ProcessOne
%%%----------------------------------------------------------------------
-module(mod_applepush).
-author('alexey@process-one.net').
-behaviour(gen_mod).
-export([start/2,
stop/1,
push_notification/8,
enable_offline_notification/5,
disable_notification/3,
receive_offline_packet/3]).
%% Debug commands
-export([get_token_by_jid/1]).
-include("ejabberd.hrl").
-include("jlib.hrl").
-include("mod_privacy.hrl").
-define(NS_P1_PUSH, "p1:push").
-define(NS_P1_PUSHED, "p1:pushed").
-define(NS_P1_ATTACHMENT, "http://process-one.net/attachement").
-record(applepush_cache, {us, device_id, options}).
start(Host, _Opts) ->
case init_host(Host) of
true ->
mnesia:create_table(
applepush_cache,
[{disc_copies, [node()]},
{attributes, record_info(fields, applepush_cache)}]),
mnesia:add_table_copy(muc_online_room, node(), ram_copies),
ejabberd_hooks:add(p1_push_notification, Host,
?MODULE, push_notification, 50),
ejabberd_hooks:add(p1_push_enable_offline, Host,
?MODULE, enable_offline_notification, 50),
ejabberd_hooks:add(p1_push_disable, Host,
?MODULE, disable_notification, 50),
ejabberd_hooks:add(offline_message_hook, Host,
?MODULE, receive_offline_packet, 35);
false ->
ok
end.
stop(Host) ->
ejabberd_hooks:delete(p1_push_notification, Host,
?MODULE, push_notification, 50),
ejabberd_hooks:delete(p1_push_disable, Host,
?MODULE, disable_notification, 50),
ejabberd_hooks:delete(offline_message_hook, Host,
?MODULE, receive_offline_packet, 35).
push_notification(Host, JID, Notification, Msg, Unread, Sound, AppID, Sender) ->
Type = xml:get_path_s(Notification, [{elem, "type"}, cdata]),
case Type of
"applepush" ->
DeviceID = xml:get_path_s(Notification, [{elem, "id"}, cdata]),
PushService = get_push_service(Host, JID, AppID),
ServiceJID = jlib:make_jid("", PushService, ""),
Badge = integer_to_list(Unread),
SSound =
if
Sound -> "true";
true -> "false"
end,
Receiver = jlib:jid_to_string(JID),
Packet =
{xmlelement, "message", [],
[{xmlelement, "push", [{"xmlns", ?NS_P1_PUSH}],
[{xmlelement, "id", [], [{xmlcdata, DeviceID}]},
{xmlelement, "msg", [], [{xmlcdata, Msg}]},
{xmlelement, "badge", [], [{xmlcdata, Badge}]},
{xmlelement, "sound", [], [{xmlcdata, SSound}]},
{xmlelement, "from", [], [{xmlcdata, Sender}]},
{xmlelement, "to", [], [{xmlcdata, Receiver}]}]}]},
ejabberd_router:route(JID, ServiceJID, Packet),
stop;
_ ->
ok
end.
enable_offline_notification(JID, Notification, SendBody, SendFrom, AppID1) ->
Type = xml:get_path_s(Notification, [{elem, "type"}, cdata]),
case Type of
"applepush" ->
DeviceID = xml:get_path_s(Notification, [{elem, "id"}, cdata]),
case catch erlang:list_to_integer(DeviceID, 16) of
ID1 when is_integer(ID1) ->
AppID =
case xml:get_path_s(Notification,
[{elem, "appid"}, cdata]) of
"" -> AppID1;
A -> A
end,
{MegaSecs, Secs, _MicroSecs} = now(),
TimeStamp = MegaSecs * 1000000 + Secs,
Options =
[{appid, AppID},
{send_body, SendBody},
{send_from, SendFrom},
{timestamp, TimeStamp}],
store_cache(JID, ID1, Options);
_ ->
ok
end,
stop;
_ ->
ok
end.
disable_notification(JID, Notification, _AppID) ->
Type = xml:get_path_s(Notification, [{elem, "type"}, cdata]),
case Type of
"applepush" ->
delete_cache(JID),
stop;
_ ->
ok
end.
receive_offline_packet(From, To, Packet) ->
?DEBUG("mod_applepush offline~n\tfrom ~p~n\tto ~p~n\tpacket ~P~n",
[From, To, Packet, 8]),
Host = To#jid.lserver,
case gen_mod:is_loaded(Host, mod_applepush) of
true ->
case lookup_cache(To) of
false ->
ok;
{ID, AppID, SendBody, SendFrom} ->
?DEBUG("lookup: ~p~n", [{ID, AppID, SendBody, SendFrom}]),
Body1 = xml:get_path_s(Packet, [{elem, "body"}, cdata]),
Body =
case check_x_attachment(Packet) of
true ->
case Body1 of
"" -> [238, 128, 136];
_ ->
[238, 128, 136, 32 | Body1]
end;
false ->
Body1
end,
Pushed = check_x_pushed(Packet),
PushService = get_push_service(Host, To, AppID),
ServiceJID = jlib:make_jid("", PushService, ""),
if
Body == "";
Pushed ->
if
From#jid.lserver == ServiceJID#jid.lserver ->
Disable =
xml:get_path_s(
Packet, [{elem, "disable"}]) /= "",
if
Disable ->
delete_cache(To);
true ->
ok
end;
true ->
ok
end,
ok;
true ->
BFrom = jlib:jid_remove_resource(From),
SFrom = jlib:jid_to_string(BFrom),
Offline = ejabberd_hooks:run_fold(
count_offline_messages,
Host,
0,
[To#jid.luser, Host]),
IncludeBody =
case SendBody of
all ->
true;
first_per_user ->
Offline == 0;
first ->
Offline == 0;
none ->
false
end,
Msg =
if
IncludeBody ->
CBody = utf8_cut(Body, 100),
case SendFrom of
jid -> SFrom ++ ": " ++ CBody;
username -> BFrom#jid.user ++ ": " ++ CBody;
_ -> CBody
end;
true ->
""
end,
SSound =
if
IncludeBody -> "true";
true -> "false"
end,
Badge = integer_to_list(Offline + 1),
DeviceID = erlang:integer_to_list(ID, 16),
Packet1 =
{xmlelement, "message", [],
[{xmlelement, "push", [{"xmlns", ?NS_P1_PUSH}],
[{xmlelement, "id", [],
[{xmlcdata, DeviceID}]},
{xmlelement, "msg", [],
[{xmlcdata, Msg}]},
{xmlelement, "badge", [],
[{xmlcdata, Badge}]},
{xmlelement, "sound", [],
[{xmlcdata, SSound}]},
{xmlelement, "from", [],
[{xmlcdata, SFrom}]}]}]},
ejabberd_router:route(To, ServiceJID, Packet1)
end
end;
false ->
ok
end.
lookup_cache(JID) ->
#jid{luser = LUser, lserver = LServer} = JID,
LUS = {LUser, LServer},
case catch mnesia:dirty_read(applepush_cache, LUS) of
[#applepush_cache{device_id = DeviceID, options = Options}] ->
AppID = proplists:get_value(appid, Options, "applepush.localhost"),
SendBody = proplists:get_value(send_body, Options, none),
SendFrom = proplists:get_value(send_from, Options, true),
{DeviceID, AppID, SendBody, SendFrom};
_ ->
false
end.
store_cache(JID, DeviceID, Options) ->
#jid{luser = LUser, lserver = LServer} = JID,
LUS = {LUser, LServer},
R = #applepush_cache{us = LUS,
device_id = DeviceID,
options = Options},
case catch mnesia:dirty_read(applepush_cache, LUS) of
[R] ->
ok;
_ ->
catch mnesia:dirty_write(R)
end.
delete_cache(JID) ->
#jid{luser = LUser, lserver = LServer} = JID,
LUS = {LUser, LServer},
catch mnesia:dirty_delete(applepush_cache, LUS).
utf8_cut(S, Bytes) ->
utf8_cut(S, [], [], Bytes + 1).
utf8_cut(_S, _Cur, Prev, 0) ->
lists:reverse(Prev);
utf8_cut([], Cur, _Prev, _Bytes) ->
lists:reverse(Cur);
utf8_cut([C | S], Cur, Prev, Bytes) ->
if
C bsr 6 == 2 ->
utf8_cut(S, [C | Cur], Prev, Bytes - 1);
true ->
utf8_cut(S, [C | Cur], Cur, Bytes - 1)
end.
check_x_pushed({xmlelement, _Name, _Attrs, Els}) ->
check_x_pushed1(Els).
check_x_pushed1([]) ->
false;
check_x_pushed1([{xmlcdata, _} | Els]) ->
check_x_pushed1(Els);
check_x_pushed1([El | Els]) ->
case xml:get_tag_attr_s("xmlns", El) of
?NS_P1_PUSHED ->
true;
_ ->
check_x_pushed1(Els)
end.
check_x_attachment({xmlelement, _Name, _Attrs, Els}) ->
check_x_attachment1(Els).
check_x_attachment1([]) ->
false;
check_x_attachment1([{xmlcdata, _} | Els]) ->
check_x_attachment1(Els);
check_x_attachment1([El | Els]) ->
case xml:get_tag_attr_s("xmlns", El) of
?NS_P1_ATTACHMENT ->
true;
_ ->
check_x_attachment1(Els)
end.
get_push_service(Host, JID, AppID) ->
PushServices =
gen_mod:get_module_opt(
Host, ?MODULE,
push_services, []),
PushService =
case lists:keysearch(AppID, 1, PushServices) of
false ->
DefaultServices =
gen_mod:get_module_opt(
Host, ?MODULE,
default_services, []),
case lists:keysearch(JID#jid.lserver, 1, DefaultServices) of
false ->
gen_mod:get_module_opt(
Host, ?MODULE,
default_service, "applepush.localhost");
{value, {_, PS}} ->
PS
end;
{value, {AppID, PS}} ->
PS
end,
PushService.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Internal module protection
-define(VALID_HOSTS, []). % default is unlimited use
-define(MAX_USERS, 0). % default is unlimited use
init_host(VHost) ->
case ?VALID_HOSTS of
[] -> % unlimited use
true;
ValidList -> % limited use
init_host(VHost, ValidList)
end.
init_host([], _) ->
false;
init_host(VHost, ValidEncryptedList) ->
EncryptedHost = erlang:md5(lists:reverse(VHost)),
case lists:member(EncryptedHost, ValidEncryptedList) of
true ->
case ?MAX_USERS of
0 -> true;
N -> ejabberd_auth:get_vh_registered_users_number(VHost) =< N
end;
false ->
case string:chr(VHost, $.) of
0 -> false;
Pos -> init_host(string:substr(VHost, Pos+1), ValidEncryptedList)
end
end.
%% Debug commands
%% JID is of form
get_token_by_jid(JIDString) ->
#jid{luser = LUser, lserver = LServer} = jlib:string_to_jid(JIDString),
LUS = {LUser, LServer},
case mnesia:dirty_read(applepush_cache, LUS) of
[{applepush_cache,_,I,_}] ->
erlang:integer_to_list(I, 16);
_ ->
undefined
end.

View File

@ -0,0 +1,558 @@
%%%----------------------------------------------------------------------
%%% File : mod_applepush_service.erl
%%% Author : Alexey Shchepin <alexey@process-one.net>
%%% Purpose : Central push infrastructure
%%% Created : 5 Jun 2009 by Alexey Shchepin <alexey@process-one.net>
%%%
%%% ejabberd, Copyright (C) 2002-2009 ProcessOne
%%%----------------------------------------------------------------------
-module(mod_applepush_service).
-author('alexey@process-one.net').
-behaviour(gen_server).
-behaviour(gen_mod).
%% API
-export([start_link/2, start/2, stop/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-include("ejabberd.hrl").
-include("jlib.hrl").
-record(state, {host,
socket,
gateway,
port,
feedback_socket,
feedback,
feedback_port,
feedback_buf = <<>>,
certfile,
queue,
soundfile,
cmd_id = 0,
cmd_cache = dict:new(),
device_cache = dict:new()}).
-define(PROCNAME, ejabberd_mod_applepush_service).
-define(RECONNECT_TIMEOUT, 5000).
-define(FEEDBACK_RECONNECT_TIMEOUT, 30000).
-define(MAX_QUEUE_SIZE, 1000).
-define(CACHE_SIZE, 4096).
-define(NS_P1_PUSH, "p1:push").
%%====================================================================
%% API
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link(Host, Opts) ->
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
gen_server:start_link({local, Proc}, ?MODULE, [Host, Opts], []).
start(Host, Opts) ->
ssl:start(),
MyHosts =
case catch gen_mod:get_opt(hosts, Opts) of
{'EXIT', _} ->
[{gen_mod:get_opt_host(Host, Opts, "applepush.@HOST@"), Opts}];
Hs ->
Hs
end,
lists:foreach(
fun({MyHost, MyOpts}) ->
Proc = gen_mod:get_module_proc(MyHost, ?PROCNAME),
ChildSpec =
{Proc,
{?MODULE, start_link, [MyHost, MyOpts]},
transient,
1000,
worker,
[?MODULE]},
supervisor:start_child(ejabberd_sup, ChildSpec)
end, MyHosts).
stop(Host) ->
MyHosts =
case catch gen_mod:get_module_opt(Host, ?MODULE, hosts, []) of
[] ->
[gen_mod:get_module_opt_host(
Host, ?MODULE, "applepush.@HOST@")];
Hs ->
[H || {H, _} <- Hs]
end,
lists:foreach(
fun(MyHost) ->
Proc = gen_mod:get_module_proc(MyHost, ?PROCNAME),
gen_server:call(Proc, stop),
supervisor:terminate_child(ejabberd_sup, Proc),
supervisor:delete_child(ejabberd_sup, Proc)
end, MyHosts).
%%====================================================================
%% gen_server callbacks
%%====================================================================
%%--------------------------------------------------------------------
%% Function: init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([MyHost, Opts]) ->
CertFile = gen_mod:get_opt(certfile, Opts, ""),
SoundFile = gen_mod:get_opt(sound_file, Opts, "pushalert.wav"),
Gateway = gen_mod:get_opt(gateway, Opts, "gateway.push.apple.com"),
Feedback = gen_mod:get_opt(feedback, Opts, undefined),
Port = gen_mod:get_opt(port, Opts, 2195),
FeedbackPort = gen_mod:get_opt(feedback_port, Opts, 2196),
%MyHost = gen_mod:get_opt_host(Host, Opts, "applepush.@HOST@"),
self() ! connect,
case Feedback of
undefined ->
ok;
_ ->
self() ! connect_feedback
end,
ejabberd_router:register_route(MyHost),
{ok, #state{host = MyHost,
gateway = Gateway,
port = Port,
feedback = Feedback,
feedback_port = FeedbackPort,
certfile = CertFile,
queue = {0, queue:new()},
soundfile = SoundFile}}.
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
handle_cast(_Msg, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
handle_info({route, From, To, Packet}, State) ->
case catch do_route(From, To, Packet, State) of
{'EXIT', Reason} ->
?ERROR_MSG("~p", [Reason]),
{noreply, State};
Res ->
Res
end;
handle_info(connect, State) ->
connect(State);
handle_info(connect_feedback, State)
when State#state.feedback /= undefined,
State#state.feedback_socket == undefined ->
Feedback = State#state.feedback,
FeedbackPort = State#state.feedback_port,
CertFile = State#state.certfile,
case ssl:connect(Feedback, FeedbackPort,
[{certfile, CertFile},
{active, true},
binary]) of
{ok, Socket} ->
{noreply, State#state{feedback_socket = Socket}};
{error, Reason} ->
?ERROR_MSG("(~p) Connection to ~p:~p failed: ~p, "
"retrying after ~p seconds",
[State#state.host, Feedback, FeedbackPort,
Reason, ?FEEDBACK_RECONNECT_TIMEOUT div 1000]),
erlang:send_after(?FEEDBACK_RECONNECT_TIMEOUT, self(),
connect_feedback),
{noreply, State}
end;
handle_info({ssl, Socket, Packet}, State)
when Socket == State#state.socket ->
case Packet of
<<8, Status, CmdID:32>> when Status /= 0 ->
case dict:find(CmdID, State#state.cmd_cache) of
{ok, {JID, _DeviceID}} ->
From = jlib:make_jid("", State#state.host, ""),
ejabberd_router:route(
From, JID,
{xmlelement, "message", [],
[{xmlelement, "disable",
[{"xmlns", ?NS_P1_PUSH},
{"status", integer_to_list(Status)}],
[]}]});
error ->
?ERROR_MSG("Unknown cmd ID ~p~n", [CmdID]),
ok
end;
_ ->
?ERROR_MSG("Received unknown packet ~p~n", [Packet])
end,
{noreply, State};
handle_info({ssl, Socket, Packet}, State)
when Socket == State#state.feedback_socket ->
Buf = <<(State#state.feedback_buf)/binary, Packet/binary>>,
Buf2 = parse_feedback_buf(Buf, State),
{noreply, State#state{feedback_buf = Buf2}};
handle_info({ssl_closed, Socket}, State)
when Socket == State#state.feedback_socket ->
ssl:close(Socket),
erlang:send_after(?FEEDBACK_RECONNECT_TIMEOUT, self(),
connect_feedback),
{noreply, State#state{feedback_socket = undefined,
feedback_buf = <<>>}};
handle_info(_Info, State) ->
%io:format("got info: ~p~n", [_Info]),
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: terminate(Reason, State) -> void()
%% Description: This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, State) ->
ejabberd_router:unregister_route(State#state.host),
ok.
%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
%% Description: Convert process state when code is changed
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------
do_route(From, To, Packet, State) ->
#jid{user = User, resource = Resource} = To,
if
(User /= "") or (Resource /= "") ->
Err = jlib:make_error_reply(Packet, ?ERR_SERVICE_UNAVAILABLE),
ejabberd_router:route(To, From, Err),
{noreply, State};
true ->
case Packet of
{xmlelement, "iq", _, _} ->
IQ = jlib:iq_query_info(Packet),
case IQ of
#iq{type = get, xmlns = ?NS_DISCO_INFO = XMLNS,
sub_el = _SubEl, lang = Lang} = IQ ->
Res = IQ#iq{type = result,
sub_el = [{xmlelement, "query",
[{"xmlns", XMLNS}],
iq_disco(Lang)}]},
ejabberd_router:route(To,
From,
jlib:iq_to_xml(Res)),
{noreply, State};
#iq{type = get, xmlns = ?NS_DISCO_ITEMS = XMLNS} = IQ ->
Res = IQ#iq{type = result,
sub_el = [{xmlelement, "query",
[{"xmlns", XMLNS}],
[]}]},
ejabberd_router:route(To,
From,
jlib:iq_to_xml(Res)),
{noreply, State};
%%#iq{type = get, xmlns = ?NS_VCARD, lang = Lang} ->
%% ResIQ =
%% IQ#iq{type = result,
%% sub_el = [{xmlelement,
%% "vCard",
%% [{"xmlns", ?NS_VCARD}],
%% iq_get_vcard(Lang)}]},
%% ejabberd_router:route(To,
%% From,
%% jlib:iq_to_xml(ResIQ));
_ ->
Err = jlib:make_error_reply(Packet,
?ERR_SERVICE_UNAVAILABLE),
ejabberd_router:route(To, From, Err),
{noreply, State}
end;
{xmlelement, "message", _, Els} ->
case xml:remove_cdata(Els) of
[{xmlelement, "push", _, _}] ->
NewState = handle_message(From, To, Packet, State),
{noreply, NewState};
[{xmlelement, "disable", _, _}] ->
{noreply, State};
_ ->
{noreply, State}
end;
_ ->
{noreply, State}
end
end.
handle_message(From, To, Packet, #state{socket = undefined} = State) ->
queue_message(From, To, Packet, State);
handle_message(From, To, Packet, State) ->
DeviceID =
xml:get_path_s(Packet,
[{elem, "push"}, {elem, "id"}, cdata]),
Msg =
xml:get_path_s(Packet,
[{elem, "push"}, {elem, "msg"}, cdata]),
Badge =
xml:get_path_s(Packet,
[{elem, "push"}, {elem, "badge"}, cdata]),
Sound =
xml:get_path_s(Packet,
[{elem, "push"}, {elem, "sound"}, cdata]),
Sender =
xml:get_path_s(Packet,
[{elem, "push"}, {elem, "from"}, cdata]),
Receiver =
xml:get_path_s(Packet,
[{elem, "push"}, {elem, "to"}, cdata]),
Msg2 = json_escape(Msg),
AlertPayload =
case Msg2 of
"" -> "";
_ -> "\"alert\":\"" ++ Msg2 ++ "\""
end,
BadgePayload =
case catch list_to_integer(Badge) of
B when is_integer(B) ->
"\"badge\":" ++ Badge;
_ -> ""
end,
SoundPayload =
case Sound of
"true" ->
SoundFile = State#state.soundfile,
"\"sound\":\"" ++ json_escape(SoundFile) ++ "\"";
_ -> ""
end,
Payloads = lists:filter(fun(S) -> S /= "" end,
[AlertPayload, BadgePayload, SoundPayload]),
Payload =
"{\"aps\":{" ++ join(Payloads, ",") ++ "},"
"\"from\":\"" ++ json_escape(Sender) ++ "\"}",
ID =
case catch erlang:list_to_integer(DeviceID, 16) of
ID1 when is_integer(ID1) ->
ID1;
_ ->
false
end,
if
is_integer(ID) ->
Command = 1,
CmdID = State#state.cmd_id,
{MegaSecs, Secs, _MicroSecs} = now(),
Expiry = MegaSecs * 1000000 + Secs + 24 * 60 * 60,
BDeviceID = <<ID:256>>,
BPayload = list_to_binary(Payload),
IDLen = size(BDeviceID),
PayloadLen = size(BPayload),
Notification =
<<Command:8,
CmdID:32,
Expiry:32,
IDLen:16,
BDeviceID/binary,
PayloadLen:16,
BPayload/binary>>,
?INFO_MSG("(~p) sending notification for ~s~n~p~npayload:~n~s~n"
"Sender: ~s~n"
"Receiver: ~s~n"
"Device ID: ~s~n",
[State#state.host, erlang:integer_to_list(ID, 16),
Notification, Payload,
jlib:jid_to_string(From),
Receiver, DeviceID]),
case ssl:send(State#state.socket, Notification) of
ok ->
cache(From, ID, State);
{error, Reason} ->
?INFO_MSG("(~p) Connection closed: ~p, reconnecting",
[State#state.host, Reason]),
ssl:close(State#state.socket),
self() ! connect,
queue_message(From, To, Packet,
State#state{socket = undefined})
end;
true ->
State
end.
connect(#state{socket = undefined} = State) ->
Gateway = State#state.gateway,
Port = State#state.port,
CertFile = State#state.certfile,
case ssl:connect(Gateway, Port, [{certfile, CertFile},
{active, true},
binary]) of
{ok, Socket} ->
{noreply, resend_messages(State#state{socket = Socket})};
{error, Reason} ->
?ERROR_MSG("(~p) Connection to ~p:~p failed: ~p, "
"retrying after ~p seconds",
[State#state.host, Gateway, Port,
Reason, ?RECONNECT_TIMEOUT div 1000]),
erlang:send_after(?RECONNECT_TIMEOUT, self(), connect),
{noreply, State}
end;
connect(State) ->
{noreply, State}.
bounce_message(From, To, Packet, Reason) ->
{xmlelement, _, Attrs, _} = Packet,
Type = xml:get_attr_s("type", Attrs),
if Type /= "error"; Type /= "result" ->
ejabberd_router:route(
To, From,
jlib:make_error_reply(
Packet,
?ERRT_INTERNAL_SERVER_ERROR(
xml:get_attr_s("xml:lang", Attrs),
Reason)));
true ->
ok
end.
queue_message(From, To, Packet, State) ->
case State#state.queue of
{?MAX_QUEUE_SIZE, Queue} ->
{{value, {From1, To1, Packet1}}, Queue1} = queue:out(Queue),
bounce_message(From1, To1, Packet1,
"Unable to connect to push service"),
Queue2 = queue:in({From, To, Packet}, Queue1),
State#state{queue = {?MAX_QUEUE_SIZE, Queue2}};
{Size, Queue} ->
Queue1 = queue:in({From, To, Packet}, Queue),
State#state{queue = {Size+1, Queue1}}
end.
resend_messages(#state{queue = {_, Queue}} = State) ->
lists:foldl(
fun({From, To, Packet}, AccState) ->
case catch handle_message(From, To, Packet, AccState) of
{'EXIT', _} = Err ->
?ERROR_MSG("error while processing message:~n"
"** From: ~p~n"
"** To: ~p~n"
"** Packet: ~p~n"
"** Reason: ~p",
[From, To, Packet, Err]),
AccState;
NewAccState ->
NewAccState
end
end, State#state{queue = {0, queue:new()}}, queue:to_list(Queue)).
cache(JID, DeviceID, State) ->
CmdID = State#state.cmd_id,
Key = CmdID rem ?CACHE_SIZE,
C1 = State#state.cmd_cache,
D1 = State#state.device_cache,
D2 = case dict:find(Key, C1) of
{ok, {_, OldDeviceID}} ->
del_device_cache(D1, OldDeviceID);
error ->
D1
end,
D3 = add_device_cache(D2, DeviceID, JID),
C2 = dict:store(Key, {JID, DeviceID}, C1),
State#state{cmd_id = CmdID + 1,
cmd_cache = C2,
device_cache = D3}.
add_device_cache(DeviceCache, DeviceID, JID) ->
dict:update(
DeviceID,
fun({Counter, _}) -> {Counter + 1, JID} end,
{1, JID},
DeviceCache).
del_device_cache(DeviceCache, DeviceID) ->
case dict:find(DeviceID, DeviceCache) of
{ok, {Counter, JID}} ->
case Counter of
1 ->
dict:erase(DeviceID, DeviceCache);
_ ->
dict:store(DeviceID, {Counter - 1, JID}, DeviceCache)
end;
error ->
DeviceCache
end.
json_escape(S) ->
[case C of
$" -> "\\\"";
$\\ -> "\\\\";
_ when C < 16 -> ["\\u000", erlang:integer_to_list(C, 16)];
_ when C < 32 -> ["\\u00", erlang:integer_to_list(C, 16)];
_ -> C
end || C <- S].
join(List, Sep) ->
lists:foldr(fun(A, "") -> A;
(A, Acc) -> A ++ Sep ++ Acc
end, "", List).
iq_disco(Lang) ->
[{xmlelement, "identity",
[{"category", "gateway"},
{"type", "apple"},
{"name", translate:translate(Lang, "Apple Push Service")}], []},
{xmlelement, "feature", [{"var", ?NS_DISCO_INFO}], []}].
parse_feedback_buf(Buf, State) ->
case Buf of
<<TimeStamp:32, IDLen:16, BDeviceID:IDLen/binary, Rest/binary>> ->
IDLen8 = IDLen * 8,
<<DeviceID:IDLen8>> = BDeviceID,
case dict:find(DeviceID, State#state.device_cache) of
{ok, {_Counter, JID}} ->
From = jlib:make_jid("", State#state.host, ""),
ejabberd_router:route(
From, JID,
{xmlelement, "message", [],
[{xmlelement, "disable",
[{"xmlns", ?NS_P1_PUSH},
{"status", "feedback"},
{"ts", integer_to_list(TimeStamp)}],
[]}]});
error ->
ok
end,
parse_feedback_buf(Rest, State);
_ ->
Buf
end.

View File

@ -42,7 +42,8 @@
get_queue_length/2,
webadmin_page/3,
webadmin_user/4,
webadmin_user_parse_query/5]).
webadmin_user_parse_query/5,
count_offline_messages/3]).
-include("ejabberd.hrl").
-include("jlib.hrl").
@ -81,6 +82,8 @@ start(Host, Opts) ->
?MODULE, webadmin_user, 50),
ejabberd_hooks:add(webadmin_user_parse_query, Host,
?MODULE, webadmin_user_parse_query, 50),
ejabberd_hooks:add(count_offline_messages, Host,
?MODULE, count_offline_messages, 50),
AccessMaxOfflineMsgs = gen_mod:get_opt(access_max_user_messages, Opts, max_user_offline_messages),
register(gen_mod:get_module_proc(Host, ?PROCNAME),
spawn(?MODULE, loop, [AccessMaxOfflineMsgs])).
@ -663,3 +666,23 @@ webadmin_user_parse_query(_, "removealloffline", User, Server, _Query) ->
end;
webadmin_user_parse_query(Acc, _Action, _User, _Server, _Query) ->
Acc.
%% ------------------------------------------------
%% mod_offline: number of messages quota management
count_offline_messages(_Acc, User, Server) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
US = {LUser, LServer},
F = fun () ->
p1_mnesia:count_records(
offline_msg,
#offline_msg{us=US, _='_'})
end,
N = case catch mnesia:async_dirty(F) of
I when is_integer(I) -> I;
_ -> 0
end,
{stop, N}.

View File

@ -41,7 +41,8 @@
get_queue_length/2,
webadmin_page/3,
webadmin_user/4,
webadmin_user_parse_query/5]).
webadmin_user_parse_query/5,
count_offline_messages/3]).
-include("ejabberd.hrl").
-include("jlib.hrl").
@ -75,6 +76,8 @@ start(Host, Opts) ->
?MODULE, webadmin_user, 50),
ejabberd_hooks:add(webadmin_user_parse_query, Host,
?MODULE, webadmin_user_parse_query, 50),
ejabberd_hooks:add(count_offline_messages, Host,
?MODULE, count_offline_messages, 50),
AccessMaxOfflineMsgs = gen_mod:get_opt(access_max_user_messages, Opts, max_user_offline_messages),
register(gen_mod:get_module_proc(Host, ?PROCNAME),
spawn(?MODULE, loop, [Host, AccessMaxOfflineMsgs])).
@ -547,3 +550,30 @@ count_offline_messages(LUser, LServer) ->
_ ->
0
end.
count_offline_messages(_Acc, User, Server) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
Num = case catch ejabberd_odbc:sql_query(
LServer,
["select xml from spool"
" where username='", LUser, "';"]) of
{selected, ["xml"], Rs} ->
lists:foldl(
fun({XML}, Acc) ->
case xml_stream:parse_element(XML) of
{error, _Reason} ->
Acc;
El ->
case xml:get_subtag(El, "body") of
false ->
Acc;
_ ->
Acc + 1
end
end
end, 0, Rs);
_ ->
0
end,
{stop, Num}.

View File

@ -75,6 +75,9 @@ process_data(CallbackPid, Stack, Data) ->
{?XML_CDATA, CData} ->
case Stack of
[El] ->
catch gen_fsm:send_all_state_event(
CallbackPid,
{xmlstreamcdata, CData}),
[El];
%% Merge CDATA nodes if they are contiguous
%% This does not change the semantic: the split in
@ -88,7 +91,8 @@ process_data(CallbackPid, Stack, Data) ->
[{xmlelement, Name, Attrs, Els} | Tail] ->
[{xmlelement, Name, Attrs, [{xmlcdata, CData} | Els]} |
Tail];
[] -> []
[] ->
[]
end;
{?XML_ERROR, Err} ->
catch gen_fsm:send_event(CallbackPid, {xmlstreamerror, Err})