From ebadcf71c2e12e379afd31829c58717565d541c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chmielowski?= Date: Tue, 22 Nov 2016 13:17:05 +0100 Subject: [PATCH] New bosh module --- src/ejabberd_bosh.erl | 1095 +++++++++++++++++++++++++++++++++++++++ src/ejabberd_socket.erl | 6 +- src/mod_bosh.erl | 296 +++++++++++ 3 files changed, 1395 insertions(+), 2 deletions(-) create mode 100644 src/ejabberd_bosh.erl create mode 100644 src/mod_bosh.erl diff --git a/src/ejabberd_bosh.erl b/src/ejabberd_bosh.erl new file mode 100644 index 000000000..d4fc6809d --- /dev/null +++ b/src/ejabberd_bosh.erl @@ -0,0 +1,1095 @@ +%%%------------------------------------------------------------------- +%%% File : ejabberd_bosh.erl +%%% Author : Evgeniy Khramtsov +%%% Purpose : Manage BOSH sockets +%%% Created : 20 Jul 2011 by Evgeniy Khramtsov +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2016 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%------------------------------------------------------------------- +-module(ejabberd_bosh). + +-protocol({xep, 124, '1.11'}). +-protocol({xep, 206, '1.4'}). + +-define(GEN_FSM, p1_fsm). + +-behaviour(?GEN_FSM). + +%% API +-export([start/2, start/3, start_link/3]). + +-export([send_xml/2, setopts/2, controlling_process/2, + migrate/3, custom_receiver/1, become_controller/2, + reset_stream/1, change_shaper/2, monitor/1, close/1, + sockname/1, peername/1, process_request/3, send/2, + change_controller/2]). + +%% gen_fsm callbacks +-export([init/1, wait_for_session/2, wait_for_session/3, + active/2, active/3, handle_event/3, print_state/1, + handle_sync_event/4, handle_info/3, terminate/3, + code_change/4]). + +-include("ejabberd.hrl"). +-include("logger.hrl"). + +-include("jlib.hrl"). + +-include("ejabberd_http.hrl"). + +-include("bosh.hrl"). + +%%-define(DBGFSM, true). +-ifdef(DBGFSM). + +-define(FSMOPTS, [{debug, [trace]}]). + +-else. + +-define(FSMOPTS, []). + +-endif. + +-define(BOSH_VERSION, <<"1.11">>). + +-define(NS_BOSH, <<"urn:xmpp:xbosh">>). + +-define(NS_HTTP_BIND, + <<"http://jabber.org/protocol/httpbind">>). + +-define(DEFAULT_MAXPAUSE, 120). + +-define(DEFAULT_WAIT, 300). + +-define(DEFAULT_HOLD, 1). + +-define(DEFAULT_POLLING, 2). + +-define(DEFAULT_INACTIVITY, 30). + +-define(MAX_SHAPED_REQUESTS_QUEUE_LEN, 1000). + +-define(SEND_TIMEOUT, 15000). + +-type bosh_socket() :: {http_bind, pid(), + {inet:ip_address(), + inet:port_number()}}. + +-export_type([bosh_socket/0]). + +-record(state, + {host = <<"">> :: binary(), + sid = <<"">> :: binary(), + el_ibuf = buf_new() :: ?TQUEUE, + el_obuf = buf_new() :: ?TQUEUE, + shaper_state = none :: shaper:shaper(), + c2s_pid :: pid(), + xmpp_ver = <<"">> :: binary(), + inactivity_timer :: reference(), + wait_timer :: reference(), + wait_timeout = ?DEFAULT_WAIT :: timeout(), + inactivity_timeout = ?DEFAULT_INACTIVITY :: timeout(), + prev_rid = 0 :: non_neg_integer(), + prev_key = <<"">> :: binary(), + prev_poll :: erlang:timestamp(), + max_concat = unlimited :: unlimited | non_neg_integer(), + responses = gb_trees:empty() :: ?TGB_TREE, + receivers = gb_trees:empty() :: ?TGB_TREE, + shaped_receivers = queue:new() :: ?TQUEUE, + ip :: inet:ip_address(), + max_requests = 1 :: non_neg_integer()}). + +-record(body, + {http_reason = <<"">> :: binary(), + attrs = [] :: [{any(), any()}], + els = [] :: [fxml_stream:xml_stream_el()], + size = 0 :: non_neg_integer()}). + +start(#body{attrs = Attrs} = Body, IP, SID) -> + XMPPDomain = get_attr(to, Attrs), + SupervisorProc = gen_mod:get_module_proc(XMPPDomain, ?PROCNAME), + case catch supervisor:start_child(SupervisorProc, + [Body, IP, SID]) + of + {ok, Pid} -> {ok, Pid}; + {'EXIT', {noproc, _}} -> + check_bosh_module(XMPPDomain), + {error, module_not_loaded}; + Err -> + ?ERROR_MSG("Failed to start BOSH session: ~p", [Err]), + {error, Err} + end. + +start(StateName, State) -> + (?GEN_FSM):start_link(?MODULE, [StateName, State], + ?FSMOPTS). + +start_link(Body, IP, SID) -> + (?GEN_FSM):start_link(?MODULE, [Body, IP, SID], + ?FSMOPTS). + +send({http_bind, FsmRef, IP}, Packet) -> + send_xml({http_bind, FsmRef, IP}, Packet). + +send_xml({http_bind, FsmRef, _IP}, Packet) -> + case catch (?GEN_FSM):sync_send_all_state_event(FsmRef, + {send_xml, Packet}, + ?SEND_TIMEOUT) + of + {'EXIT', {timeout, _}} -> {error, timeout}; + {'EXIT', _} -> {error, einval}; + Res -> Res + end. + +setopts({http_bind, FsmRef, _IP}, Opts) -> + case lists:member({active, once}, Opts) of + true -> + (?GEN_FSM):send_all_state_event(FsmRef, + {activate, self()}); + _ -> + case lists:member({active, false}, Opts) of + true -> + case catch (?GEN_FSM):sync_send_all_state_event(FsmRef, + deactivate_socket) + of + {'EXIT', _} -> {error, einval}; + Res -> Res + end; + _ -> ok + end + end. + +controlling_process(_Socket, _Pid) -> ok. + +custom_receiver({http_bind, FsmRef, _IP}) -> + {receiver, ?MODULE, FsmRef}. + +become_controller(FsmRef, C2SPid) -> + (?GEN_FSM):send_all_state_event(FsmRef, + {become_controller, C2SPid}). + +change_controller({http_bind, FsmRef, _IP}, C2SPid) -> + become_controller(FsmRef, C2SPid). + +reset_stream({http_bind, _FsmRef, _IP}) -> ok. + +change_shaper({http_bind, FsmRef, _IP}, Shaper) -> + (?GEN_FSM):send_all_state_event(FsmRef, + {change_shaper, Shaper}). + +monitor({http_bind, FsmRef, _IP}) -> + erlang:monitor(process, FsmRef). + +close({http_bind, FsmRef, _IP}) -> + catch (?GEN_FSM):sync_send_all_state_event(FsmRef, + close). + +sockname(_Socket) -> {ok, {{0, 0, 0, 0}, 0}}. + +peername({http_bind, _FsmRef, IP}) -> {ok, IP}. + +migrate(FsmRef, Node, After) when node(FsmRef) == node() -> + catch erlang:send_after(After, FsmRef, {migrate, Node}); +migrate(_FsmRef, _Node, _After) -> + ok. + +process_request(Data, IP, Type) -> + Opts1 = ejabberd_c2s_config:get_c2s_limits(), + Opts = case Type of + xml -> + [{xml_socket, true} | Opts1]; + json -> + Opts1 + end, + MaxStanzaSize = case lists:keysearch(max_stanza_size, 1, + Opts) + of + {value, {_, Size}} -> Size; + _ -> infinity + end, + PayloadSize = iolist_size(Data), + if PayloadSize > MaxStanzaSize -> + http_error(403, <<"Request Too Large">>, Type); + true -> + case decode_body(Data, PayloadSize, Type) of + {ok, #body{attrs = Attrs} = Body} -> + SID = get_attr(sid, Attrs), + To = get_attr(to, Attrs), + if SID == <<"">>, To == <<"">> -> + bosh_response_with_msg(#body{http_reason = + <<"Missing 'to' attribute">>, + attrs = + [{type, <<"terminate">>}, + {condition, + <<"improper-addressing">>}]}, + Type, Body); + SID == <<"">> -> + case start(Body, IP, make_sid()) of + {ok, Pid} -> process_request(Pid, Body, IP, Type); + _Err -> + bosh_response_with_msg(#body{http_reason = + <<"Failed to start BOSH session">>, + attrs = + [{type, <<"terminate">>}, + {condition, + <<"internal-server-error">>}]}, + Type, Body) + end; + true -> + case mod_bosh:find_session(SID) of + {ok, Pid} -> process_request(Pid, Body, IP, Type); + error -> + bosh_response_with_msg(#body{http_reason = + <<"Session ID mismatch">>, + attrs = + [{type, <<"terminate">>}, + {condition, + <<"item-not-found">>}]}, + Type, Body) + end + end; + {error, Reason} -> http_error(400, Reason, Type) + end + end. + +process_request(Pid, Req, _IP, Type) -> + case catch (?GEN_FSM):sync_send_event(Pid, Req, + infinity) + of + #body{} = Resp -> bosh_response(Resp, Type); + {'EXIT', {Reason, _}} + when Reason == noproc; Reason == normal -> + bosh_response(#body{http_reason = + <<"BOSH session not found">>, + attrs = + [{type, <<"terminate">>}, + {condition, <<"item-not-found">>}]}, + Type); + {'EXIT', _} -> + bosh_response(#body{http_reason = + <<"Unexpected error">>, + attrs = + [{type, <<"terminate">>}, + {condition, <<"internal-server-error">>}]}, + Type) + end. + +init([#body{attrs = Attrs}, IP, SID]) -> + Opts1 = ejabberd_c2s_config:get_c2s_limits(), + Opts2 = [{xml_socket, true} | Opts1], + Shaper = none, + ShaperState = shaper:new(Shaper), + Socket = make_socket(self(), IP), + XMPPVer = get_attr('xmpp:version', Attrs), + XMPPDomain = get_attr(to, Attrs), + {InBuf, Opts} = case gen_mod:get_module_opt( + XMPPDomain, + mod_bosh, prebind, + fun(B) when is_boolean(B) -> B end, + false) of + true -> + JID = make_random_jid(XMPPDomain), + {buf_new(), [{jid, JID} | Opts2]}; + false -> + {buf_in([make_xmlstreamstart(XMPPDomain, XMPPVer)], + buf_new()), + Opts2} + end, + ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket, + Opts), + Inactivity = gen_mod:get_module_opt(XMPPDomain, + mod_bosh, max_inactivity, + fun(I) when is_integer(I), I>0 -> I end, + ?DEFAULT_INACTIVITY), + MaxConcat = gen_mod:get_module_opt(XMPPDomain, mod_bosh, max_concat, + fun(unlimited) -> unlimited; + (N) when is_integer(N), N>0 -> N + end, unlimited), + State = #state{host = XMPPDomain, sid = SID, ip = IP, + xmpp_ver = XMPPVer, el_ibuf = InBuf, + max_concat = MaxConcat, el_obuf = buf_new(), + inactivity_timeout = Inactivity, + shaper_state = ShaperState}, + NewState = restart_inactivity_timer(State), + mod_bosh:open_session(SID, self()), + {ok, wait_for_session, NewState}; +init([StateName, State]) -> + mod_bosh:open_session(State#state.sid, self()), + case State#state.c2s_pid of + C2SPid when is_pid(C2SPid) -> + NewSocket = make_socket(self(), State#state.ip), + C2SPid ! {change_socket, NewSocket}, + NewState = restart_inactivity_timer(State), + {ok, StateName, NewState}; + _ -> {stop, normal} + end. + +wait_for_session(_Event, State) -> + ?ERROR_MSG("unexpected event in 'wait_for_session': ~p", + [_Event]), + {next_state, wait_for_session, State}. + +wait_for_session(#body{attrs = Attrs} = Req, From, + State) -> + RID = get_attr(rid, Attrs), + ?DEBUG("got request:~n** RequestID: ~p~n** Request: " + "~p~n** From: ~p~n** State: ~p", + [RID, Req, From, State]), + Wait = min(get_attr(wait, Attrs, undefined), + ?DEFAULT_WAIT), + Hold = min(get_attr(hold, Attrs, undefined), + ?DEFAULT_HOLD), + NewKey = get_attr(newkey, Attrs), + Type = get_attr(type, Attrs), + Requests = Hold + 1, + {PollTime, Polling} = if Wait == 0, Hold == 0 -> + {p1_time_compat:timestamp(), [{polling, ?DEFAULT_POLLING}]}; + true -> {undefined, []} + end, + MaxPause = gen_mod:get_module_opt(State#state.host, + mod_bosh, max_pause, + fun(I) when is_integer(I), I>0 -> I end, + ?DEFAULT_MAXPAUSE), + Resp = #body{attrs = + [{sid, State#state.sid}, {wait, Wait}, + {ver, ?BOSH_VERSION}, {polling, ?DEFAULT_POLLING}, + {inactivity, State#state.inactivity_timeout}, + {hold, Hold}, {'xmpp:restartlogic', true}, + {requests, Requests}, {secure, true}, + {maxpause, MaxPause}, {'xmlns:xmpp', ?NS_BOSH}, + {'xmlns:stream', ?NS_STREAM}, {from, State#state.host} + | Polling]}, + {ShaperState, _} = + shaper:update(State#state.shaper_state, Req#body.size), + State1 = State#state{wait_timeout = Wait, + prev_rid = RID, prev_key = NewKey, + prev_poll = PollTime, shaper_state = ShaperState, + max_requests = Requests}, + Els = maybe_add_xmlstreamend(Req#body.els, Type), + State2 = route_els(State1, Els), + {State3, RespEls} = get_response_els(State2), + State4 = stop_inactivity_timer(State3), + case RespEls of + [] -> + State5 = restart_wait_timer(State4), + Receivers = gb_trees:insert(RID, {From, Resp}, + State5#state.receivers), + {next_state, active, + State5#state{receivers = Receivers}}; + _ -> + reply_next_state(State4, Resp#body{els = RespEls}, RID, + From) + end; +wait_for_session(_Event, _From, State) -> + ?ERROR_MSG("unexpected sync event in 'wait_for_session': ~p", + [_Event]), + {reply, {error, badarg}, wait_for_session, State}. + +active({#body{} = Body, From}, State) -> + active1(Body, From, State); +active(_Event, State) -> + ?ERROR_MSG("unexpected event in 'active': ~p", + [_Event]), + {next_state, active, State}. + +active(#body{attrs = Attrs, size = Size} = Req, From, + State) -> + ?DEBUG("got request:~n** Request: ~p~n** From: " + "~p~n** State: ~p", + [Req, From, State]), + {ShaperState, Pause} = + shaper:update(State#state.shaper_state, Size), + State1 = State#state{shaper_state = ShaperState}, + if Pause > 0 -> + QLen = queue:len(State1#state.shaped_receivers), + if QLen < (?MAX_SHAPED_REQUESTS_QUEUE_LEN) -> + TRef = start_shaper_timer(Pause), + Q = queue:in({TRef, From, Req}, + State1#state.shaped_receivers), + State2 = stop_inactivity_timer(State1), + {next_state, active, + State2#state{shaped_receivers = Q}}; + true -> + RID = get_attr(rid, Attrs), + reply_stop(State1, + #body{http_reason = <<"Too many requests">>, + attrs = + [{<<"type">>, <<"terminate">>}, + {<<"condition">>, + <<"policy-violation">>}]}, + From, RID) + end; + true -> active1(Req, From, State1) + end; +active(_Event, _From, State) -> + ?ERROR_MSG("unexpected sync event in 'active': ~p", + [_Event]), + {reply, {error, badarg}, active, State}. + +active1(#body{attrs = Attrs} = Req, From, State) -> + RID = get_attr(rid, Attrs), + Key = get_attr(key, Attrs), + IsValidKey = is_valid_key(State#state.prev_key, Key), + IsOveractivity = is_overactivity(State#state.prev_poll), + Type = get_attr(type, Attrs), + if RID > + State#state.prev_rid + State#state.max_requests -> + reply_stop(State, + #body{http_reason = <<"Request ID is out of range">>, + attrs = + [{<<"type">>, <<"terminate">>}, + {<<"condition">>, <<"item-not-found">>}]}, + From, RID); + RID > State#state.prev_rid + 1 -> + State1 = restart_inactivity_timer(State), + Receivers = gb_trees:insert(RID, {From, Req}, + State1#state.receivers), + {next_state, active, + State1#state{receivers = Receivers}}; + RID =< State#state.prev_rid -> + %% TODO: do we need to check 'key' here? It seems so... + case gb_trees:lookup(RID, State#state.responses) of + {value, PrevBody} -> + {next_state, active, + do_reply(State, From, PrevBody, RID)}; + none -> + State1 = drop_holding_receiver(State), + State2 = stop_inactivity_timer(State1), + State3 = restart_wait_timer(State2), + Receivers = gb_trees:insert(RID, {From, Req}, + State3#state.receivers), + {next_state, active, State3#state{receivers = Receivers}} + end; + not IsValidKey -> + reply_stop(State, + #body{http_reason = <<"Session key mismatch">>, + attrs = + [{<<"type">>, <<"terminate">>}, + {<<"condition">>, <<"item-not-found">>}]}, + From, RID); + IsOveractivity -> + reply_stop(State, + #body{http_reason = <<"Too many requests">>, + attrs = + [{<<"type">>, <<"terminate">>}, + {<<"condition">>, <<"policy-violation">>}]}, + From, RID); + true -> + State1 = stop_inactivity_timer(State), + State2 = stop_wait_timer(State1), + Els = case get_attr('xmpp:restart', Attrs, false) of + true -> + XMPPDomain = get_attr(to, Attrs, State#state.host), + XMPPVer = get_attr('xmpp:version', Attrs, + State#state.xmpp_ver), + [make_xmlstreamstart(XMPPDomain, XMPPVer)]; + false -> Req#body.els + end, + State3 = route_els(State2, + maybe_add_xmlstreamend(Els, Type)), + {State4, RespEls} = get_response_els(State3), + NewKey = get_attr(newkey, Attrs, Key), + Pause = get_attr(pause, Attrs, undefined), + NewPoll = case State#state.prev_poll of + undefined -> undefined; + _ -> p1_time_compat:timestamp() + end, + State5 = State4#state{prev_poll = NewPoll, + prev_key = NewKey}, + if Type == <<"terminate">> -> + reply_stop(State5, + #body{http_reason = <<"Session close">>, + attrs = [{<<"type">>, <<"terminate">>}], + els = RespEls}, + From, RID); + Pause /= undefined -> + State6 = drop_holding_receiver(State5), + State7 = restart_inactivity_timer(State6, Pause), + InBuf = buf_in(RespEls, State7#state.el_ibuf), + {next_state, active, + State7#state{prev_rid = RID, el_ibuf = InBuf}}; + RespEls == [] -> + State6 = drop_holding_receiver(State5), + State7 = stop_inactivity_timer(State6), + State8 = restart_wait_timer(State7), + Receivers = gb_trees:insert(RID, {From, #body{}}, + State8#state.receivers), + {next_state, active, + State8#state{prev_rid = RID, receivers = Receivers}}; + true -> + State6 = drop_holding_receiver(State5), + reply_next_state(State6#state{prev_rid = RID}, + #body{els = RespEls}, RID, From) + end + end. + +handle_event({become_controller, C2SPid}, StateName, + State) -> + State1 = route_els(State#state{c2s_pid = C2SPid}), + {next_state, StateName, State1}; +handle_event({change_shaper, Shaper}, StateName, + State) -> + NewShaperState = shaper:new(Shaper), + {next_state, StateName, + State#state{shaper_state = NewShaperState}}; +handle_event(_Event, StateName, State) -> + ?ERROR_MSG("unexpected event in '~s': ~p", + [StateName, _Event]), + {next_state, StateName, State}. + +handle_sync_event({send_xml, + {xmlstreamstart, _, _} = El}, + _From, StateName, State) + when State#state.xmpp_ver >= <<"1.0">> -> + OutBuf = buf_in([El], State#state.el_obuf), + {reply, ok, StateName, State#state{el_obuf = OutBuf}}; +handle_sync_event({send_xml, El}, _From, StateName, + State) -> + OutBuf = buf_in([El], State#state.el_obuf), + State1 = State#state{el_obuf = OutBuf}, + case gb_trees:lookup(State1#state.prev_rid, + State1#state.receivers) + of + {value, {From, Body}} -> + {State2, Els} = get_response_els(State1), + {reply, ok, StateName, + reply(State2, Body#body{els = Els}, + State2#state.prev_rid, From)}; + none -> + State2 = case queue:out(State1#state.shaped_receivers) + of + {{value, {TRef, From, Body}}, Q} -> + cancel_timer(TRef), + (?GEN_FSM):send_event(self(), {Body, From}), + State1#state{shaped_receivers = Q}; + _ -> State1 + end, + {reply, ok, StateName, State2} + end; +handle_sync_event(close, _From, _StateName, State) -> + {stop, normal, State}; +handle_sync_event(deactivate_socket, _From, StateName, + StateData) -> + {reply, ok, StateName, + StateData#state{c2s_pid = undefined}}; +handle_sync_event(_Event, _From, StateName, State) -> + ?ERROR_MSG("unexpected sync event in '~s': ~p", + [StateName, _Event]), + {reply, {error, badarg}, StateName, State}. + +handle_info({timeout, TRef, wait_timeout}, StateName, + #state{wait_timer = TRef} = State) -> + {next_state, StateName, drop_holding_receiver(State)}; +handle_info({timeout, TRef, inactive}, _StateName, + #state{inactivity_timer = TRef} = State) -> + {stop, normal, State}; +handle_info({timeout, TRef, shaper_timeout}, StateName, + State) -> + case queue:out(State#state.shaped_receivers) of + {{value, {TRef, From, Req}}, Q} -> + (?GEN_FSM):send_event(self(), {Req, From}), + {next_state, StateName, + State#state{shaped_receivers = Q}}; + {{value, _}, _} -> + ?ERROR_MSG("shaper_timeout mismatch:~n** TRef: ~p~n** " + "State: ~p", + [TRef, State]), + {stop, normal, State}; + _ -> {next_state, StateName, State} + end; +handle_info({migrate, Node}, StateName, State) -> + if Node /= node() -> + NewState = bounce_receivers(State, migrated), + {migrate, NewState, + {Node, ?MODULE, start, [StateName, NewState]}, 0}; + true -> {next_state, StateName, State} + end; +handle_info(_Info, StateName, State) -> + ?ERROR_MSG("unexpected info:~n** Msg: ~p~n** StateName: ~p", + [_Info, StateName]), + {next_state, StateName, State}. + +terminate({migrated, ClonePid}, _StateName, State) -> + ?INFO_MSG("Migrating session \"~s\" (c2s_pid = " + "~p) to ~p on node ~p", + [State#state.sid, State#state.c2s_pid, ClonePid, + node(ClonePid)]), + mod_bosh:close_session(State#state.sid); +terminate(_Reason, _StateName, State) -> + mod_bosh:close_session(State#state.sid), + case State#state.c2s_pid of + C2SPid when is_pid(C2SPid) -> + (?GEN_FSM):send_event(C2SPid, closed); + _ -> ok + end, + bounce_receivers(State, closed), + bounce_els_from_obuf(State). + +code_change(_OldVsn, StateName, State, _Extra) -> + {ok, StateName, State}. + +print_state(State) -> State. + +route_els(#state{el_ibuf = Buf} = State) -> + route_els(State#state{el_ibuf = buf_new()}, + buf_to_list(Buf)). + +route_els(State, Els) -> + case State#state.c2s_pid of + C2SPid when is_pid(C2SPid) -> + lists:foreach(fun (El) -> + (?GEN_FSM):send_event(C2SPid, El) + end, + Els), + State; + _ -> + InBuf = buf_in(Els, State#state.el_ibuf), + State#state{el_ibuf = InBuf} + end. + +get_response_els(#state{el_obuf = OutBuf, + max_concat = MaxConcat} = + State) -> + {Els, NewOutBuf} = buf_out(OutBuf, MaxConcat), + {State#state{el_obuf = NewOutBuf}, Els}. + +reply(State, Body, RID, From) -> + State1 = restart_inactivity_timer(State), + Receivers = gb_trees:delete_any(RID, + State1#state.receivers), + State2 = do_reply(State1, From, Body, RID), + case catch gb_trees:take_smallest(Receivers) of + {NextRID, {From1, Req}, Receivers1} + when NextRID == RID + 1 -> + (?GEN_FSM):send_event(self(), {Req, From1}), + State2#state{receivers = Receivers1}; + _ -> State2#state{receivers = Receivers} + end. + +reply_next_state(State, Body, RID, From) -> + State1 = restart_inactivity_timer(State), + Receivers = gb_trees:delete_any(RID, + State1#state.receivers), + State2 = do_reply(State1, From, Body, RID), + case catch gb_trees:take_smallest(Receivers) of + {NextRID, {From1, Req}, Receivers1} + when NextRID == RID + 1 -> + active(Req, From1, + State2#state{receivers = Receivers1}); + _ -> + {next_state, active, + State2#state{receivers = Receivers}} + end. + +reply_stop(State, Body, From, RID) -> + {stop, normal, do_reply(State, From, Body, RID)}. + +drop_holding_receiver(State) -> + RID = State#state.prev_rid, + case gb_trees:lookup(RID, State#state.receivers) of + {value, {From, Body}} -> + State1 = restart_inactivity_timer(State), + Receivers = gb_trees:delete_any(RID, + State1#state.receivers), + State2 = State1#state{receivers = Receivers}, + do_reply(State2, From, Body, RID); + none -> State + end. + +do_reply(State, From, Body, RID) -> + ?DEBUG("send reply:~n** RequestID: ~p~n** Reply: " + "~p~n** To: ~p~n** State: ~p", + [RID, Body, From, State]), + (?GEN_FSM):reply(From, Body), + Responses = gb_trees:delete_any(RID, + State#state.responses), + Responses1 = case gb_trees:size(Responses) of + N when N < State#state.max_requests; N == 0 -> + Responses; + _ -> element(3, gb_trees:take_smallest(Responses)) + end, + Responses2 = gb_trees:insert(RID, Body, Responses1), + State#state{responses = Responses2}. + +bounce_receivers(State, Reason) -> + Receivers = gb_trees:to_list(State#state.receivers), + ShapedReceivers = lists:map(fun ({_, From, + #body{attrs = Attrs} = Body}) -> + RID = get_attr(rid, Attrs), + {RID, {From, Body}} + end, + queue:to_list(State#state.shaped_receivers)), + lists:foldl(fun ({RID, {From, Body}}, AccState) -> + NewBody = if Reason == closed -> + #body{http_reason = + <<"Session closed">>, + attrs = + [{type, <<"terminate">>}, + {condition, + <<"other-request">>}]}; + Reason == migrated -> + Body#body{http_reason = + <<"Session migrated">>} + end, + do_reply(AccState, From, NewBody, RID) + end, + State, Receivers ++ ShapedReceivers). + +bounce_els_from_obuf(State) -> + lists:foreach(fun ({xmlstreamelement, El}) -> + case El of + #xmlel{name = Name, attrs = Attrs} + when Name == <<"presence">>; + Name == <<"message">>; + Name == <<"iq">> -> + FromS = fxml:get_attr_s(<<"from">>, Attrs), + ToS = fxml:get_attr_s(<<"to">>, Attrs), + case {jid:from_string(FromS), + jid:from_string(ToS)} + of + {#jid{} = From, #jid{} = To} -> + ejabberd_router:route(From, To, El); + _ -> ok + end; + _ -> ok + end; + (_) -> ok + end, + buf_to_list(State#state.el_obuf)). + +is_valid_key(<<"">>, <<"">>) -> true; +is_valid_key(PrevKey, Key) -> + p1_sha:sha(Key) == PrevKey. + +is_overactivity(undefined) -> false; +is_overactivity(PrevPoll) -> + PollPeriod = timer:now_diff(p1_time_compat:timestamp(), PrevPoll) div + 1000000, + if PollPeriod < (?DEFAULT_POLLING) -> true; + true -> false + end. + +make_xmlstreamstart(XMPPDomain, Version) -> + VersionEl = case Version of + <<"">> -> []; + _ -> [{<<"version">>, Version}] + end, + {xmlstreamstart, <<"stream:stream">>, + [{<<"to">>, XMPPDomain}, {<<"xmlns">>, ?NS_CLIENT}, + {<<"xmlns:xmpp">>, ?NS_BOSH}, + {<<"xmlns:stream">>, ?NS_STREAM} + | VersionEl]}. + +maybe_add_xmlstreamend(Els, <<"terminate">>) -> + Els ++ [{xmlstreamend, <<"stream:stream">>}]; +maybe_add_xmlstreamend(Els, _) -> Els. + +encode_body(#body{attrs = Attrs, els = Els}, Type) -> + Attrs1 = lists:map(fun ({K, V}) when is_atom(K) -> + AmK = iolist_to_binary(atom_to_list(K)), + case V of + true -> {AmK, <<"true">>}; + false -> {AmK, <<"false">>}; + I when is_integer(I), I >= 0 -> + {AmK, iolist_to_binary(integer_to_list(I))}; + _ -> {AmK, V} + end; + ({K, V}) -> {K, V} + end, + Attrs), + Attrs2 = [{<<"xmlns">>, ?NS_HTTP_BIND} | Attrs1], + {Attrs3, XMLs} = lists:foldr(fun ({xmlstreamraw, XML}, + {AttrsAcc, XMLBuf}) -> + {AttrsAcc, [XML | XMLBuf]}; + ({xmlstreamelement, + #xmlel{name = <<"stream:error">>} = El}, + {AttrsAcc, XMLBuf}) -> + {[{<<"type">>, <<"terminate">>}, + {<<"condition">>, + <<"remote-stream-error">>}, + {<<"xmlns:stream">>, ?NS_STREAM} + | AttrsAcc], + [encode_element(El, Type) | XMLBuf]}; + ({xmlstreamelement, + #xmlel{name = <<"stream:features">>} = + El}, + {AttrsAcc, XMLBuf}) -> + {lists:keystore(<<"xmlns:stream">>, 1, + AttrsAcc, + {<<"xmlns:stream">>, + ?NS_STREAM}), + [encode_element(El, Type) | XMLBuf]}; + ({xmlstreamelement, + #xmlel{name = Name, attrs = EAttrs} = El}, + {AttrsAcc, XMLBuf}) + when Name == <<"message">>; + Name == <<"presence">>; + Name == <<"iq">> -> + NewAttrs = lists:keystore( + <<"xmlns">>, 1, EAttrs, + {<<"xmlns">>, ?NS_CLIENT}), + NewEl = El#xmlel{attrs = NewAttrs}, + {AttrsAcc, + [encode_element(NewEl, Type) | XMLBuf]}; + ({xmlstreamelement, El}, + {AttrsAcc, XMLBuf}) -> + {AttrsAcc, + [encode_element(El, Type) | XMLBuf]}; + ({xmlstreamend, _}, {AttrsAcc, XMLBuf}) -> + {[{<<"type">>, <<"terminate">>}, + {<<"condition">>, + <<"remote-stream-error">>} + | AttrsAcc], + XMLBuf}; + ({xmlstreamstart, <<"stream:stream">>, + SAttrs}, + {AttrsAcc, XMLBuf}) -> + StreamID = fxml:get_attr_s(<<"id">>, + SAttrs), + NewAttrs = case + fxml:get_attr_s(<<"version">>, + SAttrs) + of + <<"">> -> + [{<<"authid">>, + StreamID} + | AttrsAcc]; + V -> + lists:keystore(<<"xmlns:xmpp">>, + 1, + [{<<"xmpp:version">>, + V}, + {<<"authid">>, + StreamID} + | AttrsAcc], + {<<"xmlns:xmpp">>, + ?NS_BOSH}) + end, + {NewAttrs, XMLBuf}; + ({xmlstreamerror, _}, + {AttrsAcc, XMLBuf}) -> + {[{<<"type">>, <<"terminate">>}, + {<<"condition">>, + <<"remote-stream-error">>} + | AttrsAcc], + XMLBuf}; + (_, Acc) -> Acc + end, + {Attrs2, []}, Els), + case XMLs of + [] when Type == xml -> + [<<">, attrs_to_list(Attrs3), <<"/>">>]; + _ when Type == xml -> + [<<">, attrs_to_list(Attrs3), $>, XMLs, + <<"">>] + end. + +encode_element(El, xml) -> + fxml:element_to_binary(El); +encode_element(El, json) -> + El. + +decode_body(Data, Size, Type) -> + case decode(Data, Type) of + #xmlel{name = <<"body">>, attrs = Attrs, + children = Els} -> + case attrs_to_body_attrs(Attrs) of + {error, _} = Err -> Err; + BodyAttrs -> + case get_attr(rid, BodyAttrs) of + <<"">> -> {error, <<"Missing \"rid\" attribute">>}; + _ -> + Els1 = lists:flatmap(fun (#xmlel{} = El) -> + [{xmlstreamelement, El}]; + (_) -> [] + end, + Els), + {ok, #body{attrs = BodyAttrs, size = Size, els = Els1}} + end + end; + #xmlel{} -> {error, <<"Unexpected payload">>}; + _ when Type == xml -> + {error, <<"XML is not well-formed">>}; + _ when Type == json -> + {error, <<"JSON is not well-formed">>} + end. + +decode(Data, xml) -> + fxml_stream:parse_element(Data). + +attrs_to_body_attrs(Attrs) -> + lists:foldl(fun (_, {error, Reason}) -> {error, Reason}; + ({Attr, Val}, Acc) -> + try case Attr of + <<"ver">> -> [{ver, Val} | Acc]; + <<"xmpp:version">> -> + [{'xmpp:version', Val} | Acc]; + <<"type">> -> [{type, Val} | Acc]; + <<"key">> -> [{key, Val} | Acc]; + <<"newkey">> -> [{newkey, Val} | Acc]; + <<"xmlns">> -> Val = (?NS_HTTP_BIND), Acc; + <<"secure">> -> [{secure, to_bool(Val)} | Acc]; + <<"xmpp:restart">> -> + [{'xmpp:restart', to_bool(Val)} | Acc]; + <<"to">> -> + [{to, jid:nameprep(Val)} | Acc]; + <<"wait">> -> [{wait, to_int(Val, 0)} | Acc]; + <<"ack">> -> [{ack, to_int(Val, 0)} | Acc]; + <<"sid">> -> [{sid, Val} | Acc]; + <<"hold">> -> [{hold, to_int(Val, 0)} | Acc]; + <<"rid">> -> [{rid, to_int(Val, 0)} | Acc]; + <<"pause">> -> [{pause, to_int(Val, 0)} | Acc]; + _ -> [{Attr, Val} | Acc] + end + catch + _:_ -> + {error, + <<"Invalid \"", Attr/binary, "\" attribute">>} + end + end, + [], Attrs). + +to_int(S, Min) -> + case jlib:binary_to_integer(S) of + I when I >= Min -> I; + _ -> erlang:error(badarg) + end. + +to_bool(<<"true">>) -> true; +to_bool(<<"1">>) -> true; +to_bool(<<"false">>) -> false; +to_bool(<<"0">>) -> false. + +attrs_to_list(Attrs) -> [attr_to_list(A) || A <- Attrs]. + +attr_to_list({Name, Value}) -> + [$\s, Name, $=, $', fxml:crypt(Value), $']. + +bosh_response(Body, Type) -> + CType = case Type of + xml -> ?CT_XML; + json -> ?CT_JSON + end, + {200, Body#body.http_reason, ?HEADER(CType), + encode_body(Body, Type)}. + +bosh_response_with_msg(Body, Type, RcvBody) -> + ?DEBUG("send error reply:~p~n** Receiced body: ~p", + [Body, RcvBody]), + bosh_response(Body, Type). + +http_error(Status, Reason, Type) -> + CType = case Type of + xml -> ?CT_XML; + json -> ?CT_JSON + end, + {Status, Reason, ?HEADER(CType), <<"">>}. + +make_sid() -> p1_sha:sha(randoms:get_string()). + +-compile({no_auto_import, [{min, 2}]}). + +min(undefined, B) -> B; +min(A, B) -> erlang:min(A, B). + +check_bosh_module(XmppDomain) -> + case gen_mod:is_loaded(XmppDomain, mod_bosh) of + true -> ok; + false -> + ?ERROR_MSG("You are trying to use BOSH (HTTP Bind) " + "in host ~p, but the module mod_bosh " + "is not started in that host. Configure " + "your BOSH client to connect to the correct " + "host, or add your desired host to the " + "configuration, or check your 'modules' " + "section in your ejabberd configuration " + "file.", + [XmppDomain]) + end. + +get_attr(Attr, Attrs) -> get_attr(Attr, Attrs, <<"">>). + +get_attr(Attr, Attrs, Default) -> + case lists:keysearch(Attr, 1, Attrs) of + {value, {_, Val}} -> Val; + _ -> Default + end. + +buf_new() -> queue:new(). + +buf_in(Xs, Buf) -> + lists:foldl(fun (X, Acc) -> queue:in(X, Acc) end, Buf, + Xs). + +buf_out(Buf, Num) when is_integer(Num), Num > 0 -> + buf_out(Buf, Num, []); +buf_out(Buf, _) -> {queue:to_list(Buf), buf_new()}. + +buf_out(Buf, 0, Els) -> {lists:reverse(Els), Buf}; +buf_out(Buf, I, Els) -> + case queue:out(Buf) of + {{value, El}, NewBuf} -> + buf_out(NewBuf, I - 1, [El | Els]); + {empty, _} -> buf_out(Buf, 0, Els) + end. + +buf_to_list(Buf) -> queue:to_list(Buf). + +cancel_timer(TRef) when is_reference(TRef) -> + (?GEN_FSM):cancel_timer(TRef); +cancel_timer(_) -> false. + +restart_timer(TRef, Timeout, Msg) -> + cancel_timer(TRef), + erlang:start_timer(timer:seconds(Timeout), self(), Msg). + +restart_inactivity_timer(#state{inactivity_timeout = + Timeout} = + State) -> + restart_inactivity_timer(State, Timeout). + +restart_inactivity_timer(#state{inactivity_timer = + TRef} = + State, + Timeout) -> + NewTRef = restart_timer(TRef, Timeout, inactive), + State#state{inactivity_timer = NewTRef}. + +stop_inactivity_timer(#state{inactivity_timer = TRef} = + State) -> + cancel_timer(TRef), + State#state{inactivity_timer = undefined}. + +restart_wait_timer(#state{wait_timer = TRef, + wait_timeout = Timeout} = + State) -> + NewTRef = restart_timer(TRef, Timeout, wait_timeout), + State#state{wait_timer = NewTRef}. + +stop_wait_timer(#state{wait_timer = TRef} = State) -> + cancel_timer(TRef), State#state{wait_timer = undefined}. + +start_shaper_timer(Timeout) -> + erlang:start_timer(Timeout, self(), shaper_timeout). + +make_random_jid(Host) -> + User = randoms:get_string(), + jid:make(User, Host, randoms:get_string()). + +make_socket(Pid, IP) -> {http_bind, Pid, IP}. diff --git a/src/ejabberd_socket.erl b/src/ejabberd_socket.erl index b5fa52ded..e26fc8652 100644 --- a/src/ejabberd_socket.erl +++ b/src/ejabberd_socket.erl @@ -52,12 +52,14 @@ -include("logger.hrl"). -type sockmod() :: ejabberd_http_bind | + ejabberd_bosh | ejabberd_http_ws | gen_tcp | fast_tls | ezlib. -type receiver() :: pid () | atom(). -type socket() :: pid() | inet:socket() | fast_tls:tls_socket() | - ezlib:zlib_socket() | + ezlib:zlib_socket() | + ejabberd_bosh:bind_socket() | ejabberd_http_bind:bind_socket(). -record(socket_state, {sockmod = gen_tcp :: sockmod(), @@ -228,6 +230,7 @@ get_transport(#socket_state{sockmod = SockMod, tcp -> tcp_zlib; tls -> tls_zlib end; + ejabberd_bosh -> http_bind; ejabberd_http_bind -> http_bind; ejabberd_http_ws -> websocket end. @@ -254,4 +257,3 @@ peername(#socket_state{sockmod = SockMod, gen_tcp -> inet:peername(Socket); _ -> SockMod:peername(Socket) end. - diff --git a/src/mod_bosh.erl b/src/mod_bosh.erl new file mode 100644 index 000000000..13d85b3cb --- /dev/null +++ b/src/mod_bosh.erl @@ -0,0 +1,296 @@ +%%%------------------------------------------------------------------- +%%% File : mod_bosh.erl +%%% Author : Evgeniy Khramtsov +%%% Purpose : This module acts as a bridge to ejabberd_bosh which implements +%%% the real stuff, this is to handle the new pluggable architecture +%%% for extending ejabberd's http service. +%%% Created : 20 Jul 2011 by Evgeniy Khramtsov +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2016 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%------------------------------------------------------------------- +-module(mod_bosh). + +-author('steve@zeank.in-berlin.de'). + +%%-define(ejabberd_debug, true). + +-behaviour(gen_server). +-behaviour(gen_mod). + +-export([start_link/0]). +-export([start/2, stop/1, process/2, open_session/2, + close_session/1, find_session/1]). + +-export([init/1, handle_call/3, handle_cast/2, + handle_info/2, terminate/2, code_change/3, + depends/2, mod_opt_type/1]). + +-include("ejabberd.hrl"). +-include("logger.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). +-include("jlib.hrl"). + +-include("ejabberd_http.hrl"). + +-include("bosh.hrl"). + +-record(bosh, {sid = <<"">> :: binary() | '_', + timestamp = p1_time_compat:timestamp() :: erlang:timestamp() | '_', + pid = self() :: pid() | '$1'}). + +-record(state, {}). + +%%%---------------------------------------------------------------------- +%%% API +%%%---------------------------------------------------------------------- +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +process([], #request{method = 'POST', data = <<>>}) -> + ?DEBUG("Bad Request: no data", []), + {400, ?HEADER(?CT_XML), + #xmlel{name = <<"h1">>, attrs = [], + children = [{xmlcdata, <<"400 Bad Request">>}]}}; +process([], + #request{method = 'POST', data = Data, ip = IP, headers = Hdrs}) -> + ?DEBUG("Incoming data: ~p", [Data]), + Type = get_type(Hdrs), + ejabberd_bosh:process_request(Data, IP, Type); +process([], #request{method = 'GET', data = <<>>}) -> + {200, ?HEADER(?CT_XML), get_human_html_xmlel()}; +process([], #request{method = 'OPTIONS', data = <<>>}) -> + {200, ?OPTIONS_HEADER, []}; +process(_Path, _Request) -> + ?DEBUG("Bad Request: ~p", [_Request]), + {400, ?HEADER(?CT_XML), + #xmlel{name = <<"h1">>, attrs = [], + children = [{xmlcdata, <<"400 Bad Request">>}]}}. + +get_human_html_xmlel() -> + Heading = <<"ejabberd ", (jlib:atom_to_binary(?MODULE))/binary>>, + #xmlel{name = <<"html">>, + attrs = + [{<<"xmlns">>, <<"http://www.w3.org/1999/xhtml">>}], + children = + [#xmlel{name = <<"head">>, attrs = [], + children = + [#xmlel{name = <<"title">>, attrs = [], + children = [{xmlcdata, Heading}]}]}, + #xmlel{name = <<"body">>, attrs = [], + children = + [#xmlel{name = <<"h1">>, attrs = [], + children = [{xmlcdata, Heading}]}, + #xmlel{name = <<"p">>, attrs = [], + children = + [{xmlcdata, <<"An implementation of ">>}, + #xmlel{name = <<"a">>, + attrs = + [{<<"href">>, + <<"http://xmpp.org/extensions/xep-0206.html">>}], + children = + [{xmlcdata, + <<"XMPP over BOSH (XEP-0206)">>}]}]}, + #xmlel{name = <<"p">>, attrs = [], + children = + [{xmlcdata, + <<"This web page is only informative. To " + "use HTTP-Bind you need a Jabber/XMPP " + "client that supports it.">>}]}]}]}. + +open_session(SID, Pid) -> + Session = #bosh{sid = SID, timestamp = p1_time_compat:timestamp(), pid = Pid}, + lists:foreach( + fun(Node) when Node == node() -> + gen_server:call(?MODULE, {write, Session}); + (Node) -> + cluster_send({?MODULE, Node}, {write, Session}) + end, ejabberd_cluster:get_nodes()). + +close_session(SID) -> + case mnesia:dirty_read(bosh, SID) of + [Session] -> + lists:foreach( + fun(Node) when Node == node() -> + gen_server:call(?MODULE, {delete, Session}); + (Node) -> + cluster_send({?MODULE, Node}, {delete, Session}) + end, ejabberd_cluster:get_nodes()); + [] -> + ok + end. + +write_session(#bosh{pid = Pid1, sid = SID, timestamp = T1} = S1) -> + case mnesia:dirty_read(bosh, SID) of + [#bosh{pid = Pid2, timestamp = T2} = S2] -> + if Pid1 == Pid2 -> + mnesia:dirty_write(S1); + T1 < T2 -> + cluster_send(Pid2, replaced), + mnesia:dirty_write(S1); + true -> + cluster_send(Pid1, replaced), + mnesia:dirty_write(S2) + end; + [] -> + mnesia:dirty_write(S1) + end. + +delete_session(#bosh{sid = SID, pid = Pid1}) -> + case mnesia:dirty_read(bosh, SID) of + [#bosh{pid = Pid2}] -> + if Pid1 == Pid2 -> + mnesia:dirty_delete(bosh, SID); + true -> + ok + end; + [] -> + ok + end. + +find_session(SID) -> + case mnesia:dirty_read(bosh, SID) of + [#bosh{pid = Pid}] -> + {ok, Pid}; + [] -> + error + end. + +start(Host, Opts) -> + setup_database(), + start_jiffy(Opts), + TmpSup = gen_mod:get_module_proc(Host, ?PROCNAME), + TmpSupSpec = {TmpSup, + {ejabberd_tmp_sup, start_link, [TmpSup, ejabberd_bosh]}, + permanent, infinity, supervisor, [ejabberd_tmp_sup]}, + ProcSpec = {?MODULE, + {?MODULE, start_link, []}, + transient, 2000, worker, [?MODULE]}, + case supervisor:start_child(ejabberd_sup, ProcSpec) of + {ok, _} -> + supervisor:start_child(ejabberd_sup, TmpSupSpec); + {error, {already_started, _}} -> + supervisor:start_child(ejabberd_sup, TmpSupSpec); + Err -> + Err + end. + +stop(Host) -> + TmpSup = gen_mod:get_module_proc(Host, ?PROCNAME), + supervisor:terminate_child(ejabberd_sup, TmpSup), + supervisor:delete_child(ejabberd_sup, TmpSup). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== +init([]) -> + {ok, #state{}}. + +handle_call({write, Session}, _From, State) -> + Res = write_session(Session), + {reply, Res, State}; +handle_call({delete, Session}, _From, State) -> + Res = delete_session(Session), + {reply, Res, State}; +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({write, Session}, State) -> + write_session(Session), + {noreply, State}; +handle_info({delete, Session}, State) -> + delete_session(Session), + {noreply, State}; +handle_info(_Info, State) -> + ?ERROR_MSG("got unexpected info: ~p", [_Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +setup_database() -> + case catch mnesia:table_info(bosh, attributes) of + [sid, pid] -> + mnesia:delete_table(bosh); + _ -> + ok + end, + mnesia:create_table(bosh, + [{ram_copies, [node()]}, {local_content, true}, + {attributes, record_info(fields, bosh)}]), + mnesia:add_table_copy(bosh, node(), ram_copies). + +start_jiffy(Opts) -> + case gen_mod:get_opt(json, Opts, + fun(false) -> false; + (true) -> true + end, false) of + false -> + ok; + true -> + case catch ejabberd:start_app(jiffy) of + ok -> + ok; + Err -> + ?WARNING_MSG("Failed to start JSON codec (jiffy): ~p. " + "JSON support will be disabled", [Err]) + end + end. + +get_type(Hdrs) -> + try + {_, S} = lists:keyfind('Content-Type', 1, Hdrs), + [T|_] = str:tokens(S, <<";">>), + [_, <<"json">>] = str:tokens(T, <<"/">>), + json + catch _:_ -> + xml + end. + +cluster_send(NodePid, Msg) -> + erlang:send(NodePid, Msg, [noconnect, nosuspend]). + +depends(_Host, _Opts) -> + []. + +mod_opt_type(json) -> + fun (false) -> false; + (true) -> true + end; +mod_opt_type(max_concat) -> + fun (unlimited) -> unlimited; + (N) when is_integer(N), N > 0 -> N + end; +mod_opt_type(max_inactivity) -> + fun (I) when is_integer(I), I > 0 -> I end; +mod_opt_type(max_pause) -> + fun (I) when is_integer(I), I > 0 -> I end; +mod_opt_type(prebind) -> + fun (B) when is_boolean(B) -> B end; +mod_opt_type(_) -> + [json, max_concat, max_inactivity, max_pause, prebind].