#elixirlang 지연 열거(lazy enumeration)가 필요할 땐, Stream 모듈

2 minute read

Enum 모듈과 다르게 Stream 모듈은 지연 열거를 지원한다. Stream 모듈 설명 페이지에 있는 비교 예제를 보면 차이가 명확하다.

1..3
|> Enum.map(&IO.inspect(&1)) # A
|> Enum.map(&(&1 * 2))
|> Enum.map(&IO.inspect(&1)) # B
1 # A
2 # A
3 # A
2 # B
4 # B
6 # B
#=> [2, 4, 6]

Enum 모듈은 끝까지 열거한다.

stream = 1..3
|> Stream.map(&IO.inspect(&1)) # A
|> Stream.map(&(&1 * 2))
|> Stream.map(&IO.inspect(&1)) # B
Enum.to_list(stream)
1 # A
2 # B
2 # A
4 # B
3 # A
6 # B
#=> [2, 4, 6]

반면 Stream 모듈은 요청할 때만 열거를 한다. Enum.to_list(stream) 표현식을 썼기 때문에 하나씩 열거가 됐다. 안 썼으면 생성한 Stream을 리턴한다.

최근 slab을 짜면서 Stream을 쓰면 딱인데, 대충 땜질하고 넘어간 기능이 있다. slab에는 gitlab 파이프라인 상태를 보기 좋게 가공해서 slack으로 알리는 기능이 있다. 정보를 가공하려면 gitlab 파이프라인이 깨졌을 때, 마지막으로 성공한 파이프라인 정보가 필요하다. 성공한 파이프라인 정보가 있을 때까지 gitlab API로 파이프라인 정보를 요청해야 한다. 하지만 요청할 수 있는 최대 개수인 100개를 요청해서 그 안에 성공한 파이프라인 정보가 있을 때만 가공해서 알렸다. 다행히 100번 연속으로 파이프라인이 깨진 일이 없어서 알림 기능에 문제가 생긴 적은 없다.

defmodule Gitlab.PaginationStream do
  require Logger

  @per_page "100"

  @spec create((map() -> %{headers: map(), body: [any()]}), map()) :: Enumerable.t()
  def create(fun, param) do
    Stream.resource(
      fn ->
        param = Map.merge(param, %{"per_page" => @per_page, "page" => "1"})
        %{headers: headers, body: body} = fun.(param)

        with {total, ""} <- Integer.parse(headers["X-Total-Pages"]) do
          {1, total, body}
        else
          _ -> {1, 1, body}
        end
      end,
      fn
        {current, total, body} -> # A
          {body, {current + 1, total}}

        {current, total} -> # B
      if current > total do
        {:halt, {current, total}}
      else
        param = Map.merge(param, %{"per_page" => @per_page, "page" => "#{current}"})
        %{body: body} = fun.(param)
        {body, {current + 1, total}}
      end
      end,
      fn {_current, _total} -> :ok end
    )
  end
end

요즘은 slab 작업 후기를 남기고 마무리 작업을 하고 있다. 마침 찝찝했던 부분을 수정했다. Stream 모듈에 있는 Stream 생성 함수 중 가장 잘 맞는 Stream.resource/3 함수를 사용했다. 두 번째 인자는 {:halt, acc} 튜플을 리턴할 때까지 호출하는 Stream의 다음 값을 생성하는 함수이다. Stream을 생성할 때, 페이지 정보를 받으면서 실제 데이터도 받는다. 이 데이터를 A 튜플 패턴 매칭으로 넘겨주게 했다. B 튜플 패턴 매칭은 다음 값을 생성하는 함수가 연속으로 불리면서 이루어진다.

def pipeline_status(branch) do
-  %{body: pipelines} = Gitlab.pipelines(%{"per_page" => "100", "ref" => branch})
-  pipelines
+  Gitlab.PaginationStream.create(&Gitlab.pipelines/1, %{"ref" => branch})
   |> Stream.map(fn %{"id" => id} -> Gitlab.pipeline(id) end)
   |> pipelines_custom_filter
   |> take_until_last_suceess
   |> build_pipeline_status
   |> Map.put(:branch, branch)
   end

대충 100개 가져와서 처리하던 코드를 지웠다. 187f90a25d, 4e53e59f3a 커밋 참고.

이제 파이프라인이 100번 연속으로 깨져도 상태를 제대로 보여줄 수 있다. 고치길 잘했단 생각이 들 일이 없었으면 좋겠다.

참고