Make it possible to import without cursor usage

This commit is contained in:
Evgeniy Khramtsov 2013-07-22 10:46:00 +10:00
parent ca6463ed78
commit f75d78d3f5
1 changed files with 38 additions and 22 deletions

View File

@ -103,21 +103,24 @@ import_file(Server, FileName) ->
end.
import(Server, Output) ->
import(Server, Output, [{fast, true}]).
import(Server, Output, Opts) ->
LServer = jlib:nameprep(iolist_to_binary(Server)),
Modules = modules(),
IO = prepare_output(Output, disk_log),
lists:foreach(
fun(Module) ->
import(LServer, IO, Module)
import(LServer, IO, Opts, Module)
end, Modules),
close_output(Output, IO).
import(Server, Output, Module) ->
import(Server, Output, Opts, Module) ->
LServer = jlib:nameprep(iolist_to_binary(Server)),
IO = prepare_output(Output, disk_log),
lists:foreach(
fun({SelectQuery, ConvertFun}) ->
import(LServer, SelectQuery, IO, ConvertFun)
import(LServer, SelectQuery, IO, ConvertFun, Opts)
end, Module:import(Server)),
close_output(Output, IO).
@ -157,31 +160,32 @@ output(_LServer, Table, Fd, SQLs) ->
file:write(Fd, ["-- \n-- Mnesia table: ", atom_to_list(Table),
"\n--\n", SQLs]).
import(LServer, SelectQuery, IO, ConvertFun) ->
F = fun() ->
ejabberd_odbc:sql_query_t(
iolist_to_binary(
[<<"declare c cursor for ">>, SelectQuery])),
fetch(IO, ConvertFun)
import(LServer, SelectQuery, IO, ConvertFun, Opts) ->
F = case proplists:get_bool(fast, Opts) of
true ->
fun() ->
case ejabberd_odbc:sql_query_t(SelectQuery) of
{selected, _, Rows} ->
lists:foldl(fun process_sql_row/2,
{IO, ConvertFun, undefined}, Rows);
Err ->
erlang:error(Err)
end
end;
false ->
fun() ->
ejabberd_odbc:sql_query_t(
iolist_to_binary(
[<<"declare c cursor for ">>, SelectQuery])),
fetch(IO, ConvertFun, undefined)
end
end,
ejabberd_odbc:sql_transaction(LServer, F).
fetch(IO, ConvertFun) ->
fetch(IO, ConvertFun, undefined).
fetch(IO, ConvertFun, PrevRow) ->
case ejabberd_odbc:sql_query_t([<<"fetch c;">>]) of
{selected, _, [Row]} when Row == PrevRow ->
%% Avoid calling ConvertFun with the same input
fetch(IO, ConvertFun, Row);
{selected, _, [Row]} ->
case catch ConvertFun(Row) of
{'EXIT', _} = Err ->
?ERROR_MSG("failed to convert ~p: ~p",
[Row, Err]);
Term ->
ok = disk_log:log(IO#dump.fd, Term)
end,
process_sql_row(Row, {IO, ConvertFun, PrevRow}),
fetch(IO, ConvertFun, Row);
{selected, _, []} ->
ok;
@ -189,6 +193,18 @@ fetch(IO, ConvertFun, PrevRow) ->
erlang:error(Err)
end.
process_sql_row(Row, {IO, ConvertFun, PrevRow}) when Row == PrevRow ->
%% Avoid calling ConvertFun with the same input
{IO, ConvertFun, Row};
process_sql_row(Row, {IO, ConvertFun, _PrevRow}) ->
case catch ConvertFun(Row) of
{'EXIT', _} = Err ->
?ERROR_MSG("failed to convert ~p: ~p", [Row, Err]);
Term ->
ok = disk_log:log(IO#dump.fd, Term)
end,
{IO, ConvertFun, Row}.
import_dump(LServer, Mods, #dump{fd = Fd, cont = Cont}) ->
case disk_log:chunk(Fd, Cont) of
{NewCont, Terms} ->