repo_streamer.ex (746B)
1 # Pleroma: A lightweight social networking server 2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> 3 # SPDX-License-Identifier: AGPL-3.0-only 4 5 defmodule Pleroma.RepoStreamer do 6 alias Pleroma.Repo 7 import Ecto.Query 8 9 def chunk_stream(query, chunk_size) do 10 Stream.unfold(0, fn 11 :halt -> 12 {[], :halt} 13 14 last_id -> 15 query 16 |> order_by(asc: :id) 17 |> where([r], r.id > ^last_id) 18 |> limit(^chunk_size) 19 |> Repo.all() 20 |> case do 21 [] -> 22 {[], :halt} 23 24 records -> 25 last_id = List.last(records).id 26 {records, last_id} 27 end 28 end) 29 |> Stream.take_while(fn 30 [] -> false 31 _ -> true 32 end) 33 end 34 end