Retty Tech Blog

実名口コミグルメサービスRettyのエンジニアによるTech Blogです。プロダクト開発にまつわるナレッジをアウトプットして、世の中がHappyになっていくようなコンテンツを発信します。

dataformからdbtに移行するために調べたこと

この記事Retty Advent Calendar 2023 - Adventarの24日目の記事です。

2023年4月に入社した土田です。

コトの発端

こんなメールが来た。

半年後にdeprecated。まじか。

実現したい分析基盤

前述の通りRettyではdataformを使って、DWH内の物理テーブルを更新しています。
GCPに統合されるということなので、そのままGoogle Cloud の Dataformに移行してもよかったのですが、Rettyの分析基盤はdbtでデータガバナンスをしていることからdbtでできるならdbtに移行しよう!となりました。

Rettyの分析基盤

dbtにまとめて、こうしたい

ふと思ったが、ツール構成はめっちゃシンプルだな。

dataformでやっていたこと

  1. 指定時刻での実行
  2. merge intoで任意のレコードをdelete、insertする

使ったdbtの機能は、incrimental modelのThe insert_overwrite strategy

<公式ドキュメント>

docs.getdbt.com docs.getdbt.com

<参考記事>

speakerdeck.com

実際にやったコト

dbtでテーブルを作成するsqlファイルに以下を追加します。

merge intoで指定する任意のパーティション設定

{%
  set partitions_to_replace = [
    'timestamp(current_date)',
    'timestamp(date_sub(current_date, interval 1 day))'
  ]
%}

この部分は、後述にあるincremental時のwhere句に展開されます。
注意点としては、merge文が ”in” でcompileされるのでそれに合わせた設定が必要ということ。

例)

between 'timestamp_sub(timestamp(date_sub(current_date("Asia/Tokyo"), interval 7 day)), interval 4 hour)' and 'timestamp(current_date)'

の様な場合は、

'timestamp(date_sub(current_date("Asia/Tokyo"), interval 7 day))', 'timestamp(date_sub(current_date("Asia/Tokyo"), interval 6 day))', 'timestamp(date_sub(current_date("Asia/Tokyo"), interval 5 day))', 'timestamp(date_sub(current_date("Asia/Tokyo"), interval 4 day))', 'timestamp(date_sub(current_date("Asia/Tokyo"), interval 3 day))', 'timestamp(date_sub(current_date("Asia/Tokyo"), interval 2 day))', 'timestamp(date_sub(current_date("Asia/Tokyo"), interval 1 day))', 'timestamp(current_date("Asia/Tokyo"))'

で記載する必要があります。

incremental modelのinsert_overwriteの設定

{{
  config(
    schema="任意のデータセットID"
    , alias="任意のテーブル名"
    , materialized='incremental'
    , incremental_strategy='insert_overwrite'
    , partition_by={'field': '__partitiontime', 'data_type': 'timestamp'}
    , partitions=partitions_to_replace
    , tags=[" Tag名"]
    , on_schema_change='fail'
  )
}}

公式ドキュメントには、’dbt_project.yml’、もしくはsqlファイルのconfigに
+incremental_strategy: "insert_overwrite"
を設定すると記載があります。

今回は、挙動確認のためsqlファイルのconfigに設定することにしました。
(今後の運用次第で’dbt_project.yml’に定義することも検討しています)
Rettyではdbt cloudでスケジュール起動しているので、tagsにはスケジュールで設定しているTagを設定しています。
on_schema_changeを設定すると、schemaが変更されたことをエラーで教えてくれるようです。

incremental時のwhere句

{% if is_incremental() %}
  -- recalculate yesterday + today
  where timestamp_trunc(__partitiontime, day) in ({{ partitions_to_replace | join(',') }})
{% endif %}

ここの部分は、初回実行時には展開されず2回目以降(incremental時)に展開される部分です。
クエリ部分なので条件式 ”in” 以外のものに変更できそうに見えますが、merge intoの条件式がデフォルトで”in”になっているので、変更すると意図通りの挙動となりません

感想

  • 非エンジニアの僕でもできるくらいシンプルでした。
  • まだdataformから乗り換えられていないので、そろそろ頑張ろうと思ってます。
  • 記事書くのが初めてだったので、レビューボロボロでした。
  • dbtはまだまだ使いこなせていない感があって、分析コストを下げるためにも整備していきたいです。

サンプル

コード

{%
  set partitions_to_replace = [
    'timestamp(current_date)',
    'timestamp(date_sub(current_date, interval 1 day))'
  ]
%}

{{
  config(
  config(
    schema="任意のデータセット"
    , alias="任意のテーブル名"
    , materialized='incremental'
    , incremental_strategy='insert_overwrite'
    , partition_by={'field': '__partitiontime', 'data_type': 'timestamp'}
    , partitions=partitions_to_replace
    , tags=[" Tag名"]
    , on_schema_change='fail'
  )
}}

with day_base as (
  select
    __partitiontime
  from
    unnest(
      generate_timestamp_array(
      timestamp(date_sub(current_date, interval 1 day))
      , timestamp_trunc(timestamp(current_date('Asia/Tokyo')), day)
      , interval 1 day
    ) as __partitiontime
)

select
  __partitiontime
  , timestamp(current_date) as updated_at
from
  day_base

{% if is_incremental() %}
  -- recalculate yesterday + today
  where timestamp_trunc(__partitiontime, day) in ({{ partitions_to_replace | join(',') }})
{% endif %}

挙動

N日に実行

create table 任意のデータセット名が実行されます。

partitiontime updated_at
N-1 N
N N

N+1日に実行

merge into 任意のデータセット名が実行されます。

partitiontime updated_at
N-1 N
N N+1
N+1 N+1

以上です。読んで頂きありがとうございました。