Add some methods

Signed-off-by: Thomas Citharel <tcit@tcit.fr>
This commit is contained in:
Thomas Citharel 2018-05-19 20:29:11 +02:00
parent 3495c8a323
commit 394549933d
3 changed files with 205 additions and 45 deletions

View File

@ -29,7 +29,7 @@ defmodule Eventos.Service.ActivityPub do
}
# Notification.create_notifications(activity)
#stream_out(activity)
# stream_out(activity)
{:ok, activity}
else
%Activity{} = activity -> {:ok, activity}
@ -37,6 +37,20 @@ defmodule Eventos.Service.ActivityPub do
end
end
# def stream_out(%Activity{} = activity) do
# if activity.data["type"] in ["Create", "Announce"] do
# Pleroma.Web.Streamer.stream("user", activity)
#
# if Enum.member?(activity.data["to"], "https://www.w3.org/ns/activitystreams#Public") do
# Pleroma.Web.Streamer.stream("public", activity)
#
# if activity.local do
# Pleroma.Web.Streamer.stream("public:local", activity)
# end
# end
# end
# end
def fetch_event_from_url(url) do
if object = Events.get_event_by_url!(url) do
{:ok, object}

View File

@ -150,18 +150,18 @@ defmodule Eventos.Service.ActivityPub.Transmogrifier do
# end
# end
#
# def handle_incoming(
# %{"type" => "Announce", "object" => object_id, "actor" => actor, "id" => id} = data
# ) do
# with %User{} = actor <- User.get_or_fetch_by_ap_id(actor),
# {:ok, object} <-
# get_obj_helper(object_id) || ActivityPub.fetch_object_from_id(object_id),
# {:ok, activity, object} <- ActivityPub.announce(actor, object, id, false) do
# {:ok, activity}
# else
# _e -> :error
# end
# end
def handle_incoming(
%{"type" => "Announce", "object" => object_id, "actor" => actor, "id" => id} = data
) do
with %Actor{} = actor <- Actors.get_or_fetch_by_url(actor),
{:ok, object} <-
get_obj_helper(object_id) || ActivityPub.fetch_event_from_url(object_id),
{:ok, activity, object} <- ActivityPub.announce(actor, object, id, false) do
{:ok, activity}
else
_e -> :error
end
end
#
# def handle_incoming(
# %{"type" => "Update", "object" => %{"type" => "Person"} = object, "actor" => actor_id} =
@ -219,35 +219,35 @@ defmodule Eventos.Service.ActivityPub.Transmogrifier do
# # Accept
# # Undo
#
# def handle_incoming(_), do: :error
#
# def get_obj_helper(id) do
# if object = Object.get_by_ap_id(id), do: {:ok, object}, else: nil
# end
#
# def set_reply_to_uri(%{"inReplyTo" => inReplyTo} = object) do
# with false <- String.starts_with?(inReplyTo, "http"),
# {:ok, %{data: replied_to_object}} <- get_obj_helper(inReplyTo) do
# Map.put(object, "inReplyTo", replied_to_object["external_url"] || inReplyTo)
# else
# _e -> object
# end
# end
#
# def set_reply_to_uri(obj), do: obj
def handle_incoming(_), do: :error
def get_obj_helper(id) do
if object = Object.get_by_ap_id(id), do: {:ok, object}, else: nil
end
def set_reply_to_uri(%{"inReplyTo" => inReplyTo} = object) do
with false <- String.starts_with?(inReplyTo, "http"),
{:ok, %{data: replied_to_object}} <- get_obj_helper(inReplyTo) do
Map.put(object, "inReplyTo", replied_to_object["external_url"] || inReplyTo)
else
_e -> object
end
end
def set_reply_to_uri(obj), do: obj
#
# # Prepares the object of an outgoing create activity.
# def prepare_object(object) do
# object
def prepare_object(object) do
object
# |> set_sensitive
# |> add_hashtags
# |> add_mention_tags
# |> add_emoji_tags
# |> add_attributed_to
|> add_attributed_to
# |> prepare_attachments
# |> set_conversation
# |> set_reply_to_uri
# end
|> set_conversation
|> set_reply_to_uri
end
@doc
"""
@ -257,7 +257,7 @@ defmodule Eventos.Service.ActivityPub.Transmogrifier do
def prepare_outgoing(%{"type" => "Create", "object" => %{"type" => "Note"} = object} = data) do
object =
object
#|> prepare_object
|> prepare_object
data =
data
@ -282,6 +282,7 @@ defmodule Eventos.Service.ActivityPub.Transmogrifier do
|> Map.from_struct
|> Map.drop([:"__meta__"])
|> Map.put(:"@context", "https://www.w3.org/ns/activitystreams")
|> prepare_object
{:ok, event}
end
@ -360,21 +361,21 @@ defmodule Eventos.Service.ActivityPub.Transmogrifier do
# |> Map.put("tag", tags ++ out)
# end
#
# def set_conversation(object) do
# Map.put(object, "conversation", object["context"])
# end
def set_conversation(object) do
Map.put(object, "conversation", object["context"])
end
#
# def set_sensitive(object) do
# tags = object["tag"] || []
# Map.put(object, "sensitive", "nsfw" in tags)
# end
#
# def add_attributed_to(object) do
# attributedTo = object["attributedTo"] || object["actor"]
#
# object
# |> Map.put("attributedTo", attributedTo)
# end
def add_attributed_to(object) do
attributedTo = object["attributedTo"] || object["actor"]
object
|> Map.put("attributedTo", attributedTo)
end
#
# def prepare_attachments(object) do
# attachments =

145
lib/service/streamer.ex Normal file
View File

@ -0,0 +1,145 @@
defmodule Eventos.Service.Streamer do
use GenServer
require Logger
alias Eventos.Accounts.Actor
def init(args) do
{:ok, args}
end
def start_link do
spawn(fn ->
# 30 seconds
Process.sleep(1000 * 30)
GenServer.cast(__MODULE__, %{action: :ping})
end)
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def add_socket(topic, socket) do
GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
end
def remove_socket(topic, socket) do
GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
end
def stream(topic, item) do
GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
end
def handle_cast(%{action: :ping}, topics) do
Map.values(topics)
|> List.flatten()
|> Enum.each(fn socket ->
Logger.debug("Sending keepalive ping")
send(socket.transport_pid, {:text, ""})
end)
spawn(fn ->
# 30 seconds
Process.sleep(1000 * 30)
GenServer.cast(__MODULE__, %{action: :ping})
end)
{:noreply, topics}
end
# def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
# topic = "user:#{item.user_id}"
#
# Enum.each(topics[topic] || [], fn socket ->
# json =
# %{
# event: "notification",
# payload:
# Pleroma.Web.MastodonAPI.MastodonAPIController.render_notification(
# socket.assigns["user"],
# item
# )
# |> Jason.encode!()
# }
# |> Jason.encode!()
#
# send(socket.transport_pid, {:text, json})
# end)
#
# {:noreply, topics}
# end
def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
Logger.debug("Trying to push to users")
recipient_topics =
User.get_recipients_from_activity(item)
|> Enum.map(fn %{id: id} -> "user:#{id}" end)
Enum.each(recipient_topics, fn topic ->
push_to_socket(topics, topic, item)
end)
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
Logger.debug("Trying to push to #{topic}")
Logger.debug("Pushing item to #{topic}")
push_to_socket(topics, topic, item)
{:noreply, topics}
end
def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
topic = internal_topic(topic, socket)
sockets_for_topic = sockets[topic] || []
sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
sockets = Map.put(sockets, topic, sockets_for_topic)
Logger.debug("Got new conn for #{topic}")
{:noreply, sockets}
end
def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
topic = internal_topic(topic, socket)
sockets_for_topic = sockets[topic] || []
sockets_for_topic = List.delete(sockets_for_topic, socket)
sockets = Map.put(sockets, topic, sockets_for_topic)
Logger.debug("Removed conn for #{topic}")
{:noreply, sockets}
end
def handle_cast(m, state) do
Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
{:noreply, state}
end
def push_to_socket(topics, topic, item) do
Enum.each(topics[topic] || [], fn socket ->
# Get the current user so we have up-to-date blocks etc.
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
blocks = user.info["blocks"] || []
unless item.actor in blocks do
json =
%{
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
"status.json",
activity: item,
for: user
)
|> Jason.encode!()
}
|> Jason.encode!()
send(socket.transport_pid, {:text, json})
end
end)
end
defp internal_topic("user", socket) do
"user:#{socket.assigns[:user].id}"
end
defp internal_topic(topic, _), do: topic
end