GenStage - elixir로 비트코인 시세 스크랩 (feat. 업비트) 1/3

6 minute read

elixir 라이브러리 GenStage와 업비트(Upbit) Open API를 사용해 비트코인 시세를 스크랩하는 간단한 프로그램을 작성하려고 한다. GenStage를 써보고 싶은데 어디에 써볼까 예제를 찾던 중에 전고점을 돌파한 비트코인 시세를 스크랩하는 예제가 재미있겠다 싶어서 골랐다. 미래에 비트코인 가격이 어떻게 될지 모르겠지만 현재 글을 쓰는 지금 가격은 5500만원이다.

GenStage?

Today we are glad to announce the official release of GenStage. GenStage is a new Elixir behaviour for exchanging events with back-pressure between Elixir processes. In the short-term, we expect GenStage to replace the use cases for GenEvent as well as providing a composable abstraction for consuming data from third-party systems.

Announcing GenStage

생산자(producer)와 소비자(consumer) 사이에 이벤트 교환을 구현한 elixir 라이브러리다. 더는 사용하지 않는(deprecated) GenEvent로도 생산자, 소비자 패턴을 구현할 수 있는데, 모든 event handler가 하나의 erlang 내부 프로세스(이하 프로세스)에서 실행되기 때문에 병행성(concurrency)을 달성하기 힘들다.

Enum 모듈조급한 계산법(eager evaluation)으로 열거형을 조작할 수 있는 알고리즘을 제공한다. 이후 추가된 Stream 모듈에서 느긋한 계산법(Lazy evaluation)을 지원한다. 그다음은?

열거형을 병렬처리하는 게 다음 스테이지다.

CSV.parse(path)
|> Stream.async() #<-- 1
|> Stream.filter(fn b -> b.color == :red end)
|> Stream.async() #<-- 1
|> Stream.map(fn b -> {b.title, b.height} end)
|> Stream.async() #<-- 1
|> Stream.into(IO.stream(:stdio, :inspect))
|> Stream.run

1번 코드처럼 기존 Stream 모듈에 함수를 추가해 구현한다면 배압(back-pressure), async 함수가 스폰한 프로세스가 크래시날 때, 처리 등이 힘들다.

새로운 라이브러리가 필요하다. 그래서 나온 게 GenStage다.

rate limiter 기본 구현, commit b39fc08

업비트 Open API를 사용해 비트코인 시세를 스크랩하는 프로그램을 짠다. Open API는 과도한 사용으로 인한 시스템 부하를 방지하려고요청 수를 제한한다. 업비트 시세 조회는 초당 10회 요청으로 제한한다.그래서 소비자가 생산자에게 초당 10개씩만 시세를 조회할 날짜를 요청하는 방식으로 구현한다.

생산자 QuotationDemander 모듈

defmodule BitcoinPriceScraper.QuotationDemander do
  use GenStage

  def start_link(number) do
    GenStage.start_link(__MODULE__, number)
  end

  def init(counter) do
    {:producer, counter} # <--- 1
  end

  def handle_demand(demand, counter) when demand > 0 do
    # 이벤트 요구 개수 이하를 랜덤하게 생산한다
    # [1, demand]
    demand = :rand.uniform(demand)
    events = Enum.to_list(counter..(counter + demand - 1))
    {:noreply, events, counter + demand} # <--- 2
  end
end

날짜를 만들어내는 세부 구현 전에 간단하게 생산자는 랜덤 숫자를 소비자가 요구한 개수 이하로 전달한다. 1번 코드에서 생산자로 프로세스를 등록한다. 2번 코드에서 소비자에게 전달할 이벤트를 tuple의 두 번째 인자(events)로 리턴한다.

소비자 RateLimiter 모듈

defmodule BitcoinPriceScraper.RateLimiter do
  use GenStage

  def start_link() do
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(_) do
    {:consumer, %{}} # <--- 1
  end

  def handle_subscribe(:producer, opts, from, producers) do
    limits_per_second = Keyword.fetch!(opts, :limits_per_second)

    producers =
      producers
      |> Map.put(from, limits_per_second)
      |> ask_and_schedule(from)

    # :manual을 리턴해 생산자(producer)에 요구(demand)를 보내는 걸 직접 컨트롤한다.
    {:manual, producers} # <--- 2
  end

  def handle_events(events, _from, producers) do
    # consume!
    IO.puts("#{inspect(NaiveDateTime.utc_now())}: #{inspect(events, charlists: true)}") # <--- 3

    {:noreply, [], producers}
  end

  def handle_info({:ask, from}, producers) do
    {:noreply, [], ask_and_schedule(producers, from)} # <--- 4
  end

  defp ask_and_schedule(producers, from) do
    case producers do
      %{^from => limits_per_second} ->
        # 이벤트를 요구한다. :manual 모드일 때는 GenStage.ask/2 함수를 호출해서 직접 요구해야 한다
        GenStage.ask(from, limits_per_second) # <--- 5
        # 초당 호출 개수 제한이 있으므로 1초 스케줄링을 한다
        Process.send_after(self(), {:ask, from}, :timer.seconds(1)) # <--- 6
        producers

      %{} ->
        producers
    end
  end
end

소비자 코드다. 1번 코드로 소비자 프로세스로 등록한다. 2번 코드로 GenStage.ask/2 함수를 사용해 직접 요구하겠다고 알린다. :automatic 옵션이 디폴트다. :automatic 옵션일 때는 handle_events/3 함수로 생산자로부터 넘어온 이벤트를 처리한 다음 자동으로 생산자에게 이벤트를 요구한다. 이렇게 자동으로 놔두면 수시로 초당 요청 개수 제한을 넘어서기 때문에 :manual 옵션을 사용해서 직접 생산자에게 이벤트를 요구한다.

이벤트는 handle_events/3 함수에서 처리한다. 지금은 할 일이 없음으로 3번 코드로 단순히 화면에 찍는다. 5번 코드에서 가능한 초당 요청 개수를 생산자에게 요구한다. 그 후 6번 코드로 1초 후에 {:ask, from} 메시지를 현재 프로세스에 전달하도록 스케줄링한다. 1초 후에 4번 코드가 실행되며 생산자에 이벤트를 요구하고 1초 후에 다시 함수가 호출되게 스케줄링한다.

구독과 시작

defmodule BitcoinPriceScraper do
  alias BitcoinPriceScraper.{QuotationDemander, RateLimiter}

  def scrap() do
    {:ok, producer} = QuotationDemander.start_link(1) # <--- 1
    {:ok, consumer} = RateLimiter.start_link() # <--- 2

    GenStage.sync_subscribe(consumer, # <--- 3
      to: producer,
      # 시세(quotation) API 요청수 제한
      # 초당 10, 분당 600
      # https://docs.upbit.com/docs/user-request-guide
      limits_per_second: 10
    )
  end
end

1, 2번 코드로 생산자 소비자 프로세스를 시작한다. 3번 코드로 소비자 프로세스가 생산자 프로세스를 구독하게 한다. 구독할 때, 추가적인 옵션을 넘길 수 있다. limits_per_second: 10 옵션은 handle_subscribe/4 콜백 함수 2번째 인자로 전달된다.

결과

iex> BitcoinPriceScraper.scrap
~N[2021-01-10 14:48:06.449649]: [1, 2, 3, 4, 5, 6, 7, 8]
{:ok, #Reference<0.1174726878.1313865735.193296>}
iex> ~N[2021-01-10 14:48:07.444601]: [9, 10, 11, 12, 13, 14, 15]
iex> ~N[2021-01-10 14:48:08.445519]: [16, 17, 18, 19, 20, 21, 22, 23]
iex> ~N[2021-01-10 14:48:09.446351]: [24, 25, 26, 27]
iex> ~N[2021-01-10 14:48:10.447406]: [28, 29, 30, 31, 32, 33, 34]
iex> ~N[2021-01-10 14:48:11.448330]: [35, 36, 37, 38, 39]
iex> ~N[2021-01-10 14:48:12.449366]: [40, 41, 42, 43, 44, 45]
iex> ~N[2021-01-10 14:48:13.450296]: [46, 47]
iex> ~N[2021-01-10 14:48:14.451452]: [48, 49, 50, 51, 52, 53, 54, 55, 56]
iex> ~N[2021-01-10 14:48:15.452408]: [57, 58, 59, 60, 61, 62]
iex> ~N[2021-01-10 14:48:16.453463]: [63, 64, 65, 66, 67, 68, 69, 70, 71]
iex> ~N[2021-01-10 14:48:17.454373]: [72, 73, 74, 75, 76, 77, 78, 79, 80, 81]
iex> ~N[2021-01-10 14:48:18.455462]: [82, 83, 84, 85, 86]
iex> ~N[2021-01-10 14:48:19.456467]: [87]

의도대로 잘 동작한다. 소비자가 10개씩 요구하지만 생산자는 랜덤을 굴려 최대 10개 이벤트를 만들어서 넘기고 있다.

1분봉을 가져올 기준 시간을 생산자가 이벤트로 생성, commit 69d4234

1분봉 업비트 Open API 문서를 보면 파라미터로 캔들 마지막 시간과 캔들 개수를 넘긴다. 최대 캔들 개수가 200개이므로 시세를 조회할 시간을 200분 단위로 쪼개서 시간을 만든다.

캔들 마지막 시간을 이벤트로 생성

defmodule BitcoinPriceScraper.QuotationDemander do
  use GenStage

  # ...

  def handle_demand(demand, state) when demand > 0 do
    {events, state} =
      Enum.reduce(1..demand, {[], state}, fn _, {events, state} ->
      # to_datetime 보다 작을 때만 event를 생산한다
      if NaiveDateTime.compare(state.current_datetime, state.to_datetime) == :lt do
        # step을 초로 변환해서 더하고 to_datetime을 넘지 않게 한다
        next =
          min_dt(NaiveDateTime.add(state.current_datetime, state.step * 60), state.to_datetime) # <--- 1

        {[next | events], %{state | current_datetime: next}}
      else
        {events, state}
      end
      end)

    {:noreply, Enum.reverse(events), state}
  end

  defp min_dt(lhs, rhs) when is_struct(lhs, NaiveDateTime) and is_struct(rhs, NaiveDateTime) do
    case NaiveDateTime.compare(lhs, rhs) do
      :gt -> rhs
      _ -> lhs
    end
  end
end

1번 코드에서 200분(state.step) 단위로 시간을 생성하게 했다. 1분봉 업비트 Open API 인자로 마지막 캔들 시간을 넘기므로 첫 시간부터 200분을 더해서 이벤트를 만들면 된다. 시세 조회 범위 마지막 시간(state.to_datetime)을 넘지 않게 했다. 이렇게 하면 마지막 시세 조회 범위가 겹칠 수 있는데, 이건 업비트 Open API 호출 결과를 받아서 가공하는 쪽 책임이라며 넘겨버렸다. 생산자에서는 신경 쓰지 않는다.

결과

iex> BitcoinPriceScraper.scrap
~N[2021-01-16 14:00:33.203082]: [~N[2020-12-17 17:20:33.194719], ~N[2020-12-17 20:40:33.194719], ~N[2020-12-18 00:00:33.194719], ~N[2020-12-18 03:20:33.194719], ~N[2020-12-18 06:40:33.194719], ~N[2020-12-18 10:00:33.194719], ~N[2020-12-18 13:20:33.194719], ~N[2020-12-18 16:40:33.194719], ~N[2020-12-18 20:00:33.194719], ~N[2020-12-18 23:20:33.194719]]
iex> ~N[2021-01-16 14:00:34.201242]: [~N[2020-12-19 02:40:33.194719], ~N[2020-12-19 06:00:33.194719], ~N[2020-12-19 09:20:33.194719], ~N[2020-12-19 12:40:33.194719], ~N[2020-12-19 16:00:33.194719], ~N[2020-12-19 19:20:33.194719], ~N[2020-12-19 22:40:33.194719], ~N[2020-12-20 02:00:33.194719], ~N[2020-12-20 05:20:33.194719], ~N[2020-12-20 08:40:33.194719]]
iex> ~N[2021-01-16 14:00:35.202203]: [~N[2020-12-20 12:00:33.194719], ~N[2020-12-20 15:20:33.194719], ~N[2020-12-20 18:40:33.194719], ~N[2020-12-20 22:00:33.194719], ~N[2020-12-21 01:20:33.194719], ~N[2020-12-21 04:40:33.194719], ~N[2020-12-21 08:00:33.194719], ~N[2020-12-21 11:20:33.194719], ~N[2020-12-21 14:40:33.194719], ~N[2020-12-21 18:00:33.194719]]
...

200분, 즉 3분 20초씩 증가한 시간을 이벤트로 만들어서 잘 넘기고 있다.

마치며

드디어 GenStage를 써봤다. 소비자인 RateLimiter 모듈에서는 생산자가 만든 이벤트를 단순히 화면에 찍기만 했는데, 다음 편에서 업비트 Open API를 사용해 실제 시세 조회 요청을 할 예정이다.

전체 코드는 ohyecloudy/bitcoin_price_scraper 깃헙 사이트에서 볼 수 있다.

이 글과 관련된 커밋 목록이다.

  • 5f1816f Initial commit
  • 35c8086 :tada: mix new bitcoin_price_scraper
  • b39fc08 :star: GenStage를 사용해 초당 10개씩 요청하는 rate limiter 소비자 추가
  • 69d4234 :star: 30일 전부터 지금까지 200분 단위로 끊어서 이벤트 생성

참고