From bb8e8923232ce2097fecd1b2a7d422a1debf2041 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chmielowski?= Date: Wed, 7 Jun 2023 15:54:19 +0200 Subject: [PATCH] Add alternate version of mysql upsert This one works by issuing select and then insert or update or skip depending on what select returns. We use this on mysql 5.7.26 and 8.0.20 where previous implementation using 'replace' or 'on conflict update' can cause excessive deadlocks. --- src/ejabberd_sql.erl | 31 ++++++++++++++++++- src/ejabberd_sql_pt.erl | 66 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 95 insertions(+), 2 deletions(-) diff --git a/src/ejabberd_sql.erl b/src/ejabberd_sql.erl index 4e92e0574..35b77d3d9 100644 --- a/src/ejabberd_sql.erl +++ b/src/ejabberd_sql.erl @@ -73,7 +73,7 @@ -record(state, {db_ref :: undefined | pid(), db_type = odbc :: pgsql | mysql | sqlite | odbc | mssql, - db_version :: undefined | non_neg_integer(), + db_version :: undefined | non_neg_integer() | {non_neg_integer(), atom(), non_neg_integer()}, reconnect_count = 0 :: non_neg_integer(), host :: binary(), pending_requests :: p1_queue:queue(), @@ -1123,9 +1123,38 @@ get_db_version(#state{db_type = pgsql} = State) -> ?WARNING_MSG("Error getting pgsql version: ~p", [Res]), State end; +get_db_version(#state{db_type = mysql} = State) -> + case mysql_to_odbc(p1_mysql_conn:squery(State#state.db_ref, + [<<"select version();">>], self(), + [{timeout, 5000}, + {result_type, binary}])) of + {selected, _, [SVersion]} -> + case re:run(SVersion, <<"(\\d+)\\.(\\d+)(?:\\.(\\d+))?(?:-([^-]*))?">>, + [{capture, all_but_first, binary}]) of + {match, [V1, V2, V3, Type]} -> + V = ((bin_to_int(V1)*1000)+bin_to_int(V2))*1000+bin_to_int(V3), + TypeA = binary_to_atom(Type, utf8), + Flags = case TypeA of + 'MariaDB' -> 0; + _ when V >= 5007026 andalso V < 8000000 -> 1; + _ when V >= 8000020 -> 1; + _ -> 0 + end, + State#state{db_version = {V, TypeA, Flags}}; + _ -> + ?WARNING_MSG("Error parsing mysql version: ~p", [SVersion]), + State + end; + Res -> + ?WARNING_MSG("Error getting mysql version: ~p", [Res]), + State + end; get_db_version(State) -> State. +bin_to_int(<<>>) -> 0; +bin_to_int(V) -> binary_to_integer(V). + log(Level, Format, Args) -> case Level of debug -> ?DEBUG(Format, Args); diff --git a/src/ejabberd_sql_pt.erl b/src/ejabberd_sql_pt.erl index 303ac1503..5d72fde24 100644 --- a/src/ejabberd_sql_pt.erl +++ b/src/ejabberd_sql_pt.erl @@ -567,7 +567,6 @@ parse_upsert_field1([$= | S], Acc, ParamPos, Loc) -> parse_upsert_field1([C | S], Acc, ParamPos, Loc) -> parse_upsert_field1(S, [C | Acc], ParamPos, Loc). - make_sql_upsert(Table, ParseRes, Pos) -> check_upsert(ParseRes, Pos), erl_syntax:fun_expr( @@ -587,6 +586,11 @@ make_sql_upsert(Table, ParseRes, Pos) -> erl_syntax:integer(90100))], [make_sql_upsert_pgsql901(Table, ParseRes), erl_syntax:atom(ok)]), + erl_syntax:clause( + [erl_syntax:atom(mysql), erl_syntax:tuple([erl_syntax:underscore(), erl_syntax:underscore(), erl_syntax:integer(1)])], + [], + [make_sql_upsert_mysql_select(Table, ParseRes), + erl_syntax:atom(ok)]), erl_syntax:clause( [erl_syntax:atom(mysql), erl_syntax:underscore()], [], @@ -682,6 +686,66 @@ make_sql_upsert_insert(Table, ParseRes) -> ]), State. +make_sql_upsert_select(Table, ParseRes) -> + {Fields0, Where0} = + lists:foldl( + fun({Field, key, ST}, {Fie, Whe}) -> + {Fie, [ST#state{ + 'query' = [{str, Field}, {str, "="}] ++ ST#state.'query'}] ++ Whe}; + ({Field, {true}, ST}, {Fie, Whe}) -> + {[ST#state{ + 'query' = [{str, Field}, {str, "="}] ++ ST#state.'query'}] ++ Fie, Whe}; + (_, Acc) -> + Acc + end, {[], []}, ParseRes), + Fields = join_states(Fields0, " AND "), + Where = join_states(Where0, " AND "), + State = + concat_states( + [#state{'query' = [{str, "SELECT "}], + res_vars = [erl_syntax:variable("__VSel")], + res = [erl_syntax:application( + erl_syntax:atom(ejabberd_sql), + erl_syntax:atom(to_bool), + [erl_syntax:variable("__VSel")])]}, + Fields, + #state{'query' = [{str, " FROM "}, {str, Table}, {str, " WHERE "}]}, + Where + ]), + State. + +make_sql_upsert_mysql_select(Table, ParseRes) -> + Select = make_sql_query(make_sql_upsert_select(Table, ParseRes)), + Insert = make_sql_query(make_sql_upsert_insert(Table, ParseRes)), + Update = make_sql_query(make_sql_upsert_update(Table, ParseRes)), + erl_syntax:case_expr( + erl_syntax:application( + erl_syntax:atom(ejabberd_sql), + erl_syntax:atom(sql_query_t), + [Select]), + [erl_syntax:clause( + [erl_syntax:tuple([erl_syntax:atom(selected), erl_syntax:list([])])], + none, + [erl_syntax:application( + erl_syntax:atom(ejabberd_sql), + erl_syntax:atom(sql_query_t), + [Insert])]), + erl_syntax:clause( + [erl_syntax:abstract({selected, [{true}]})], + [], + [erl_syntax:atom(ok)]), + erl_syntax:clause( + [erl_syntax:tuple([erl_syntax:atom(selected), erl_syntax:underscore()])], + none, + [erl_syntax:application( + erl_syntax:atom(ejabberd_sql), + erl_syntax:atom(sql_query_t), + [Update])]), + erl_syntax:clause( + [erl_syntax:variable("__SelectRes")], + none, + [erl_syntax:variable("__SelectRes")])]). + make_sql_upsert_mysql(Table, ParseRes) -> Vals = lists:map(