diff --git a/src/mod_mqtt_bridge.erl b/src/mod_mqtt_bridge.erl index 696b781d5..268d2bed7 100644 --- a/src/mod_mqtt_bridge.erl +++ b/src/mod_mqtt_bridge.erl @@ -34,6 +34,15 @@ %%%=================================================================== start(Host, Opts) -> User = mod_mqtt_bridge_opt:replication_user(Opts), + start_servers(User, element(1, mod_mqtt_bridge_opt:servers(Opts))), + ejabberd_hooks:add(mqtt_publish, Host, ?MODULE, mqtt_publish_hook, 50). + +stop(Host) -> + ejabberd_hooks:delete(mqtt_publish, Host, ?MODULE, mqtt_publish_hook, 50), + stop_servers(element(1, mod_mqtt_bridge_opt:servers(Host))), + ok. + +start_servers(User, Servers) -> lists:foldl( fun({Proc, Transport, HostAddr, Port, Path, Publish, Subscribe, Authentication}, Started) -> case Started of @@ -52,31 +61,44 @@ start(Host, Opts) -> ?DEBUG("Starting ~p ~p", [Proc, Res]), Started#{Proc => true} end - end, #{}, element(1, mod_mqtt_bridge_opt:servers(Opts))), - ejabberd_hooks:add(mqtt_publish, Host, ?MODULE, mqtt_publish_hook, 50). + end, #{}, Servers). -stop(Host) -> - lists:foldl( - fun({Proc, _Transport, _Host, _Port, _Publish, _Subscribe, _Authentication}, _) -> +stop_servers(Servers) -> + lists:foreach( + fun({Proc, _Transport, _Host, _Port, _Path, _Publish, _Subscribe, _Authentication}) -> try p1_server:call(Proc, stop) catch _:_ -> ok end, supervisor:terminate_child(ejabberd_gen_mod_sup, Proc), supervisor:delete_child(ejabberd_gen_mod_sup, Proc) - end, #{}, element(1, mod_mqtt_bridge_opt:servers(Host))), - ejabberd_hooks:delete(mqtt_publish, Host, ?MODULE, mqtt_publish_hook, 50). + end, Servers). -reload(_Host, _NewOpts, _OldOpts) -> +reload(_Host, NewOpts, OldOpts) -> + OldServers = element(1, mod_mqtt_bridge_opt:servers(OldOpts)), + NewServers = element(1, mod_mqtt_bridge_opt:servers(NewOpts)), + Deleted = lists:filter( + fun(E) -> not lists:keymember(element(1, E), 1, NewServers) end, + OldServers), + Added = lists:filter( + fun(E) -> not lists:keymember(element(1, E), 1, OldServers) end, + NewServers), + stop_servers(Deleted), + start_servers(mod_mqtt_bridge_opt:replication_user(NewOpts), Added), ok. depends(_Host, _Opts) -> [{mod_mqtt, hard}]. -proc_name(Proto, Host, Port) -> +proc_name(Proto, Host, Port, Path) -> HostB = list_to_binary(Host), TransportB = list_to_binary(Proto), + PathB = case Path of + V when is_list(V) -> + list_to_binary(V); + _ -> <<>> + end, binary_to_atom(<<"mod_mqtt_bridge_", TransportB/binary, "_", HostB/binary, - "_", (integer_to_binary(Port))/binary>>, utf8). + "_", (integer_to_binary(Port))/binary, PathB/binary>>, utf8). -spec mqtt_publish_hook(jid:ljid(), publish(), non_neg_integer()) -> ok. mqtt_publish_hook({_, S, _}, #publish{topic = Topic} = Pkt, _ExpiryTime) -> @@ -136,7 +158,7 @@ mod_opt_type(servers) -> Subscribe = maps:get(subscribe, Opts, #{}), Authentication = maps:get(authentication, Opts, []), Proto = list_to_atom(Scheme), - Proc = proc_name(Scheme, Host, Port), + Proc = proc_name(Scheme, Host, Port, Path), PAcc2 = maps:fold( fun(Topic, _RemoteTopic, Acc) -> maps:update_with(Topic, fun(V) -> [Proc | V] end, [Proc], Acc) diff --git a/src/mod_mqtt_bridge_session.erl b/src/mod_mqtt_bridge_session.erl index eb01f9fb7..d32772317 100644 --- a/src/mod_mqtt_bridge_session.erl +++ b/src/mod_mqtt_bridge_session.erl @@ -123,6 +123,8 @@ init([_Proc, Proto, Host, Port, Path, Publish, Subscribe, Authentication, Replic {stop, {error, <<"Certificate can be only used for encrypted connections">> }} end. +handle_call(stop, _From, State) -> + {stop, normal, ok, State}; handle_call(Request, From, State) -> ?WARNING_MSG("Unexpected call from ~p: ~p", [From, Request]), {noreply, State}.