* src/eldap/eldap.erl: implemented queue for pending queries.

SVN Revision: 1855
This commit is contained in:
Evgeniy Khramtsov 2009-01-27 13:24:18 +00:00
parent 8fa76b9015
commit 8530e628b4
2 changed files with 110 additions and 102 deletions

View File

@ -1,3 +1,7 @@
2009-01-27 Evgeniy Khramtsov <ekhramtsov@process-one.net>
* src/eldap/eldap.erl: implemented queue for pending queries.
2009-01-27 Badlop <badlop@process-one.net>
* doc/guide.tex: mod_muc can run in several nodes of cluster

View File

@ -85,6 +85,10 @@
-define(RETRY_TIMEOUT, 500).
-define(BIND_TIMEOUT, 10000).
-define(CMD_TIMEOUT, 100000).
%% Used in gen_fsm sync calls.
-define(CALL_TIMEOUT, ?CMD_TIMEOUT + ?BIND_TIMEOUT + ?RETRY_TIMEOUT).
%% Used as a timeout for gen_tcp:send/2
-define(SEND_TIMEOUT, 30000).
-define(MAX_TRANSACTION_ID, 65535).
-define(MIN_TRANSACTION_ID, 0).
@ -98,7 +102,7 @@
id = 0, % LDAP Request ID
bind_timer, % Ref to bind timeout
dict, % dict holding operation params and results
bind_q % Queue for bind() requests
req_q % Queue for requests
}).
%%%----------------------------------------------------------------------
@ -141,7 +145,8 @@ close(Handle) ->
%%% --------------------------------------------------------------------
add(Handle, Entry, Attributes) when list(Entry),list(Attributes) ->
Handle1 = get_handle(Handle),
gen_fsm:sync_send_event(Handle1, {add, Entry, add_attrs(Attributes)}).
gen_fsm:sync_send_event(Handle1, {add, Entry, add_attrs(Attributes)},
?CALL_TIMEOUT).
%%% Do sanity check !
add_attrs(Attrs) ->
@ -166,7 +171,7 @@ add_attrs(Attrs) ->
%%% --------------------------------------------------------------------
delete(Handle, Entry) when list(Entry) ->
Handle1 = get_handle(Handle),
gen_fsm:sync_send_event(Handle1, {delete, Entry}).
gen_fsm:sync_send_event(Handle1, {delete, Entry}, ?CALL_TIMEOUT).
%%% --------------------------------------------------------------------
%%% Modify an entry. Given an entry a number of modification
@ -181,7 +186,7 @@ delete(Handle, Entry) when list(Entry) ->
%%% --------------------------------------------------------------------
modify(Handle, Object, Mods) when list(Object), list(Mods) ->
Handle1 = get_handle(Handle),
gen_fsm:sync_send_event(Handle1, {modify, Object, Mods}).
gen_fsm:sync_send_event(Handle1, {modify, Object, Mods}, ?CALL_TIMEOUT).
%%%
%%% Modification operations.
@ -214,7 +219,10 @@ m(Operation, Type, Values) ->
modify_dn(Handle, Entry, NewRDN, DelOldRDN, NewSup)
when list(Entry),list(NewRDN),atom(DelOldRDN),list(NewSup) ->
Handle1 = get_handle(Handle),
gen_fsm:sync_send_event(Handle1, {modify_dn, Entry, NewRDN, bool_p(DelOldRDN), optional(NewSup)}).
gen_fsm:sync_send_event(
Handle1,
{modify_dn, Entry, NewRDN, bool_p(DelOldRDN), optional(NewSup)},
?CALL_TIMEOUT).
%%% --------------------------------------------------------------------
@ -228,7 +236,7 @@ modify_dn(Handle, Entry, NewRDN, DelOldRDN, NewSup)
bind(Handle, RootDN, Passwd)
when list(RootDN),list(Passwd) ->
Handle1 = get_handle(Handle),
gen_fsm:sync_send_event(Handle1, {bind, RootDN, Passwd}, infinity).
gen_fsm:sync_send_event(Handle1, {bind, RootDN, Passwd}, ?CALL_TIMEOUT).
%%% Sanity checks !
@ -273,7 +281,7 @@ search(Handle, L) when list(L) ->
call_search(Handle, A) ->
Handle1 = get_handle(Handle),
gen_fsm:sync_send_event(Handle1, {search, A}, infinity).
gen_fsm:sync_send_event(Handle1, {search, A}, ?CALL_TIMEOUT).
parse_search_args(Args) ->
parse_search_args(Args, #eldap_search{scope = wholeSubtree}).
@ -382,7 +390,7 @@ init({Hosts, Port, Rootdn, Passwd}) ->
passwd = Passwd,
id = 0,
dict = dict:new(),
bind_q = queue:new()}, 0}.
req_q = queue:new()}, 0}.
%%----------------------------------------------------------------------
%% Func: StateName/2
@ -405,38 +413,20 @@ connecting(timeout, S) ->
%% {stop, Reason, NewStateData} |
%% {stop, Reason, Reply, NewStateData}
%%----------------------------------------------------------------------
connecting(_Event, _From, S) ->
Reply = {error, connecting},
{reply, Reply, connecting, S}.
connecting(Event, From, S) ->
Q = queue:in({Event, From}, S#eldap.req_q),
{next_state, connecting, S#eldap{req_q=Q}}.
wait_bind_response(_Event, _From, S) ->
Reply = {error, wait_bind_response},
{reply, Reply, wait_bind_response, S}.
wait_bind_response(Event, From, S) ->
Q = queue:in({Event, From}, S#eldap.req_q),
{next_state, wait_bind_response, S#eldap{req_q=Q}}.
active_bind(Event, From, S) ->
Q = queue:in({Event, From}, S#eldap.req_q),
{next_state, active_bind, S#eldap{req_q=Q}}.
active(Event, From, S) ->
case catch send_command(Event, From, S) of
{ok, NewS} ->
case Event of
{bind, _, _} ->
{next_state, active_bind, NewS};
_ ->
{next_state, active, NewS}
end;
{error, Reason} ->
{reply, {error, Reason}, active, S};
{'EXIT', Reason} ->
{reply, {error, Reason}, active, S}
end.
active_bind({bind, RootDN, Passwd}, From, #eldap{bind_q=Q} = S) ->
NewQ = queue:in({{bind, RootDN, Passwd}, From}, Q),
{next_state, active_bind, S#eldap{bind_q=NewQ}};
active_bind(Event, From, S) ->
case catch send_command(Event, From, S) of
{ok, NewS} -> {next_state, active_bind, NewS};
{error, Reason} -> {reply, {error, Reason}, active_bind, S};
{'EXIT', Reason} -> {reply, {error, Reason}, active_bind, S}
end.
process_command(S, Event, From).
%%----------------------------------------------------------------------
%% Func: handle_event/3
@ -446,21 +436,8 @@ active_bind(Event, From, S) ->
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
handle_event(close, _StateName, S) ->
gen_tcp:close(S#eldap.fd),
{stop, closed, S};
handle_event(process_bind_q, active_bind, #eldap{bind_q=Q} = S) ->
case queue:out(Q) of
{{value, {BindEvent, To}}, NewQ} ->
NewStateData = case catch send_command(BindEvent, To, S) of
{ok, NewS} -> NewS;
{error, Reason} -> gen_fsm:reply(To, {error, Reason}), S;
{'EXIT', Reason} -> gen_fsm:reply(To, {error, Reason}), S
end,
{next_state, active_bind, NewStateData#eldap{bind_q=NewQ}};
{empty, Q} ->
{next_state, active, S}
end;
catch gen_tcp:close(S#eldap.fd),
{stop, normal, S};
handle_event(_Event, StateName, S) ->
{next_state, StateName, S}.
@ -489,50 +466,61 @@ handle_sync_event(_Event, _From, StateName, S) ->
%% Packets arriving in various states
%%
handle_info({tcp, _Socket, Data}, connecting, S) ->
?DEBUG("eldap. tcp packet received when disconnected!~n~p", [Data]),
?DEBUG("tcp packet received when disconnected!~n~p", [Data]),
{next_state, connecting, S};
handle_info({tcp, _Socket, Data}, wait_bind_response, S) ->
cancel_timer(S#eldap.bind_timer),
case catch recvd_wait_bind_response(Data, S) of
bound -> {next_state, active, S};
{fail_bind, _Reason} -> close_and_retry(S),
{next_state, connecting, S#eldap{fd = null}};
{'EXIT', _Reason} -> close_and_retry(S),
{next_state, connecting, S#eldap{fd = null}};
{error, _Reason} -> close_and_retry(S),
{next_state, connecting, S#eldap{fd = null}}
bound ->
dequeue_commands(S);
{fail_bind, _Reason} ->
{next_state, connecting, close_and_retry(S)};
{'EXIT', _Reason} ->
{next_state, connecting, close_and_retry(S)};
{error, _Reason} ->
{next_state, connecting, close_and_retry(S)}
end;
handle_info({tcp, _Socket, Data}, StateName, S)
when StateName==active; StateName==active_bind ->
when StateName == active orelse StateName == active_bind ->
case catch recvd_packet(Data, S) of
{reply, Reply, To, NewS} -> gen_fsm:reply(To, Reply),
{next_state, StateName, NewS};
{ok, NewS} -> {next_state, StateName, NewS};
{'EXIT', _Reason} -> {next_state, StateName, S};
{error, _Reason} -> {next_state, StateName, S}
{response, Response, RequestType} ->
NewS = case Response of
{reply, Reply, To, S1} ->
gen_fsm:reply(To, Reply),
S1;
{ok, S1} ->
S1
end,
if (StateName == active_bind andalso
RequestType == bindRequest) orelse
(StateName == active) ->
dequeue_commands(NewS);
true ->
{next_state, StateName, NewS}
end;
_ ->
{next_state, StateName, S}
end;
handle_info({tcp_closed, _Socket}, Fsm_state, S) ->
?WARNING_MSG("LDAP server closed the connection: ~s:~p~nIn State: ~p",
[S#eldap.host, S#eldap.port ,Fsm_state]),
{ok, NextState, NewS} = close_and_rebind(S, tcp_closed),
{next_state, NextState, NewS};
{next_state, connecting, close_and_retry(S)};
handle_info({tcp_error, _Socket, Reason}, Fsm_state, S) ->
?DEBUG("eldap received tcp_error: ~p~nIn State: ~p", [Reason, Fsm_state]),
{ok, NextState, NewS} = close_and_rebind(S, tcp_error),
{next_state, NextState, NewS};
{next_state, connecting, close_and_retry(S)};
%%
%% Timers
%%
handle_info({timeout, Timer, {cmd_timeout, Id}}, active, S) ->
handle_info({timeout, Timer, {cmd_timeout, Id}}, StateName, S) ->
case cmd_timeout(Timer, Id, S) of
{reply, To, Reason, NewS} -> gen_fsm:reply(To, Reason),
{next_state, active, NewS};
{error, _Reason} -> {next_state, active, S}
{next_state, StateName, NewS};
{error, _Reason} -> {next_state, StateName, S}
end;
handle_info({timeout, retry_connect}, connecting, S) ->
@ -540,8 +528,7 @@ handle_info({timeout, retry_connect}, connecting, S) ->
{next_state, NextState, NewS};
handle_info({timeout, _Timer, bind_timeout}, wait_bind_response, S) ->
close_and_retry(S),
{next_state, connecting, S#eldap{fd = null}};
{next_state, connecting, close_and_retry(S)};
%%
%% Make sure we don't fill the message queue with rubbish
@ -570,6 +557,34 @@ code_change(_OldVsn, StateName, S, _Extra) ->
%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------
dequeue_commands(S) ->
case queue:out(S#eldap.req_q) of
{{value, {Event, From}}, Q} ->
case process_command(S#eldap{req_q=Q}, Event, From) of
{_, active, NewS} ->
dequeue_commands(NewS);
Res ->
Res
end;
{empty, _} ->
{next_state, active, S}
end.
process_command(S, Event, From) ->
case send_command(Event, From, S) of
{ok, NewS} ->
case Event of
{bind, _, _} ->
{next_state, active_bind, NewS};
_ ->
{next_state, active, NewS}
end;
{error, _Reason} ->
Q = queue:in_r({Event, From}, S#eldap.req_q),
NewS = close_and_retry(S#eldap{req_q=Q}),
{next_state, connecting, NewS}
end.
send_command(Command, From, S) ->
Id = bump_id(S),
{Name, Request} = gen_req(Command),
@ -640,6 +655,7 @@ recvd_packet(Pkt, S) ->
Dict = S#eldap.dict,
Id = Msg#'LDAPMessage'.messageID,
{Timer, From, Name, Result_so_far} = get_op_rec(Id, Dict),
Answer =
case {Name, Op} of
{searchRequest, {searchResEntry, R}} when
record(R,'SearchResultEntry') ->
@ -687,14 +703,14 @@ recvd_packet(Pkt, S) ->
New_dict = dict:erase(Id, Dict),
cancel_timer(Timer),
Reply = check_bind_reply(Result, From),
gen_fsm:send_all_state_event(self(), process_bind_q),
{reply, Reply, From, S#eldap{dict = New_dict}};
{OtherName, OtherResult} ->
New_dict = dict:erase(Id, Dict),
cancel_timer(Timer),
{reply, {error, {invalid_result, OtherName, OtherResult}},
From, S#eldap{dict = New_dict}}
end;
end,
{response, Answer, Name};
Error -> Error
end.
@ -775,13 +791,9 @@ check_tag(Data) ->
end.
close_and_retry(S) ->
gen_tcp:close(S#eldap.fd),
retry_connect().
retry_connect() ->
erlang:send_after(?RETRY_TIMEOUT, self(),
{timeout, retry_connect}).
catch gen_tcp:close(S#eldap.fd),
erlang:send_after(?RETRY_TIMEOUT, self(), {timeout, retry_connect}),
S#eldap{fd = null}.
%%-----------------------------------------------------------------------
%% Sort out timed out commands
@ -832,7 +844,8 @@ polish([], Res, Ref) ->
%%-----------------------------------------------------------------------
connect_bind(S) ->
Host = next_host(S#eldap.host, S#eldap.hosts),
TcpOpts = [{packet, asn1}, {active, true}, {keepalive, true}, binary],
TcpOpts = [{packet, asn1}, {active, true}, {keepalive, true},
{send_timeout, ?SEND_TIMEOUT}, binary],
?INFO_MSG("LDAP connection on ~s:~p", [Host, S#eldap.port]),
case gen_tcp:connect(Host, S#eldap.port, TcpOpts) of
{ok, Socket} ->
@ -844,15 +857,16 @@ connect_bind(S) ->
host = Host,
bind_timer = Timer}};
{error, Reason} ->
?ERROR_MSG("LDAP bind failed on ~s:~p~nReason: ~p", [Host, S#eldap.port, Reason]),
gen_tcp:close(Socket),
retry_connect(),
{ok, connecting, S#eldap{host = Host}}
?ERROR_MSG("LDAP bind failed on ~s:~p~nReason: ~p",
[Host, S#eldap.port, Reason]),
NewS = close_and_retry(S),
{ok, connecting, NewS#eldap{host = Host}}
end;
{error, Reason} ->
?ERROR_MSG("LDAP connection failed on ~s:~p~nReason: ~p", [Host, S#eldap.port, Reason]),
retry_connect(),
{ok, connecting, S#eldap{host = Host}}
?ERROR_MSG("LDAP connection failed on ~s:~p~nReason: ~p",
[Host, S#eldap.port, Reason]),
NewS = close_and_retry(S),
{ok, connecting, NewS#eldap{host = Host}}
end.
bind_request(Socket, S) ->
@ -997,13 +1011,3 @@ bump_id(#eldap{id = Id}) when Id > ?MAX_TRANSACTION_ID ->
?MIN_TRANSACTION_ID;
bump_id(#eldap{id = Id}) ->
Id + 1.
close_and_rebind(State, Err) ->
F = fun(_Id, [{Timer, From, _Name}|_]) ->
gen_fsm:reply(From, {error, Err}),
cancel_timer(Timer)
end,
dict:map(F, State#eldap.dict),
connect_bind(State#eldap{fd = null,
dict = dict:new(),
bind_q=queue:new()}).