25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-12-26 17:38:45 +01:00

several pubsub fixes and improvements. also fixes EJAB-913 and EJAB-871

SVN Revision: 2061
This commit is contained in:
Christophe Romain 2009-05-07 00:54:44 +00:00
parent 1d3947c0da
commit e40e4298ca
15 changed files with 184 additions and 118 deletions

View File

@ -1,3 +1,38 @@
2009-05-07 Christophe Romain <christophe.romain@process-one.net>
* src/mod_caps.erl: Set debug message to DEBUG (from debian patch,
thanks to Sergei Golovan)
* src/mod_pubsub/mod_pubsub.erl: Remove subscriptions when anonymous
user removed (EJAB-913) (thanks to Andy Skelton)
* src/mod_pubsub/mod_pubsub.erl: Fix disco#items bug on root node,
and fix other minor typo from previous patch.
* src/mod_pubsub/mod_pubsub.erl: Avoid calling get_user_resources
on non local domain when pep_sendlast_offline is enabled
* src/mod_pubsub/mod_pubsub.erl: Reduce send_last_item load and number
of calls to get_caps
* src/mod_pubsub/mod_pubsub.erl: Fix get_entity_* not returning node
* src/mod_pubsub/node_default.erl: Likewise
* src/mod_pubsub/nodetree_default.erl: Likewise
* src/mod_pubsub/mod_pubsub.erl: Retract policy should obey
pubsub#publish_model (EJAB-871) (thanks to Matthew Baron)
* src/mod_pubsub/node_default.erl: Likewise
* src/mod_pubsub/node_mb.erl: Likewise
* src/mod_pubsub/node_dispatch.erl: Likewise
* src/mod_pubsub/node_buddy.erl: Likewise
* src/mod_pubsub/node_private.erl: Likewise
* src/mod_pubsub/node_public.erl: Likewise
* src/mod_pubsub/node_default.erl: Likewise
* src/mod_pubsub/node_pep.erl: Likewise
* src/mod_pubsub/node_club.erl: Likewise
* src/mod_pubsub/node_flat.erl: Likewise
* src/mod_pubsub/node.template: Likewise
2009-05-06 Badlop <badlop@process-one.net>
* src/ejabberd_c2s.erl: Replace TYPE/1 with is_TYPE/1 (EJAB-922)

View File

@ -192,9 +192,6 @@ receive_packet(_, _, _) ->
receive_packet(_JID, From, To, Packet) ->
receive_packet(From, To, Packet).
jid_to_binary(JID) ->
list_to_binary(jlib:jid_to_string(JID)).
caps_to_binary(#caps{node = Node, version = Version, exts = Exts}) ->
BExts = [list_to_binary(Ext) || Ext <- Exts],
#caps{node = list_to_binary(Node), version = list_to_binary(Version), exts = BExts}.
@ -330,7 +327,7 @@ handle_cast({disco_response, From, _To, #iq{id = ID, type = Type, payload = Payl
mnesia:dirty_write(#caps_features{node_pair = BinaryNode, features = features_to_binary(Features)}),
gen_server:cast(self(), visit_feature_queries);
error ->
?ERROR_MSG("ID '~s' matches no query", [ID])
?DEBUG("ID '~s' matches no query", [ID])
end;
{error, _} ->
%% XXX: if we get error, we cache empty feature not to probe the client continuously
@ -339,7 +336,7 @@ handle_cast({disco_response, From, _To, #iq{id = ID, type = Type, payload = Payl
mnesia:dirty_write(#caps_features{node_pair = BinaryNode}),
gen_server:cast(self(), visit_feature_queries);
error ->
?ERROR_MSG("ID '~s' matches no query", [ID])
?DEBUG("ID '~s' matches no query", [ID])
end;
%gen_server:cast(self(), visit_feature_queries),
%?DEBUG("Error IQ reponse from ~s:~n~p", [exmpp_jid:jid_to_list(From), SubEls]);

View File

@ -49,7 +49,7 @@ behaviour_info(callbacks) ->
{subscribe_node, 7},
{unsubscribe_node, 4},
{publish_item, 6},
{delete_item, 3},
{delete_item, 4},
{remove_extra_items, 3},
{get_node_affiliations, 1},
{get_entity_affiliations, 2},

View File

@ -181,6 +181,7 @@ init([ServerHost, Opts]) ->
ejabberd_hooks:add(presence_probe_hook, ServerHostB, ?MODULE, presence_probe, 50),
ejabberd_hooks:add(roster_out_subscription, ServerHostB, ?MODULE, out_subscription, 50),
ejabberd_hooks:add(remove_user, ServerHostB, ?MODULE, remove_user, 50),
ejabberd_hooks:add(anonymous_purge_hook, ServerHostB, ?MODULE, remove_user, 50),
gen_iq_handler:add_iq_handler(ejabberd_sm, ServerHostB, ?NS_PUBSUB, ?MODULE, iq_sm, IQDisc),
gen_iq_handler:add_iq_handler(ejabberd_sm, ServerHostB, ?NS_PUBSUB_OWNER, ?MODULE, iq_sm, IQDisc),
ejabberd_router:register_route(Host),
@ -411,69 +412,89 @@ send_loop(State) ->
%% and is not able to "store" events of remote users (via s2s)
%% this makes that hack only work for local domain by now
if State#state.pep_sendlast_offline ->
case catch ejabberd_c2s:get_subscribed(Pid) of
Contacts when is_list(Contacts) ->
{User, Server, Resource} = LJID,
lists:foreach(
fun({U, S, R}) -> %% local contacts
case ejabberd_sm:get_user_resources(U, S) of
[] -> %% offline
case S of
ServerHost -> %% local contact, so we may have pep items
PeerJID = exmpp_jlib:make_jid(U, S, R),
self() ! {presence, User, Server, [Resource], PeerJID};
_ -> %% remote contact, no items available
ok
end;
_ -> %% online
% this is already handled by presence probe
ok
end;
(_) -> %% remote contacts
% we can not do anything in any cases
ok
end, Contacts);
{User, Server, Resource} = LJID,
case mod_caps:get_caps({User, Server, Resource}) of
nothing ->
%% we don't have caps, no need to handle PEP items
ok;
_ ->
ok
case catch ejabberd_c2s:get_subscribed(Pid) of
Contacts when is_list(Contacts) ->
lists:foreach(
fun({U, S, R}) ->
case S of
ServerHost -> %% local contacts
case ejabberd_sm:get_user_resources(U, S) of
[] -> %% offline
PeerJID = exmpp_jlib:make_jid(U, S, R),
self() ! {presence, User, Server, [Resource], PeerJID};
_ -> %% online
% this is already handled by presence probe
ok
end;
_ -> %% remote contacts
% we can not do anything in any cases
ok
end
end, Contacts);
_ ->
ok
end
end;
true ->
ok
end,
send_loop(State);
{presence, User, Server, Resources, JID} ->
Owner = jlib:short_prepd_bare_jid(JID),
Host = State#state.host,
ServerHost = State#state.server_host,
lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, type = Type, id = NodeId, options = Options}) ->
case get_option(Options, send_last_published_item) of
on_sub_and_presence ->
lists:foreach(fun(Resource) ->
LJID = {User, Server, Resource},
case is_caps_notify(ServerHost, Node, LJID) of
true ->
Subscribed = case get_option(Options, access_model) of
open -> true;
presence -> true;
whitelist -> false; % subscribers are added manually
authorize -> false; % likewise
roster ->
Grps = get_option(Options, roster_groups_allowed, []),
{OU, OS, _} = Owner,
element(2, get_roster_info(OU, OS, LJID, Grps))
end,
if Subscribed ->
send_items(Owner, Node, NodeId, Type, LJID, last);
true ->
ok
end;
false ->
ok
end
end, Resources);
_ ->
ok
end
end, tree_action(Host, get_nodes, [Owner, JID])),
%% get resources caps and check if processing is needed
{HasCaps, ResourcesCaps} = lists:foldl(fun(Resource, {R, L}) ->
case mod_caps:get_caps({User, Server, Resource}) of
nothing -> {R, L};
Caps -> {true, [{Resource, Caps} | L]}
end
end, {false, []}, Resources),
case HasCaps of
true ->
Host = State#state.host,
ServerHost = State#state.server_host,
Owner = jlib:short_prepd_bare_jid(JID),
lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, type = Type, id = NodeId, options = Options}) ->
case get_option(Options, send_last_published_item) of
on_sub_and_presence ->
lists:foreach(fun({Resource, Caps}) ->
CapsNotify = case catch mod_caps:get_features(ServerHost, Caps) of
Features when is_list(Features) -> lists:member(Node ++ "+notify", Features);
_ -> false
end,
case CapsNotify of
true ->
LJID = {User, Server, Resource},
Subscribed = case get_option(Options, access_model) of
open -> true;
presence -> true;
whitelist -> false; % subscribers are added manually
authorize -> false; % likewise
roster ->
Grps = get_option(Options, roster_groups_allowed, []),
{OU, OS, _} = Owner,
element(2, get_roster_info(OU, OS, LJID, Grps))
end,
if Subscribed ->
send_items(Owner, Node, NodeId, Type, LJID, last);
true ->
ok
end;
false ->
ok
end
end, ResourcesCaps);
_ ->
ok
end
end, tree_action(Host, get_nodes, [Owner, JID]));
false ->
ok
end,
send_loop(State);
stop ->
ok
@ -539,7 +560,7 @@ disco_sm_items(Acc, From, To, <<>>, _Lang) ->
%% TODO, use iq_disco_items(Host, [], From)
Host = exmpp_jid:ldomain_as_list(To),
LJID = jlib:short_prepd_bare_jid(To),
case tree_action(Host, get_nodes, [Host, From]) of
case tree_action(Host, get_subnodes, [Host, [], From]) of
[] ->
Acc;
Nodes ->
@ -684,7 +705,7 @@ handle_cast({remove_user, LUser, LServer}, State) ->
%% remove user's PEP nodes
lists:foreach(fun(#pubsub_node{nodeid={NodeKey, NodeName}}) ->
delete_node(NodeKey, NodeName, Owner)
end, tree_action(Host, get_nodes, [jlib:short_prepd_bare_jid(Owner)])),
end, tree_action(Host, get_nodes, [jlib:short_prepd_bare_jid(Owner), Owner])),
%% remove user's nodes
delete_node(Host, ["home", LServer, LUser], Owner),
{noreply, State};
@ -724,7 +745,6 @@ terminate(_Reason, #state{host = Host,
nodetree = TreePlugin,
plugins = Plugins,
send_loop = SendLoop}) ->
terminate_plugins(Host, ServerHost, Plugins, TreePlugin),
ejabberd_router:unregister_route(Host),
ServerHostB = list_to_binary(ServerHost),
case lists:member(?PEPNODE, Plugins) of
@ -746,11 +766,12 @@ terminate(_Reason, #state{host = Host,
ejabberd_hooks:delete(presence_probe_hook, ServerHostB, ?MODULE, presence_probe, 50),
ejabberd_hooks:delete(roster_out_subscription, ServerHostB, ?MODULE, out_subscription, 50),
ejabberd_hooks:delete(remove_user, ServerHostB, ?MODULE, remove_user, 50),
ejabberd_hooks:delete(anonymous_purge_hook, ServerHostB, ?MODULE, remove_user, 50),
gen_iq_handler:remove_iq_handler(ejabberd_sm, ServerHostB, ?NS_PUBSUB),
gen_iq_handler:remove_iq_handler(ejabberd_sm, ServerHostB, ?NS_PUBSUB_OWNER),
mod_disco:unregister_feature(ServerHost, ?NS_PUBSUB_s),
SendLoop ! stop,
ok.
terminate_plugins(Host, ServerHost, Plugins, TreePlugin).
%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
@ -1757,23 +1778,24 @@ delete_item(_, "", _, _, _) ->
%% Request does not specify a node
{error, extended_error('bad-request', "node-required")};
delete_item(Host, Node, Publisher, ItemId, ForceNotify) ->
Action = fun(#pubsub_node{type = Type, id = NodeId}) ->
Features = features(Type),
PersistentFeature = lists:member("persistent-items", Features),
DeleteFeature = lists:member("delete-items", Features),
if
%%-> iq_pubsub just does that matchs
%% %% Request does not specify an item
%% {error, extended_error('bad-request', "item-required")};
not PersistentFeature ->
%% Node does not support persistent items
{error, extended_error('feature-not-implemented', unsupported, "persistent-items")};
not DeleteFeature ->
%% Service does not support item deletion
{error, extended_error('feature-not-implemented', unsupported, "delete-items")};
true ->
node_call(Type, delete_item, [NodeId, Publisher, ItemId])
end
Action = fun(#pubsub_node{options = Options, type = Type, id = NodeId}) ->
Features = features(Type),
PersistentFeature = lists:member("persistent-items", Features),
DeleteFeature = lists:member("delete-items", Features),
PublishModel = get_option(Options, publish_model),
if
%%-> iq_pubsub just does that matchs
%% %% Request does not specify an item
%% {error, extended_error('bad-request', "item-required")};
not PersistentFeature ->
%% Node does not support persistent items
{error, extended_error('feature-not-implemented', unsupported, "persistent-items")};
not DeleteFeature ->
%% Service does not support item deletion
{error, extended_error('feature-not-implemented', unsupported, "delete-items")};
true ->
node_call(Type, delete_item, [NodeId, Publisher, PublishModel, ItemId])
end
end,
Reply = [],
case transaction(Host, Node, Action, sync_dirty) of
@ -1916,6 +1938,7 @@ get_items(Host, Node, From, SubId, SMaxItems, ItemIDs) ->
%% @spec (Host, Node, NodeId, Type, LJID, Number) -> any()
%% Host = host()
%% Node = pubsubNode()
%% NodeId = pubsubNodeId()
%% Type = pubsubNodeType()
%% LJID = {U, S, []}

View File

@ -47,7 +47,7 @@
subscribe_node/7,
unsubscribe_node/4,
publish_item/6,
delete_item/3,
delete_item/4,
remove_extra_items/3,
get_entity_affiliations/2,
get_node_affiliations/1,
@ -129,8 +129,8 @@ publish_item(NodeId, Publisher, Model, MaxItems, ItemId, Payload) ->
remove_extra_items(NodeId, MaxItems, ItemIds) ->
node_default:remove_extra_items(NodeId, MaxItems, ItemIds).
delete_item(NodeId, JID, ItemId) ->
node_default:delete_item(NodeId, JID, ItemId).
delete_item(NodeId, Publisher, PublishModel, ItemId) ->
node_default:delete_item(NodeId, Publisher, PublishModel, ItemId).
purge_node(NodeId, Owner) ->
node_default:purge_node(NodeId, Owner).

View File

@ -50,7 +50,7 @@
subscribe_node/7,
unsubscribe_node/4,
publish_item/6,
delete_item/3,
delete_item/4,
remove_extra_items/3,
get_entity_affiliations/2,
get_node_affiliations/1,
@ -134,8 +134,8 @@ publish_item(NodeId, Publisher, Model, MaxItems, ItemId, Payload) ->
remove_extra_items(NodeId, MaxItems, ItemIds) ->
node_default:remove_extra_items(NodeId, MaxItems, ItemIds).
delete_item(NodeId, JID, ItemId) ->
node_default:delete_item(NodeId, JID, ItemId).
delete_item(NodeId, Publisher, PublishModel, ItemId) ->
node_default:delete_item(NodeId, Publisher, PublishModel, ItemId).
purge_node(NodeId, Owner) ->
node_default:purge_node(NodeId, Owner).

View File

@ -50,7 +50,7 @@
subscribe_node/7,
unsubscribe_node/4,
publish_item/6,
delete_item/3,
delete_item/4,
remove_extra_items/3,
get_entity_affiliations/2,
get_node_affiliations/1,
@ -133,8 +133,8 @@ publish_item(NodeId, Publisher, Model, MaxItems, ItemId, Payload) ->
remove_extra_items(NodeId, MaxItems, ItemIds) ->
node_default:remove_extra_items(NodeId, MaxItems, ItemIds).
delete_item(NodeId, JID, ItemId) ->
node_default:delete_item(NodeId, JID, ItemId).
delete_item(NodeId, Publisher, PublishModel, ItemId) ->
node_default:delete_item(NodeId, Publisher, PublishModel, ItemId).
purge_node(NodeId, Owner) ->
node_default:purge_node(NodeId, Owner).

View File

@ -57,7 +57,7 @@
subscribe_node/7,
unsubscribe_node/4,
publish_item/6,
delete_item/3,
delete_item/4,
remove_extra_items/3,
get_entity_affiliations/2,
get_node_affiliations/1,
@ -484,22 +484,24 @@ remove_extra_items(NodeId, MaxItems, ItemIds) ->
%% Return the new items list:
{result, {NewItems, OldItems}}.
%% @spec (NodeId, JID, ItemId) ->
%% @spec (NodeId, Publisher, PublishModel, ItemId) ->
%% {error, Reason::stanzaError()} |
%% {result, []}
%% NodeId = mod_pubsub:pubsubNodeId()
%% JID = mod_pubsub:jid()
%% Publisher = mod_pubsub:jid()
%% PublishModel = atom()
%% ItemId = string()
%% @doc <p>Triggers item deletion.</p>
%% <p>Default plugin: The user performing the deletion must be the node owner
%% or a publisher.</p>
delete_item(NodeId, Publisher, ItemId) ->
delete_item(NodeId, Publisher, PublishModel, ItemId) ->
GenKey = jlib:short_prepd_bare_jid(Publisher),
GenState = get_state(NodeId, GenKey),
#pubsub_state{affiliation = Affiliation, items = Items} = GenState,
Allowed = (Affiliation == publisher) orelse (Affiliation == owner)
orelse (PublishModel == open)
orelse case get_item(NodeId, ItemId) of
{result, #pubsub_item{creation = {GenKey, _}}} -> true;
{result, #pubsub_item{creation = {_, GenKey}}} -> true;
_ -> false
end,
if
@ -552,14 +554,14 @@ get_entity_affiliations(_Host, Owner) ->
States = mnesia:match_object(
#pubsub_state{stateid = {GenKey, '_'}, _ = '_'}),
Tr = fun(#pubsub_state{stateid = {_, N}, affiliation = A}) ->
{N, A}
{get_nodename(N), A}
end,
{result, lists:map(Tr, States)}.
get_node_affiliations(NodeId) ->
{result, States} = get_states(NodeId),
Tr = fun(#pubsub_state{stateid = {J, {_, _}}, affiliation = A}) ->
{J, A}
{J, A}
end,
{result, lists:map(Tr, States)}.
@ -600,14 +602,14 @@ get_entity_subscriptions(_Host, Owner) ->
#pubsub_state{stateid = {SubKey, '_'}, _ = '_'})
end,
Tr = fun(#pubsub_state{stateid = {J, N}, subscription = S}) ->
{N, S, J}
{get_nodename(N), S, J}
end,
{result, lists:map(Tr, States)}.
get_node_subscriptions(NodeId) ->
{result, States} = get_states(NodeId),
Tr = fun(#pubsub_state{stateid = {J, _}, subscription = S}) ->
{J, S}
{J, S}
end,
{result, lists:map(Tr, States)}.
@ -805,3 +807,11 @@ can_fetch_item(outcast, _) -> false;
can_fetch_item(none, subscribed) -> true;
can_fetch_item(none, none) -> false;
can_fetch_item(_Affiliation, _Subscription) -> false.
%% @spec (NodeId) -> Node
%% @doc retreive pubsubNode() representation giving a NodeId
get_nodename(NodeId) ->
case mnesia:index_read(pubsub_node, NodeId, #pubsub_node.id) of
[#pubsub_node{nodeid = {_, Node}}] -> Node;
_ -> []
end.

View File

@ -48,7 +48,7 @@
subscribe_node/7,
unsubscribe_node/4,
publish_item/6,
delete_item/3,
delete_item/4,
remove_extra_items/3,
get_entity_affiliations/2,
get_node_affiliations/1,
@ -135,7 +135,7 @@ publish_item(NodeId, Publisher, Model, MaxItems, ItemId, Payload) ->
remove_extra_items(_NodeId, _MaxItems, ItemIds) ->
{result, {ItemIds, []}}.
delete_item(_NodeId, _JID, _ItemId) ->
delete_item(_NodeId, _Publisher, _PublishModel, _ItemId) ->
{error, 'item-not-found'}.
purge_node(_NodeId, _Owner) ->

View File

@ -41,7 +41,7 @@
subscribe_node/7,
unsubscribe_node/4,
publish_item/6,
delete_item/3,
delete_item/4,
remove_extra_items/3,
get_entity_affiliations/2,
get_node_affiliations/1,
@ -121,8 +121,8 @@ publish_item(NodeId, Publisher, Model, MaxItems, ItemId, Payload) ->
remove_extra_items(NodeId, MaxItems, ItemIds) ->
node_default:remove_extra_items(NodeId, MaxItems, ItemIds).
delete_item(NodeId, JID, ItemId) ->
node_default:delete_item(NodeId, JID, ItemId).
delete_item(NodeId, Publisher, PublishModel, ItemId) ->
node_default:delete_item(NodeId, Publisher, PublishModel, ItemId).
purge_node(NodeId, Owner) ->
node_default:purge_node(NodeId, Owner).

View File

@ -53,7 +53,7 @@
subscribe_node/7,
unsubscribe_node/4,
publish_item/6,
delete_item/3,
delete_item/4,
remove_extra_items/3,
get_entity_affiliations/2,
get_node_affiliations/1,
@ -141,8 +141,8 @@ publish_item(NodeId, Publisher, Model, MaxItems, ItemId, Payload) ->
remove_extra_items(NodeId, MaxItems, ItemIds) ->
node_pep:remove_extra_items(NodeId, MaxItems, ItemIds).
delete_item(NodeId, JID, ItemId) ->
node_pep:delete_item(NodeId, JID, ItemId).
delete_item(NodeId, Publisher, PublishModel, ItemId) ->
node_pep:delete_item(NodeId, Publisher, PublishModel, ItemId).
purge_node(NodeId, Owner) ->
node_pep:purge_node(NodeId, Owner).

View File

@ -46,7 +46,7 @@
subscribe_node/7,
unsubscribe_node/4,
publish_item/6,
delete_item/3,
delete_item/4,
remove_extra_items/3,
get_entity_affiliations/2,
get_node_affiliations/1,
@ -163,8 +163,8 @@ publish_item(NodeId, Publisher, Model, MaxItems, ItemId, Payload) ->
remove_extra_items(NodeId, MaxItems, ItemIds) ->
node_default:remove_extra_items(NodeId, MaxItems, ItemIds).
delete_item(NodeId, JID, ItemId) ->
node_default:delete_item(NodeId, JID, ItemId).
delete_item(NodeId, Publisher, PublishModel, ItemId) ->
node_default:delete_item(NodeId, Publisher, PublishModel, ItemId).
purge_node(NodeId, Owner) ->
node_default:purge_node(NodeId, Owner).

View File

@ -50,7 +50,7 @@
subscribe_node/7,
unsubscribe_node/4,
publish_item/6,
delete_item/3,
delete_item/4,
remove_extra_items/3,
get_entity_affiliations/2,
get_node_affiliations/1,
@ -136,8 +136,8 @@ publish_item(NodeId, Publisher, Model, MaxItems, ItemId, Payload) ->
remove_extra_items(NodeId, MaxItems, ItemIds) ->
node_default:remove_extra_items(NodeId, MaxItems, ItemIds).
delete_item(NodeId, JID, ItemId) ->
node_default:delete_item(NodeId, JID, ItemId).
delete_item(NodeId, Publisher, PublishModel, ItemId) ->
node_default:delete_item(NodeId, Publisher, PublishModel, ItemId).
purge_node(NodeId, Owner) ->
node_default:purge_node(NodeId, Owner).

View File

@ -50,7 +50,7 @@
subscribe_node/7,
unsubscribe_node/4,
publish_item/6,
delete_item/3,
delete_item/4,
remove_extra_items/3,
get_entity_affiliations/2,
get_node_affiliations/1,
@ -133,8 +133,8 @@ publish_item(NodeId, Publisher, Model, MaxItems, ItemId, Payload) ->
remove_extra_items(NodeId, MaxItems, ItemIds) ->
node_default:remove_extra_items(NodeId, MaxItems, ItemIds).
delete_item(NodeId, JID, ItemId) ->
node_default:delete_item(NodeId, JID, ItemId).
delete_item(NodeId, Publisher, PublishModel, ItemId) ->
node_default:delete_item(NodeId, Publisher, PublishModel, ItemId).
purge_node(NodeId, Owner) ->
node_default:purge_node(NodeId, Owner).

View File

@ -74,6 +74,7 @@ init(_Host, _ServerHost, _Opts) ->
mnesia:create_table(pubsub_node,
[{disc_copies, [node()]},
{attributes, record_info(fields, pubsub_node)}]),
mnesia:add_table_index(pubsub_node, id),
NodesFields = record_info(fields, pubsub_node),
case mnesia:table_info(pubsub_node, attributes) of
NodesFields -> ok;