この記事はRetty Advent Calendarの3日目の記事です。
ハロー! software engineerのTakato Fukuiです。最近バッチアプリケーションを開発しました。この記事ではその際に使用したバッチ実装のテクニックを説明します。
バッチの品質を高めるためには考慮することがたくさんあります。
- 冪等にする
- 同時実行できる
- パラメータ指定などしてrerunし易い
- 処理失敗しにくいようにAPIアクセスなどをretryする
- 途中でエラーが起きても結果に問題なければ処理継続する
- ...
(Web APIでもある程度同じことは言えます)
このバッチ実装における設計やGo実装のいくつかのテクニックを説明します。 今回開発したバッチの概要です。
前提となる技術スタックです。
アプリケーションアーキテクチャ
(特にGoに限った話ではないです)
Goで書かれたバッチは他にもいくつか存在しています。ですが現在はバッチのロジック実装と起動時間を設定するだけでバッチを動かせるようなGoのバッチ基盤は存在しません。今回開発したバッチは運用後に頻繁に改修が入るかわからなかったですし、大体の開発は将来のことは見えにくいと思います。 なのでゼロからバッチ開発するのはコストが高く、ある程度既存のコードなりを流用できないかと考えました。
今回開発したバッチは飲食店の基本情報(営業時間や予算など)を扱います。他にWeb APIとして飲食店の基本情報を扱うgRPCサービスがあります。このgRPCサービスのアプリケーションアーキテクチャです。一般的なWebアプリケーションの構成に近いと思います。
- bootstrap
エントリーポイントやgRPCサービス登録などの処理 - entity
ビジネスロジックを持つ構造体など - service
gRPCの知識を持ち、Protocol Buffersに定義されたrpcの実装などする - usecase
repositoryやentityを操作しserviceに返す - repository
DBやgRPCなどの技術基盤を隠蔽しentityを操作する
既存のgRPCサービスとバッチで似たような情報を扱うという意味もあり、このgRPCサービスに相乗りするようにしました。
バッチ用のbootstrapとusecaseを作り、他は既存のものを流用しました。
- バッチ用bootstrap
バッチ起動するエントリーポイント含む - entity
既存gRPCサービスと同じものを扱う - バッチ用usecase
既存usecaseはgRPCから使われるので実行時間が短いが、バッチ用は実行時間が長い - repository
バッチで使用する外部API操作は既存gRPCと同じrepositoryの扱いにした
あくまでgRPCからでも使えるような短い時間で処理できる単位にしている
相乗りにより過剰な開発を避けられました。将来的にこのバッチが大きくなったり他にもGoのバッチが増えてきたら、バッチのみを切り出したり、バッチ基盤構築と移行が検討されると思います。
可用性向上のためのretry
失敗したバッチを手動でrerunするのは手間がかかります。失敗率をなるべく減らすべく、ネットワークアクセスにretryを入れています。 gRPC clientにはgrpc-ecosystem/go-grpc-middlewareのretryを、HTTP clientにはhashicorp/go-retryablehttpを使用しました。
今回はバッチ相乗り元のgRPCサービスの方でも同じretry設定を共通で使うようにしています。ただバッチとWeb APIではretryの温度感が異なるケースがあります。例えば日次バッチを時間内に処理完了しなければいけないのに対し、高リクエストレートの読み込みAPIでは若干の失敗は許容できるケースです。絶えずリクエストがあるWeb API(gRPC)でcircuit breakerなしでretryすると相手serverの負荷を高める可能性があります。
バッチ失敗監視
バッチ自体の失敗を判定できるようにするには、どのようにGoで実装すればよいでしょうか。 main関数でエラーが返ってきた場合はGoを終了コード1で終了します。そしてGo実行元で終了コードを監視します。これによりバッチ自体の失敗を判定できます。
func main() { // ... if err := run(); err != nil { slog.ErrorContext(ctx, "failed to batch", "err", err) os.Exit(1) } }
os.Exitはdeferが呼ばれません。そのためmain関数外でdeferを必要とするbootstrap処理をし、main関数内の処理は最小限にしています。
エラー処理継続
このバッチは複数飲食店を処理しますが、例えばある1つの飲食店が処理失敗しても、他の飲食店は処理中断しないようにしたいです。これにはGo 1.20から導入されたerrors.Joinを有効活用できます。アメイジング!
var joinedError error // 複数飲食店のループ for i, restaurant := range restaurants { // 飲食店の処理 if err != nil { joinedError = errors.Join(joinedError, fmt.Errorf("failed to process a restaurant: %w", err)) continue } // ... } return joinedError
errors.Joinの返り値(joinedError)が複数errorを保持するerror型になります。
またerrors.Joinの返り値を出力するなどしてstringにすると、joinされたerrorごとに改行されます。例えば以下のようにwrapしたerrors.Joinの返り値を出力すると、やや見づらくなります。
func foo() { err := bar() if err != nil { fmt.Println(err) } } func bar() error { var joinedError error for i := 0; i < 2; i++ { joinedError = errors.Join(joinedError, errors.New(fmt.Sprintf("error %d", i))) } return fmt.Errorf("failed: %w", joinedError) }
⇩出力結果
failed: error 0 error 1
より見やすくしたい場合は工夫が必要でしょう。
ライブラリ活用
バッチにおいて遭遇しやすい処理パターンで活用できるライブラリを説明します。
複数アイテムのchunk
バッチでは複数アイテムをchunkして処理したいケースが多いと思います。例えば100件のデータを10件ずつ処理する、などです。このバッチでは複数飲食店のchunkにsamber/loのChunk関数を使っており便利です。
HTTP requestのrate limit
このバッチは処理の中で連続して何回も外部APIを叩きます。外部APIのQPSに引っかからないように、HTTP requestにrate limitを入れています。
rate limitにはgolang.org/x/time/rateを使っています。例えば以下のようにhttp.RoundTripperを実装することで、これを実現できます。
import ( // ... "golang.org/x/time/rate" ) func (r *rateLimitRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { err := r.limiter.Wait(req.Context()) if err != nil { return nil, err } return r.delegate.RoundTrip(req) } func newRateLimitedClient() *http.Client { return &http.Client{ Transport: &rateLimitRoundTripper{ limiter: rate.NewLimiter(rate.Every(1*time.Second), 1), // 1秒ごとに1回のrequest delegate: http.DefaultTransport, }, } }
バッチ実装でのいくつかのテクニックを見てきました。過剰な開発を避け、可用性や信頼性の高いバッチアプリケーションを開発できたと思います。
ハッピーコーディング!