25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-11-22 16:20:52 +01:00

Merge 1855 from trunk.

* src/eldap/eldap.erl: implemented queue for pending
queries (thanks to Evgeniy Khramtsov)

SVN Revision: 1973
This commit is contained in:
Badlop 2009-03-06 11:42:56 +00:00
parent 66c60c4e6d
commit d23ebd354b
2 changed files with 109 additions and 102 deletions

View File

@ -1,5 +1,8 @@
2009-03-06 Badlop <badlop@process-one.net> 2009-03-06 Badlop <badlop@process-one.net>
* src/eldap/eldap.erl: implemented queue for pending
queries (thanks to Evgeniy Khramtsov)
* src/eldap/eldap.erl: Close a connection on tcp_error (thanks to * src/eldap/eldap.erl: Close a connection on tcp_error (thanks to
Evgeniy Khramtsov) Evgeniy Khramtsov)

View File

@ -85,6 +85,10 @@
-define(RETRY_TIMEOUT, 500). -define(RETRY_TIMEOUT, 500).
-define(BIND_TIMEOUT, 10000). -define(BIND_TIMEOUT, 10000).
-define(CMD_TIMEOUT, 100000). -define(CMD_TIMEOUT, 100000).
%% Used in gen_fsm sync calls.
-define(CALL_TIMEOUT, ?CMD_TIMEOUT + ?BIND_TIMEOUT + ?RETRY_TIMEOUT).
%% Used as a timeout for gen_tcp:send/2
-define(SEND_TIMEOUT, 30000).
-define(MAX_TRANSACTION_ID, 65535). -define(MAX_TRANSACTION_ID, 65535).
-define(MIN_TRANSACTION_ID, 0). -define(MIN_TRANSACTION_ID, 0).
@ -98,7 +102,7 @@
id = 0, % LDAP Request ID id = 0, % LDAP Request ID
bind_timer, % Ref to bind timeout bind_timer, % Ref to bind timeout
dict, % dict holding operation params and results dict, % dict holding operation params and results
bind_q % Queue for bind() requests req_q % Queue for requests
}). }).
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
@ -141,7 +145,8 @@ close(Handle) ->
%%% -------------------------------------------------------------------- %%% --------------------------------------------------------------------
add(Handle, Entry, Attributes) when list(Entry),list(Attributes) -> add(Handle, Entry, Attributes) when list(Entry),list(Attributes) ->
Handle1 = get_handle(Handle), Handle1 = get_handle(Handle),
gen_fsm:sync_send_event(Handle1, {add, Entry, add_attrs(Attributes)}). gen_fsm:sync_send_event(Handle1, {add, Entry, add_attrs(Attributes)},
?CALL_TIMEOUT).
%%% Do sanity check ! %%% Do sanity check !
add_attrs(Attrs) -> add_attrs(Attrs) ->
@ -166,7 +171,7 @@ add_attrs(Attrs) ->
%%% -------------------------------------------------------------------- %%% --------------------------------------------------------------------
delete(Handle, Entry) when list(Entry) -> delete(Handle, Entry) when list(Entry) ->
Handle1 = get_handle(Handle), Handle1 = get_handle(Handle),
gen_fsm:sync_send_event(Handle1, {delete, Entry}). gen_fsm:sync_send_event(Handle1, {delete, Entry}, ?CALL_TIMEOUT).
%%% -------------------------------------------------------------------- %%% --------------------------------------------------------------------
%%% Modify an entry. Given an entry a number of modification %%% Modify an entry. Given an entry a number of modification
@ -181,7 +186,7 @@ delete(Handle, Entry) when list(Entry) ->
%%% -------------------------------------------------------------------- %%% --------------------------------------------------------------------
modify(Handle, Object, Mods) when list(Object), list(Mods) -> modify(Handle, Object, Mods) when list(Object), list(Mods) ->
Handle1 = get_handle(Handle), Handle1 = get_handle(Handle),
gen_fsm:sync_send_event(Handle1, {modify, Object, Mods}). gen_fsm:sync_send_event(Handle1, {modify, Object, Mods}, ?CALL_TIMEOUT).
%%% %%%
%%% Modification operations. %%% Modification operations.
@ -214,7 +219,10 @@ m(Operation, Type, Values) ->
modify_dn(Handle, Entry, NewRDN, DelOldRDN, NewSup) modify_dn(Handle, Entry, NewRDN, DelOldRDN, NewSup)
when list(Entry),list(NewRDN),atom(DelOldRDN),list(NewSup) -> when list(Entry),list(NewRDN),atom(DelOldRDN),list(NewSup) ->
Handle1 = get_handle(Handle), Handle1 = get_handle(Handle),
gen_fsm:sync_send_event(Handle1, {modify_dn, Entry, NewRDN, bool_p(DelOldRDN), optional(NewSup)}). gen_fsm:sync_send_event(
Handle1,
{modify_dn, Entry, NewRDN, bool_p(DelOldRDN), optional(NewSup)},
?CALL_TIMEOUT).
%%% -------------------------------------------------------------------- %%% --------------------------------------------------------------------
@ -228,7 +236,7 @@ modify_dn(Handle, Entry, NewRDN, DelOldRDN, NewSup)
bind(Handle, RootDN, Passwd) bind(Handle, RootDN, Passwd)
when list(RootDN),list(Passwd) -> when list(RootDN),list(Passwd) ->
Handle1 = get_handle(Handle), Handle1 = get_handle(Handle),
gen_fsm:sync_send_event(Handle1, {bind, RootDN, Passwd}, infinity). gen_fsm:sync_send_event(Handle1, {bind, RootDN, Passwd}, ?CALL_TIMEOUT).
%%% Sanity checks ! %%% Sanity checks !
@ -273,7 +281,7 @@ search(Handle, L) when list(L) ->
call_search(Handle, A) -> call_search(Handle, A) ->
Handle1 = get_handle(Handle), Handle1 = get_handle(Handle),
gen_fsm:sync_send_event(Handle1, {search, A}, infinity). gen_fsm:sync_send_event(Handle1, {search, A}, ?CALL_TIMEOUT).
parse_search_args(Args) -> parse_search_args(Args) ->
parse_search_args(Args, #eldap_search{scope = wholeSubtree}). parse_search_args(Args, #eldap_search{scope = wholeSubtree}).
@ -382,7 +390,7 @@ init({Hosts, Port, Rootdn, Passwd}) ->
passwd = Passwd, passwd = Passwd,
id = 0, id = 0,
dict = dict:new(), dict = dict:new(),
bind_q = queue:new()}, 0}. req_q = queue:new()}, 0}.
%%---------------------------------------------------------------------- %%----------------------------------------------------------------------
%% Func: StateName/2 %% Func: StateName/2
@ -405,38 +413,20 @@ connecting(timeout, S) ->
%% {stop, Reason, NewStateData} | %% {stop, Reason, NewStateData} |
%% {stop, Reason, Reply, NewStateData} %% {stop, Reason, Reply, NewStateData}
%%---------------------------------------------------------------------- %%----------------------------------------------------------------------
connecting(_Event, _From, S) -> connecting(Event, From, S) ->
Reply = {error, connecting}, Q = queue:in({Event, From}, S#eldap.req_q),
{reply, Reply, connecting, S}. {next_state, connecting, S#eldap{req_q=Q}}.
wait_bind_response(_Event, _From, S) -> wait_bind_response(Event, From, S) ->
Reply = {error, wait_bind_response}, Q = queue:in({Event, From}, S#eldap.req_q),
{reply, Reply, wait_bind_response, S}. {next_state, wait_bind_response, S#eldap{req_q=Q}}.
active_bind(Event, From, S) ->
Q = queue:in({Event, From}, S#eldap.req_q),
{next_state, active_bind, S#eldap{req_q=Q}}.
active(Event, From, S) -> active(Event, From, S) ->
case catch send_command(Event, From, S) of process_command(S, Event, From).
{ok, NewS} ->
case Event of
{bind, _, _} ->
{next_state, active_bind, NewS};
_ ->
{next_state, active, NewS}
end;
{error, Reason} ->
{reply, {error, Reason}, active, S};
{'EXIT', Reason} ->
{reply, {error, Reason}, active, S}
end.
active_bind({bind, RootDN, Passwd}, From, #eldap{bind_q=Q} = S) ->
NewQ = queue:in({{bind, RootDN, Passwd}, From}, Q),
{next_state, active_bind, S#eldap{bind_q=NewQ}};
active_bind(Event, From, S) ->
case catch send_command(Event, From, S) of
{ok, NewS} -> {next_state, active_bind, NewS};
{error, Reason} -> {reply, {error, Reason}, active_bind, S};
{'EXIT', Reason} -> {reply, {error, Reason}, active_bind, S}
end.
%%---------------------------------------------------------------------- %%----------------------------------------------------------------------
%% Func: handle_event/3 %% Func: handle_event/3
@ -446,21 +436,8 @@ active_bind(Event, From, S) ->
%% {stop, Reason, NewStateData} %% {stop, Reason, NewStateData}
%%---------------------------------------------------------------------- %%----------------------------------------------------------------------
handle_event(close, _StateName, S) -> handle_event(close, _StateName, S) ->
gen_tcp:close(S#eldap.fd), catch gen_tcp:close(S#eldap.fd),
{stop, closed, S}; {stop, normal, S};
handle_event(process_bind_q, active_bind, #eldap{bind_q=Q} = S) ->
case queue:out(Q) of
{{value, {BindEvent, To}}, NewQ} ->
NewStateData = case catch send_command(BindEvent, To, S) of
{ok, NewS} -> NewS;
{error, Reason} -> gen_fsm:reply(To, {error, Reason}), S;
{'EXIT', Reason} -> gen_fsm:reply(To, {error, Reason}), S
end,
{next_state, active_bind, NewStateData#eldap{bind_q=NewQ}};
{empty, Q} ->
{next_state, active, S}
end;
handle_event(_Event, StateName, S) -> handle_event(_Event, StateName, S) ->
{next_state, StateName, S}. {next_state, StateName, S}.
@ -489,50 +466,61 @@ handle_sync_event(_Event, _From, StateName, S) ->
%% Packets arriving in various states %% Packets arriving in various states
%% %%
handle_info({tcp, _Socket, Data}, connecting, S) -> handle_info({tcp, _Socket, Data}, connecting, S) ->
?DEBUG("eldap. tcp packet received when disconnected!~n~p", [Data]), ?DEBUG("tcp packet received when disconnected!~n~p", [Data]),
{next_state, connecting, S}; {next_state, connecting, S};
handle_info({tcp, _Socket, Data}, wait_bind_response, S) -> handle_info({tcp, _Socket, Data}, wait_bind_response, S) ->
cancel_timer(S#eldap.bind_timer), cancel_timer(S#eldap.bind_timer),
case catch recvd_wait_bind_response(Data, S) of case catch recvd_wait_bind_response(Data, S) of
bound -> {next_state, active, S}; bound ->
{fail_bind, _Reason} -> close_and_retry(S), dequeue_commands(S);
{next_state, connecting, S#eldap{fd = null}}; {fail_bind, _Reason} ->
{'EXIT', _Reason} -> close_and_retry(S), {next_state, connecting, close_and_retry(S)};
{next_state, connecting, S#eldap{fd = null}}; {'EXIT', _Reason} ->
{error, _Reason} -> close_and_retry(S), {next_state, connecting, close_and_retry(S)};
{next_state, connecting, S#eldap{fd = null}} {error, _Reason} ->
{next_state, connecting, close_and_retry(S)}
end; end;
handle_info({tcp, _Socket, Data}, StateName, S) handle_info({tcp, _Socket, Data}, StateName, S)
when StateName==active; StateName==active_bind -> when StateName == active orelse StateName == active_bind ->
case catch recvd_packet(Data, S) of case catch recvd_packet(Data, S) of
{reply, Reply, To, NewS} -> gen_fsm:reply(To, Reply), {response, Response, RequestType} ->
{next_state, StateName, NewS}; NewS = case Response of
{ok, NewS} -> {next_state, StateName, NewS}; {reply, Reply, To, S1} ->
{'EXIT', _Reason} -> {next_state, StateName, S}; gen_fsm:reply(To, Reply),
{error, _Reason} -> {next_state, StateName, S} S1;
{ok, S1} ->
S1
end,
if (StateName == active_bind andalso
RequestType == bindRequest) orelse
(StateName == active) ->
dequeue_commands(NewS);
true ->
{next_state, StateName, NewS}
end;
_ ->
{next_state, StateName, S}
end; end;
handle_info({tcp_closed, _Socket}, Fsm_state, S) -> handle_info({tcp_closed, _Socket}, Fsm_state, S) ->
?WARNING_MSG("LDAP server closed the connection: ~s:~p~nIn State: ~p", ?WARNING_MSG("LDAP server closed the connection: ~s:~p~nIn State: ~p",
[S#eldap.host, S#eldap.port ,Fsm_state]), [S#eldap.host, S#eldap.port ,Fsm_state]),
{ok, NextState, NewS} = close_and_rebind(S, tcp_closed), {next_state, connecting, close_and_retry(S)};
{next_state, NextState, NewS};
handle_info({tcp_error, _Socket, Reason}, Fsm_state, S) -> handle_info({tcp_error, _Socket, Reason}, Fsm_state, S) ->
?DEBUG("eldap received tcp_error: ~p~nIn State: ~p", [Reason, Fsm_state]), ?DEBUG("eldap received tcp_error: ~p~nIn State: ~p", [Reason, Fsm_state]),
{ok, NextState, NewS} = close_and_rebind(S, tcp_error), {next_state, connecting, close_and_retry(S)};
{next_state, NextState, NewS};
%% %%
%% Timers %% Timers
%% %%
handle_info({timeout, Timer, {cmd_timeout, Id}}, active, S) -> handle_info({timeout, Timer, {cmd_timeout, Id}}, StateName, S) ->
case cmd_timeout(Timer, Id, S) of case cmd_timeout(Timer, Id, S) of
{reply, To, Reason, NewS} -> gen_fsm:reply(To, Reason), {reply, To, Reason, NewS} -> gen_fsm:reply(To, Reason),
{next_state, active, NewS}; {next_state, StateName, NewS};
{error, _Reason} -> {next_state, active, S} {error, _Reason} -> {next_state, StateName, S}
end; end;
handle_info({timeout, retry_connect}, connecting, S) -> handle_info({timeout, retry_connect}, connecting, S) ->
@ -540,8 +528,7 @@ handle_info({timeout, retry_connect}, connecting, S) ->
{next_state, NextState, NewS}; {next_state, NextState, NewS};
handle_info({timeout, _Timer, bind_timeout}, wait_bind_response, S) -> handle_info({timeout, _Timer, bind_timeout}, wait_bind_response, S) ->
close_and_retry(S), {next_state, connecting, close_and_retry(S)};
{next_state, connecting, S#eldap{fd = null}};
%% %%
%% Make sure we don't fill the message queue with rubbish %% Make sure we don't fill the message queue with rubbish
@ -570,6 +557,34 @@ code_change(_OldVsn, StateName, S, _Extra) ->
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
%%% Internal functions %%% Internal functions
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
dequeue_commands(S) ->
case queue:out(S#eldap.req_q) of
{{value, {Event, From}}, Q} ->
case process_command(S#eldap{req_q=Q}, Event, From) of
{_, active, NewS} ->
dequeue_commands(NewS);
Res ->
Res
end;
{empty, _} ->
{next_state, active, S}
end.
process_command(S, Event, From) ->
case send_command(Event, From, S) of
{ok, NewS} ->
case Event of
{bind, _, _} ->
{next_state, active_bind, NewS};
_ ->
{next_state, active, NewS}
end;
{error, _Reason} ->
Q = queue:in_r({Event, From}, S#eldap.req_q),
NewS = close_and_retry(S#eldap{req_q=Q}),
{next_state, connecting, NewS}
end.
send_command(Command, From, S) -> send_command(Command, From, S) ->
Id = bump_id(S), Id = bump_id(S),
{Name, Request} = gen_req(Command), {Name, Request} = gen_req(Command),
@ -640,6 +655,7 @@ recvd_packet(Pkt, S) ->
Dict = S#eldap.dict, Dict = S#eldap.dict,
Id = Msg#'LDAPMessage'.messageID, Id = Msg#'LDAPMessage'.messageID,
{Timer, From, Name, Result_so_far} = get_op_rec(Id, Dict), {Timer, From, Name, Result_so_far} = get_op_rec(Id, Dict),
Answer =
case {Name, Op} of case {Name, Op} of
{searchRequest, {searchResEntry, R}} when {searchRequest, {searchResEntry, R}} when
record(R,'SearchResultEntry') -> record(R,'SearchResultEntry') ->
@ -687,14 +703,14 @@ recvd_packet(Pkt, S) ->
New_dict = dict:erase(Id, Dict), New_dict = dict:erase(Id, Dict),
cancel_timer(Timer), cancel_timer(Timer),
Reply = check_bind_reply(Result, From), Reply = check_bind_reply(Result, From),
gen_fsm:send_all_state_event(self(), process_bind_q),
{reply, Reply, From, S#eldap{dict = New_dict}}; {reply, Reply, From, S#eldap{dict = New_dict}};
{OtherName, OtherResult} -> {OtherName, OtherResult} ->
New_dict = dict:erase(Id, Dict), New_dict = dict:erase(Id, Dict),
cancel_timer(Timer), cancel_timer(Timer),
{reply, {error, {invalid_result, OtherName, OtherResult}}, {reply, {error, {invalid_result, OtherName, OtherResult}},
From, S#eldap{dict = New_dict}} From, S#eldap{dict = New_dict}}
end; end,
{response, Answer, Name};
Error -> Error Error -> Error
end. end.
@ -775,13 +791,9 @@ check_tag(Data) ->
end. end.
close_and_retry(S) -> close_and_retry(S) ->
gen_tcp:close(S#eldap.fd), catch gen_tcp:close(S#eldap.fd),
retry_connect(). erlang:send_after(?RETRY_TIMEOUT, self(), {timeout, retry_connect}),
S#eldap{fd = null}.
retry_connect() ->
erlang:send_after(?RETRY_TIMEOUT, self(),
{timeout, retry_connect}).
%%----------------------------------------------------------------------- %%-----------------------------------------------------------------------
%% Sort out timed out commands %% Sort out timed out commands
@ -832,7 +844,8 @@ polish([], Res, Ref) ->
%%----------------------------------------------------------------------- %%-----------------------------------------------------------------------
connect_bind(S) -> connect_bind(S) ->
Host = next_host(S#eldap.host, S#eldap.hosts), Host = next_host(S#eldap.host, S#eldap.hosts),
TcpOpts = [{packet, asn1}, {active, true}, {keepalive, true}, binary], TcpOpts = [{packet, asn1}, {active, true}, {keepalive, true},
{send_timeout, ?SEND_TIMEOUT}, binary],
?INFO_MSG("LDAP connection on ~s:~p", [Host, S#eldap.port]), ?INFO_MSG("LDAP connection on ~s:~p", [Host, S#eldap.port]),
case gen_tcp:connect(Host, S#eldap.port, TcpOpts) of case gen_tcp:connect(Host, S#eldap.port, TcpOpts) of
{ok, Socket} -> {ok, Socket} ->
@ -844,15 +857,16 @@ connect_bind(S) ->
host = Host, host = Host,
bind_timer = Timer}}; bind_timer = Timer}};
{error, Reason} -> {error, Reason} ->
?ERROR_MSG("LDAP bind failed on ~s:~p~nReason: ~p", [Host, S#eldap.port, Reason]), ?ERROR_MSG("LDAP bind failed on ~s:~p~nReason: ~p",
gen_tcp:close(Socket), [Host, S#eldap.port, Reason]),
retry_connect(), NewS = close_and_retry(S),
{ok, connecting, S#eldap{host = Host}} {ok, connecting, NewS#eldap{host = Host}}
end; end;
{error, Reason} -> {error, Reason} ->
?ERROR_MSG("LDAP connection failed on ~s:~p~nReason: ~p", [Host, S#eldap.port, Reason]), ?ERROR_MSG("LDAP connection failed on ~s:~p~nReason: ~p",
retry_connect(), [Host, S#eldap.port, Reason]),
{ok, connecting, S#eldap{host = Host}} NewS = close_and_retry(S),
{ok, connecting, NewS#eldap{host = Host}}
end. end.
bind_request(Socket, S) -> bind_request(Socket, S) ->
@ -997,13 +1011,3 @@ bump_id(#eldap{id = Id}) when Id > ?MAX_TRANSACTION_ID ->
?MIN_TRANSACTION_ID; ?MIN_TRANSACTION_ID;
bump_id(#eldap{id = Id}) -> bump_id(#eldap{id = Id}) ->
Id + 1. Id + 1.
close_and_rebind(State, Err) ->
F = fun(_Id, [{Timer, From, _Name}|_]) ->
gen_fsm:reply(From, {error, Err}),
cancel_timer(Timer)
end,
dict:map(F, State#eldap.dict),
connect_bind(State#eldap{fd = null,
dict = dict:new(),
bind_q=queue:new()}).