diff --git a/ChangeLog b/ChangeLog index d8dd2f29c..6f640b7d1 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,7 @@ +2009-01-27 Evgeniy Khramtsov + + * src/eldap/eldap.erl: implemented queue for pending queries. + 2009-01-27 Badlop * doc/guide.tex: mod_muc can run in several nodes of cluster diff --git a/src/eldap/eldap.erl b/src/eldap/eldap.erl index 799012fc5..24e234cf7 100644 --- a/src/eldap/eldap.erl +++ b/src/eldap/eldap.erl @@ -85,6 +85,10 @@ -define(RETRY_TIMEOUT, 500). -define(BIND_TIMEOUT, 10000). -define(CMD_TIMEOUT, 100000). +%% Used in gen_fsm sync calls. +-define(CALL_TIMEOUT, ?CMD_TIMEOUT + ?BIND_TIMEOUT + ?RETRY_TIMEOUT). +%% Used as a timeout for gen_tcp:send/2 +-define(SEND_TIMEOUT, 30000). -define(MAX_TRANSACTION_ID, 65535). -define(MIN_TRANSACTION_ID, 0). @@ -98,7 +102,7 @@ id = 0, % LDAP Request ID bind_timer, % Ref to bind timeout dict, % dict holding operation params and results - bind_q % Queue for bind() requests + req_q % Queue for requests }). %%%---------------------------------------------------------------------- @@ -141,7 +145,8 @@ close(Handle) -> %%% -------------------------------------------------------------------- add(Handle, Entry, Attributes) when list(Entry),list(Attributes) -> Handle1 = get_handle(Handle), - gen_fsm:sync_send_event(Handle1, {add, Entry, add_attrs(Attributes)}). + gen_fsm:sync_send_event(Handle1, {add, Entry, add_attrs(Attributes)}, + ?CALL_TIMEOUT). %%% Do sanity check ! add_attrs(Attrs) -> @@ -166,7 +171,7 @@ add_attrs(Attrs) -> %%% -------------------------------------------------------------------- delete(Handle, Entry) when list(Entry) -> Handle1 = get_handle(Handle), - gen_fsm:sync_send_event(Handle1, {delete, Entry}). + gen_fsm:sync_send_event(Handle1, {delete, Entry}, ?CALL_TIMEOUT). %%% -------------------------------------------------------------------- %%% Modify an entry. Given an entry a number of modification @@ -181,7 +186,7 @@ delete(Handle, Entry) when list(Entry) -> %%% -------------------------------------------------------------------- modify(Handle, Object, Mods) when list(Object), list(Mods) -> Handle1 = get_handle(Handle), - gen_fsm:sync_send_event(Handle1, {modify, Object, Mods}). + gen_fsm:sync_send_event(Handle1, {modify, Object, Mods}, ?CALL_TIMEOUT). %%% %%% Modification operations. @@ -214,7 +219,10 @@ m(Operation, Type, Values) -> modify_dn(Handle, Entry, NewRDN, DelOldRDN, NewSup) when list(Entry),list(NewRDN),atom(DelOldRDN),list(NewSup) -> Handle1 = get_handle(Handle), - gen_fsm:sync_send_event(Handle1, {modify_dn, Entry, NewRDN, bool_p(DelOldRDN), optional(NewSup)}). + gen_fsm:sync_send_event( + Handle1, + {modify_dn, Entry, NewRDN, bool_p(DelOldRDN), optional(NewSup)}, + ?CALL_TIMEOUT). %%% -------------------------------------------------------------------- @@ -228,7 +236,7 @@ modify_dn(Handle, Entry, NewRDN, DelOldRDN, NewSup) bind(Handle, RootDN, Passwd) when list(RootDN),list(Passwd) -> Handle1 = get_handle(Handle), - gen_fsm:sync_send_event(Handle1, {bind, RootDN, Passwd}, infinity). + gen_fsm:sync_send_event(Handle1, {bind, RootDN, Passwd}, ?CALL_TIMEOUT). %%% Sanity checks ! @@ -273,7 +281,7 @@ search(Handle, L) when list(L) -> call_search(Handle, A) -> Handle1 = get_handle(Handle), - gen_fsm:sync_send_event(Handle1, {search, A}, infinity). + gen_fsm:sync_send_event(Handle1, {search, A}, ?CALL_TIMEOUT). parse_search_args(Args) -> parse_search_args(Args, #eldap_search{scope = wholeSubtree}). @@ -382,7 +390,7 @@ init({Hosts, Port, Rootdn, Passwd}) -> passwd = Passwd, id = 0, dict = dict:new(), - bind_q = queue:new()}, 0}. + req_q = queue:new()}, 0}. %%---------------------------------------------------------------------- %% Func: StateName/2 @@ -405,38 +413,20 @@ connecting(timeout, S) -> %% {stop, Reason, NewStateData} | %% {stop, Reason, Reply, NewStateData} %%---------------------------------------------------------------------- -connecting(_Event, _From, S) -> - Reply = {error, connecting}, - {reply, Reply, connecting, S}. +connecting(Event, From, S) -> + Q = queue:in({Event, From}, S#eldap.req_q), + {next_state, connecting, S#eldap{req_q=Q}}. -wait_bind_response(_Event, _From, S) -> - Reply = {error, wait_bind_response}, - {reply, Reply, wait_bind_response, S}. +wait_bind_response(Event, From, S) -> + Q = queue:in({Event, From}, S#eldap.req_q), + {next_state, wait_bind_response, S#eldap{req_q=Q}}. + +active_bind(Event, From, S) -> + Q = queue:in({Event, From}, S#eldap.req_q), + {next_state, active_bind, S#eldap{req_q=Q}}. active(Event, From, S) -> - case catch send_command(Event, From, S) of - {ok, NewS} -> - case Event of - {bind, _, _} -> - {next_state, active_bind, NewS}; - _ -> - {next_state, active, NewS} - end; - {error, Reason} -> - {reply, {error, Reason}, active, S}; - {'EXIT', Reason} -> - {reply, {error, Reason}, active, S} - end. - -active_bind({bind, RootDN, Passwd}, From, #eldap{bind_q=Q} = S) -> - NewQ = queue:in({{bind, RootDN, Passwd}, From}, Q), - {next_state, active_bind, S#eldap{bind_q=NewQ}}; -active_bind(Event, From, S) -> - case catch send_command(Event, From, S) of - {ok, NewS} -> {next_state, active_bind, NewS}; - {error, Reason} -> {reply, {error, Reason}, active_bind, S}; - {'EXIT', Reason} -> {reply, {error, Reason}, active_bind, S} - end. + process_command(S, Event, From). %%---------------------------------------------------------------------- %% Func: handle_event/3 @@ -446,21 +436,8 @@ active_bind(Event, From, S) -> %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- handle_event(close, _StateName, S) -> - gen_tcp:close(S#eldap.fd), - {stop, closed, S}; - -handle_event(process_bind_q, active_bind, #eldap{bind_q=Q} = S) -> - case queue:out(Q) of - {{value, {BindEvent, To}}, NewQ} -> - NewStateData = case catch send_command(BindEvent, To, S) of - {ok, NewS} -> NewS; - {error, Reason} -> gen_fsm:reply(To, {error, Reason}), S; - {'EXIT', Reason} -> gen_fsm:reply(To, {error, Reason}), S - end, - {next_state, active_bind, NewStateData#eldap{bind_q=NewQ}}; - {empty, Q} -> - {next_state, active, S} - end; + catch gen_tcp:close(S#eldap.fd), + {stop, normal, S}; handle_event(_Event, StateName, S) -> {next_state, StateName, S}. @@ -489,50 +466,61 @@ handle_sync_event(_Event, _From, StateName, S) -> %% Packets arriving in various states %% handle_info({tcp, _Socket, Data}, connecting, S) -> - ?DEBUG("eldap. tcp packet received when disconnected!~n~p", [Data]), + ?DEBUG("tcp packet received when disconnected!~n~p", [Data]), {next_state, connecting, S}; handle_info({tcp, _Socket, Data}, wait_bind_response, S) -> cancel_timer(S#eldap.bind_timer), case catch recvd_wait_bind_response(Data, S) of - bound -> {next_state, active, S}; - {fail_bind, _Reason} -> close_and_retry(S), - {next_state, connecting, S#eldap{fd = null}}; - {'EXIT', _Reason} -> close_and_retry(S), - {next_state, connecting, S#eldap{fd = null}}; - {error, _Reason} -> close_and_retry(S), - {next_state, connecting, S#eldap{fd = null}} + bound -> + dequeue_commands(S); + {fail_bind, _Reason} -> + {next_state, connecting, close_and_retry(S)}; + {'EXIT', _Reason} -> + {next_state, connecting, close_and_retry(S)}; + {error, _Reason} -> + {next_state, connecting, close_and_retry(S)} end; handle_info({tcp, _Socket, Data}, StateName, S) - when StateName==active; StateName==active_bind -> + when StateName == active orelse StateName == active_bind -> case catch recvd_packet(Data, S) of - {reply, Reply, To, NewS} -> gen_fsm:reply(To, Reply), - {next_state, StateName, NewS}; - {ok, NewS} -> {next_state, StateName, NewS}; - {'EXIT', _Reason} -> {next_state, StateName, S}; - {error, _Reason} -> {next_state, StateName, S} + {response, Response, RequestType} -> + NewS = case Response of + {reply, Reply, To, S1} -> + gen_fsm:reply(To, Reply), + S1; + {ok, S1} -> + S1 + end, + if (StateName == active_bind andalso + RequestType == bindRequest) orelse + (StateName == active) -> + dequeue_commands(NewS); + true -> + {next_state, StateName, NewS} + end; + _ -> + {next_state, StateName, S} end; handle_info({tcp_closed, _Socket}, Fsm_state, S) -> ?WARNING_MSG("LDAP server closed the connection: ~s:~p~nIn State: ~p", [S#eldap.host, S#eldap.port ,Fsm_state]), - {ok, NextState, NewS} = close_and_rebind(S, tcp_closed), - {next_state, NextState, NewS}; + {next_state, connecting, close_and_retry(S)}; handle_info({tcp_error, _Socket, Reason}, Fsm_state, S) -> ?DEBUG("eldap received tcp_error: ~p~nIn State: ~p", [Reason, Fsm_state]), - {ok, NextState, NewS} = close_and_rebind(S, tcp_error), - {next_state, NextState, NewS}; + {next_state, connecting, close_and_retry(S)}; %% %% Timers %% -handle_info({timeout, Timer, {cmd_timeout, Id}}, active, S) -> +handle_info({timeout, Timer, {cmd_timeout, Id}}, StateName, S) -> case cmd_timeout(Timer, Id, S) of {reply, To, Reason, NewS} -> gen_fsm:reply(To, Reason), - {next_state, active, NewS}; - {error, _Reason} -> {next_state, active, S} + {next_state, StateName, NewS}; + {error, _Reason} -> {next_state, StateName, S} end; handle_info({timeout, retry_connect}, connecting, S) -> @@ -540,8 +528,7 @@ handle_info({timeout, retry_connect}, connecting, S) -> {next_state, NextState, NewS}; handle_info({timeout, _Timer, bind_timeout}, wait_bind_response, S) -> - close_and_retry(S), - {next_state, connecting, S#eldap{fd = null}}; + {next_state, connecting, close_and_retry(S)}; %% %% Make sure we don't fill the message queue with rubbish @@ -570,6 +557,34 @@ code_change(_OldVsn, StateName, S, _Extra) -> %%%---------------------------------------------------------------------- %%% Internal functions %%%---------------------------------------------------------------------- +dequeue_commands(S) -> + case queue:out(S#eldap.req_q) of + {{value, {Event, From}}, Q} -> + case process_command(S#eldap{req_q=Q}, Event, From) of + {_, active, NewS} -> + dequeue_commands(NewS); + Res -> + Res + end; + {empty, _} -> + {next_state, active, S} + end. + +process_command(S, Event, From) -> + case send_command(Event, From, S) of + {ok, NewS} -> + case Event of + {bind, _, _} -> + {next_state, active_bind, NewS}; + _ -> + {next_state, active, NewS} + end; + {error, _Reason} -> + Q = queue:in_r({Event, From}, S#eldap.req_q), + NewS = close_and_retry(S#eldap{req_q=Q}), + {next_state, connecting, NewS} + end. + send_command(Command, From, S) -> Id = bump_id(S), {Name, Request} = gen_req(Command), @@ -640,6 +655,7 @@ recvd_packet(Pkt, S) -> Dict = S#eldap.dict, Id = Msg#'LDAPMessage'.messageID, {Timer, From, Name, Result_so_far} = get_op_rec(Id, Dict), + Answer = case {Name, Op} of {searchRequest, {searchResEntry, R}} when record(R,'SearchResultEntry') -> @@ -687,14 +703,14 @@ recvd_packet(Pkt, S) -> New_dict = dict:erase(Id, Dict), cancel_timer(Timer), Reply = check_bind_reply(Result, From), - gen_fsm:send_all_state_event(self(), process_bind_q), {reply, Reply, From, S#eldap{dict = New_dict}}; {OtherName, OtherResult} -> New_dict = dict:erase(Id, Dict), cancel_timer(Timer), {reply, {error, {invalid_result, OtherName, OtherResult}}, From, S#eldap{dict = New_dict}} - end; + end, + {response, Answer, Name}; Error -> Error end. @@ -775,13 +791,9 @@ check_tag(Data) -> end. close_and_retry(S) -> - gen_tcp:close(S#eldap.fd), - retry_connect(). - -retry_connect() -> - erlang:send_after(?RETRY_TIMEOUT, self(), - {timeout, retry_connect}). - + catch gen_tcp:close(S#eldap.fd), + erlang:send_after(?RETRY_TIMEOUT, self(), {timeout, retry_connect}), + S#eldap{fd = null}. %%----------------------------------------------------------------------- %% Sort out timed out commands @@ -832,7 +844,8 @@ polish([], Res, Ref) -> %%----------------------------------------------------------------------- connect_bind(S) -> Host = next_host(S#eldap.host, S#eldap.hosts), - TcpOpts = [{packet, asn1}, {active, true}, {keepalive, true}, binary], + TcpOpts = [{packet, asn1}, {active, true}, {keepalive, true}, + {send_timeout, ?SEND_TIMEOUT}, binary], ?INFO_MSG("LDAP connection on ~s:~p", [Host, S#eldap.port]), case gen_tcp:connect(Host, S#eldap.port, TcpOpts) of {ok, Socket} -> @@ -844,15 +857,16 @@ connect_bind(S) -> host = Host, bind_timer = Timer}}; {error, Reason} -> - ?ERROR_MSG("LDAP bind failed on ~s:~p~nReason: ~p", [Host, S#eldap.port, Reason]), - gen_tcp:close(Socket), - retry_connect(), - {ok, connecting, S#eldap{host = Host}} + ?ERROR_MSG("LDAP bind failed on ~s:~p~nReason: ~p", + [Host, S#eldap.port, Reason]), + NewS = close_and_retry(S), + {ok, connecting, NewS#eldap{host = Host}} end; {error, Reason} -> - ?ERROR_MSG("LDAP connection failed on ~s:~p~nReason: ~p", [Host, S#eldap.port, Reason]), - retry_connect(), - {ok, connecting, S#eldap{host = Host}} + ?ERROR_MSG("LDAP connection failed on ~s:~p~nReason: ~p", + [Host, S#eldap.port, Reason]), + NewS = close_and_retry(S), + {ok, connecting, NewS#eldap{host = Host}} end. bind_request(Socket, S) -> @@ -997,13 +1011,3 @@ bump_id(#eldap{id = Id}) when Id > ?MAX_TRANSACTION_ID -> ?MIN_TRANSACTION_ID; bump_id(#eldap{id = Id}) -> Id + 1. - -close_and_rebind(State, Err) -> - F = fun(_Id, [{Timer, From, _Name}|_]) -> - gen_fsm:reply(From, {error, Err}), - cancel_timer(Timer) - end, - dict:map(F, State#eldap.dict), - connect_bind(State#eldap{fd = null, - dict = dict:new(), - bind_q=queue:new()}).