Support MQTT subscriptions spread over the cluster (#3750)
This commit is contained in:
parent
585afcbc77
commit
1b192987d2
|
@ -22,7 +22,7 @@
|
||||||
-export([init/2, publish/6, delete_published/2, lookup_published/2]).
|
-export([init/2, publish/6, delete_published/2, lookup_published/2]).
|
||||||
-export([list_topics/1, use_cache/1]).
|
-export([list_topics/1, use_cache/1]).
|
||||||
-export([init/0]).
|
-export([init/0]).
|
||||||
-export([subscribe/4, unsubscribe/2, find_subscriber/2]).
|
-export([subscribe/4, unsubscribe/2, find_subscriber/2, mqtree_match/1]).
|
||||||
-export([open_session/1, close_session/1, lookup_session/1, get_sessions/2]).
|
-export([open_session/1, close_session/1, lookup_session/1, get_sessions/2]).
|
||||||
|
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
|
@ -249,9 +249,16 @@ unsubscribe({U, S, R} = USR, Topic) ->
|
||||||
Reason, [jid:encode(USR), Topic])
|
Reason, [jid:encode(USR), Topic])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
find_subscriber(S, Topic) when is_binary(Topic) ->
|
mqtree_match(Topic) ->
|
||||||
Tree = mqtree:whereis(mqtt_sub_index),
|
Tree = mqtree:whereis(mqtt_sub_index),
|
||||||
case mqtree:match(Tree, Topic) of
|
mqtree:match(Tree, Topic).
|
||||||
|
|
||||||
|
mqtree_multi_match(Topic) ->
|
||||||
|
{Res, []} = ejabberd_cluster:multicall(?MODULE, mqtree_match, [Topic]),
|
||||||
|
lists:umerge(Res).
|
||||||
|
|
||||||
|
find_subscriber(S, Topic) when is_binary(Topic) ->
|
||||||
|
case mqtree_multi_match(Topic) of
|
||||||
[Filter|Filters] ->
|
[Filter|Filters] ->
|
||||||
find_subscriber(S, {Filters, {Filter, S, '_', '_'}});
|
find_subscriber(S, {Filters, {Filter, S, '_', '_'}});
|
||||||
[] ->
|
[] ->
|
||||||
|
|
Loading…
Reference in New Issue