
- コトの発端
- 実現したい分析基盤
- dataformでやっていたこと
- 使ったdbtの機能は、incrimental modelのThe insert_overwrite strategy
- 実際にやったコト
- 感想
- サンプル
この記事Retty Advent Calendar 2023 - Adventarの24日目の記事です。
2023年4月に入社した土田です。
コトの発端
こんなメールが来た。

半年後にdeprecated。まじか。
実現したい分析基盤
前述の通りRettyではdataformを使って、DWH内の物理テーブルを更新しています。
GCPに統合されるということなので、そのままGoogle Cloud の Dataformに移行してもよかったのですが、Rettyの分析基盤はdbtでデータガバナンスをしていることからdbtでできるならdbtに移行しよう!となりました。
Rettyの分析基盤

dbtにまとめて、こうしたい

ふと思ったが、ツール構成はめっちゃシンプルだな。
dataformでやっていたこと
- 指定時刻での実行
- merge intoで任意のレコードをdelete、insertする
使ったdbtの機能は、incrimental modelのThe insert_overwrite strategy
<公式ドキュメント>
docs.getdbt.com docs.getdbt.com
<参考記事>
実際にやったコト
dbtでテーブルを作成するsqlファイルに以下を追加します。
merge intoで指定する任意のパーティション設定
{%
set partitions_to_replace = [
'timestamp(current_date)',
'timestamp(date_sub(current_date, interval 1 day))'
]
%}
この部分は、後述にあるincremental時のwhere句に展開されます。
注意点としては、merge文が ”in” でcompileされるのでそれに合わせた設定が必要ということ。
例)
between 'timestamp_sub(timestamp(date_sub(current_date("Asia/Tokyo"), interval 7 day)), interval 4 hour)' and 'timestamp(current_date)'
の様な場合は、
'timestamp(date_sub(current_date("Asia/Tokyo"), interval 7 day))', 'timestamp(date_sub(current_date("Asia/Tokyo"), interval 6 day))', 'timestamp(date_sub(current_date("Asia/Tokyo"), interval 5 day))', 'timestamp(date_sub(current_date("Asia/Tokyo"), interval 4 day))', 'timestamp(date_sub(current_date("Asia/Tokyo"), interval 3 day))', 'timestamp(date_sub(current_date("Asia/Tokyo"), interval 2 day))', 'timestamp(date_sub(current_date("Asia/Tokyo"), interval 1 day))', 'timestamp(current_date("Asia/Tokyo"))'
で記載する必要があります。
incremental modelのinsert_overwriteの設定
{{
config(
schema="任意のデータセットID"
, alias="任意のテーブル名"
, materialized='incremental'
, incremental_strategy='insert_overwrite'
, partition_by={'field': '__partitiontime', 'data_type': 'timestamp'}
, partitions=partitions_to_replace
, tags=[" Tag名"]
, on_schema_change='fail'
)
}}
公式ドキュメントには、’dbt_project.yml’、もしくはsqlファイルのconfigに
+incremental_strategy: "insert_overwrite"
を設定すると記載があります。
今回は、挙動確認のためsqlファイルのconfigに設定することにしました。
(今後の運用次第で’dbt_project.yml’に定義することも検討しています)
Rettyではdbt cloudでスケジュール起動しているので、tagsにはスケジュールで設定しているTagを設定しています。
on_schema_changeを設定すると、schemaが変更されたことをエラーで教えてくれるようです。
incremental時のwhere句
{% if is_incremental() %}
-- recalculate yesterday + today
where timestamp_trunc(__partitiontime, day) in ({{ partitions_to_replace | join(',') }})
{% endif %}
ここの部分は、初回実行時には展開されず2回目以降(incremental時)に展開される部分です。
クエリ部分なので条件式 ”in” 以外のものに変更できそうに見えますが、merge intoの条件式がデフォルトで”in”になっているので、変更すると意図通りの挙動となりません。
感想
- 非エンジニアの僕でもできるくらいシンプルでした。
- まだdataformから乗り換えられていないので、そろそろ頑張ろうと思ってます。
- 記事書くのが初めてだったので、レビューボロボロでした。
- dbtはまだまだ使いこなせていない感があって、分析コストを下げるためにも整備していきたいです。
サンプル
コード
{%
set partitions_to_replace = [
'timestamp(current_date)',
'timestamp(date_sub(current_date, interval 1 day))'
]
%}
{{
config(
config(
schema="任意のデータセット"
, alias="任意のテーブル名"
, materialized='incremental'
, incremental_strategy='insert_overwrite'
, partition_by={'field': '__partitiontime', 'data_type': 'timestamp'}
, partitions=partitions_to_replace
, tags=[" Tag名"]
, on_schema_change='fail'
)
}}
with day_base as (
select
__partitiontime
from
unnest(
generate_timestamp_array(
timestamp(date_sub(current_date, interval 1 day))
, timestamp_trunc(timestamp(current_date('Asia/Tokyo')), day)
, interval 1 day
) as __partitiontime
)
select
__partitiontime
, timestamp(current_date) as updated_at
from
day_base
{% if is_incremental() %}
-- recalculate yesterday + today
where timestamp_trunc(__partitiontime, day) in ({{ partitions_to_replace | join(',') }})
{% endif %}
挙動
N日に実行
create table 任意のデータセット名が実行されます。
| partitiontime | updated_at |
|---|---|
| N-1 | N |
| N | N |
N+1日に実行
merge into 任意のデータセット名が実行されます。
| partitiontime | updated_at |
|---|---|
| N-1 | N |
| N | N+1 |
| N+1 | N+1 |
以上です。読んで頂きありがとうございました。