25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-11-24 16:23:40 +01:00

Add option update_sql_schema_timeout to allow schema update use longer timeouts

This also makes batch of schema updates to single table use transaction,
which should help in not leaving table in inconsistent state if some update
steps fails (unless you use mysql where you can't rollback changes to
table schemas).
This commit is contained in:
Paweł Chmielowski 2024-07-16 15:36:14 +02:00
parent f56739fd9f
commit ead87e3727
4 changed files with 254 additions and 237 deletions

View File

@ -164,6 +164,7 @@
-export([sql_username/0, sql_username/1]). -export([sql_username/0, sql_username/1]).
-export([trusted_proxies/0]). -export([trusted_proxies/0]).
-export([update_sql_schema/0]). -export([update_sql_schema/0]).
-export([update_sql_schema_timeout/0, update_sql_schema_timeout/1]).
-export([use_cache/0, use_cache/1]). -export([use_cache/0, use_cache/1]).
-export([validate_stream/0]). -export([validate_stream/0]).
-export([version/0]). -export([version/0]).
@ -1109,6 +1110,13 @@ trusted_proxies() ->
update_sql_schema() -> update_sql_schema() ->
ejabberd_config:get_option({update_sql_schema, global}). ejabberd_config:get_option({update_sql_schema, global}).
-spec update_sql_schema_timeout() -> 'infinity' | pos_integer().
update_sql_schema_timeout() ->
update_sql_schema_timeout(global).
-spec update_sql_schema_timeout(global | binary()) -> 'infinity' | pos_integer().
update_sql_schema_timeout(Host) ->
ejabberd_config:get_option({update_sql_schema_timeout, Host}).
-spec use_cache() -> boolean(). -spec use_cache() -> boolean().
use_cache() -> use_cache() ->
use_cache(global). use_cache(global).

View File

@ -264,6 +264,8 @@ opt_type(new_sql_schema) ->
econf:bool(); econf:bool();
opt_type(update_sql_schema) -> opt_type(update_sql_schema) ->
econf:bool(); econf:bool();
opt_type(update_sql_schema_timeout) ->
econf:timeout(second, infinity);
opt_type(oauth_access) -> opt_type(oauth_access) ->
econf:acl(); econf:acl();
opt_type(oauth_cache_life_time) -> opt_type(oauth_cache_life_time) ->
@ -613,6 +615,7 @@ options() ->
{net_ticktime, timer:seconds(60)}, {net_ticktime, timer:seconds(60)},
{new_sql_schema, ?USE_NEW_SQL_SCHEMA_DEFAULT}, {new_sql_schema, ?USE_NEW_SQL_SCHEMA_DEFAULT},
{update_sql_schema, true}, {update_sql_schema, true},
{update_sql_schema_timeout, timer:minutes(5)},
{oauth_access, none}, {oauth_access, none},
{oauth_cache_life_time, {oauth_cache_life_time,
fun(Host) -> ejabberd_config:get_option({cache_life_time, Host}) end}, fun(Host) -> ejabberd_config:get_option({cache_life_time, Host}) end},

View File

@ -929,6 +929,12 @@ doc() ->
"This option was added in ejabberd 23.10, " "This option was added in ejabberd 23.10, "
"and enabled by default since 24.06. " "and enabled by default since 24.06. "
"The default value is 'true'.")}}, "The default value is 'true'.")}},
{update_sql_schema_timeout,
#{value => "timeout()",
note => "added in 24.07",
desc =>
?T("Time allocated to SQL schema update queries. "
"The default value is set to 5 minutes.")}},
{oauth_access, {oauth_access,
#{value => ?T("AccessName"), #{value => ?T("AccessName"),
desc => ?T("By default creating OAuth tokens is not allowed. " desc => ?T("By default creating OAuth tokens is not allowed. "

View File

@ -226,6 +226,13 @@ store_version(Host, Module, Version) ->
["!module=%(SModule)s", ["!module=%(SModule)s",
"version=%(Version)d"]). "version=%(Version)d"]).
store_version_t(Module, Version) ->
SModule = misc:atom_to_binary(Module),
?SQL_UPSERT_T(
"schema_version",
["!module=%(SModule)s",
"version=%(Version)d"]).
table_exists(Host, Table) -> table_exists(Host, Table) ->
ejabberd_sql:sql_query( ejabberd_sql:sql_query(
Host, Host,
@ -398,28 +405,25 @@ get_current_version(Host, Module, Schemas) ->
Version Version
end. end.
sqlite_table_copy(Host, SchemaInfo, Table) -> sqlite_table_copy_t(SchemaInfo, Table) ->
ejabberd_sql:sql_transaction(Host, TableName = Table#sql_table.name,
fun() -> NewTableName = <<"new_", TableName/binary>>,
TableName = Table#sql_table.name, NewTable = Table#sql_table{name = NewTableName},
NewTableName = <<"new_", TableName/binary>>, create_table_t(SchemaInfo, NewTable),
NewTable = Table#sql_table{name = NewTableName}, SQL2 = <<"INSERT INTO ", NewTableName/binary,
create_table_t(SchemaInfo, NewTable), " SELECT * FROM ", TableName/binary>>,
SQL2 = <<"INSERT INTO ", NewTableName/binary, ?INFO_MSG("Copying table ~s to ~s:~n~s~n",
" SELECT * FROM ", TableName/binary>>, [TableName, NewTableName, SQL2]),
?INFO_MSG("Copying table ~s to ~s:~n~s~n", ejabberd_sql:sql_query_t(SQL2),
[TableName, NewTableName, SQL2]), SQL3 = <<"DROP TABLE ", TableName/binary>>,
ejabberd_sql:sql_query_t(SQL2), ?INFO_MSG("Droping old table ~s:~n~s~n",
SQL3 = <<"DROP TABLE ", TableName/binary>>, [TableName, SQL2]),
?INFO_MSG("Droping old table ~s:~n~s~n", ejabberd_sql:sql_query_t(SQL3),
[TableName, SQL2]), SQL4 = <<"ALTER TABLE ", NewTableName/binary,
ejabberd_sql:sql_query_t(SQL3), " RENAME TO ", TableName/binary>>,
SQL4 = <<"ALTER TABLE ", NewTableName/binary, ?INFO_MSG("Renameing table ~s to ~s:~n~s~n",
" RENAME TO ", TableName/binary>>, [NewTableName, TableName, SQL4]),
?INFO_MSG("Renameing table ~s to ~s:~n~s~n", ejabberd_sql:sql_query_t(SQL4).
[NewTableName, TableName, SQL4]),
ejabberd_sql:sql_query_t(SQL4)
end).
format_type(#sql_schema_info{db_type = pgsql}, Column) -> format_type(#sql_schema_info{db_type = pgsql}, Column) ->
case Column#sql_column.type of case Column#sql_column.type of
@ -875,18 +879,18 @@ update_schema(Host, Module, RawSchemas) ->
end. end.
do_update_schema(Host, Module, SchemaInfo, Schema) -> do_update_schema(Host, Module, SchemaInfo, Schema) ->
lists:foreach( F = fun() ->
fun({add_column, TableName, ColumnName}) -> lists:foreach(
{value, Table} = fun({add_column, TableName, ColumnName}) ->
lists:keysearch( {value, Table} =
TableName, #sql_table.name, Schema#sql_schema.tables), lists:keysearch(
{value, Column} = TableName, #sql_table.name, Schema#sql_schema.tables),
lists:keysearch( {value, Column} =
ColumnName, #sql_column.name, Table#sql_table.columns), lists:keysearch(
Res = ColumnName, #sql_column.name, Table#sql_table.columns),
ejabberd_sql:sql_query( Res =
Host, ejabberd_sql:sql_query_t(
fun(DBType, _DBVersion) -> fun(DBType, _DBVersion) ->
Def = format_column_def(SchemaInfo, Column), Def = format_column_def(SchemaInfo, Column),
Default = format_default(SchemaInfo, Column), Default = format_default(SchemaInfo, Column),
SQLs = SQLs =
@ -911,209 +915,205 @@ do_update_schema(Host, Module, SchemaInfo, Schema) ->
ColumnName, ColumnName,
SQLs]), SQLs]),
lists:foreach( lists:foreach(
fun(SQL) -> ejabberd_sql:sql_query_t(SQL) end, fun(SQL) -> ejabberd_sql:sql_query_t(SQL) end,
SQLs) SQLs)
end), end),
case Res of case Res of
{error, Error} -> {error, Error} ->
?ERROR_MSG("Failed to update table ~s: ~p", ?ERROR_MSG("Failed to update table ~s: ~p",
[TableName, Error]), [TableName, Error]),
error(Error); error(Error);
_ -> _ ->
ok ok
end; end;
({drop_column, TableName, ColumnName}) -> ({drop_column, TableName, ColumnName}) ->
Res = Res =
ejabberd_sql:sql_query( ejabberd_sql:sql_query_t(
Host, fun(_DBType, _DBVersion) ->
fun(_DBType, _DBVersion) -> SQL = [<<"ALTER TABLE ">>,
SQL = [<<"ALTER TABLE ">>, TableName,
TableName, <<" DROP COLUMN ">>,
<<" DROP COLUMN ">>, ColumnName,
ColumnName, <<";">>],
<<";">>], ?INFO_MSG("Drop column ~s/~s:~n~s~n",
?INFO_MSG("Drop column ~s/~s:~n~s~n", [TableName,
[TableName, ColumnName,
ColumnName, SQL]),
SQL]), ejabberd_sql:sql_query_t(SQL)
ejabberd_sql:sql_query_t(SQL) end),
end), case Res of
case Res of {error, Error} ->
{error, Error} -> ?ERROR_MSG("Failed to update table ~s: ~p",
?ERROR_MSG("Failed to update table ~s: ~p", [TableName, Error]),
[TableName, Error]), error(Error);
error(Error); _ ->
_ -> ok
ok end;
end; ({create_index, TableName, Columns1}) ->
({create_index, TableName, Columns1}) -> Columns =
Columns = case ejabberd_sql:use_new_schema() of
case ejabberd_sql:use_new_schema() of true ->
true -> Columns1;
Columns1; false ->
false -> lists:delete(
lists:delete( <<"server_host">>, Columns1)
<<"server_host">>, Columns1) end,
end, {value, Table} =
{value, Table} = lists:keysearch(
lists:keysearch( TableName, #sql_table.name, Schema#sql_schema.tables),
TableName, #sql_table.name, Schema#sql_schema.tables), {value, Index} =
{value, Index} = lists:keysearch(
lists:keysearch( Columns, #sql_index.columns, Table#sql_table.indices),
Columns, #sql_index.columns, Table#sql_table.indices), case Index#sql_index.meta of
case Index#sql_index.meta of #{ignore := true} -> ok;
#{ignore := true} -> ok; _ ->
_ -> Res =
Res = ejabberd_sql:sql_query_t(
ejabberd_sql:sql_query( fun() ->
Host, case Index#sql_index.meta of
fun() -> #{primary_key := true} ->
case Index#sql_index.meta of SQL1 = format_add_primary_key(
#{primary_key := true} -> SchemaInfo, Table, Index),
SQL1 = format_add_primary_key( SQL = iolist_to_binary(SQL1),
SchemaInfo, Table, Index), ?INFO_MSG("Add primary key ~s/~p:~n~s~n",
SQL = iolist_to_binary(SQL1), [Table#sql_table.name,
?INFO_MSG("Add primary key ~s/~p:~n~s~n", Index#sql_index.columns,
[Table#sql_table.name, SQL]),
Index#sql_index.columns, ejabberd_sql:sql_query_t(SQL);
SQL]), _ ->
ejabberd_sql:sql_query_t(SQL); SQL1 = format_create_index(
_ -> SchemaInfo, Table, Index),
SQL1 = format_create_index( SQL = iolist_to_binary(SQL1),
SchemaInfo, Table, Index), ?INFO_MSG("Create index ~s/~p:~n~s~n",
SQL = iolist_to_binary(SQL1), [Table#sql_table.name,
?INFO_MSG("Create index ~s/~p:~n~s~n", Index#sql_index.columns,
[Table#sql_table.name, SQL]),
Index#sql_index.columns, ejabberd_sql:sql_query_t(SQL)
SQL]), end
ejabberd_sql:sql_query_t(SQL) end),
end case Res of
end), {error, Error} ->
case Res of ?ERROR_MSG("Failed to update table ~s: ~p",
{error, Error} -> [TableName, Error]),
?ERROR_MSG("Failed to update table ~s: ~p", error(Error);
[TableName, Error]), _ ->
error(Error); ok
_ -> end
ok end;
end ({update_primary_key, TableName, Columns1}) ->
end; Columns =
({update_primary_key, TableName, Columns1}) -> case ejabberd_sql:use_new_schema() of
Columns = true ->
case ejabberd_sql:use_new_schema() of Columns1;
true -> false ->
Columns1; lists:delete(
false -> <<"server_host">>, Columns1)
lists:delete( end,
<<"server_host">>, Columns1) {value, Table} =
end, lists:keysearch(
{value, Table} = TableName, #sql_table.name, Schema#sql_schema.tables),
lists:keysearch( {value, Index} =
TableName, #sql_table.name, Schema#sql_schema.tables), lists:keysearch(
{value, Index} = Columns, #sql_index.columns, Table#sql_table.indices),
lists:keysearch( Res =
Columns, #sql_index.columns, Table#sql_table.indices), case SchemaInfo#sql_schema_info.db_type of
Res = sqlite ->
case SchemaInfo#sql_schema_info.db_type of sqlite_table_copy_t(SchemaInfo, Table);
sqlite -> pgsql ->
sqlite_table_copy(Host, SchemaInfo, Table); TableName = Table#sql_table.name,
pgsql -> SQL1 = [<<"ALTER TABLE ">>, TableName, <<" DROP CONSTRAINT ",
TableName = Table#sql_table.name, TableName/binary, "_pkey, ",
SQL1 = [<<"ALTER TABLE ">>, TableName, <<" DROP CONSTRAINT ", "ADD PRIMARY KEY (">>,
TableName/binary,"_pkey, ", lists:join(
"ADD PRIMARY KEY (">>, <<", ">>,
lists:join( Index#sql_index.columns),
<<", ">>, <<");">>],
Index#sql_index.columns), SQL = iolist_to_binary(SQL1),
<<");">>], ?INFO_MSG("Update primary key ~s/~p:~n~s~n",
SQL = iolist_to_binary(SQL1), [Table#sql_table.name,
?INFO_MSG("Update primary key ~s/~p:~n~s~n", Index#sql_index.columns,
[Table#sql_table.name, SQL]),
Index#sql_index.columns, ejabberd_sql:sql_query_t(
SQL]), fun(_DBType, _DBVersion) ->
ejabberd_sql:sql_query( ejabberd_sql:sql_query_t(SQL)
Host, end);
fun(_DBType, _DBVersion) -> mysql ->
ejabberd_sql:sql_query_t(SQL) TableName = Table#sql_table.name,
end); SQL1 = [<<"ALTER TABLE ">>, TableName, <<" DROP PRIMARY KEY, "
mysql -> "ADD PRIMARY KEY (">>,
TableName = Table#sql_table.name, lists:join(
SQL1 = [<<"ALTER TABLE ">>, TableName, <<" DROP PRIMARY KEY, " <<", ">>,
"ADD PRIMARY KEY (">>, lists:map(
lists:join( fun(Col) ->
<<", ">>, format_mysql_index_column(Table, Col)
lists:map( end, Index#sql_index.columns)),
fun(Col) -> <<");">>],
format_mysql_index_column(Table, Col) SQL = iolist_to_binary(SQL1),
end, Index#sql_index.columns)), ?INFO_MSG("Update primary key ~s/~p:~n~s~n",
<<");">>], [Table#sql_table.name,
SQL = iolist_to_binary(SQL1), Index#sql_index.columns,
?INFO_MSG("Update primary key ~s/~p:~n~s~n", SQL]),
[Table#sql_table.name, ejabberd_sql:sql_query_t(
Index#sql_index.columns, fun(_DBType, _DBVersion) ->
SQL]), ejabberd_sql:sql_query_t(SQL)
ejabberd_sql:sql_query( end)
Host, end,
fun(_DBType, _DBVersion) -> case Res of
ejabberd_sql:sql_query_t(SQL) {error, Error} ->
end) ?ERROR_MSG("Failed to update table ~s: ~p",
end, [TableName, Error]),
case Res of error(Error);
{error, Error} -> _ ->
?ERROR_MSG("Failed to update table ~s: ~p", ok
[TableName, Error]), end;
error(Error); ({drop_index, TableName, Columns1}) ->
_ -> Columns =
ok case ejabberd_sql:use_new_schema() of
end; true ->
({drop_index, TableName, Columns1}) -> Columns1;
Columns = false ->
case ejabberd_sql:use_new_schema() of lists:delete(
true -> <<"server_host">>, Columns1)
Columns1; end,
false -> case find_index_name(Host, TableName, Columns) of
lists:delete( false ->
<<"server_host">>, Columns1) ?ERROR_MSG("Can't find an index to drop for ~s/~p",
end, [TableName, Columns]);
case find_index_name(Host, TableName, Columns) of {ok, IndexName} ->
false -> Res =
?ERROR_MSG("Can't find an index to drop for ~s/~p", ejabberd_sql:sql_query_t(
[TableName, Columns]); fun(DBType, _DBVersion) ->
{ok, IndexName} -> SQL =
Res = case DBType of
ejabberd_sql:sql_query( mysql ->
Host, [<<"DROP INDEX ">>,
fun(DBType, _DBVersion) -> IndexName,
SQL = <<" ON ">>,
case DBType of TableName,
mysql -> <<";">>];
[<<"DROP INDEX ">>, _ ->
IndexName, [<<"DROP INDEX ">>,
<<" ON ">>, IndexName, <<";">>]
TableName, end,
<<";">>]; ?INFO_MSG("Drop index ~s/~p:~n~s~n",
_ -> [TableName,
[<<"DROP INDEX ">>, Columns,
IndexName, <<";">>] SQL]),
end, ejabberd_sql:sql_query_t(SQL)
?INFO_MSG("Drop index ~s/~p:~n~s~n", end),
[TableName, case Res of
Columns, {error, Error} ->
SQL]), ?ERROR_MSG("Failed to update table ~s: ~p",
ejabberd_sql:sql_query_t(SQL) [TableName, Error]),
end), error(Error);
case Res of _ ->
{error, Error} -> ok
?ERROR_MSG("Failed to update table ~s: ~p", end
[TableName, Error]), end
error(Error); end, Schema#sql_schema.update),
_ -> store_version_t(Module, Schema#sql_schema.version)
ok end,
end ejabberd_sql:sql_transaction(Host, F, ejabberd_option:update_sql_schema_timeout(), 1).
end
end, Schema#sql_schema.update),
store_version(Host, Module, Schema#sql_schema.version).
print_schema(SDBType, SDBVersion, SNewSchema) -> print_schema(SDBType, SDBVersion, SNewSchema) ->
{DBType, DBVersion} = {DBType, DBVersion} =