- コトの発端
- 実現したい分析基盤
- dataformでやっていたこと
- 使ったdbtの機能は、incrimental modelのThe insert_overwrite strategy
- 実際にやったコト
- 感想
- サンプル
この記事Retty Advent Calendar 2023 - Adventarの24日目の記事です。
2023年4月に入社した土田です。
コトの発端
こんなメールが来た。
半年後にdeprecated。まじか。
実現したい分析基盤
前述の通りRettyではdataformを使って、DWH内の物理テーブルを更新しています。
GCPに統合されるということなので、そのままGoogle Cloud の Dataformに移行してもよかったのですが、Rettyの分析基盤はdbtでデータガバナンスをしていることからdbtでできるならdbtに移行しよう!となりました。
Rettyの分析基盤
dbtにまとめて、こうしたい
ふと思ったが、ツール構成はめっちゃシンプルだな。
dataformでやっていたこと
- 指定時刻での実行
- merge intoで任意のレコードをdelete、insertする
使ったdbtの機能は、incrimental modelのThe insert_overwrite strategy
<公式ドキュメント>
docs.getdbt.com docs.getdbt.com
<参考記事>
実際にやったコト
dbtでテーブルを作成するsqlファイルに以下を追加します。
merge intoで指定する任意のパーティション設定
{% set partitions_to_replace = [ 'timestamp(current_date)', 'timestamp(date_sub(current_date, interval 1 day))' ] %}
この部分は、後述にあるincremental時のwhere句に展開されます。
注意点としては、merge文が ”in” でcompileされるのでそれに合わせた設定が必要ということ。
例)
between 'timestamp_sub(timestamp(date_sub(current_date("Asia/Tokyo"), interval 7 day)), interval 4 hour)' and 'timestamp(current_date)'
の様な場合は、
'timestamp(date_sub(current_date("Asia/Tokyo"), interval 7 day))', 'timestamp(date_sub(current_date("Asia/Tokyo"), interval 6 day))', 'timestamp(date_sub(current_date("Asia/Tokyo"), interval 5 day))', 'timestamp(date_sub(current_date("Asia/Tokyo"), interval 4 day))', 'timestamp(date_sub(current_date("Asia/Tokyo"), interval 3 day))', 'timestamp(date_sub(current_date("Asia/Tokyo"), interval 2 day))', 'timestamp(date_sub(current_date("Asia/Tokyo"), interval 1 day))', 'timestamp(current_date("Asia/Tokyo"))'
で記載する必要があります。
incremental modelのinsert_overwriteの設定
{{ config( schema="任意のデータセットID" , alias="任意のテーブル名" , materialized='incremental' , incremental_strategy='insert_overwrite' , partition_by={'field': '__partitiontime', 'data_type': 'timestamp'} , partitions=partitions_to_replace , tags=[" Tag名"] , on_schema_change='fail' ) }}
公式ドキュメントには、’dbt_project.yml’、もしくはsqlファイルのconfigに
+incremental_strategy: "insert_overwrite"
を設定すると記載があります。
今回は、挙動確認のためsqlファイルのconfigに設定することにしました。
(今後の運用次第で’dbt_project.yml’に定義することも検討しています)
Rettyではdbt cloudでスケジュール起動しているので、tagsにはスケジュールで設定しているTagを設定しています。
on_schema_changeを設定すると、schemaが変更されたことをエラーで教えてくれるようです。
incremental時のwhere句
{% if is_incremental() %} -- recalculate yesterday + today where timestamp_trunc(__partitiontime, day) in ({{ partitions_to_replace | join(',') }}) {% endif %}
ここの部分は、初回実行時には展開されず2回目以降(incremental時)に展開される部分です。
クエリ部分なので条件式 ”in” 以外のものに変更できそうに見えますが、merge intoの条件式がデフォルトで”in”になっているので、変更すると意図通りの挙動となりません。
感想
- 非エンジニアの僕でもできるくらいシンプルでした。
- まだdataformから乗り換えられていないので、そろそろ頑張ろうと思ってます。
- 記事書くのが初めてだったので、レビューボロボロでした。
- dbtはまだまだ使いこなせていない感があって、分析コストを下げるためにも整備していきたいです。
サンプル
コード
{% set partitions_to_replace = [ 'timestamp(current_date)', 'timestamp(date_sub(current_date, interval 1 day))' ] %} {{ config( config( schema="任意のデータセット" , alias="任意のテーブル名" , materialized='incremental' , incremental_strategy='insert_overwrite' , partition_by={'field': '__partitiontime', 'data_type': 'timestamp'} , partitions=partitions_to_replace , tags=[" Tag名"] , on_schema_change='fail' ) }} with day_base as ( select __partitiontime from unnest( generate_timestamp_array( timestamp(date_sub(current_date, interval 1 day)) , timestamp_trunc(timestamp(current_date('Asia/Tokyo')), day) , interval 1 day ) as __partitiontime ) select __partitiontime , timestamp(current_date) as updated_at from day_base {% if is_incremental() %} -- recalculate yesterday + today where timestamp_trunc(__partitiontime, day) in ({{ partitions_to_replace | join(',') }}) {% endif %}
挙動
N日に実行
create table 任意のデータセット名
が実行されます。
partitiontime | updated_at |
---|---|
N-1 | N |
N | N |
N+1日に実行
merge into 任意のデータセット名
が実行されます。
partitiontime | updated_at |
---|---|
N-1 | N |
N | N+1 |
N+1 | N+1 |
以上です。読んで頂きありがとうございました。