mirror of
https://github.com/processone/ejabberd.git
synced 2024-12-22 17:28:25 +01:00
Update mod_mqtt_mnesia.erl
This commit is contained in:
parent
daf9c99728
commit
0250826cf9
@ -21,12 +21,12 @@
|
||||
%% API
|
||||
-export([init/2, publish/6, delete_published/2, lookup_published/2]).
|
||||
-export([list_topics/1, use_cache/1]).
|
||||
%% Unsupported backend API
|
||||
-export([init/0]).
|
||||
-export([subscribe/4, unsubscribe/2, find_subscriber/2]).
|
||||
-export([open_session/1, close_session/1, lookup_session/1]).
|
||||
|
||||
-include("logger.hrl").
|
||||
-include("mqtt.hrl").
|
||||
|
||||
-record(mqtt_pub, {topic_server :: {binary(), binary()},
|
||||
user :: binary(),
|
||||
@ -40,6 +40,16 @@
|
||||
content_type = <<>> :: binary(),
|
||||
user_properties = [] :: [{binary(), binary()}]}).
|
||||
|
||||
-record(mqtt_sub, {topic :: {binary(), binary(), binary(), binary()},
|
||||
options :: sub_opts(),
|
||||
id :: non_neg_integer(),
|
||||
pid :: pid(),
|
||||
timestamp :: erlang:timestamp()}).
|
||||
|
||||
-record(mqtt_session, {usr :: jid:ljid(),
|
||||
pid :: pid(),
|
||||
timestamp :: erlang:timestamp()}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
@ -107,26 +117,183 @@ list_topics(S) ->
|
||||
{ok, [Topic || {Topic, S1} <- mnesia:dirty_all_keys(mqtt_pub), S1 == S]}.
|
||||
|
||||
init() ->
|
||||
erlang:nif_error(unsupported_db).
|
||||
case mqtree:whereis(mqtt_sub_index) of
|
||||
undefined ->
|
||||
T = mqtree:new(),
|
||||
mqtree:register(mqtt_sub_index, T);
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
try
|
||||
{atomic, ok} = ejabberd_mnesia:create(
|
||||
?MODULE,
|
||||
mqtt_session,
|
||||
[{ram_copies, [node()]},
|
||||
{attributes, record_info(fields, mqtt_session)}]),
|
||||
{atomic, ok} = ejabberd_mnesia:create(
|
||||
?MODULE,
|
||||
mqtt_sub,
|
||||
[{ram_copies, [node()]},
|
||||
{type, ordered_set},
|
||||
{attributes, record_info(fields, mqtt_sub)}]),
|
||||
ok
|
||||
catch _:{badmatch, Err} ->
|
||||
{error, Err}
|
||||
end.
|
||||
|
||||
open_session(_) ->
|
||||
erlang:nif_error(unsupported_db).
|
||||
open_session(USR) ->
|
||||
TS1 = p1_time_compat:unique_timestamp(),
|
||||
P1 = self(),
|
||||
F = fun() ->
|
||||
case mnesia:read(mqtt_session, USR) of
|
||||
[#mqtt_session{pid = P2, timestamp = TS2}] ->
|
||||
if TS1 >= TS2 ->
|
||||
mod_mqtt_session:route(P2, {replaced, P1}),
|
||||
mnesia:write(
|
||||
#mqtt_session{usr = USR,
|
||||
pid = P1,
|
||||
timestamp = TS1});
|
||||
true ->
|
||||
case is_process_dead(P2) of
|
||||
true ->
|
||||
mnesia:write(
|
||||
#mqtt_session{usr = USR,
|
||||
pid = P1,
|
||||
timestamp = TS1});
|
||||
false ->
|
||||
mod_mqtt_session:route(P1, {replaced, P2})
|
||||
end
|
||||
end;
|
||||
[] ->
|
||||
mnesia:write(
|
||||
#mqtt_session{usr = USR,
|
||||
pid = P1,
|
||||
timestamp = TS1})
|
||||
end
|
||||
end,
|
||||
case mnesia:transaction(F) of
|
||||
{atomic, _} -> ok;
|
||||
{aborted, Reason} ->
|
||||
db_fail("Failed to register MQTT session for ~s",
|
||||
Reason, [jid:encode(USR)])
|
||||
end.
|
||||
|
||||
close_session(_) ->
|
||||
erlang:nif_error(unsupported_db).
|
||||
close_session(USR) ->
|
||||
close_session(USR, self()).
|
||||
|
||||
lookup_session(_) ->
|
||||
erlang:nif_error(unsupported_db).
|
||||
lookup_session(USR) ->
|
||||
case mnesia:dirty_read(mqtt_session, USR) of
|
||||
[#mqtt_session{pid = Pid}] ->
|
||||
case is_process_dead(Pid) of
|
||||
true ->
|
||||
%% Read-Repair
|
||||
close_session(USR, Pid),
|
||||
{error, notfound};
|
||||
false ->
|
||||
{ok, Pid}
|
||||
end;
|
||||
[] ->
|
||||
{error, notfound}
|
||||
end.
|
||||
|
||||
subscribe(_, _, _, _) ->
|
||||
erlang:nif_error(unsupported_db).
|
||||
subscribe({U, S, R} = USR, TopicFilter, SubOpts, ID) ->
|
||||
T1 = p1_time_compat:unique_timestamp(),
|
||||
P1 = self(),
|
||||
Key = {TopicFilter, S, U, R},
|
||||
F = fun() ->
|
||||
case mnesia:read(mqtt_sub, Key) of
|
||||
[#mqtt_sub{timestamp = T2}] when T1 < T2 ->
|
||||
ok;
|
||||
_ ->
|
||||
Tree = mqtree:whereis(mqtt_sub_index),
|
||||
mqtree:insert(Tree, TopicFilter),
|
||||
mnesia:write(
|
||||
#mqtt_sub{topic = {TopicFilter, S, U, R},
|
||||
options = SubOpts,
|
||||
id = ID,
|
||||
pid = P1,
|
||||
timestamp = T1})
|
||||
end
|
||||
end,
|
||||
case mnesia:transaction(F) of
|
||||
{atomic, _} -> ok;
|
||||
{abored, Reason} ->
|
||||
db_fail("Failed to subscribe ~s to ~s",
|
||||
Reason, [jid:encode(USR), TopicFilter])
|
||||
end.
|
||||
|
||||
unsubscribe(_, _) ->
|
||||
erlang:nif_error(unsupported_db).
|
||||
unsubscribe({U, S, R} = USR, Topic) ->
|
||||
Pid = self(),
|
||||
F = fun() ->
|
||||
Tree = mqtree:whereis(mqtt_sub_index),
|
||||
mqtree:delete(Tree, Topic),
|
||||
case mnesia:read(mqtt_sub, {Topic, S, U, R}) of
|
||||
[#mqtt_sub{pid = Pid} = Obj] ->
|
||||
mnesia:delete_object(Obj);
|
||||
_ ->
|
||||
ok
|
||||
end
|
||||
end,
|
||||
case mnesia:transaction(F) of
|
||||
{atomic, _} -> ok;
|
||||
{aborted, Reason} ->
|
||||
db_fail("Failed to unsubscribe ~s from ~s",
|
||||
Reason, [jid:encode(USR), Topic])
|
||||
end.
|
||||
|
||||
find_subscriber(_, _) ->
|
||||
erlang:nif_error(unsupported_db).
|
||||
find_subscriber(S, Topic) when is_binary(Topic) ->
|
||||
Tree = mqtree:whereis(mqtt_sub_index),
|
||||
case mqtree:match(Tree, Topic) of
|
||||
[Filter|Filters] ->
|
||||
find_subscriber(S, {Filters, {Filter, S, '_', '_'}});
|
||||
[] ->
|
||||
{error, notfound}
|
||||
end;
|
||||
find_subscriber(S, {Filters, {Filter, S, _, _} = Prev}) ->
|
||||
case mnesia:dirty_next(mqtt_sub, Prev) of
|
||||
{Filter, S, _, _} = Next ->
|
||||
case mnesia:dirty_read(mqtt_sub, Next) of
|
||||
[#mqtt_sub{options = SubOpts, id = ID, pid = Pid}] ->
|
||||
case is_process_dead(Pid) of
|
||||
true ->
|
||||
find_subscriber(S, {Filters, Next});
|
||||
false ->
|
||||
{ok, {Pid, SubOpts, ID}, {Filters, Next}}
|
||||
end;
|
||||
[] ->
|
||||
find_subscriber(S, {Filters, Next})
|
||||
end;
|
||||
_ ->
|
||||
case Filters of
|
||||
[] ->
|
||||
{error, notfound};
|
||||
[Filter1|Filters1] ->
|
||||
find_subscriber(S, {Filters1, {Filter1, S, '_', '_'}})
|
||||
end
|
||||
end.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
close_session(USR, Pid) ->
|
||||
F = fun() ->
|
||||
case mnesia:read(mqtt_session, USR) of
|
||||
[#mqtt_session{pid = Pid} = Obj] ->
|
||||
mnesia:delete_object(Obj);
|
||||
_ ->
|
||||
ok
|
||||
end
|
||||
end,
|
||||
case mnesia:transaction(F) of
|
||||
{atomic, _} -> ok;
|
||||
{aborted, Reason} ->
|
||||
db_fail("Failed to unregister MQTT session for ~s",
|
||||
Reason, [jid:encode(USR)])
|
||||
end.
|
||||
|
||||
is_process_dead(Pid) ->
|
||||
node(Pid) == node() andalso not is_process_alive(Pid).
|
||||
|
||||
db_fail(Format, Reason, Args) ->
|
||||
?ERROR_MSG(Format ++ ": ~p", Args ++ [Reason]),
|
||||
{error, db_failure}.
|
||||
|
Loading…
Reference in New Issue
Block a user