From 57a6d0e1d3670bf405ddee316df1c1f5164edb8c Mon Sep 17 00:00:00 2001 From: Alexey Shchepin Date: Wed, 16 Nov 2005 02:59:05 +0000 Subject: [PATCH] * src/odbc/ejabberd_odbc.erl: Support for mnesia-like transaction interface * src/mod_roster_odbc.erl: Updated to use ejabberd_odbc:sql_transaction/2 SVN Revision: 434 --- ChangeLog | 7 + src/mod_roster_odbc.erl | 305 +++++++++++++++++++------------------ src/odbc/ejabberd_odbc.erl | 67 +++++++- 3 files changed, 224 insertions(+), 155 deletions(-) diff --git a/ChangeLog b/ChangeLog index 148292925..af12efbf8 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,10 @@ +2005-11-16 Alexey Shchepin + + * src/odbc/ejabberd_odbc.erl: Support for mnesia-like transaction + interface + * src/mod_roster_odbc.erl: Updated to use + ejabberd_odbc:sql_transaction/2 + 2005-11-12 Alexey Shchepin * src/ejabberd_s2s_out.erl: Fixed invalid behaviour upon diff --git a/src/mod_roster_odbc.erl b/src/mod_roster_odbc.erl index 36d477d52..66b065473 100644 --- a/src/mod_roster_odbc.erl +++ b/src/mod_roster_odbc.erl @@ -229,70 +229,69 @@ process_item_set(From, To, {xmlelement, _Name, Attrs, Els}) -> LJID = jlib:jid_tolower(JID1), Username = ejabberd_odbc:escape(LUser), SJID = ejabberd_odbc:escape(jlib:jid_to_string(LJID)), - case catch ejabberd_odbc:sql_query( - LServer, - ["select username, jid, nick, subscription, ask, " - "server, subscribe, type from rosterusers " - "where username='", Username, "' " - "and jid='", SJID, "'"]) of - {selected, ["username", "jid", "nick", "subscription", "ask", - "server", "subscribe", "type"], - Res} -> - Item = case Res of - [] -> - #roster{user = LUser, - jid = LJID}; - [I] -> - (raw_to_record(I))#roster{user = LUser, - jid = JID, - name = ""} - end, - Item1 = process_item_attrs(Item, Attrs), - Item2 = process_item_els(Item1, Els), - case Item2#roster.subscription of + F = fun() -> + {selected, + ["username", "jid", "nick", "subscription", + "ask", "server", "subscribe", "type"], + Res} = + ejabberd_odbc:sql_query_t( + ["select username, jid, nick, subscription, " + "ask, server, subscribe, type from rosterusers " + "where username='", Username, "' " + "and jid='", SJID, "'"]), + Item = case Res of + [] -> + #roster{user = LUser, + jid = LJID}; + [I] -> + (raw_to_record(I))#roster{user = LUser, + jid = JID, + name = ""} + end, + Item1 = process_item_attrs(Item, Attrs), + Item2 = process_item_els(Item1, Els), + case Item2#roster.subscription of + remove -> + ejabberd_odbc:sql_query_t( + ["delete from rosterusers " + " where username='", Username, "' " + " and jid='", SJID, "';" + "delete from rostergroups " + " where username='", Username, "' " + " and jid='", SJID, "'"]); + _ -> + ItemVals = record_to_string(Item2), + ItemGroups = groups_to_string(Item2), + ejabberd_odbc:sql_query_t( + ["delete from rosterusers " + " where username='", Username, "' " + " and jid='", SJID, "';" + "insert into rosterusers(" + " username, jid, nick, " + " subscription, ask, " + " server, subscribe, type) " + " values ", ItemVals, ";" + "delete from rostergroups " + " where username='", Username, "' " + " and jid='", SJID, "';", + [["insert into rostergroups(" + " username, jid, grp) " + " values ", ItemGroup, ";"] || + ItemGroup <- ItemGroups]]) + end, + {Item, Item2} + end, + case ejabberd_odbc:sql_transaction(LServer, F) of + {atomic, {OldItem, Item}} -> + push_item(User, LServer, To, Item), + case Item#roster.subscription of remove -> - catch ejabberd_odbc:sql_query( - LServer, - ["begin;" - "delete from rosterusers " - " where username='", Username, "' " - " and jid='", SJID, "';" - "delete from rostergroups " - " where username='", Username, "' " - " and jid='", SJID, "';" - "commit"]); - _ -> - ItemVals = record_to_string(Item2), - ItemGroups = groups_to_string(Item2), - catch ejabberd_odbc:sql_query( - LServer, - ["begin;" - "delete from rosterusers " - " where username='", Username, "' " - " and jid='", SJID, "';" - "insert into rosterusers(" - " username, jid, nick, " - " subscription, ask, " - " server, subscribe, type) " - " values ", ItemVals, ";" - "delete from rostergroups " - " where username='", Username, "' " - " and jid='", SJID, "';", - [["insert into rostergroups(" - " username, jid, grp) " - " values ", ItemGroup, ";"] || - ItemGroup <- ItemGroups], - "commit"]) - end, - push_item(User, LServer, To, Item2), - case Item2#roster.subscription of - remove -> - IsTo = case Item#roster.subscription of + IsTo = case OldItem#roster.subscription of both -> true; to -> true; _ -> false end, - IsFrom = case Item#roster.subscription of + IsFrom = case OldItem#roster.subscription of both -> true; from -> true; _ -> false @@ -300,7 +299,7 @@ process_item_set(From, To, {xmlelement, _Name, Attrs, Els}) -> if IsTo -> ejabberd_router:route( jlib:jid_remove_resource(From), - jlib:make_jid(Item#roster.jid), + jlib:make_jid(OldItem#roster.jid), {xmlelement, "presence", [{"type", "unsubscribe"}], []}); @@ -309,7 +308,7 @@ process_item_set(From, To, {xmlelement, _Name, Attrs, Els}) -> if IsFrom -> ejabberd_router:route( jlib:jid_remove_resource(From), - jlib:make_jid(Item#roster.jid), + jlib:make_jid(OldItem#roster.jid), {xmlelement, "presence", [{"type", "unsubscribed"}], []}); @@ -459,87 +458,96 @@ process_subscription(Direction, User, Server, JID1, Type) -> LJID = jlib:jid_tolower(JID1), Username = ejabberd_odbc:escape(LUser), SJID = ejabberd_odbc:escape(jlib:jid_to_string(LJID)), - Item = case catch ejabberd_odbc:sql_query( - LServer, - ["select username, jid, nick, subscription, ask, " - "server, subscribe, type from rosterusers " - "where username='", Username, "' " - "and jid='", SJID, "'"]) of - {selected, ["username", "jid", "nick", "subscription", "ask", - "server", "subscribe", "type"], - [I]} -> - R = raw_to_record(I), - Groups = case catch ejabberd_odbc:sql_query( - LServer, - ["select grp from rostergroups " - "where username='", Username, "' " - "and jid='", SJID, "'"]) of - {selected, ["grp"], JGrps} when is_list(JGrps) -> - [JGrp || {JGrp} <- JGrps]; - _ -> - [] + F = fun() -> + Item = + case ejabberd_odbc:sql_query_t( + ["select username, jid, nick, subscription, ask, " + "server, subscribe, type from rosterusers " + "where username='", Username, "' " + "and jid='", SJID, "'"]) of + {selected, + ["username", "jid", "nick", "subscription", "ask", + "server", "subscribe", "type"], + [I]} -> + R = raw_to_record(I), + Groups = + case ejabberd_odbc:sql_query_t( + ["select grp from rostergroups " + "where username='", Username, "' " + "and jid='", SJID, "'"]) of + {selected, ["grp"], JGrps} when is_list(JGrps) -> + [JGrp || {JGrp} <- JGrps]; + _ -> + [] + end, + R#roster{groups = Groups}; + {selected, + ["username", "jid", "nick", "subscription", "ask", + "server", "subscribe", "type"], + []} -> + #roster{user = LUser, + jid = LJID} + end, + NewState = case Direction of + out -> + out_state_change(Item#roster.subscription, + Item#roster.ask, + Type); + in -> + in_state_change(Item#roster.subscription, + Item#roster.ask, + Type) + end, + AutoReply = case Direction of + out -> + none; + in -> + in_auto_reply(Item#roster.subscription, + Item#roster.ask, + Type) end, - R#roster{groups = Groups}; - _ -> - #roster{user = LUser, - jid = LJID} - end, - NewState = case Direction of - out -> - out_state_change(Item#roster.subscription, - Item#roster.ask, - Type); - in -> - in_state_change(Item#roster.subscription, - Item#roster.ask, - Type) - end, - AutoReply = case Direction of - out -> - none; - in -> - in_auto_reply(Item#roster.subscription, - Item#roster.ask, - Type) - end, - Push = case NewState of - none -> - none; - {Subscription, Pending} -> - NewItem = Item#roster{subscription = Subscription, - ask = Pending}, - ItemVals = record_to_string(NewItem), - catch ejabberd_odbc:sql_query( - LServer, - ["begin;" - "delete from rosterusers " - " where username='", Username, "' " - " and jid='", SJID, "';" - "insert into rosterusers(" - " username, jid, nick, " - " subscription, ask, " - " server, subscribe, type) " - " values ", ItemVals, ";" - "commit"]), - {push, NewItem} - end, - case AutoReply of - none -> - ok; + case NewState of + none -> + {none, AutoReply}; + {Subscription, Pending} -> + NewItem = Item#roster{subscription = Subscription, + ask = Pending}, + ItemVals = record_to_string(NewItem), + ejabberd_odbc:sql_query_t( + ["delete from rosterusers " + " where username='", Username, "' " + " and jid='", SJID, "';" + "insert into rosterusers(" + " username, jid, nick, " + " subscription, ask, " + " server, subscribe, type) " + " values ", ItemVals]), + {{push, NewItem}, AutoReply} + end + end, + case ejabberd_odbc:sql_transaction(LServer, F) of + {atomic, {Push, AutoReply}} -> + case AutoReply of + none -> + ok; + _ -> + T = case AutoReply of + subscribed -> "subscribed"; + unsubscribed -> "unsubscribed" + end, + ejabberd_router:route( + jlib:make_jid(User, Server, ""), JID1, + {xmlelement, "presence", [{"type", T}], []}) + end, + case Push of + {push, Item} -> + push_item(User, Server, + jlib:make_jid("", Server, ""), Item), + true; + none -> + false + end; _ -> - T = case AutoReply of - subscribed -> "subscribed"; - unsubscribed -> "unsubscribed" - end, - ejabberd_router:route( - jlib:make_jid(User, Server, ""), JID1, - {xmlelement, "presence", [{"type", T}], []}) - end, - case Push of - {push, PushItem} -> - push_item(User, Server, jlib:make_jid("", Server, ""), PushItem), - true; - none -> false end. @@ -639,14 +647,15 @@ remove_user(User, Server) -> LUser = jlib:nodeprep(User), LServer = jlib:nameprep(Server), Username = ejabberd_odbc:escape(LUser), - catch ejabberd_odbc:sql_query( - LServer, - ["begin;" - "delete from rosterusers " - " where username='", Username, "';" - "delete from rostergroups " - " where username='", Username, "';" - "commit"]), + ejabberd_odbc:sql_transaction( + LServer, + fun() -> + ejabberd_odbc:sql_query_t( + ["delete from rosterusers " + " where username='", Username, "';" + "delete from rostergroups " + " where username='", Username, "'"]) + end), ok. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/src/odbc/ejabberd_odbc.erl b/src/odbc/ejabberd_odbc.erl index 7b91abc8a..49540e34e 100644 --- a/src/odbc/ejabberd_odbc.erl +++ b/src/odbc/ejabberd_odbc.erl @@ -15,6 +15,8 @@ %% External exports -export([start/1, start_link/1, sql_query/2, + sql_query_t/1, + sql_transaction/2, escape/1]). %% gen_server callbacks @@ -27,6 +29,9 @@ -record(state, {db_ref, db_type}). +-define(STATE_KEY, ejabberd_odbc_state). +-define(MAX_TRANSACTION_RESTARTS, 10). + %%%---------------------------------------------------------------------- %%% API %%%---------------------------------------------------------------------- @@ -40,6 +45,30 @@ sql_query(Host, Query) -> gen_server:call(ejabberd_odbc_sup:get_random_pid(Host), {sql_query, Query}, 60000). +sql_transaction(Host, F) -> + gen_server:call(ejabberd_odbc_sup:get_random_pid(Host), + {sql_transaction, F}, 60000). + +sql_query_t(Query) -> + State = get(?STATE_KEY), + QRes = sql_query_internal(State, Query), + case QRes of + {error, "No SQL-driver information available."} -> + % workaround for odbc bug + {updated, 0}; + {error, _} -> + throw(aborted); + Rs when is_list(Rs) -> + case lists:keymember(error, 1, Rs) of + true -> + throw(aborted); + _ -> + QRes + end; + _ -> + QRes + end. + escape(S) -> [case C of $\0 -> "\\0"; @@ -91,13 +120,13 @@ init([Host]) -> %% {stop, Reason, State} (terminate/2 is called) %%---------------------------------------------------------------------- handle_call({sql_query, Query}, _From, State) -> - Reply = case State#state.db_type of - odbc -> - odbc:sql_query(State#state.db_ref, Query); - pgsql -> - pgsql_to_odbc(pgsql:squery(State#state.db_ref, Query)) - end, + Reply = sql_query_internal(State, Query), {reply, Reply, State}; + +handle_call({sql_transaction, F}, _From, State) -> + Reply = execute_transaction(State, F, ?MAX_TRANSACTION_RESTARTS), + {reply, Reply, State}; + handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State}. @@ -136,6 +165,30 @@ terminate(_Reason, _State) -> %%% Internal functions %%%---------------------------------------------------------------------- +sql_query_internal(State, Query) -> + case State#state.db_type of + odbc -> + odbc:sql_query(State#state.db_ref, Query); + pgsql -> + pgsql_to_odbc(pgsql:squery(State#state.db_ref, Query)) + end. + +execute_transaction(_State, _F, 0) -> + {aborted, restarts_exceeded}; +execute_transaction(State, F, NRestarts) -> + put(?STATE_KEY, State), + sql_query_internal(State, "begin"), + case catch F() of + aborted -> + execute_transaction(State, F, NRestarts - 1); + {'EXIT', Reason} -> + sql_query_internal(State, "rollback"), + {aborted, Reason}; + Res -> + sql_query_internal(State, "commit"), + {atomic, Res} + end. + pgsql_to_odbc({ok, PGSQLResult}) -> case PGSQLResult of [Item] -> @@ -149,7 +202,7 @@ pgsql_item_to_odbc({"SELECT", Rows, Recs}) -> [element(1, Row) || Row <- Rows], [list_to_tuple(Rec) || Rec <- Recs]}; pgsql_item_to_odbc("INSERT " ++ OIDN) -> - [OID, N] = string:tokens(OIDN, " "), + [_OID, N] = string:tokens(OIDN, " "), {updated, list_to_integer(N)}; pgsql_item_to_odbc("DELETE " ++ N) -> {updated, list_to_integer(N)};