Retty Tech Blog

実名口コミグルメサービスRettyのエンジニアによるTech Blogです。プロダクト開発にまつわるナレッジをアウトプットして、世の中がHappyになっていくようなコンテンツを発信します。

マイクロサービスのデータぜんぶ抜く……gRPCで!

どうも、エンジニアの神 id:pikatenor です。書きかけの記事を下書きに突っ込んで放置していたらマネージャーの常松に目をつけられ、#Rettyマイクロサービス強化月間 第1週目の記事に祭り上げられることになりましたが無事に遅刻しました。記事の公開をお待ちいただいていた皆様には深くお詫び申し上げます。

engineer.retty.me

そういうわけで今回は自作OSSの宣伝とそいつをサービスに組み込むに至った背景のお話です。

マイクロサービスのDB分割と集約

さて、Retty がマイクロサービスアーキテクチャへの移行取り組んでいるという話は従前の通りですが、最近では共有DBの呪いから解き放たれるべくDBスキーマを完全に分割したマイクロサービスも現れました。

共有DBの呪いとはなんぞや、詳細は「モノリスからマイクロサービスへ」といった書籍に委ねますが、データの所有権が曖昧になる、独立デプロイ可能性が損なわれる、スキーママイグレーションが困難になる、etc... 共有DBはマイクロサービスにとって様々な角度から「敵」となりえる存在です。

www.oreilly.co.jp

一方、システム全体を横断する検索機能を設計する場合、ほとんどはデータを集約しインデックスを構築することとなります。Retty でも主たる検索機能には Elasticsearch を利用し、MySQL DB からデータを抜き出し Index を生成しています。これらの生成には Logstash および JDBC Input Plugin を用いて共有DBへクエリを発行してインデキシングを行っていました。

しかし、分割されたDBスキーマに対し、これまでと同様にマイクロサービスの垣根を超えて自由にクエリを投げてしまっては、結局共有DBと同様の問題を抱え込むこととなります。

DB そのものを外部から守りつつデータをインデキシングする方法について、いくつかのパターンを検討しましたが、今回はマイクロサービスが公開しているAPI上に、必要なデータを全件公開するAPIを新規に実装し、データをぶっこ抜くことにしました。

Logstash + gRPC という選択

Retty のマイクロサービスのほとんどはサービス間通信に gRPC を用いています。gRPC には Server Streaming RPC と呼ばれる種類の RPC があり、これを利用することで大量のデータを効率よく処理することができそうでした。

また、検索システム側では Logstash をバッチとして定期実行する基盤*1があり、つまり Logstash が gRPC を喋ってくれれば最高なんだよな〜と思い立ち、実験してみたところ案外いけそうだったので gRPC Stream を Input とするプラグインを実装することにしました。こうしてできたものが logstash-input-grpc です。

github.com

大雑把な説明

公式の hellostreamingworld.proto を例にすると、protoc で DescriptorSet 形式に変換して、

protoc --include_imports -o hellostreamingworld.protoset ./hellostreamingworld.proto

次のような Logstash の pipeline.conf を書いて実行すると

input {
  grpc {
   host => "localhost"
   port => 50051

   protoset_path => "/tmp/hellostreamingworld.protoset"

   rpc => "hellostreamingworld.MultiGreeter/sayHello"

   message => {
     name => "Logstash"
     num_greetings => 3
   }
  }
}
output {
    stdout {}
}

sayHello rpc が呼び出され、stream で返された HelloReply がそれぞれ Logstash の event として出力されます。

[2022-06-02T17:19:18,180][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
{
    "@timestamp" => 2022-06-02T18:19:18.378Z,
       "message" => "Hello, Logstash",
      "@version" => "1"
}
{
    "@timestamp" => 2022-06-02T18:19:19.348Z,
       "message" => "Hello, Logstash",
      "@version" => "1"
}
{
    "@timestamp" => 2022-06-02T18:19:20.348Z,
       "message" => "Hello, Logstash",
      "@version" => "1"
}
[2022-06-02T17:19:21,359][INFO ][net.p1kachu.logstash.input.grpc.Grpc][main][32030a93d5b1a33b7b52a8dd01281c5942a1318c2462354ebde617c62d7031b8] rpc call ended successfully
[2022-06-03T03:19:21,571][INFO ][logstash.javapipeline    ][main] Pipeline terminated {"pipeline.id"=>"main"}

得られた event は煮るなり好きに filter をかましたりお気に入りの output に出力したり焼くなりできます。

gRPC Server 側の実装

gRPC マイクロサービス側では、Go 言語の Channel を活用することで repository 層の詳細を隠蔽しつつ Streaming を実装することができました。

repository 側 (SQL)

func (r *repository) ListAllEmployee(ctx context.Context) (<-chan entity.Employee, <-chan error) {
    dataChannel := make(chan entity.Employee)
    errChannel := make(chan error)
    go func() {
        defer close(dataChannel)
        defer close(errorChannel)
     
        rows, err := r.db.QueryContext(ctx, "SELECT ENAME FROM EMP;")
        if err != nil {
            errChannel <- err
            return
        }
        defer rows.Close()
     
        for rows.Next() {
            select {
            case <-ctx.Done():
                return
            default:
                var ename string;
                err = rows.Scan(&ename)
                if err != nil {
                    errorChannel <- err
                    return
                }

                dataChannel <- entity.Employee{
                    Name: ename,
                }
            }
        }
    }()
    return dataChannel, errChannel
}
 

service 側 (gRPC)

func (s *service) ListAllEmployee(query *proto.ListAllEmployeeRequest, server proto.Service_ListAllEmployeeServer) error {
    empChannel, errChannel := s.useCase.ListAllEmployee(server.Context())
    for {
        select {
        case emp, isOpen := <-empChannel:
            if !isOpen {
                return nil
            }
            err := server.Send(&proto.ListAllEmployeeResponse{
                Name: emp.Name, 
            })
            if err != nil {
                return status.Newf(codes.Aborted, "send:%v", err)
            }
        case err := <-errorChannel:
            if err != nil {
                return status.Newf(codes.Internal, "internal:%v", err)
            }
        }
    }
}

複数の Channel のような機能を管理する必要があると Close 忘れなどが怖いですが、Go 言語では defer を扱えるので少し気持ちが楽ですね。repository 側でも ctx.Done() を確認することで呼び出し側からのキャンセルをサポートしています。

良かったこと

これまで検索システムの基盤として利用していた Logstash のプラグインとして実装したことにより、一貫性を保って他のパイプラインと合わせて管理することができました。また、gRPC のメッセージから出力先の Elasticsearch の検索クエリに適した形に変換する処理も Logstash の filter として実装できるため、豊富な filter プラグインを活用することができました。

gRPC マイクロサービス側でも、大幅な拡張を伴うことなく RPC を1つ追加するに留めることができました。

Retty ではバッチ処理で Elasticsearch Index を作成し日次で切り替えるという構成を取っているため、今回は単発実行でデータ全部抜くというパイプラインの実装でしたが、gRPC Stream は単に複数メッセージを扱えるだけでなくイベント的にやり取りすることもできるため、リアルタイムに Document を更新するような構成にも活用できるのではないかとか考えています。

わりとみんなこういうの欲しいんじゃないの!?という思いから作成したプラグインなので、これを機にまた違った採用事例が聞かせてもらえれば嬉しいです。

おまけ: プラグインの実装についてあれこれ

Logstash は JRuby で実装されており、プラグインJRuby で実装するのが基本だったのですが、grpc-ruby が JRuby に対応していないという問題があり、grpc-javaバインディングで頑張るよりはいっそ最初から Java で書いたほうがいいなと考え Java Plugin を書くことを選択しました。テンプレートをベースにしつつ自分の好みで Kotlin に書き換えましたが特に問題はなかったのと、ぶっちゃけ開発環境の構築も Java オンリーのほうが楽だと思いました。

汎用的なプラグインとするためには Protobuf の定義を動的に読み込んで gRPC コールをする必要があったのですが、そのあたりのコードは Polyglot という Java 製の CLI クライアントから思いっきりコードをお借りしました。

github.com

実際のところ難しい部分はほとんど Polyglot 由来で、プラグイン本体部分は1ファイルに収まる程度しか書いてないです。

*1:この辺は検索システムのリニューアルに合わせて ECS Scheduled Task を基盤とするものがわりと最近整備されました。詳しくは 6/8 公開予定の記事「検索サービスの構築」をお楽しみに!