This commit is contained in:
Badlop 2010-07-22 18:47:36 +02:00
parent eb2ad7e699
commit 13fad04d14
3 changed files with 162 additions and 92 deletions

View File

@ -15,7 +15,7 @@
dirty_read/3, dirty_write/3, dirty_delete/3, dirty_delete_object/3,
dirty_select/3,
dirty_count_records/2, dirty_count_records/3, dirty_delete_where/3,
async_dirty/3,
async_dirty/3, sync_dirty/3,
transaction/3,
write_lock_table/2]).
@ -41,11 +41,12 @@ behaviour_info(callbacks) ->
{delete_where, 2},
{dirty_delete_where, 2},
{async_dirty, 2},
{sync_dirty, 2},
{transaction, 2}];
behaviour_info(_) ->
undefined.
-type storage_host() :: string().
-type storage_host() :: binary().
-type storage_table() :: atom().
-type lock_kind() :: read | write | sticky_write.
-record(table, {host_name :: {storage_host(), storage_table()},
@ -54,6 +55,7 @@ behaviour_info(_) ->
-record(mnesia_def, {table :: atom(),
tabdef :: list()}).
-include("ejabberd.hrl"). % This is used for ERROR_MSG
%% Returns all hosts where the table Tab is defined
-spec all_table_hosts(atom()) ->
@ -92,7 +94,7 @@ table_info(Host, Tab, InfoKey) ->
end.
%% @spec create_table(backend(), Host::string(), Name::atom(), options()) -> {atomic, ok} | {aborted, Reason}
%% @spec create_table(backend(), Host::binary(), Name::atom(), options()) -> {atomic, ok} | {aborted, Reason}
%% @type options() = [option()]
%% @type option() = {odbc_host, string()}
%% | {Table::atom(), [tabdef()]}
@ -532,6 +534,15 @@ write_lock_table(Host, Tab) ->
%% Warning: all tabs touched by the transaction must use the same
%% storage backend!
transaction(Host, Tab, Fun) ->
%% This is just to ensure an error is logged when error appears:
case transaction2(Host, Tab, Fun) of
{atomic, _} = Good ->
Good;
{aborted, Reason} = Bad ->
?ERROR_MSG("Transaction failed for host ~p in tab ~p with fun ~p:~n~p", [Host, Tab, Fun, Reason]),
Bad
end.
transaction2(Host, Tab, Fun) ->
case get_table(Host, Tab) of
#table{backend = mnesia} ->
mnesia:transaction(Fun);
@ -540,6 +551,18 @@ transaction(Host, Tab, Fun) ->
Backend:transaction(Def, Fun)
end.
-spec sync_dirty(storage_host(), storage_table(), fun()) ->
{atomic, any()}.
%% Warning: all tabs touched by the sync_dirty must use the same
%% storage backend!
sync_dirty(Host, Tab, Fun) ->
case get_table(Host, Tab) of
#table{backend = mnesia} ->
mnesia:sync_dirty(Fun);
#table{backend = Backend,
def = Def} ->
Backend:sync_dirty(Def, Fun)
end.
-spec async_dirty(storage_host(), storage_table(), fun()) ->
{atomic, any()}.
@ -555,12 +578,17 @@ async_dirty(Host, Tab, Fun) ->
end.
%% TODO: fix the calling code so this function clause isn't needed
get_table(Host, Tab) when is_list(Host) ->
get_table(list_to_binary(Host), Tab);
get_table(Host, Tab) ->
case mnesia:dirty_read(table, {Host, Tab}) of
[T] ->
T;
_ ->
error_logger:error_msg("gen_storage: Table ~p not found on ~p~n", [Tab, Host]),
catch throw(error123),
Stacktrace = erlang:get_stacktrace(),
error_logger:error_msg("gen_storage: Table ~p not found on ~p~nStacktrace: ~p", [Tab, Host, Stacktrace]),
exit(table_not_found)
end.

View File

@ -98,7 +98,7 @@ migrate_mnesia1(Host, Table, {OldTable, OldAttributes, MigrateFun}) ->
end
end, ok, OldTable)
end,
{atomic, ok} = mnesia:transaction(F1),
{atomic, _} = mnesia:transaction(F1),
mnesia:delete_table(OldTable),
?INFO_MSG("Migration of mnesia table ~p successfully finished", [Table]),
ok
@ -135,85 +135,98 @@ migrate_odbc1(Host, Tables, {OldTablesColumns, MigrateFun}) ->
{[OldTable | _] = OldTables,
[OldColumns | _] = OldColumnsAll} = lists:unzip(OldTablesColumns),
OldTablesA = [list_to_atom(Table) || Table <- OldTables],
case [odbc_table_columns_t(OldTable1)
|| OldTable1 <- OldTables] of
OldColumnsAll ->
?INFO_MSG("Migrating ODBC table ~p to gen_storage tables ~p", [OldTable, Tables]),
ColumnsT = [odbc_table_columns_t(OldTable1) || OldTable1 <- OldTables],
migrate_odbc2(Host, Tables, OldTable, OldTables, OldColumns, OldColumnsAll, OldTablesA, ColumnsT, MigrateFun).
%% rename old tables to *_old
lists:foreach(fun(OldTable1) ->
{updated, _} =
ejabberd_odbc:sql_query_t("alter table " ++ OldTable1 ++
" rename to " ++ OldTable1 ++ "_old")
end, OldTables),
%% recreate new tables
lists:foreach(fun(NewTable) ->
case lists:member(NewTable, OldTablesA) of
true ->
TableInfo =
gen_storage:table_info(Host, NewTable, all),
{value, {_, Backend}} =
lists:keysearch(backend, 1, TableInfo),
gen_storage:create_table(Backend, Host,
NewTable, TableInfo);
false -> ignored
end
end, Tables),
migrate_odbc2(Host, Tables, OldTable, OldTables, OldColumns, OldColumnsAll, OldTablesA, ColumnsT, MigrateFun)
when ColumnsT == OldColumnsAll ->
?INFO_MSG("Migrating ODBC table ~p to gen_storage tables ~p", [OldTable, Tables]),
SELECT =
fun(Columns, Table, Keys) ->
Table1 = case lists:member(Table, OldTables) of
true -> Table ++ "_old";
false -> Table
end,
WherePart = case Keys of
[] -> "";
_ -> " WHERE " ++
string:join([K ++ "=" ++
if
is_list(V) ->
"\"" ++ ejabberd_odbc:escape(V) ++ "\"";
is_integer(V) ->
integer_to_list(V)
end
|| {K, V} <- Keys],
" AND ")
end,
{selected, _, Rows} =
ejabberd_odbc:sql_query_t("SELECT " ++ string:join(Columns, ", ") ++
" FROM " ++ Table1 ++
WherePart),
[tuple_to_list(Row) || Row <- Rows]
end,
%% rename old tables to *_old
lists:foreach(fun(OldTable1) ->
{updated, _} =
ejabberd_odbc:sql_query_t("alter table " ++ OldTable1 ++
" rename to " ++ OldTable1 ++ "_old")
end, OldTables),
HostB = list_to_binary(Host),
%% recreate new tables
lists:foreach(fun(NewTable) ->
case lists:member(NewTable, OldTablesA) of
true ->
TableInfo =
gen_storage:table_info(Host, NewTable, all),
{value, {_, Backend}} =
lists:keysearch(backend, 1, TableInfo),
gen_storage:create_table(Backend, HostB,
NewTable, TableInfo);
false -> ignored
end
end, Tables),
%% TODO: this will need lots of RAM, make it batched
OldRows = SELECT(OldColumns, OldTable, []),
NRows =
lists:foldl(fun(OldRow, NRow) ->
NewRecords = apply(MigrateFun, [SELECT | OldRow]),
if
is_list(NewRecords) ->
lists:foreach(
fun(NewRecord) ->
%% TODO: gen_storage transaction?
gen_storage:dirty_write(Host, NewRecord)
end, NewRecords);
is_tuple(NewRecords) ->
gen_storage:dirty_write(Host, NewRecords)
end,
NRow + 1
end, 0, OldRows),
SELECT =
fun(Columns, Table, Keys) ->
Table1 = case lists:member(Table, OldTables) of
true -> Table ++ "_old";
false -> Table
end,
WherePart = case Keys of
[] -> "";
_ -> " WHERE " ++
string:join([K ++ "=" ++
if
is_list(V) ->
"\"" ++ ejabberd_odbc:escape(V) ++ "\"";
is_integer(V) ->
integer_to_list(V)
end
|| {K, V} <- Keys],
" AND ")
end,
{selected, _, Rows} =
ejabberd_odbc:sql_query_t("SELECT " ++ string:join(Columns, ", ") ++
" FROM " ++ Table1 ++
WherePart),
[tuple_to_list(Row) || Row <- Rows]
end,
lists:foreach(fun(OldTable1) ->
{updated, _} = ejabberd_odbc:sql_query_t("drop table " ++ OldTable1 ++ "_old")
end, OldTables),
%% TODO: this will need lots of RAM, make it batched
OldRows = SELECT(OldColumns, OldTable, []),
NRows =
lists:foldl(fun(OldRow, NRow) ->
NewRecords = apply(MigrateFun, [SELECT | OldRow]),
if
is_list(NewRecords) ->
lists:foreach(
fun(NewRecord) ->
%% TODO: gen_storage transaction?
gen_storage:dirty_write(HostB, NewRecord)
end, NewRecords);
is_tuple(NewRecords) ->
gen_storage:dirty_write(HostB, NewRecords)
end,
NRow + 1
end, 0, OldRows),
?INFO_MSG("Migrated ODBC table ~p to gen_storage tables ~p (~p rows)", [OldTable, Tables, NRows]);
_ ->
lists:foreach(fun(OldTable1) ->
{updated, _} = ejabberd_odbc:sql_query_t("drop table " ++ OldTable1 ++ "_old")
end, OldTables),
?INFO_MSG("Migrated ODBC table ~p to gen_storage tables ~p (~p rows)", [OldTable, Tables, NRows]),
ok;
migrate_odbc2(_Host, _Tables, _OldTable, _OldTables, _OldColumns, _OldColumnsAll, _OldTablesA, [[]], _MigrateFun) ->
ignored;
migrate_odbc2(Host, Tables, OldTable, OldTables, OldColumns, OldColumnsAll, OldTablesA, [ColumnsTAndCreatedat | MoreCTAC], MigrateFun)
when ColumnsTAndCreatedat /= OldColumnsAll ->
case lists:last(ColumnsTAndCreatedat) of
"created_at" ->
ColumnsT = ColumnsTAndCreatedat -- ["created_at"],
migrate_odbc2(Host, Tables, OldTable, OldTables, OldColumns, OldColumnsAll, OldTablesA, [ColumnsT | MoreCTAC], MigrateFun);
_ ->
ignored
end.
odbc_table_columns_t(Table) ->
case ejabberd_odbc:sql_query_t("select column_name from information_schema.columns where table_name='" ++ Table ++ "'") of
{selected, _, Columns1} ->

View File

@ -11,7 +11,7 @@
dirty_read/2, dirty_select/2, dirty_count_records/2, dirty_write/2,
dirty_delete/2, dirty_delete_object/2,
delete_where/2, dirty_delete_where/2,
async_dirty/2,
async_dirty/2, sync_dirty/2,
transaction/2]).
%% TODO: append 's' to table names in SQL?
@ -27,7 +27,8 @@
}).
-record(odbc_cont, {tabdef, sql, offset = 0, limit}).
-include_lib("exmpp/include/exmpp.hrl"). % for #jid{}
-include("ejabberd.hrl"). % for ?DEBUG macro
-include_lib("exmpp/include/exmpp.hrl"). % for #jid{} and #xmlel{}
table_info(#tabdef{record_name = RecordName,
@ -121,8 +122,11 @@ create_table(#tabdef{name = Tab,
fun(Attribute, {Q, K}) ->
IsKey = TableType =:= bag orelse
Attribute =:= KeyName,
NoTextKeys = IsKey andalso
ejabberd_odbc:db_type(Host) =:= mysql,
%% The "packet" column in the table offline_msg,
%% must be "text" in order to be large enough to
%% contain a full stanza, not limited to a small VARCHAR():
NoTextKeys = IsKey andalso ejabberd_odbc:db_type(Host) =:= mysql
andalso Attribute /= "packet",
KN = tabdef_column_names(TabDef, Attribute),
case lists:keysearch(Attribute, 1, Types) of
{value, {_, Tuple}} when is_tuple(Tuple) ->
@ -158,7 +162,12 @@ create_table(#tabdef{name = Tab,
end, {"", []}, Attributes),
TabS = atom_to_list(Tab),
PKey = case TableType of
set -> [", PRIMARY KEY (", string:join(K, ", "), ")"];
%% This 105 limits the size of fields in the primary key.
%% That prevents MySQL from complaining when setting the
%% last_activity key (text, text) with this error:
%% #42000Specified key was too long; max key length is 1000bytes"
%% Similarly for rosteritem and other tables, maybe also PgSQL.
set -> [", PRIMARY KEY (", string:join(K, "(105), "), "(105))"];
bag -> []
end,
case odbc_command(Host,
@ -169,7 +178,7 @@ create_table(#tabdef{name = Tab,
bag ->
KeyColumns = tabdef_column_names(TabDef, KeyName),
Q = ["CREATE INDEX ", TabS, "_bag ON ",
TabS, " USING (", string:join(KeyColumns, ", "), $)],
TabS, " (", string:join(KeyColumns, "(75), "), "(75))"],
case odbc_command(Host, Q) of
ok ->
{atomic, ok};
@ -193,9 +202,11 @@ type_to_sql_type(Type, true = _NoTextKeys) ->
end.
type_to_sql_type(pid) -> "TEXT";
type_to_sql_type(xmlel) -> "TEXT";
type_to_sql_type(jid) -> "TEXT";
type_to_sql_type(ljid) -> "TEXT";
type_to_sql_type(atom) -> "TEXT";
type_to_sql_type(binary) -> "TEXT";
type_to_sql_type(A) when is_atom(A) -> atom_to_list(A).
@ -215,7 +226,7 @@ add_table_index(#tabdef{name = Tab, host = Host} = TabDef, Attribute) ->
AttributeS = atom_to_list(Attribute),
A = tabdef_column_names(TabDef, AttributeS),
Q = ["CREATE INDEX ", TabS, $_, AttributeS,
" ON ", TabS, " USING (", string:join(A, ", "), ")"],
" ON ", TabS, " (", string:join(A, "(75), "), "(75))"],
case odbc_command(Host, Q) of
ok ->
{atomic, ok};
@ -365,8 +376,9 @@ prepare_match_op(Tab, Op, Column, Value) ->
io_lib:format("~s.~s ~s ~s", [Tab, Column, Op, format(Value)]).
make_pattern(S) ->
R = make_pattern(S, []),
lists:reverse(R).
make_pattern(S, []).
make_pattern([], R) ->
lists:reverse(R);
make_pattern(['_' | S], R) ->
make_pattern(S, [$% | R]);
make_pattern([C | S], R) ->
@ -411,15 +423,21 @@ row_to_result([Field | Row], [Type | Types], Result) ->
text ->
Row2 = Row,
R = Field;
binary ->
Row2 = Row,
R = list_to_binary(Field);
pid ->
Row2 = Row,
R = list_to_pid(Field);
xmlel ->
Row2 = Row,
[R] = exmpp_xml:parse_document(Field, [names_as_atom]);
jid ->
Row2 = Row,
R = jlib:string_to_jid(Field);
R = exmpp_jid:parse(Field);
ljid ->
Row2 = Row,
R = jlib:jid_tolower(jlib:string_to_jid(Field));
R = jlib:short_prepd_jid(exmpp_jid:parse(Field));
atom ->
Row2 = Row,
R = list_to_atom(Field);
@ -437,7 +455,7 @@ dirty_count_records(#tabdef{host = Host,
[Column | _] = tabdef_column_names(TabDef, KeyAttr),
Q = ["SELECT count(", Column, ") FROM ", atom_to_list(Tab),
WherePart],
{selected, [_], [{Count}]} = odbc_query(Host, Q),
[{Count}] = odbc_query(Host, Q),
list_to_integer(Count).
@ -447,7 +465,7 @@ count_records(#tabdef{attributes = [KeyAttr | _],
[Column | _] = tabdef_column_names(TabDef, KeyAttr),
Q = ["SELECT count(", Column, ") FROM ", atom_to_list(Tab),
WherePart],
{selected, [_], [{Count}]} = odbc_query_t(Q),
[{Count}] = odbc_query_t(Q),
list_to_integer(Count).
@ -570,7 +588,7 @@ prepare_insert_command(#tabdef{name = Tab,
fun(Attribute, {V, [Value | Values1]}) ->
case lists:keysearch(Attribute, 1, Types) of
{value, {_, Type}} when is_tuple(Type) ->
io:format("Type for ~p: ~p = ~p~n",[Attribute, Type, Value]),
?DEBUG("Type for ~p: ~p = ~p~n",[Attribute, Type, Value]),
ValueL = tuple_to_list(Value),
if
length(ValueL) == size(Type) ->
@ -601,6 +619,14 @@ transaction(#tabdef{host = Host}, Fun) ->
async_dirty(Tab, Fun) ->
transaction(Tab, Fun).
%% Mnesia has sync_dirty, maybe ODBC has something similar:
%% "Call the Fun in a context which is not protected by a transaction."
%% "The difference [with async_dirty] is that the operations are performed
%% synchronously. The caller waits for the updates to be performed on all
%% active replicas before the Fun returns."
sync_dirty(Tab, Fun) ->
transaction(Tab, Fun).
tabdef_column_names(TabDef, Attribute) when is_atom(Attribute) ->
tabdef_column_names(TabDef, atom_to_list(Attribute));
tabdef_column_names(#tabdef{column_names = ColumnNames}, Attribute) ->
@ -623,8 +649,11 @@ format(P) when is_pid(P) ->
format({jid, _, _, _, _} = JID) ->
format(exmpp_jid:to_list(JID));
format({_, _, _} = LJID) ->
format(exmpp_jid:to_list(LJID));
format({N, D, R}) when (R==undefined) or (not is_atom(R)) ->
format(exmpp_jid:to_list(N, D, R));
format(Xmlel) when is_record(Xmlel, xmlel) ->
format(exmpp_xml:document_to_list(Xmlel));
format(B) when is_binary(B) ->
format(binary_to_list(B));