logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma

state.ex (2094B)


      1 # Pleroma: A lightweight social networking server
      2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
      3 # SPDX-License-Identifier: AGPL-3.0-only
      4 
      5 defmodule Pleroma.Web.Streamer.State do
      6   use GenServer
      7   require Logger
      8 
      9   alias Pleroma.Web.Streamer.StreamerSocket
     10 
     11   @env Mix.env()
     12 
     13   def start_link(_) do
     14     GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__)
     15   end
     16 
     17   def add_socket(topic, socket) do
     18     GenServer.call(__MODULE__, {:add, topic, socket})
     19   end
     20 
     21   def remove_socket(topic, socket) do
     22     do_remove_socket(@env, topic, socket)
     23   end
     24 
     25   def get_sockets do
     26     %{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state)
     27     stream_sockets
     28   end
     29 
     30   def init(init_arg) do
     31     {:ok, init_arg}
     32   end
     33 
     34   def handle_call(:get_state, _from, state) do
     35     {:reply, state, state}
     36   end
     37 
     38   def handle_call({:add, topic, socket}, _from, %{sockets: sockets} = state) do
     39     internal_topic = internal_topic(topic, socket)
     40     stream_socket = StreamerSocket.from_socket(socket)
     41 
     42     sockets_for_topic =
     43       sockets
     44       |> Map.get(internal_topic, [])
     45       |> List.insert_at(0, stream_socket)
     46       |> Enum.uniq()
     47 
     48     state = put_in(state, [:sockets, internal_topic], sockets_for_topic)
     49     Logger.debug("Got new conn for #{topic}")
     50     {:reply, state, state}
     51   end
     52 
     53   def handle_call({:remove, topic, socket}, _from, %{sockets: sockets} = state) do
     54     internal_topic = internal_topic(topic, socket)
     55     stream_socket = StreamerSocket.from_socket(socket)
     56 
     57     sockets_for_topic =
     58       sockets
     59       |> Map.get(internal_topic, [])
     60       |> List.delete(stream_socket)
     61 
     62     state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic)
     63     {:reply, state, state}
     64   end
     65 
     66   defp do_remove_socket(:test, _, _) do
     67     :ok
     68   end
     69 
     70   defp do_remove_socket(_env, topic, socket) do
     71     GenServer.call(__MODULE__, {:remove, topic, socket})
     72   end
     73 
     74   defp internal_topic(topic, socket)
     75        when topic in ~w[user user:notification direct] do
     76     "#{topic}:#{socket.assigns[:user].id}"
     77   end
     78 
     79   defp internal_topic(topic, _) do
     80     topic
     81   end
     82 end