25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-12-22 17:28:25 +01:00

Implement reload in mqtt_bridge

This commit is contained in:
Paweł Chmielowski 2023-01-16 11:22:17 +01:00
parent f2cbe7f3c2
commit 89918865b0
2 changed files with 35 additions and 11 deletions

View File

@ -34,6 +34,15 @@
%%%===================================================================
start(Host, Opts) ->
User = mod_mqtt_bridge_opt:replication_user(Opts),
start_servers(User, element(1, mod_mqtt_bridge_opt:servers(Opts))),
ejabberd_hooks:add(mqtt_publish, Host, ?MODULE, mqtt_publish_hook, 50).
stop(Host) ->
ejabberd_hooks:delete(mqtt_publish, Host, ?MODULE, mqtt_publish_hook, 50),
stop_servers(element(1, mod_mqtt_bridge_opt:servers(Host))),
ok.
start_servers(User, Servers) ->
lists:foldl(
fun({Proc, Transport, HostAddr, Port, Path, Publish, Subscribe, Authentication}, Started) ->
case Started of
@ -52,31 +61,44 @@ start(Host, Opts) ->
?DEBUG("Starting ~p ~p", [Proc, Res]),
Started#{Proc => true}
end
end, #{}, element(1, mod_mqtt_bridge_opt:servers(Opts))),
ejabberd_hooks:add(mqtt_publish, Host, ?MODULE, mqtt_publish_hook, 50).
end, #{}, Servers).
stop(Host) ->
lists:foldl(
fun({Proc, _Transport, _Host, _Port, _Publish, _Subscribe, _Authentication}, _) ->
stop_servers(Servers) ->
lists:foreach(
fun({Proc, _Transport, _Host, _Port, _Path, _Publish, _Subscribe, _Authentication}) ->
try p1_server:call(Proc, stop)
catch _:_ -> ok
end,
supervisor:terminate_child(ejabberd_gen_mod_sup, Proc),
supervisor:delete_child(ejabberd_gen_mod_sup, Proc)
end, #{}, element(1, mod_mqtt_bridge_opt:servers(Host))),
ejabberd_hooks:delete(mqtt_publish, Host, ?MODULE, mqtt_publish_hook, 50).
end, Servers).
reload(_Host, _NewOpts, _OldOpts) ->
reload(_Host, NewOpts, OldOpts) ->
OldServers = element(1, mod_mqtt_bridge_opt:servers(OldOpts)),
NewServers = element(1, mod_mqtt_bridge_opt:servers(NewOpts)),
Deleted = lists:filter(
fun(E) -> not lists:keymember(element(1, E), 1, NewServers) end,
OldServers),
Added = lists:filter(
fun(E) -> not lists:keymember(element(1, E), 1, OldServers) end,
NewServers),
stop_servers(Deleted),
start_servers(mod_mqtt_bridge_opt:replication_user(NewOpts), Added),
ok.
depends(_Host, _Opts) ->
[{mod_mqtt, hard}].
proc_name(Proto, Host, Port) ->
proc_name(Proto, Host, Port, Path) ->
HostB = list_to_binary(Host),
TransportB = list_to_binary(Proto),
PathB = case Path of
V when is_list(V) ->
list_to_binary(V);
_ -> <<>>
end,
binary_to_atom(<<"mod_mqtt_bridge_", TransportB/binary, "_", HostB/binary,
"_", (integer_to_binary(Port))/binary>>, utf8).
"_", (integer_to_binary(Port))/binary, PathB/binary>>, utf8).
-spec mqtt_publish_hook(jid:ljid(), publish(), non_neg_integer()) -> ok.
mqtt_publish_hook({_, S, _}, #publish{topic = Topic} = Pkt, _ExpiryTime) ->
@ -136,7 +158,7 @@ mod_opt_type(servers) ->
Subscribe = maps:get(subscribe, Opts, #{}),
Authentication = maps:get(authentication, Opts, []),
Proto = list_to_atom(Scheme),
Proc = proc_name(Scheme, Host, Port),
Proc = proc_name(Scheme, Host, Port, Path),
PAcc2 = maps:fold(
fun(Topic, _RemoteTopic, Acc) ->
maps:update_with(Topic, fun(V) -> [Proc | V] end, [Proc], Acc)

View File

@ -123,6 +123,8 @@ init([_Proc, Proto, Host, Port, Path, Publish, Subscribe, Authentication, Replic
{stop, {error, <<"Certificate can be only used for encrypted connections">> }}
end.
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
handle_call(Request, From, State) ->
?WARNING_MSG("Unexpected call from ~p: ~p", [From, Request]),
{noreply, State}.