25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-11-24 16:23:40 +01:00

Add alternate version of mysql upsert

This one works by issuing select and then insert or update or skip depending
on what select returns. We use this on mysql 5.7.26 and 8.0.20 where
previous implementation using 'replace' or 'on conflict update' can cause
excessive deadlocks.
This commit is contained in:
Paweł Chmielowski 2023-06-07 15:54:19 +02:00
parent 3eecf4ae8a
commit bb8e892323
2 changed files with 95 additions and 2 deletions

View File

@ -73,7 +73,7 @@
-record(state, -record(state,
{db_ref :: undefined | pid(), {db_ref :: undefined | pid(),
db_type = odbc :: pgsql | mysql | sqlite | odbc | mssql, db_type = odbc :: pgsql | mysql | sqlite | odbc | mssql,
db_version :: undefined | non_neg_integer(), db_version :: undefined | non_neg_integer() | {non_neg_integer(), atom(), non_neg_integer()},
reconnect_count = 0 :: non_neg_integer(), reconnect_count = 0 :: non_neg_integer(),
host :: binary(), host :: binary(),
pending_requests :: p1_queue:queue(), pending_requests :: p1_queue:queue(),
@ -1123,9 +1123,38 @@ get_db_version(#state{db_type = pgsql} = State) ->
?WARNING_MSG("Error getting pgsql version: ~p", [Res]), ?WARNING_MSG("Error getting pgsql version: ~p", [Res]),
State State
end; end;
get_db_version(#state{db_type = mysql} = State) ->
case mysql_to_odbc(p1_mysql_conn:squery(State#state.db_ref,
[<<"select version();">>], self(),
[{timeout, 5000},
{result_type, binary}])) of
{selected, _, [SVersion]} ->
case re:run(SVersion, <<"(\\d+)\\.(\\d+)(?:\\.(\\d+))?(?:-([^-]*))?">>,
[{capture, all_but_first, binary}]) of
{match, [V1, V2, V3, Type]} ->
V = ((bin_to_int(V1)*1000)+bin_to_int(V2))*1000+bin_to_int(V3),
TypeA = binary_to_atom(Type, utf8),
Flags = case TypeA of
'MariaDB' -> 0;
_ when V >= 5007026 andalso V < 8000000 -> 1;
_ when V >= 8000020 -> 1;
_ -> 0
end,
State#state{db_version = {V, TypeA, Flags}};
_ ->
?WARNING_MSG("Error parsing mysql version: ~p", [SVersion]),
State
end;
Res ->
?WARNING_MSG("Error getting mysql version: ~p", [Res]),
State
end;
get_db_version(State) -> get_db_version(State) ->
State. State.
bin_to_int(<<>>) -> 0;
bin_to_int(V) -> binary_to_integer(V).
log(Level, Format, Args) -> log(Level, Format, Args) ->
case Level of case Level of
debug -> ?DEBUG(Format, Args); debug -> ?DEBUG(Format, Args);

View File

@ -567,7 +567,6 @@ parse_upsert_field1([$= | S], Acc, ParamPos, Loc) ->
parse_upsert_field1([C | S], Acc, ParamPos, Loc) -> parse_upsert_field1([C | S], Acc, ParamPos, Loc) ->
parse_upsert_field1(S, [C | Acc], ParamPos, Loc). parse_upsert_field1(S, [C | Acc], ParamPos, Loc).
make_sql_upsert(Table, ParseRes, Pos) -> make_sql_upsert(Table, ParseRes, Pos) ->
check_upsert(ParseRes, Pos), check_upsert(ParseRes, Pos),
erl_syntax:fun_expr( erl_syntax:fun_expr(
@ -587,6 +586,11 @@ make_sql_upsert(Table, ParseRes, Pos) ->
erl_syntax:integer(90100))], erl_syntax:integer(90100))],
[make_sql_upsert_pgsql901(Table, ParseRes), [make_sql_upsert_pgsql901(Table, ParseRes),
erl_syntax:atom(ok)]), erl_syntax:atom(ok)]),
erl_syntax:clause(
[erl_syntax:atom(mysql), erl_syntax:tuple([erl_syntax:underscore(), erl_syntax:underscore(), erl_syntax:integer(1)])],
[],
[make_sql_upsert_mysql_select(Table, ParseRes),
erl_syntax:atom(ok)]),
erl_syntax:clause( erl_syntax:clause(
[erl_syntax:atom(mysql), erl_syntax:underscore()], [erl_syntax:atom(mysql), erl_syntax:underscore()],
[], [],
@ -682,6 +686,66 @@ make_sql_upsert_insert(Table, ParseRes) ->
]), ]),
State. State.
make_sql_upsert_select(Table, ParseRes) ->
{Fields0, Where0} =
lists:foldl(
fun({Field, key, ST}, {Fie, Whe}) ->
{Fie, [ST#state{
'query' = [{str, Field}, {str, "="}] ++ ST#state.'query'}] ++ Whe};
({Field, {true}, ST}, {Fie, Whe}) ->
{[ST#state{
'query' = [{str, Field}, {str, "="}] ++ ST#state.'query'}] ++ Fie, Whe};
(_, Acc) ->
Acc
end, {[], []}, ParseRes),
Fields = join_states(Fields0, " AND "),
Where = join_states(Where0, " AND "),
State =
concat_states(
[#state{'query' = [{str, "SELECT "}],
res_vars = [erl_syntax:variable("__VSel")],
res = [erl_syntax:application(
erl_syntax:atom(ejabberd_sql),
erl_syntax:atom(to_bool),
[erl_syntax:variable("__VSel")])]},
Fields,
#state{'query' = [{str, " FROM "}, {str, Table}, {str, " WHERE "}]},
Where
]),
State.
make_sql_upsert_mysql_select(Table, ParseRes) ->
Select = make_sql_query(make_sql_upsert_select(Table, ParseRes)),
Insert = make_sql_query(make_sql_upsert_insert(Table, ParseRes)),
Update = make_sql_query(make_sql_upsert_update(Table, ParseRes)),
erl_syntax:case_expr(
erl_syntax:application(
erl_syntax:atom(ejabberd_sql),
erl_syntax:atom(sql_query_t),
[Select]),
[erl_syntax:clause(
[erl_syntax:tuple([erl_syntax:atom(selected), erl_syntax:list([])])],
none,
[erl_syntax:application(
erl_syntax:atom(ejabberd_sql),
erl_syntax:atom(sql_query_t),
[Insert])]),
erl_syntax:clause(
[erl_syntax:abstract({selected, [{true}]})],
[],
[erl_syntax:atom(ok)]),
erl_syntax:clause(
[erl_syntax:tuple([erl_syntax:atom(selected), erl_syntax:underscore()])],
none,
[erl_syntax:application(
erl_syntax:atom(ejabberd_sql),
erl_syntax:atom(sql_query_t),
[Update])]),
erl_syntax:clause(
[erl_syntax:variable("__SelectRes")],
none,
[erl_syntax:variable("__SelectRes")])]).
make_sql_upsert_mysql(Table, ParseRes) -> make_sql_upsert_mysql(Table, ParseRes) ->
Vals = Vals =
lists:map( lists:map(