-module(gen_storage). -author('stephan@spaceboyz.net'). -export([behaviour_info/1]). -export([all_table_hosts/1, table_info/3, create_table/4, delete_table/2, add_table_copy/4, add_table_index/3, read/2, write/2, delete/2, delete_object/2, read/4, write/4, delete/4, delete_object/4, select/2, select/3, select/4, select/5, count_records/2, count_records/3, delete_where/3, dirty_read/2, dirty_write/2, dirty_delete/2, dirty_delete_object/2, dirty_read/3, dirty_write/3, dirty_delete/3, dirty_delete_object/3, dirty_select/3, dirty_count_records/2, dirty_count_records/3, dirty_delete_where/3, async_dirty/3, sync_dirty/3, transaction/3, write_lock_table/2]). behaviour_info(callbacks) -> [{table_info, 1}, {prepare_tabdef, 2}, {create_table, 1}, {delete_table, 1}, {add_table_copy, 3}, {add_table_index, 2}, {dirty_read, 2}, {read, 3}, {dirty_select, 2}, {select, 3}, {dirty_count_records, 2}, {count_records, 2}, {dirty_write, 2}, {write, 3}, {dirty_delete, 2}, {delete, 3}, {dirty_delete_object, 2}, {delete_object, 3}, {delete_where, 2}, {dirty_delete_where, 2}, {async_dirty, 2}, {sync_dirty, 2}, {transaction, 2}]; behaviour_info(_) -> undefined. -type(storage_host() :: binary()). -type(storage_table() :: atom()). -type(lock_kind() :: read | write | sticky_write). -record(table, {host_name :: {storage_host(), storage_table()}, backend :: atom(), def :: any()}). -record(mnesia_def, {table :: atom(), tabdef :: list()}). -include("ejabberd.hrl"). % This is used for ERROR_MSG %% Returns all hosts where the table Tab is defined -spec all_table_hosts(storage_table()) -> [storage_host()]. all_table_hosts(Tab) -> TT = setelement(2, {table, {<<"hidding_from_dialyzer">>, '$2'}, '_', '_'}, {'$1', '$2'}), Res = (catch mnesia:dirty_select(table, [{TT, [{'=:=', '$2', {const, Tab}}], ['$1']}])), case Res of Res when is_list(Res) -> [HostB || HostB <- Res, is_binary(HostB)]; _ -> [] end. -spec table_info(storage_host(), storage_table(), atom()) -> any(). table_info(Host, Tab, InfoKey) -> Info = case get_table(Host, Tab) of #table{backend = mnesia, def = #mnesia_def{tabdef = Def}} -> [{backend, mnesia} | Def]; #table{backend = Backend, def = Def} -> Info1 = Backend:table_info(Def), BackendName = case Backend of gen_storage_odbc -> odbc end, [{backend, BackendName} | Info1] end, case InfoKey of all -> Info; _ -> case lists:keysearch(InfoKey, 1, Info) of {value, {_, Value}} -> Value; false when InfoKey =:= record_name -> Tab end end. %% @spec create_table(backend(), Host::binary(), Name::atom(), options()) -> {atomic, ok} | {aborted, Reason} %% @type options() = [option()] %% @type option() = {odbc_host, string()} %% | {Table::atom(), [tabdef()]} %% @type tabdef() = {attributes, AtomList} %% | {record_name, atom()} %% | {types, attributedef()} %% @type attributedef() = [{Column::atom(), columndef()}] %% @type columndef() = text %% | int %% | tuple() %% With an arbitrary number of columndef() %% option() is any mnesia option %% columndef() defaults to text for all unspecified attributes -spec create_table(atom(), storage_host(), storage_table(), list()) -> tuple(). create_table(mnesia, Host, Tab, Def) -> MDef = filter_mnesia_tabdef(Def), define_table(mnesia, Host, Tab, #mnesia_def{table = Tab, tabdef = MDef}), mnesia:create_table(Tab, MDef); create_table(odbc, Host, Tab, Def) -> ODef = gen_storage_odbc:prepare_tabdef(Tab, Def), define_table(gen_storage_odbc, Host, Tab, ODef), gen_storage_odbc:create_table(ODef). -spec define_table(atom(), storage_host(), storage_table(), #mnesia_def{} | tuple()) -> ok. define_table(Backend, Host, Name, Def) -> mnesia:create_table(table, [{attributes, record_info(fields, table)}]), mnesia:dirty_write(#table{host_name = {Host, Name}, backend = Backend, def = Def}). %% @spec (list()) -> [{atom(), any()}] -spec filter_mnesia_tabdef(list()) -> [any()]. filter_mnesia_tabdef(TabDef) -> lists:filter(fun filter_mnesia_tabdef_/1, TabDef). filter_mnesia_tabdef_({access_mode, _}) -> true; filter_mnesia_tabdef_({attributes, _}) -> true; filter_mnesia_tabdef_({disc_copies, _}) -> true; filter_mnesia_tabdef_({disc_only_copies, _}) -> true; filter_mnesia_tabdef_({index, _}) -> true; filter_mnesia_tabdef_({load_order, _}) -> true; filter_mnesia_tabdef_({ram_copies, _}) -> true; filter_mnesia_tabdef_({record_name, _}) -> true; filter_mnesia_tabdef_({snmp, _}) -> true; filter_mnesia_tabdef_({type, _}) -> true; filter_mnesia_tabdef_({local_content, _}) -> true; filter_mnesia_tabdef_(_) -> false. -spec delete_table(storage_host(), storage_table()) -> {atomic, ok}. delete_table(Host, Tab) -> backend_apply(delete_table, Host, Tab). -spec add_table_copy(storage_host(), storage_table(), node(), atom()) -> {atomic, ok}. add_table_copy(Host, Tab, Node, Type) -> backend_apply(add_table_copy, Host, Tab, [Node, Type]). -spec add_table_index(storage_host(), storage_table(), atom()) -> {atomic, ok}. add_table_index(Host, Tab, Attribute) -> backend_apply(add_table_index, Host, Tab, [Attribute]). -spec read(storage_host(), {storage_table(), any()}) -> [tuple()]. read(Host, {Tab, Key}) -> backend_apply(read, Host, Tab, [Key, read]). -spec read(storage_host(), storage_table(), any(), lock_kind()) -> [tuple()]. read(Host, Tab, Key, LockKind) -> backend_apply(read, Host, Tab, [Key, LockKind]). -spec dirty_read(storage_host(), {storage_table(), any()}) -> [tuple()]. dirty_read(Host, {Tab, Key}) -> backend_apply(dirty_read, Host, Tab, [Key]). -spec dirty_read(storage_host(), storage_table(), any()) -> [tuple()]. dirty_read(Host, Tab, Key) -> backend_apply(dirty_read, Host, Tab, [Key]). %% select/3 %% If Matchvalue is a tuple, then its size must be == length(string:tokens(Matchrule, "_")) -type(matchvalue() :: '_' | integer() | string() | tuple()). %% | {matchvalue(), matchrule()}. -type(matchrule() :: {'and', matchrule(), matchrule()} | {'andalso', matchrule(), matchrule()} | {'or', matchrule(), matchrule()} | {'orelse', matchrule(), matchrule()} | {'=', Attribute::atom(), matchvalue()} | {'<', Attribute::atom(), matchvalue()} | {'=/=', Attribute::atom(), matchvalue()} | {like, Attribute::atom(), matchvalue()}). %% For the like operator the last element (not the tail as in %% matchspecs) may be '_'. -spec select(storage_host(), storage_table(), [matchrule()]) -> [tuple()]. select(Host, Tab, MatchRules) -> select(Host, Tab, MatchRules, read). -spec select(storage_host(), storage_table(), [matchrule()], lock_kind()) -> [tuple()]. select(Host, Tab, MatchRules, Lock) -> case get_table(Host, Tab) of #table{backend = mnesia}-> MatchSpec = matchrules_to_mnesia_matchspec(Tab, MatchRules), mnesia:select(Tab, MatchSpec, Lock); #table{backend = Backend, def = Def}-> Backend:select(Def, MatchRules, undefined) end. -spec select(storage_host(), storage_table(), [matchrule()], integer(), lock_kind()) -> {[tuple()], any()} | '$end_of_table'. select(Host, Tab, MatchRules, N, Lock) -> case get_table(Host, Tab) of #table{backend = mnesia} -> MatchSpec = matchrules_to_mnesia_matchspec(Tab, MatchRules), mnesia:select(Tab, MatchSpec, N, Lock); #table{backend = Backend, def = Def} -> Backend:select(Def, MatchRules, N) end. -spec select({storage_host(), storage_table()}, any()) -> {[tuple()], any()} | '$end_of_table'. select({Host, Tab}, Cont) -> case get_table(Host, Tab) of #table{backend = mnesia} -> mnesia:select(Cont); #table{backend = Backend} -> Backend:select(Cont) end. -spec dirty_select(storage_host(), storage_table(), [matchrule()]) -> [tuple()]. dirty_select(Host, Tab, MatchRules) -> case get_table(Host, Tab) of #table{backend = mnesia}-> MatchSpec = matchrules_to_mnesia_matchspec(Tab, MatchRules), mnesia:dirty_select(Tab, MatchSpec); #table{backend = Backend, def = Def}-> Backend:dirty_select(Def, MatchRules) end. matchrules_to_mnesia_matchspec(Tab, MatchRules) -> RecordName = mnesia:table_info(Tab, record_name), Attributes = mnesia:table_info(Tab, attributes), %% Build up {record_name, '$1', '$2', ...} MatchHead = list_to_tuple([RecordName | lists:reverse( lists:foldl( fun(_, L) -> A = list_to_atom( [$$ | integer_to_list( length(L) + 1)]), [A | L] end, [], Attributes))]), %% Transform conditions MatchConditions = [matchrules_transform_conditions(Attributes, Rule) || Rule <- MatchRules], %% Always full records MatchBody = ['$_'], [{MatchHead, MatchConditions, MatchBody}]. %% TODO: special handling for '=='? matchrules_transform_conditions(Attributes, {Op, Attribute, Value}) when Op =:= '='; Op =:= '=='; Op =:= '=:='; Op =:= like; Op =:= '=/='; Op =:= '<'; Op =:= '>'; Op =:= '>='; Op =:= '=<' -> Var = case list_find(Attribute, Attributes) of false -> exit(unknown_attribute); N -> list_to_atom([$$ | integer_to_list(N)]) end, if is_tuple(Value) -> {Expr, _} = lists:foldl( fun('_', {R, N}) -> {R, N + 1}; (V, {R, N}) -> {[matchrules_transform_column_op(Op, {element, N, Var}, {const, V}) | R], N + 1} end, {[], 1}, tuple_to_list(Value)), case Expr of [E] -> E; _ -> list_to_tuple(['andalso' | Expr]) end; true -> matchrules_transform_column_op(Op, Var, Value) end; matchrules_transform_conditions(Attributes, T) when is_tuple(T) -> L = tl(tuple_to_list(T)), L2 = [matchrules_transform_conditions(Attributes, E) || E <- L], list_to_tuple([element(1, T) | L2]). matchrules_transform_column_op(like, Expression, Pattern) -> case lists:foldl(fun('_', {R, E1}) -> {R, E1}; (P, {R, E1}) -> Comparision = {'=:=', {hd, E1}, {const, P}}, {[Comparision | R], {tl, E1}} end, {[], Expression}, Pattern) of {[Comparision], _} -> Comparision; {Comparisions, _} -> list_to_tuple(['andalso' | lists:reverse(Comparisions)]) end; matchrules_transform_column_op(Op, Expression, Pattern) when Op =:= '='; Op =:= '=:=' -> {'=:=', Expression, Pattern}; matchrules_transform_column_op(Op, Expression, Pattern) -> {Op, Expression, Pattern}. %% Finds the first occurence of an element in a list list_find(E, L) -> list_find(E, L, 1). list_find(_, [], _) -> false; list_find(E, [E | _], N) -> N; list_find(E, [_ | L], N) -> list_find(E, L, N + 1). -spec dirty_count_records(storage_host(), storage_table()) -> integer(). dirty_count_records(Host, Tab) -> dirty_count_records(Host, Tab, []). -spec dirty_count_records(storage_host(), storage_table(), [matchrule()]) -> integer(). dirty_count_records(Host, Tab, MatchRules) -> case get_table(Host, Tab) of #table{backend = mnesia}-> [{MatchHead, MatchConditions, _}] = matchrules_to_mnesia_matchspec(Tab, MatchRules), MatchSpec = [{MatchHead, MatchConditions, [[]]}], length(mnesia:dirty_select(Tab, MatchSpec)); #table{backend = Backend, def = Def}-> Backend:dirty_count_records(Def, MatchRules) end. -define(COUNT_RECORDS_BATCHSIZE, 100). -spec count_records(storage_host(), storage_table()) -> integer(). count_records(Host, Tab) -> count_records(Host, Tab, []). -spec count_records(storage_host(), storage_table(), [matchrule()]) -> integer(). count_records(Host, Tab, MatchRules) -> case get_table(Host, Tab) of #table{backend = mnesia}-> [{MatchHead, MatchConditions, _}] = matchrules_to_mnesia_matchspec(Tab, MatchRules), MatchSpec = [{MatchHead, MatchConditions, [[]]}], case mnesia:select(Tab, MatchSpec, ?COUNT_RECORDS_BATCHSIZE, read) of {Result, Cont} -> Count = length(Result), mnesia_count_records_cont(Cont, Count); '$end_of_table' -> 0 end; #table{backend = Backend, def = Def}-> Backend:count_records(Def, MatchRules) end. mnesia_count_records_cont(Cont, Count) -> case mnesia:select(Cont) of {Result, Cont} -> NewCount = Count + length(Result), mnesia_count_records_cont(Cont, NewCount); '$end_of_table' -> Count end. -spec write(storage_host(), tuple()) -> ok. write(Host, Rec) -> Tab = element(1, Rec), backend_apply(write, Host, Tab, [Rec, write]). -spec write(storage_host(), storage_table(), tuple(), lock_kind()) -> ok. write(Host, Tab, Rec, LockKind) -> backend_apply(write, Host, Tab, [Rec, LockKind]). -spec dirty_write(storage_host(), tuple()) -> ok. dirty_write(Host, Rec) -> Tab = element(1, Rec), backend_apply(dirty_write, Host, Tab, [Rec]). -spec dirty_write(storage_host(), storage_table(), tuple()) -> ok. dirty_write(Host, Tab, Rec) -> backend_apply(dirty_write, Host, Tab, [Rec]). -spec delete(storage_host(), {storage_table(), any()}) -> ok. delete(Host, {Tab, Key}) -> backend_apply(delete, Host, Tab, [Key, write]). -spec delete(storage_host(), storage_table(), any(), lock_kind()) -> ok. delete(Host, Tab, Key, LockKind) -> backend_apply(delete, Host, Tab, [Key, LockKind]). -spec dirty_delete(storage_host(), {storage_table(), any()}) -> ok. dirty_delete(Host, {Tab, Key}) -> backend_apply(dirty_delete, Host, Tab, [Key]). -spec dirty_delete(storage_host(), storage_table(), any()) -> ok. dirty_delete(Host, Tab, Key) -> backend_apply(dirty_delete, Host, Tab, [Key]). -spec delete_object(storage_host(), tuple()) -> ok. delete_object(Host, Rec) -> Tab = element(1, Rec), backend_apply(delete_object, Host, Tab, [Rec, write]). -spec delete_object(storage_host(), storage_table(), tuple(), lock_kind()) -> ok. delete_object(Host, Tab, Rec, LockKind) -> backend_apply(delete_object, Host, Tab, [Rec, LockKind]). -spec dirty_delete_object(storage_host(), tuple()) -> ok. dirty_delete_object(Host, Rec) -> Tab = element(1, Rec), backend_apply(delete_object, Host, Tab, [Rec]). -spec dirty_delete_object(storage_host(), storage_table(), tuple()) -> ok. dirty_delete_object(Host, Tab, Rec) -> backend_apply(delete_object, Host, Tab, [Rec]). -define(DELETE_WHERE_BATCH_SIZE, 100). -spec delete_where(storage_host(), storage_table(), [matchrule()]) -> ok. delete_where(Host, Tab, MatchRules) -> case get_table(Host, Tab) of #table{backend = mnesia} -> MatchSpec = matchrules_to_mnesia_matchspec(Tab, MatchRules), mnesia:write_lock_table(Tab), SR = mnesia:select(Tab, MatchSpec, ?DELETE_WHERE_BATCH_SIZE, write), delete_where_mnesia1(SR); #table{backend = Backend, def = Def} -> Backend:delete_where(Def, MatchRules) end. delete_where_mnesia1('$end_of_table') -> ok; delete_where_mnesia1({Objects, Cont}) -> lists:foreach(fun(Object) -> mnesia:delete_object(Object) end, Objects), delete_where_mnesia1(mnesia:select(Cont)). -spec dirty_delete_where(storage_host(), storage_table(), [matchrule()]) -> ok. dirty_delete_where(Host, Tab, MatchRules) -> case get_table(Host, Tab) of #table{backend = mnesia} -> MatchSpec = matchrules_to_mnesia_matchspec(Tab, MatchRules), F = fun() -> mnesia:write_lock_table(Tab), Objects = mnesia:select(Tab, MatchSpec, write), lists:foreach(fun(Object) -> mnesia:delete_object(Object) end, Objects) end, {atomic, _} = mnesia:transaction(F); #table{backend = Backend, def = Def} -> Backend:dirty_delete_where(Def, MatchRules) end. -spec write_lock_table(storage_host(), storage_table()) -> ok. write_lock_table(Host, Tab) -> case get_table(Host, Tab) of #table{backend = mnesia} -> mnesia:write_lock_table(Tab); _ -> ignored end. -spec transaction(storage_host(), storage_table(), fun()) -> {atomic, any()} | {aborted, string()}. %% Warning: all tabs touched by the transaction must use the same %% storage backend! transaction(Host, Tab, Fun) -> %% This is just to ensure an error is logged when error appears: case transaction2(Host, Tab, Fun) of {atomic, _} = Good -> Good; {aborted, Reason} = Bad -> Stacktrace = erlang:get_stacktrace(), ?ERROR_MSG("Transaction failed for host ~p in tab ~p with fun ~p:" "~nReason: ~p~nStacktrace: ~p", [Host, Tab, Fun, Reason, Stacktrace]), Bad end. transaction2(Host, Tab, Fun) -> case get_table(Host, Tab) of #table{backend = mnesia} -> mnesia:transaction(Fun); #table{backend = Backend, def = Def} -> Backend:transaction(Def, Fun) end. -spec sync_dirty(storage_host(), storage_table(), fun()) -> {atomic, any()}. %% Warning: all tabs touched by the sync_dirty must use the same %% storage backend! sync_dirty(Host, Tab, Fun) -> case get_table(Host, Tab) of #table{backend = mnesia} -> mnesia:sync_dirty(Fun); #table{backend = Backend, def = Def} -> Backend:sync_dirty(Def, Fun) end. -spec async_dirty(storage_host(), storage_table(), fun()) -> {atomic, any()}. %% Warning: all tabs touched by the async_dirty must use the same %% storage backend! async_dirty(Host, Tab, Fun) -> case get_table(Host, Tab) of #table{backend = mnesia} -> mnesia:async_dirty(Fun); #table{backend = Backend, def = Def} -> Backend:async_dirty(Def, Fun) end. %% TODO: fix the calling code so this function clause isn't needed get_table(Host, Tab) when is_list(Host) -> get_table(list_to_binary(Host), Tab); get_table(Host, Tab) -> case {mnesia:dirty_read(table, {Host, Tab}), Host} of {[T], _} -> T; {_, Host} when is_binary(Host) -> Prefix = lists:nth(1, string:tokens(binary_to_list(Host), ".")), get_table({global, Prefix}, Tab); {_, {global, _Prefix}} -> get_table(global, Tab); {_, global} -> catch throw(error123), Stacktrace = erlang:get_stacktrace(), error_logger:error_msg("gen_storage: Table ~p not found on ~p~nStacktrace: ~p", [Tab, Host, Stacktrace]), exit(table_not_found) end. backend_apply(F, Host, Tab) -> backend_apply(F, Host, Tab, []). backend_apply(F, Host, Tab, A) -> #table{backend = Backend, def = Def} = get_table(Host, Tab), case Def of #mnesia_def{table = Tab} -> apply(Backend, F, [Tab | A]); _ -> apply(Backend, F, [Def | A]) end.