From 13fad04d147ef4cfde96d07fd3e0985296a170d5 Mon Sep 17 00:00:00 2001 From: Badlop Date: Thu, 22 Jul 2010 18:47:36 +0200 Subject: [PATCH] Fix GS --- src/gen_storage.erl | 36 +++++++- src/gen_storage_migration.erl | 157 ++++++++++++++++++---------------- src/gen_storage_odbc.erl | 61 +++++++++---- 3 files changed, 162 insertions(+), 92 deletions(-) diff --git a/src/gen_storage.erl b/src/gen_storage.erl index 87e8eef9c..414d40c58 100644 --- a/src/gen_storage.erl +++ b/src/gen_storage.erl @@ -15,7 +15,7 @@ dirty_read/3, dirty_write/3, dirty_delete/3, dirty_delete_object/3, dirty_select/3, dirty_count_records/2, dirty_count_records/3, dirty_delete_where/3, - async_dirty/3, + async_dirty/3, sync_dirty/3, transaction/3, write_lock_table/2]). @@ -41,11 +41,12 @@ behaviour_info(callbacks) -> {delete_where, 2}, {dirty_delete_where, 2}, {async_dirty, 2}, + {sync_dirty, 2}, {transaction, 2}]; behaviour_info(_) -> undefined. --type storage_host() :: string(). +-type storage_host() :: binary(). -type storage_table() :: atom(). -type lock_kind() :: read | write | sticky_write. -record(table, {host_name :: {storage_host(), storage_table()}, @@ -54,6 +55,7 @@ behaviour_info(_) -> -record(mnesia_def, {table :: atom(), tabdef :: list()}). +-include("ejabberd.hrl"). % This is used for ERROR_MSG %% Returns all hosts where the table Tab is defined -spec all_table_hosts(atom()) -> @@ -92,7 +94,7 @@ table_info(Host, Tab, InfoKey) -> end. -%% @spec create_table(backend(), Host::string(), Name::atom(), options()) -> {atomic, ok} | {aborted, Reason} +%% @spec create_table(backend(), Host::binary(), Name::atom(), options()) -> {atomic, ok} | {aborted, Reason} %% @type options() = [option()] %% @type option() = {odbc_host, string()} %% | {Table::atom(), [tabdef()]} @@ -532,6 +534,15 @@ write_lock_table(Host, Tab) -> %% Warning: all tabs touched by the transaction must use the same %% storage backend! transaction(Host, Tab, Fun) -> + %% This is just to ensure an error is logged when error appears: + case transaction2(Host, Tab, Fun) of + {atomic, _} = Good -> + Good; + {aborted, Reason} = Bad -> + ?ERROR_MSG("Transaction failed for host ~p in tab ~p with fun ~p:~n~p", [Host, Tab, Fun, Reason]), + Bad + end. +transaction2(Host, Tab, Fun) -> case get_table(Host, Tab) of #table{backend = mnesia} -> mnesia:transaction(Fun); @@ -540,6 +551,18 @@ transaction(Host, Tab, Fun) -> Backend:transaction(Def, Fun) end. +-spec sync_dirty(storage_host(), storage_table(), fun()) -> + {atomic, any()}. +%% Warning: all tabs touched by the sync_dirty must use the same +%% storage backend! +sync_dirty(Host, Tab, Fun) -> + case get_table(Host, Tab) of + #table{backend = mnesia} -> + mnesia:sync_dirty(Fun); + #table{backend = Backend, + def = Def} -> + Backend:sync_dirty(Def, Fun) + end. -spec async_dirty(storage_host(), storage_table(), fun()) -> {atomic, any()}. @@ -555,12 +578,17 @@ async_dirty(Host, Tab, Fun) -> end. +%% TODO: fix the calling code so this function clause isn't needed +get_table(Host, Tab) when is_list(Host) -> + get_table(list_to_binary(Host), Tab); get_table(Host, Tab) -> case mnesia:dirty_read(table, {Host, Tab}) of [T] -> T; _ -> - error_logger:error_msg("gen_storage: Table ~p not found on ~p~n", [Tab, Host]), + catch throw(error123), + Stacktrace = erlang:get_stacktrace(), + error_logger:error_msg("gen_storage: Table ~p not found on ~p~nStacktrace: ~p", [Tab, Host, Stacktrace]), exit(table_not_found) end. diff --git a/src/gen_storage_migration.erl b/src/gen_storage_migration.erl index 55ea45509..7d40658b3 100644 --- a/src/gen_storage_migration.erl +++ b/src/gen_storage_migration.erl @@ -98,7 +98,7 @@ migrate_mnesia1(Host, Table, {OldTable, OldAttributes, MigrateFun}) -> end end, ok, OldTable) end, - {atomic, ok} = mnesia:transaction(F1), + {atomic, _} = mnesia:transaction(F1), mnesia:delete_table(OldTable), ?INFO_MSG("Migration of mnesia table ~p successfully finished", [Table]), ok @@ -135,85 +135,98 @@ migrate_odbc1(Host, Tables, {OldTablesColumns, MigrateFun}) -> {[OldTable | _] = OldTables, [OldColumns | _] = OldColumnsAll} = lists:unzip(OldTablesColumns), OldTablesA = [list_to_atom(Table) || Table <- OldTables], - case [odbc_table_columns_t(OldTable1) - || OldTable1 <- OldTables] of - OldColumnsAll -> - ?INFO_MSG("Migrating ODBC table ~p to gen_storage tables ~p", [OldTable, Tables]), + ColumnsT = [odbc_table_columns_t(OldTable1) || OldTable1 <- OldTables], + migrate_odbc2(Host, Tables, OldTable, OldTables, OldColumns, OldColumnsAll, OldTablesA, ColumnsT, MigrateFun). - %% rename old tables to *_old - lists:foreach(fun(OldTable1) -> - {updated, _} = - ejabberd_odbc:sql_query_t("alter table " ++ OldTable1 ++ - " rename to " ++ OldTable1 ++ "_old") - end, OldTables), - %% recreate new tables - lists:foreach(fun(NewTable) -> - case lists:member(NewTable, OldTablesA) of - true -> - TableInfo = - gen_storage:table_info(Host, NewTable, all), - {value, {_, Backend}} = - lists:keysearch(backend, 1, TableInfo), - gen_storage:create_table(Backend, Host, - NewTable, TableInfo); - false -> ignored - end - end, Tables), +migrate_odbc2(Host, Tables, OldTable, OldTables, OldColumns, OldColumnsAll, OldTablesA, ColumnsT, MigrateFun) + when ColumnsT == OldColumnsAll -> + ?INFO_MSG("Migrating ODBC table ~p to gen_storage tables ~p", [OldTable, Tables]), - SELECT = - fun(Columns, Table, Keys) -> - Table1 = case lists:member(Table, OldTables) of - true -> Table ++ "_old"; - false -> Table - end, - WherePart = case Keys of - [] -> ""; - _ -> " WHERE " ++ - string:join([K ++ "=" ++ - if - is_list(V) -> - "\"" ++ ejabberd_odbc:escape(V) ++ "\""; - is_integer(V) -> - integer_to_list(V) - end - || {K, V} <- Keys], - " AND ") - end, - {selected, _, Rows} = - ejabberd_odbc:sql_query_t("SELECT " ++ string:join(Columns, ", ") ++ - " FROM " ++ Table1 ++ - WherePart), - [tuple_to_list(Row) || Row <- Rows] - end, + %% rename old tables to *_old + lists:foreach(fun(OldTable1) -> + {updated, _} = + ejabberd_odbc:sql_query_t("alter table " ++ OldTable1 ++ + " rename to " ++ OldTable1 ++ "_old") + end, OldTables), + HostB = list_to_binary(Host), + %% recreate new tables + lists:foreach(fun(NewTable) -> + case lists:member(NewTable, OldTablesA) of + true -> + TableInfo = + gen_storage:table_info(Host, NewTable, all), + {value, {_, Backend}} = + lists:keysearch(backend, 1, TableInfo), + gen_storage:create_table(Backend, HostB, + NewTable, TableInfo); + false -> ignored + end + end, Tables), - %% TODO: this will need lots of RAM, make it batched - OldRows = SELECT(OldColumns, OldTable, []), - NRows = - lists:foldl(fun(OldRow, NRow) -> - NewRecords = apply(MigrateFun, [SELECT | OldRow]), - if - is_list(NewRecords) -> - lists:foreach( - fun(NewRecord) -> - %% TODO: gen_storage transaction? - gen_storage:dirty_write(Host, NewRecord) - end, NewRecords); - is_tuple(NewRecords) -> - gen_storage:dirty_write(Host, NewRecords) - end, - NRow + 1 - end, 0, OldRows), + SELECT = + fun(Columns, Table, Keys) -> + Table1 = case lists:member(Table, OldTables) of + true -> Table ++ "_old"; + false -> Table + end, + WherePart = case Keys of + [] -> ""; + _ -> " WHERE " ++ + string:join([K ++ "=" ++ + if + is_list(V) -> + "\"" ++ ejabberd_odbc:escape(V) ++ "\""; + is_integer(V) -> + integer_to_list(V) + end + || {K, V} <- Keys], + " AND ") + end, + {selected, _, Rows} = + ejabberd_odbc:sql_query_t("SELECT " ++ string:join(Columns, ", ") ++ + " FROM " ++ Table1 ++ + WherePart), + [tuple_to_list(Row) || Row <- Rows] + end, - lists:foreach(fun(OldTable1) -> - {updated, _} = ejabberd_odbc:sql_query_t("drop table " ++ OldTable1 ++ "_old") - end, OldTables), + %% TODO: this will need lots of RAM, make it batched + OldRows = SELECT(OldColumns, OldTable, []), + NRows = + lists:foldl(fun(OldRow, NRow) -> + NewRecords = apply(MigrateFun, [SELECT | OldRow]), + if + is_list(NewRecords) -> + lists:foreach( + fun(NewRecord) -> + %% TODO: gen_storage transaction? + gen_storage:dirty_write(HostB, NewRecord) + end, NewRecords); + is_tuple(NewRecords) -> + gen_storage:dirty_write(HostB, NewRecords) + end, + NRow + 1 + end, 0, OldRows), - ?INFO_MSG("Migrated ODBC table ~p to gen_storage tables ~p (~p rows)", [OldTable, Tables, NRows]); - _ -> + lists:foreach(fun(OldTable1) -> + {updated, _} = ejabberd_odbc:sql_query_t("drop table " ++ OldTable1 ++ "_old") + end, OldTables), + + ?INFO_MSG("Migrated ODBC table ~p to gen_storage tables ~p (~p rows)", [OldTable, Tables, NRows]), + ok; + +migrate_odbc2(_Host, _Tables, _OldTable, _OldTables, _OldColumns, _OldColumnsAll, _OldTablesA, [[]], _MigrateFun) -> + ignored; + +migrate_odbc2(Host, Tables, OldTable, OldTables, OldColumns, OldColumnsAll, OldTablesA, [ColumnsTAndCreatedat | MoreCTAC], MigrateFun) + when ColumnsTAndCreatedat /= OldColumnsAll -> + case lists:last(ColumnsTAndCreatedat) of + "created_at" -> + ColumnsT = ColumnsTAndCreatedat -- ["created_at"], + migrate_odbc2(Host, Tables, OldTable, OldTables, OldColumns, OldColumnsAll, OldTablesA, [ColumnsT | MoreCTAC], MigrateFun); + _ -> ignored end. - odbc_table_columns_t(Table) -> case ejabberd_odbc:sql_query_t("select column_name from information_schema.columns where table_name='" ++ Table ++ "'") of {selected, _, Columns1} -> diff --git a/src/gen_storage_odbc.erl b/src/gen_storage_odbc.erl index e3901c438..6da489da2 100644 --- a/src/gen_storage_odbc.erl +++ b/src/gen_storage_odbc.erl @@ -11,7 +11,7 @@ dirty_read/2, dirty_select/2, dirty_count_records/2, dirty_write/2, dirty_delete/2, dirty_delete_object/2, delete_where/2, dirty_delete_where/2, - async_dirty/2, + async_dirty/2, sync_dirty/2, transaction/2]). %% TODO: append 's' to table names in SQL? @@ -27,7 +27,8 @@ }). -record(odbc_cont, {tabdef, sql, offset = 0, limit}). --include_lib("exmpp/include/exmpp.hrl"). % for #jid{} +-include("ejabberd.hrl"). % for ?DEBUG macro +-include_lib("exmpp/include/exmpp.hrl"). % for #jid{} and #xmlel{} table_info(#tabdef{record_name = RecordName, @@ -121,8 +122,11 @@ create_table(#tabdef{name = Tab, fun(Attribute, {Q, K}) -> IsKey = TableType =:= bag orelse Attribute =:= KeyName, - NoTextKeys = IsKey andalso - ejabberd_odbc:db_type(Host) =:= mysql, + %% The "packet" column in the table offline_msg, + %% must be "text" in order to be large enough to + %% contain a full stanza, not limited to a small VARCHAR(): + NoTextKeys = IsKey andalso ejabberd_odbc:db_type(Host) =:= mysql + andalso Attribute /= "packet", KN = tabdef_column_names(TabDef, Attribute), case lists:keysearch(Attribute, 1, Types) of {value, {_, Tuple}} when is_tuple(Tuple) -> @@ -158,7 +162,12 @@ create_table(#tabdef{name = Tab, end, {"", []}, Attributes), TabS = atom_to_list(Tab), PKey = case TableType of - set -> [", PRIMARY KEY (", string:join(K, ", "), ")"]; + %% This 105 limits the size of fields in the primary key. + %% That prevents MySQL from complaining when setting the + %% last_activity key (text, text) with this error: + %% #42000Specified key was too long; max key length is 1000bytes" + %% Similarly for rosteritem and other tables, maybe also PgSQL. + set -> [", PRIMARY KEY (", string:join(K, "(105), "), "(105))"]; bag -> [] end, case odbc_command(Host, @@ -169,7 +178,7 @@ create_table(#tabdef{name = Tab, bag -> KeyColumns = tabdef_column_names(TabDef, KeyName), Q = ["CREATE INDEX ", TabS, "_bag ON ", - TabS, " USING (", string:join(KeyColumns, ", "), $)], + TabS, " (", string:join(KeyColumns, "(75), "), "(75))"], case odbc_command(Host, Q) of ok -> {atomic, ok}; @@ -193,9 +202,11 @@ type_to_sql_type(Type, true = _NoTextKeys) -> end. type_to_sql_type(pid) -> "TEXT"; +type_to_sql_type(xmlel) -> "TEXT"; type_to_sql_type(jid) -> "TEXT"; type_to_sql_type(ljid) -> "TEXT"; type_to_sql_type(atom) -> "TEXT"; +type_to_sql_type(binary) -> "TEXT"; type_to_sql_type(A) when is_atom(A) -> atom_to_list(A). @@ -215,7 +226,7 @@ add_table_index(#tabdef{name = Tab, host = Host} = TabDef, Attribute) -> AttributeS = atom_to_list(Attribute), A = tabdef_column_names(TabDef, AttributeS), Q = ["CREATE INDEX ", TabS, $_, AttributeS, - " ON ", TabS, " USING (", string:join(A, ", "), ")"], + " ON ", TabS, " (", string:join(A, "(75), "), "(75))"], case odbc_command(Host, Q) of ok -> {atomic, ok}; @@ -365,8 +376,9 @@ prepare_match_op(Tab, Op, Column, Value) -> io_lib:format("~s.~s ~s ~s", [Tab, Column, Op, format(Value)]). make_pattern(S) -> - R = make_pattern(S, []), - lists:reverse(R). + make_pattern(S, []). +make_pattern([], R) -> + lists:reverse(R); make_pattern(['_' | S], R) -> make_pattern(S, [$% | R]); make_pattern([C | S], R) -> @@ -411,15 +423,21 @@ row_to_result([Field | Row], [Type | Types], Result) -> text -> Row2 = Row, R = Field; + binary -> + Row2 = Row, + R = list_to_binary(Field); pid -> Row2 = Row, R = list_to_pid(Field); + xmlel -> + Row2 = Row, + [R] = exmpp_xml:parse_document(Field, [names_as_atom]); jid -> Row2 = Row, - R = jlib:string_to_jid(Field); + R = exmpp_jid:parse(Field); ljid -> Row2 = Row, - R = jlib:jid_tolower(jlib:string_to_jid(Field)); + R = jlib:short_prepd_jid(exmpp_jid:parse(Field)); atom -> Row2 = Row, R = list_to_atom(Field); @@ -437,7 +455,7 @@ dirty_count_records(#tabdef{host = Host, [Column | _] = tabdef_column_names(TabDef, KeyAttr), Q = ["SELECT count(", Column, ") FROM ", atom_to_list(Tab), WherePart], - {selected, [_], [{Count}]} = odbc_query(Host, Q), + [{Count}] = odbc_query(Host, Q), list_to_integer(Count). @@ -447,7 +465,7 @@ count_records(#tabdef{attributes = [KeyAttr | _], [Column | _] = tabdef_column_names(TabDef, KeyAttr), Q = ["SELECT count(", Column, ") FROM ", atom_to_list(Tab), WherePart], - {selected, [_], [{Count}]} = odbc_query_t(Q), + [{Count}] = odbc_query_t(Q), list_to_integer(Count). @@ -570,7 +588,7 @@ prepare_insert_command(#tabdef{name = Tab, fun(Attribute, {V, [Value | Values1]}) -> case lists:keysearch(Attribute, 1, Types) of {value, {_, Type}} when is_tuple(Type) -> - io:format("Type for ~p: ~p = ~p~n",[Attribute, Type, Value]), + ?DEBUG("Type for ~p: ~p = ~p~n",[Attribute, Type, Value]), ValueL = tuple_to_list(Value), if length(ValueL) == size(Type) -> @@ -601,6 +619,14 @@ transaction(#tabdef{host = Host}, Fun) -> async_dirty(Tab, Fun) -> transaction(Tab, Fun). +%% Mnesia has sync_dirty, maybe ODBC has something similar: +%% "Call the Fun in a context which is not protected by a transaction." +%% "The difference [with async_dirty] is that the operations are performed +%% synchronously. The caller waits for the updates to be performed on all +%% active replicas before the Fun returns." +sync_dirty(Tab, Fun) -> + transaction(Tab, Fun). + tabdef_column_names(TabDef, Attribute) when is_atom(Attribute) -> tabdef_column_names(TabDef, atom_to_list(Attribute)); tabdef_column_names(#tabdef{column_names = ColumnNames}, Attribute) -> @@ -623,8 +649,11 @@ format(P) when is_pid(P) -> format({jid, _, _, _, _} = JID) -> format(exmpp_jid:to_list(JID)); -format({_, _, _} = LJID) -> - format(exmpp_jid:to_list(LJID)); +format({N, D, R}) when (R==undefined) or (not is_atom(R)) -> + format(exmpp_jid:to_list(N, D, R)); + +format(Xmlel) when is_record(Xmlel, xmlel) -> + format(exmpp_xml:document_to_list(Xmlel)); format(B) when is_binary(B) -> format(binary_to_list(B));