JWT - elixir로 비트코인 시세 스크랩 (feat. 업비트) 2/3
- GenStage - elixir로 비트코인 시세 스크랩 (feat. 업비트) 1/3
- JWT - elixir로 비트코인 시세 스크랩 (feat. 업비트) 2/3
- prometheus, grafana - elixir로 비트코인 시세 스크랩 (feat. 업비트) 3/3
1편에서는 GenStage를 사용해 비트코인 시세를 조회할 날짜를 이벤트로 만들었다. 1초당 시세 조회 요청 개수 제한이 있어서 소비자가 1초마다 60개씩 생산자로부터 이벤트를 가져온다. 업비트 Open API를 호출하지 않는 동작하는 껍데기만 만들었는데, 이번에 알맹이를 채울 예정이다.
JWT?
REST API 요청시, 발급받은 access key와 secret key로 토큰을 생성하여 Authorization 헤더를 통해 전송합니다. 토큰은 JWT(nil) 형식을 따릅니다.
인증에 사용하는 토큰으로 JSON Web Tokens를 사용한다. JWT라길래, 대문자 J는 java인가? 했는데, JSON이다. 역시 웹알못.
JWTs can be signed using a secret (with the HMAC algorithm) or a public/private key pair using RSA or ECDSA.
Security-wise, SWT can only be symmetrically signed by a shared secret using the HMAC algorithm. However, JWT and SAML tokens can use a public/private key pair in the form of a X.509 certificate for signing.
JWT는 대칭키 암호(symmetric-key algorithm)와 공개키 암호 방식(Public-key cryptography)을 둘 다 지원한다. 그래서 대칭키 암호만 지원하는 SWT(Simple Web Tokens)보다 보안이 우수하다.
As JSON is less verbose than XML, when it is encoded its size is also smaller, making JWT more compact than SAML. This makes JWT a good choice to be passed in HTML and HTTP environments.
XML보다는 JSON이 간결하다. 그래서 SAML(Security Assertion Markup Language Tokens)보다 짱이라는 JWT 소개 페이지의 약 팔기 되겠다.
JWT 라이브러리로 Joken 선택
JSON Web Tokens 홈페이지를 방문하니 서명(Signing)과 검증(Verification)을 지원하는 언어별 라이브러리 소개가 있다. 친절하다. JWT 첫인상이 좋아졌다.
elixir 라이브러리로 Guardian과 Joken을 소개한다. 라이브러리가 많을 땐, 뭐다? 처음엔 별이 많은 Guardian을 사용하려고 했다. 하지만 좀 더 사용이 간편한 Joken을 선택했다.
업비트 Open API를 사용해 마켓 코드 조회와 1분 캔들 요청 commit b021eeb
defmodule BitcoinPriceScraper.Jwt do
use Joken.Config
def sign!(payload) when is_map(payload) do
generate_and_sign!(payload, signer())
end
def signer() do
# upbit에서 권장하는 HS256 알고리즘 사용
Joken.Signer.create("HS256", Application.get_env(:bitcoin_price_scraper, :upbit_secret_key))
end
end
시키는 대로 하자. 업비트에서 권장하는 알고리즘을 사용해서 서명한다. 업비트는 공개키 암호 방식을 사용한다. 발급받은 비밀 키를 사용한다.
defmodule BitcoinPriceScraper.Upbit do
use Tesla
alias BitcoinPriceScraper.Jwt
# ...
def candles(market, to, count \\ 200) do
query = %{
market: market,
to: to_string(NaiveDateTime.truncate(to, :second)),
count: min(count, @max_candle_count)
}
query_hash =
:crypto.hash(:sha256, Tesla.encode_query(query))
|> Base.encode16()
payload = %{
access_key: Application.get_env(:bitcoin_price_scraper, :upbit_access_key),
nonce: UUID.uuid4(),
query_hash: query_hash,
query_hash_alg: "SHA512"
}
jwt_token = Jwt.sign!(payload)
get("candles/minutes/1",
query: query,
headers: [{"authorization", "Bearer #{jwt_token}"}]
)
end
end
{
"access_key": "발급 받은 acccess key (필수)",
"nonce": "무작위의 UUID 문자열 (필수)",
"query_hash": "해싱된 query string (파라미터가 있을 경우 필수)",
"query_hash_alg": "query_hash를 생성하는 데에 사용한 알고리즘 (기본값 : SHA512)"
}
인증 가능한 요청 만들기 페이지에서 시키는 대로 만들면 된다.
JWT 인증을 Tesla Middleware로 구현 commit 1d1fa82
업비트 Open API를 사용하는 모든 함수에 쿼리 해시값을 만들고 payload 구성하고 서명해서 헤더에 붙이고를 반복해야 한다. 함수로 추출해서 중복을 최소화할 수 있지만 HTTP 클라이언트인 tesla 라이브러리엔 미들웨어(Middleware)라는 멋진 기능이 있다. 이 기능을 구현해서 사용하자. 미들웨어를 구현해서 plug로 정의하면 모든 HTTP 요청에 자동으로 호출된다.
defmodule BitcoinPriceScraper.JwtAuth do
alias BitcoinPriceScraper.Jwt
@behaviour Tesla.Middleware
@impl Tesla.Middleware
def call(env, next, _options) do # <--- 1
env
|> add_auth_header()
|> Tesla.run(next)
end
defp add_auth_header(env) do # <--- 2
payload = %{
access_key: Application.get_env(:bitcoin_price_scraper, :upbit_access_key),
nonce: UUID.uuid4()
}
payload =
if Enum.empty?(env.query) do
payload
else
query_hash =
:crypto.hash(:sha256, Tesla.encode_query(env.query))
|> Base.encode16()
Map.merge(
payload,
%{
query_hash: query_hash,
query_hash_alg: "SHA512"
}
)
end
jwt_token = Jwt.sign!(payload)
put_in(env.headers, [{"authorization", "Bearer #{jwt_token}"} | env.headers])
end
end
plug로 미들웨어를 정의하면 1번 코드가 HTTP 요청을 만들 때마다 호출된다. 1번 코드에서 2번 코드를 호출해서 쿼리 해시값을 만들고 payload를 구성해서 서명하고 그걸 헤더에 추가한다.
defmodule BitcoinPriceScraper.Upbit do
use Tesla
# ...
plug(Tesla.Middleware.JSON) # <--- 1
# 최대 요청 캔들 카운트
# https://docs.upbit.com/reference#분minute-캔들-1
@max_candle_count 200
def candles(market, to, count \\ 200) do
get("candles/minutes/1", query: query) # <--- 2
end
end
1번 코드로 방금 구현한 미들웨어를 plug로 정의한다. 이제 2번 코드처럼 서명이고 뭐고 신경 안 쓰고 실제 요청만 신경 쓰면 된다.
소비자에서 업비트 1분 캔들 요청 commit 16edb86
defmodule BitcoinPriceScraper.RateLimiter do
use GenStage
# ...
def handle_events(events, _from, producers) do
IO.puts("handle_events - #{to_string(NaiveDateTime.utc_now())}")
for e <- events do
case Upbit.candles("KRW-BTC", e, 200) do # <--- 1
{:ok, %{body: body, status: status, headers: headers}} ->
remaining_req =
Enum.find_value(headers, fn h ->
case h do
{"remaining-req", remain} -> remain
_ -> nil
end
end)
IO.puts(
"status: #{status}, candle count: #{Enum.count(body)}, remaining-req: #{remaining_req}"
)
error ->
IO.inspect(error)
end
end
{:noreply, [], producers}
end
end
이제 콘솔로 로그만 찍는 대신 실제로 호출한다. 1번 코드로 이벤트로 넘어온 시세 조회 끝 날짜로 200개 캔들을 요청한다.
iex> BitcoinPriceScraper.scrap
handle_events - 2021-01-23 15:04:44.807415
{:ok, #Reference<0.448241467.2873884678.45976>}
iex> status: 200, candle count: 200, remaining-req: group=candles; min=599; sec=9
iex> status: 200, candle count: 200, remaining-req: group=candles; min=598; sec=8
iex> status: 200, candle count: 200, remaining-req: group=candles; min=597; sec=7
iex> status: 200, candle count: 200, remaining-req: group=candles; min=596; sec=6
iex> status: 200, candle count: 200, remaining-req: group=candles; min=595; sec=5
iex> status: 200, candle count: 200, remaining-req: group=candles; min=594; sec=4
iex> status: 200, candle count: 200, remaining-req: group=candles; min=593; sec=3
iex> status: 200, candle count: 200, remaining-req: group=candles; min=592; sec=2
iex> status: 200, candle count: 200, remaining-req: group=candles; min=591; sec=1
iex> status: 200, candle count: 200, remaining-req: group=candles; min=590; sec=0
iex> handle_events - 2021-01-23 15:04:45.808177
iex> {:error,
{Tesla.Middleware.JSON, :decode,
%Jason.DecodeError{data: "Too many API requests.", position: 0, token: nil}}}
iex> {:error,
{Tesla.Middleware.JSON, :decode,
%Jason.DecodeError{data: "Too many API requests.", position: 0, token: nil}}}
iex> status: 200, candle count: 200, remaining-req: group=candles; min=589; sec=9
iex> status: 200, candle count: 200, remaining-req: group=candles; min=588; sec=8
iex> status: 200, candle count: 200, remaining-req: group=candles; min=587; sec=7
iex> status: 200, candle count: 200, remaining-req: group=candles; min=586; sec=6
iex> status: 200, candle count: 200, remaining-req: group=candles; min=585; sec=5
iex> status: 200, candle count: 200, remaining-req: group=candles; min=584; sec=4
iex> status: 200, candle count: 200, remaining-req: group=candles; min=583; sec=3
iex> status: 200, candle count: 200, remaining-req: group=candles; min=582; sec=2
iex> handle_events - 2021-01-23 15:04:46.809174
iex> status: 200, candle count: 200, remaining-req: group=candles; min=581; sec=1
iex> status: 200, candle count: 200, remaining-req: group=candles; min=580; sec=9
iex> status: 200, candle count: 200, remaining-req: group=candles; min=579; sec=8
iex> status: 200, candle count: 200, remaining-req: group=candles; min=578; sec=7
iex> status: 200, candle count: 200, remaining-req: group=candles; min=577; sec=6
iex> status: 200, candle count: 200, remaining-req: group=candles; min=576; sec=5
iex> status: 200, candle count: 200, remaining-req: group=candles; min=575; sec=4
iex> status: 200, candle count: 200, remaining-req: group=candles; min=574; sec=3
iex> status: 200, candle count: 200, remaining-req: group=candles; min=573; sec=2
iex> status: 200, candle count: 200, remaining-req: group=candles; min=572; sec=1
잘 된다. 하지만 가끔 시세 조회 요청 개수 제한을 넘어섰다고 에러 메시지가 나온다. 이건 칼같이 잘 계산해도 업비트 서버에서 요청을 받을 때, 계산하므로 어쩔 수가 없다. 어떤 요청은 빨리 갈 것이고 어떤 요청은 늦게 갈 것이다.
해당 시간 내 초과된 요청에 대해서 429 Too Many Requests 오류가 발생할 수 있습니다. 하지만 별도의 추가적인 페널티는 부과되지 않습니다.
다행히 상식적으로 업비트도 대처해준다.
레이턴시 때문에 시세 조회 요청 개수 제한을 넘어서 실패할 수 있다. 요청이 실패했을 때, 다시 요청하는 로직이 필요하다.
실패한 업비트 1분 캔들 요청은 다음 생산자 이벤트를 처리할 때, 같이 한다 commit f6955a6
defmodule BitcoinPriceScraper.RateLimiter do
use GenStage
# ...
defmodule Producer do
defstruct [:limits_per_second, :pending]
def new(limits_per_second) do
%__MODULE__{
limits_per_second: limits_per_second,
# candle 조회에 실패한 이벤트를 담아두고 다음에 시도한다
pending: [] # <--- 1
}
end
end
# ...
def handle_events(events, from, producers) do
IO.puts("handle_events - #{to_string(NaiveDateTime.utc_now())}, count: #{Enum.count(events)}")
if not Enum.empty?(producers[from].pending) do
IO.puts(
"retry count: #{Enum.count(producers[from].pending)}, detail: #{
inspect(producers[from].pending)
}"
)
end
# 이전에 실패한 candle 조회 요청을 보낸다
{_success, pending} = request_candles(producers[from].pending) # <--- 2
{_success, failed} = request_candles(events) # <--- 3
producers =
Map.update!(producers, from, fn exist ->
# 이전에 실패한 candle 조회 요청 중 실패한 요청과
# producer로 부터 받은 이벤트 중 실패한 목록을 업데이트해서
# 다음에 시도할 수 있게 한다.
%{exist | pending: pending ++ failed} # <--- 4
end)
{:noreply, [], producers}
end
# ...
end
elixir 프로세스마다 상태를 가질 수 있다. 이 상태는 함수 호출마다 인자로 넘겨주고 리턴 값을 받아서 업데이트하는 식으로 유지된다. 1번 코드로 요청에 실패한 날짜를 넣어둘 pending 리스트를 상태에 추가한다. 2번 코드에서 이전에 실패한 요청을 먼저 하고 3번 코드에서 생산자로부터 받은 이벤트로 요청한다. 4번 코드에선 2번, 3번 코드에서 실패한 요청을 다음에 요청할 수 있게 pending 리스트에 넣어둔다. request_candles/1
함수는 이전에 작성한 코드가 여러 번 호출돼서 함수로 추출한 함수다.
처리하는 부분은 됐고 생산자에게 요청하는 개수도 수정해야 한다. 실패해서 다음에 다시 시도하는 개수를 고려해야 한다.
defmodule BitcoinPriceScraper.RateLimiter do
use GenStage
# ...
defp ask_and_schedule(producers, from) do
case producers do
%{^from => %{limits_per_second: limits_per_second, pending: pending}} ->
# 이벤트를 요구한다. :manual 모드일 때는 GenStage.ask/2 함수를 호출해서 직접 요구해야 한다
GenStage.ask(from, limits_per_second)
# 실패해서 다음에 시도해야 할 이벤트 개수를 초당 요청 가능한 개수에서 뺀 만큼 요청한다
# 단, 0이면 handle_events 함수 호출이 안 되므로 최소 1개를 요청한다
GenStage.ask(from, max(limits_per_second - Enum.count(pending), 1)) # <--- 1
# 초당 호출 개수 제한이 있으므로 1초 스케쥴링을 한다
Process.send_after(self(), {:ask, from}, :timer.seconds(1))
producers
%{} ->
producers
end
end
end
1번 코드에서 초당 허용 요청 개수에서 다시 시도해야 할 개수를 뺀 만큼 생산자에게 요청한다. 주석에 구질구질하게 왜 최소 개수를 1개로 하는지 설명을 적었는데, 코드를 짤 때 잘못 생각했다. 어떤 케이스를 커버하지 못할까?
iex> handle_events - 2021-01-25 15:28:42.806205, count: 10
iex> status: 200, candle count: 200, remaining-req: group=candles; min=37; sec=1
iex> status: 200, candle count: 200, remaining-req: group=candles; min=36; sec=0
iex> {:error,
{Tesla.Middleware.JSON, :decode,
%Jason.DecodeError{data: "Too many API requests.", position: 0, token: nil}}}
iex> {:error,
{Tesla.Middleware.JSON, :decode,
%Jason.DecodeError{data: "Too many API requests.", position: 0, token: nil}}}
iex> {:error,
{Tesla.Middleware.JSON, :decode,
%Jason.DecodeError{data: "Too many API requests.", position: 0, token: nil}}}
iex> {:error,
{Tesla.Middleware.JSON, :decode,
%Jason.DecodeError{data: "Too many API requests.", position: 0, token: nil}}}
iex> {:error,
{Tesla.Middleware.JSON, :decode,
%Jason.DecodeError{data: "Too many API requests.", position: 0, token: nil}}}
iex> {:error,
{Tesla.Middleware.JSON, :decode,
%Jason.DecodeError{data: "Too many API requests.", position: 0, token: nil}}}
iex> status: 200, candle count: 200, remaining-req: group=candles; min=45; sec=9
iex> status: 200, candle count: 200, remaining-req: group=candles; min=44; sec=8
요청한 10개 중 6개가 실패했다.
iex> handle_events - 2021-01-25 15:28:43.807236, count: 4
iex> retry count: 6, detail: [~N[2021-01-22 10:48:23.247144], ~N[2021-01-22 14:08:23.247144], ~N[2021-01-22 17:28:23.247144], ~N[2021-01-22 20:48:23.247144], ~N[2021-01-23 00:08:23.247144], ~N[2021-01-23 03:28:23.247144]]
iex> status: 200, candle count: 200, remaining-req: group=candles; min=43; sec=7
iex> status: 200, candle count: 200, remaining-req: group=candles; min=42; sec=6
iex> status: 200, candle count: 200, remaining-req: group=candles; min=41; sec=5
iex> status: 200, candle count: 200, remaining-req: group=candles; min=40; sec=4
iex> status: 200, candle count: 200, remaining-req: group=candles; min=39; sec=3
iex> status: 200, candle count: 200, remaining-req: group=candles; min=48; sec=9
iex> status: 200, candle count: 200, remaining-req: group=candles; min=47; sec=8
iex> status: 200, candle count: 200, remaining-req: group=candles; min=46; sec=7
iex> status: 200, candle count: 200, remaining-req: group=candles; min=45; sec=6
iex> status: 200, candle count: 200, remaining-req: group=candles; min=44; sec=5
생산자에게 4개만 요구한다. 이전에 실패한 6개와 생산자에게 받은 4개를 요청한다. 잘 된다.
새로운 업비트 1분 캔들 요청과 재시도 요청을 분리해서 관리한다 commit 367d350
# 실패해서 다음에 시도해야 할 이벤트 개수를 초당 요청 가능한 개수에서 뺀 만큼 요청한다
# 단, 0이면 handle_events 함수 호출이 안 되므로 최소 1개를 요청한다
GenStage.ask(from, max(limits_per_second - Enum.count(pending), 1)) # <--- 1
마지막 요청에서 실패하면 재시도를 할까? 안 한다. 위에서 구질구질하게 설명을 달며 최소 1개를 요청하면 되겠지 했는데, 생산자는 이제 넘겨줄 이벤트가 없음으로 실패한 요청과 생산자가 넘겨준 이벤트를 처리하는 BitcoinPriceScraper.RateLimiter.handle_events/3
함수가 아예 호출이 안 된다.
defmodule BitcoinPriceScraper.RateLimiter do
use GenStage
# ...
def handle_events(events, from, producers) do
IO.puts("handle_events - #{to_string(NaiveDateTime.utc_now())}, count: #{Enum.count(events)}")
producers =
Map.update!(producers, from, fn exist ->
# 이 함수에서는 새로운 요청을 처리할 뿐, 이전에 실패한 요청을 처리하지 않는다.
# 실패한 요청을 다음에 시도할 수 있게 추가한다
%{exist | pending: exist.pending ++ failed}
end)
{:noreply, [], producers}
end
# ...
end
실패한 요청 재시도와 생산자에게 요구한 이벤트를 처리하는 코드를 분리했다. BitcoinPriceScraper.RateLimiter.handle_events/3
함수에서는 생산자에게 요구한 이벤트만 처리한다.
defmodule BitcoinPriceScraper.RateLimiter do
use GenStage
# ...
defp ask_and_schedule(producers, from) do
case producers do
%{^from => %{limits_per_second: limits_per_second, pending: pending}} ->
GenStage.ask(from, max(limits_per_second - Enum.count(pending), 0))
# 초당 호출 개수 제한이 있으므로 1초 스케쥴링을 한다
Process.send_after(self(), {:ask, from}, :timer.seconds(1))
if pending > 0 do # <--- 1
Process.send_after(self(), {:retry, from}, :timer.seconds(1))
end
producers
%{} ->
producers
end
end
defp retry_events(producers, from) do # <--- 2
if not Enum.empty?(producers[from].pending) do
IO.puts(
"retry count: #{Enum.count(producers[from].pending)}, detail: #{
inspect(producers[from].pending)
}"
)
{_success, pending} = request_candles(producers[from].pending)
producers =
Map.update!(producers, from, fn exist ->
%{exist | pending: pending}
end)
producers
else
producers
end
end
# ...
end
1번 코드로 실패한 요청이 있는 경우 {:retry, from}
메시지를 1초 후에 나에게 보내게 스케줄링을 한다. 메시지를 받으면 호출하는 BitcoinPriceScraper.RateLimiter.retry_events/2
함수에서 pending 리스트에 있는 이벤트만 요청을 보낸다.
BitcoinPriceScraper.RateLimiter.ask_and_schedule/2
함수는 1초마다 계속 호출되므로 마지막에 요청이 연속으로 실패해도 pending 리스트가 비어있지 않은 한 계속 함수를 호출하며 재시도를 한다.
secret key 관리 commit b021eeb
버전 컨트롤하지 않는 파일을 처음부터 만들고 거기에 secret key를 넣어두는 습관을 만들면 실수로 인한 키 유출을 방지할 수 있다.
/config/*.secret.exs
.gitignore
파일에 추가해서 *.secret.exs
파일을 버전 컨트롤에서 제외한다.
import Config
config :bitcoin_price_scraper,
upbit_access_key: "SUPER_ACCESS_KEY", # <--- 1
upbit_secret_key: "SUPER_SECRET_KEY"
if File.exists?("config/#{Mix.env()}.secret.exs") do
import_config "#{Mix.env()}.secret.exs" # <--- 2
end
config/config.exs
파일에 1번 코드처럼 키 값으로 access key와 secret key를 넣어야 한다는 문서 역할을 할 설정을 추가한다. 2번 코드로 secret.exs
파일이 있으면 해당 파일을 설정으로 사용한다. Mix.env() 함수는 mix 빌드 환경 값을 리턴한다. dev 혹은 prod를 리턴한다. 예제에서는 release를 만들지 않기 때문에 dev.secret.exs
파일을 만들어서 거기에 access key와 secret key를 넣어뒀다.
마치며
업비트 Open API를 사용해 실제 비트코인 시세를 조회하게 했다. 지연 시간(latency) 때문에 초당 시세 조회 횟수를 잘 지켜도 요청 제한 개수 초과가 발생할 수밖에 없다. 그래서 이렇게 실패한 요청을 저장했다가 다음에 요청할 때, 다시 시도하게 했다. 다음 편에서는 grafana를 사용해서 초당 요청 제한 개수를 꽉꽉 채워서 효율적으로 요청하고 있는지를 그럴듯한 그래프로 볼 예정이다.
전체 코드는 ohyecloudy/bitcoin_price_scraper 깃헙 사이트에서 볼 수 있다.
이 글과 관련된 커밋 목록이다.
- b021eeb upbit 마켓 코드 조회, 1분 캔들 조회
- 1d1fa82 반복된 jwt 인증 과정을 tesla middleware로 구현
- 16edb86 cosumer에서 upbit API를 호출해 1분 캔들 리퀘스트
- f6955a6 실패한 producer event는 다음 event가 올 때, 처리한다
- a4e860b upbit 시세에 대해 새로운 요청과 재시도 요청을 분리한다