state.ex (2094B)
1 # Pleroma: A lightweight social networking server 2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> 3 # SPDX-License-Identifier: AGPL-3.0-only 4 5 defmodule Pleroma.Web.Streamer.State do 6 use GenServer 7 require Logger 8 9 alias Pleroma.Web.Streamer.StreamerSocket 10 11 @env Mix.env() 12 13 def start_link(_) do 14 GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__) 15 end 16 17 def add_socket(topic, socket) do 18 GenServer.call(__MODULE__, {:add, topic, socket}) 19 end 20 21 def remove_socket(topic, socket) do 22 do_remove_socket(@env, topic, socket) 23 end 24 25 def get_sockets do 26 %{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state) 27 stream_sockets 28 end 29 30 def init(init_arg) do 31 {:ok, init_arg} 32 end 33 34 def handle_call(:get_state, _from, state) do 35 {:reply, state, state} 36 end 37 38 def handle_call({:add, topic, socket}, _from, %{sockets: sockets} = state) do 39 internal_topic = internal_topic(topic, socket) 40 stream_socket = StreamerSocket.from_socket(socket) 41 42 sockets_for_topic = 43 sockets 44 |> Map.get(internal_topic, []) 45 |> List.insert_at(0, stream_socket) 46 |> Enum.uniq() 47 48 state = put_in(state, [:sockets, internal_topic], sockets_for_topic) 49 Logger.debug("Got new conn for #{topic}") 50 {:reply, state, state} 51 end 52 53 def handle_call({:remove, topic, socket}, _from, %{sockets: sockets} = state) do 54 internal_topic = internal_topic(topic, socket) 55 stream_socket = StreamerSocket.from_socket(socket) 56 57 sockets_for_topic = 58 sockets 59 |> Map.get(internal_topic, []) 60 |> List.delete(stream_socket) 61 62 state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic) 63 {:reply, state, state} 64 end 65 66 defp do_remove_socket(:test, _, _) do 67 :ok 68 end 69 70 defp do_remove_socket(_env, topic, socket) do 71 GenServer.call(__MODULE__, {:remove, topic, socket}) 72 end 73 74 defp internal_topic(topic, socket) 75 when topic in ~w[user user:notification direct] do 76 "#{topic}:#{socket.assigns[:user].id}" 77 end 78 79 defp internal_topic(topic, _) do 80 topic 81 end 82 end