* src/odbc/ejabberd_odbc.erl: Support for mnesia-like transaction

interface
* src/mod_roster_odbc.erl: Updated to use
ejabberd_odbc:sql_transaction/2

SVN Revision: 434
This commit is contained in:
Alexey Shchepin 2005-11-16 02:59:05 +00:00
parent bbfd58a822
commit 57a6d0e1d3
3 changed files with 224 additions and 155 deletions

View File

@ -1,3 +1,10 @@
2005-11-16 Alexey Shchepin <alexey@sevcom.net>
* src/odbc/ejabberd_odbc.erl: Support for mnesia-like transaction
interface
* src/mod_roster_odbc.erl: Updated to use
ejabberd_odbc:sql_transaction/2
2005-11-12 Alexey Shchepin <alexey@sevcom.net> 2005-11-12 Alexey Shchepin <alexey@sevcom.net>
* src/ejabberd_s2s_out.erl: Fixed invalid behaviour upon * src/ejabberd_s2s_out.erl: Fixed invalid behaviour upon

View File

@ -229,70 +229,69 @@ process_item_set(From, To, {xmlelement, _Name, Attrs, Els}) ->
LJID = jlib:jid_tolower(JID1), LJID = jlib:jid_tolower(JID1),
Username = ejabberd_odbc:escape(LUser), Username = ejabberd_odbc:escape(LUser),
SJID = ejabberd_odbc:escape(jlib:jid_to_string(LJID)), SJID = ejabberd_odbc:escape(jlib:jid_to_string(LJID)),
case catch ejabberd_odbc:sql_query( F = fun() ->
LServer, {selected,
["select username, jid, nick, subscription, ask, " ["username", "jid", "nick", "subscription",
"server, subscribe, type from rosterusers " "ask", "server", "subscribe", "type"],
"where username='", Username, "' " Res} =
"and jid='", SJID, "'"]) of ejabberd_odbc:sql_query_t(
{selected, ["username", "jid", "nick", "subscription", "ask", ["select username, jid, nick, subscription, "
"server", "subscribe", "type"], "ask, server, subscribe, type from rosterusers "
Res} -> "where username='", Username, "' "
Item = case Res of "and jid='", SJID, "'"]),
[] -> Item = case Res of
#roster{user = LUser, [] ->
jid = LJID}; #roster{user = LUser,
[I] -> jid = LJID};
(raw_to_record(I))#roster{user = LUser, [I] ->
jid = JID, (raw_to_record(I))#roster{user = LUser,
name = ""} jid = JID,
end, name = ""}
Item1 = process_item_attrs(Item, Attrs), end,
Item2 = process_item_els(Item1, Els), Item1 = process_item_attrs(Item, Attrs),
case Item2#roster.subscription of Item2 = process_item_els(Item1, Els),
case Item2#roster.subscription of
remove ->
ejabberd_odbc:sql_query_t(
["delete from rosterusers "
" where username='", Username, "' "
" and jid='", SJID, "';"
"delete from rostergroups "
" where username='", Username, "' "
" and jid='", SJID, "'"]);
_ ->
ItemVals = record_to_string(Item2),
ItemGroups = groups_to_string(Item2),
ejabberd_odbc:sql_query_t(
["delete from rosterusers "
" where username='", Username, "' "
" and jid='", SJID, "';"
"insert into rosterusers("
" username, jid, nick, "
" subscription, ask, "
" server, subscribe, type) "
" values ", ItemVals, ";"
"delete from rostergroups "
" where username='", Username, "' "
" and jid='", SJID, "';",
[["insert into rostergroups("
" username, jid, grp) "
" values ", ItemGroup, ";"] ||
ItemGroup <- ItemGroups]])
end,
{Item, Item2}
end,
case ejabberd_odbc:sql_transaction(LServer, F) of
{atomic, {OldItem, Item}} ->
push_item(User, LServer, To, Item),
case Item#roster.subscription of
remove -> remove ->
catch ejabberd_odbc:sql_query( IsTo = case OldItem#roster.subscription of
LServer,
["begin;"
"delete from rosterusers "
" where username='", Username, "' "
" and jid='", SJID, "';"
"delete from rostergroups "
" where username='", Username, "' "
" and jid='", SJID, "';"
"commit"]);
_ ->
ItemVals = record_to_string(Item2),
ItemGroups = groups_to_string(Item2),
catch ejabberd_odbc:sql_query(
LServer,
["begin;"
"delete from rosterusers "
" where username='", Username, "' "
" and jid='", SJID, "';"
"insert into rosterusers("
" username, jid, nick, "
" subscription, ask, "
" server, subscribe, type) "
" values ", ItemVals, ";"
"delete from rostergroups "
" where username='", Username, "' "
" and jid='", SJID, "';",
[["insert into rostergroups("
" username, jid, grp) "
" values ", ItemGroup, ";"] ||
ItemGroup <- ItemGroups],
"commit"])
end,
push_item(User, LServer, To, Item2),
case Item2#roster.subscription of
remove ->
IsTo = case Item#roster.subscription of
both -> true; both -> true;
to -> true; to -> true;
_ -> false _ -> false
end, end,
IsFrom = case Item#roster.subscription of IsFrom = case OldItem#roster.subscription of
both -> true; both -> true;
from -> true; from -> true;
_ -> false _ -> false
@ -300,7 +299,7 @@ process_item_set(From, To, {xmlelement, _Name, Attrs, Els}) ->
if IsTo -> if IsTo ->
ejabberd_router:route( ejabberd_router:route(
jlib:jid_remove_resource(From), jlib:jid_remove_resource(From),
jlib:make_jid(Item#roster.jid), jlib:make_jid(OldItem#roster.jid),
{xmlelement, "presence", {xmlelement, "presence",
[{"type", "unsubscribe"}], [{"type", "unsubscribe"}],
[]}); []});
@ -309,7 +308,7 @@ process_item_set(From, To, {xmlelement, _Name, Attrs, Els}) ->
if IsFrom -> if IsFrom ->
ejabberd_router:route( ejabberd_router:route(
jlib:jid_remove_resource(From), jlib:jid_remove_resource(From),
jlib:make_jid(Item#roster.jid), jlib:make_jid(OldItem#roster.jid),
{xmlelement, "presence", {xmlelement, "presence",
[{"type", "unsubscribed"}], [{"type", "unsubscribed"}],
[]}); []});
@ -459,87 +458,96 @@ process_subscription(Direction, User, Server, JID1, Type) ->
LJID = jlib:jid_tolower(JID1), LJID = jlib:jid_tolower(JID1),
Username = ejabberd_odbc:escape(LUser), Username = ejabberd_odbc:escape(LUser),
SJID = ejabberd_odbc:escape(jlib:jid_to_string(LJID)), SJID = ejabberd_odbc:escape(jlib:jid_to_string(LJID)),
Item = case catch ejabberd_odbc:sql_query( F = fun() ->
LServer, Item =
["select username, jid, nick, subscription, ask, " case ejabberd_odbc:sql_query_t(
"server, subscribe, type from rosterusers " ["select username, jid, nick, subscription, ask, "
"where username='", Username, "' " "server, subscribe, type from rosterusers "
"and jid='", SJID, "'"]) of "where username='", Username, "' "
{selected, ["username", "jid", "nick", "subscription", "ask", "and jid='", SJID, "'"]) of
"server", "subscribe", "type"], {selected,
[I]} -> ["username", "jid", "nick", "subscription", "ask",
R = raw_to_record(I), "server", "subscribe", "type"],
Groups = case catch ejabberd_odbc:sql_query( [I]} ->
LServer, R = raw_to_record(I),
["select grp from rostergroups " Groups =
"where username='", Username, "' " case ejabberd_odbc:sql_query_t(
"and jid='", SJID, "'"]) of ["select grp from rostergroups "
{selected, ["grp"], JGrps} when is_list(JGrps) -> "where username='", Username, "' "
[JGrp || {JGrp} <- JGrps]; "and jid='", SJID, "'"]) of
_ -> {selected, ["grp"], JGrps} when is_list(JGrps) ->
[] [JGrp || {JGrp} <- JGrps];
_ ->
[]
end,
R#roster{groups = Groups};
{selected,
["username", "jid", "nick", "subscription", "ask",
"server", "subscribe", "type"],
[]} ->
#roster{user = LUser,
jid = LJID}
end,
NewState = case Direction of
out ->
out_state_change(Item#roster.subscription,
Item#roster.ask,
Type);
in ->
in_state_change(Item#roster.subscription,
Item#roster.ask,
Type)
end,
AutoReply = case Direction of
out ->
none;
in ->
in_auto_reply(Item#roster.subscription,
Item#roster.ask,
Type)
end, end,
R#roster{groups = Groups}; case NewState of
_ -> none ->
#roster{user = LUser, {none, AutoReply};
jid = LJID} {Subscription, Pending} ->
end, NewItem = Item#roster{subscription = Subscription,
NewState = case Direction of ask = Pending},
out -> ItemVals = record_to_string(NewItem),
out_state_change(Item#roster.subscription, ejabberd_odbc:sql_query_t(
Item#roster.ask, ["delete from rosterusers "
Type); " where username='", Username, "' "
in -> " and jid='", SJID, "';"
in_state_change(Item#roster.subscription, "insert into rosterusers("
Item#roster.ask, " username, jid, nick, "
Type) " subscription, ask, "
end, " server, subscribe, type) "
AutoReply = case Direction of " values ", ItemVals]),
out -> {{push, NewItem}, AutoReply}
none; end
in -> end,
in_auto_reply(Item#roster.subscription, case ejabberd_odbc:sql_transaction(LServer, F) of
Item#roster.ask, {atomic, {Push, AutoReply}} ->
Type) case AutoReply of
end, none ->
Push = case NewState of ok;
none -> _ ->
none; T = case AutoReply of
{Subscription, Pending} -> subscribed -> "subscribed";
NewItem = Item#roster{subscription = Subscription, unsubscribed -> "unsubscribed"
ask = Pending}, end,
ItemVals = record_to_string(NewItem), ejabberd_router:route(
catch ejabberd_odbc:sql_query( jlib:make_jid(User, Server, ""), JID1,
LServer, {xmlelement, "presence", [{"type", T}], []})
["begin;" end,
"delete from rosterusers " case Push of
" where username='", Username, "' " {push, Item} ->
" and jid='", SJID, "';" push_item(User, Server,
"insert into rosterusers(" jlib:make_jid("", Server, ""), Item),
" username, jid, nick, " true;
" subscription, ask, " none ->
" server, subscribe, type) " false
" values ", ItemVals, ";" end;
"commit"]),
{push, NewItem}
end,
case AutoReply of
none ->
ok;
_ -> _ ->
T = case AutoReply of
subscribed -> "subscribed";
unsubscribed -> "unsubscribed"
end,
ejabberd_router:route(
jlib:make_jid(User, Server, ""), JID1,
{xmlelement, "presence", [{"type", T}], []})
end,
case Push of
{push, PushItem} ->
push_item(User, Server, jlib:make_jid("", Server, ""), PushItem),
true;
none ->
false false
end. end.
@ -639,14 +647,15 @@ remove_user(User, Server) ->
LUser = jlib:nodeprep(User), LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server), LServer = jlib:nameprep(Server),
Username = ejabberd_odbc:escape(LUser), Username = ejabberd_odbc:escape(LUser),
catch ejabberd_odbc:sql_query( ejabberd_odbc:sql_transaction(
LServer, LServer,
["begin;" fun() ->
"delete from rosterusers " ejabberd_odbc:sql_query_t(
" where username='", Username, "';" ["delete from rosterusers "
"delete from rostergroups " " where username='", Username, "';"
" where username='", Username, "';" "delete from rostergroups "
"commit"]), " where username='", Username, "'"])
end),
ok. ok.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

View File

@ -15,6 +15,8 @@
%% External exports %% External exports
-export([start/1, start_link/1, -export([start/1, start_link/1,
sql_query/2, sql_query/2,
sql_query_t/1,
sql_transaction/2,
escape/1]). escape/1]).
%% gen_server callbacks %% gen_server callbacks
@ -27,6 +29,9 @@
-record(state, {db_ref, db_type}). -record(state, {db_ref, db_type}).
-define(STATE_KEY, ejabberd_odbc_state).
-define(MAX_TRANSACTION_RESTARTS, 10).
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
%%% API %%% API
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
@ -40,6 +45,30 @@ sql_query(Host, Query) ->
gen_server:call(ejabberd_odbc_sup:get_random_pid(Host), gen_server:call(ejabberd_odbc_sup:get_random_pid(Host),
{sql_query, Query}, 60000). {sql_query, Query}, 60000).
sql_transaction(Host, F) ->
gen_server:call(ejabberd_odbc_sup:get_random_pid(Host),
{sql_transaction, F}, 60000).
sql_query_t(Query) ->
State = get(?STATE_KEY),
QRes = sql_query_internal(State, Query),
case QRes of
{error, "No SQL-driver information available."} ->
% workaround for odbc bug
{updated, 0};
{error, _} ->
throw(aborted);
Rs when is_list(Rs) ->
case lists:keymember(error, 1, Rs) of
true ->
throw(aborted);
_ ->
QRes
end;
_ ->
QRes
end.
escape(S) -> escape(S) ->
[case C of [case C of
$\0 -> "\\0"; $\0 -> "\\0";
@ -91,13 +120,13 @@ init([Host]) ->
%% {stop, Reason, State} (terminate/2 is called) %% {stop, Reason, State} (terminate/2 is called)
%%---------------------------------------------------------------------- %%----------------------------------------------------------------------
handle_call({sql_query, Query}, _From, State) -> handle_call({sql_query, Query}, _From, State) ->
Reply = case State#state.db_type of Reply = sql_query_internal(State, Query),
odbc ->
odbc:sql_query(State#state.db_ref, Query);
pgsql ->
pgsql_to_odbc(pgsql:squery(State#state.db_ref, Query))
end,
{reply, Reply, State}; {reply, Reply, State};
handle_call({sql_transaction, F}, _From, State) ->
Reply = execute_transaction(State, F, ?MAX_TRANSACTION_RESTARTS),
{reply, Reply, State};
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
Reply = ok, Reply = ok,
{reply, Reply, State}. {reply, Reply, State}.
@ -136,6 +165,30 @@ terminate(_Reason, _State) ->
%%% Internal functions %%% Internal functions
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
sql_query_internal(State, Query) ->
case State#state.db_type of
odbc ->
odbc:sql_query(State#state.db_ref, Query);
pgsql ->
pgsql_to_odbc(pgsql:squery(State#state.db_ref, Query))
end.
execute_transaction(_State, _F, 0) ->
{aborted, restarts_exceeded};
execute_transaction(State, F, NRestarts) ->
put(?STATE_KEY, State),
sql_query_internal(State, "begin"),
case catch F() of
aborted ->
execute_transaction(State, F, NRestarts - 1);
{'EXIT', Reason} ->
sql_query_internal(State, "rollback"),
{aborted, Reason};
Res ->
sql_query_internal(State, "commit"),
{atomic, Res}
end.
pgsql_to_odbc({ok, PGSQLResult}) -> pgsql_to_odbc({ok, PGSQLResult}) ->
case PGSQLResult of case PGSQLResult of
[Item] -> [Item] ->
@ -149,7 +202,7 @@ pgsql_item_to_odbc({"SELECT", Rows, Recs}) ->
[element(1, Row) || Row <- Rows], [element(1, Row) || Row <- Rows],
[list_to_tuple(Rec) || Rec <- Recs]}; [list_to_tuple(Rec) || Rec <- Recs]};
pgsql_item_to_odbc("INSERT " ++ OIDN) -> pgsql_item_to_odbc("INSERT " ++ OIDN) ->
[OID, N] = string:tokens(OIDN, " "), [_OID, N] = string:tokens(OIDN, " "),
{updated, list_to_integer(N)}; {updated, list_to_integer(N)};
pgsql_item_to_odbc("DELETE " ++ N) -> pgsql_item_to_odbc("DELETE " ++ N) ->
{updated, list_to_integer(N)}; {updated, list_to_integer(N)};