- 07 Mar 2024
- 印刷
- ダークライト
- PDF
DWH上のマスターテーブルを重複なく効率的に更新する
- 更新日 07 Mar 2024
- 印刷
- ダークライト
- PDF
概要
本ページでは、DWHのテーブルに対し、「データの重複なく」「効率的に」変更を反映する方法をご紹介します。
はじめに
転送先Google BigQueryには、UPSERT
やMERGE
といった転送モードがありません。
そのため、代替案として以下のような方法で対応されることが考えられますが、それぞれ懸念点があります。
追記によるデータの重複
UPSERT
やMERGE
の代わりに、データソース側で変更・追加された行を、追記(APPEND
やINSERT
)する方法があります。
一方でこの方法を用いると、DWH上のテーブルにおいてデータの重複が発生することがあります。
全件洗い替えによる転送時間の増化
データの重複を避けるためには、DWH上のテーブルに対して「データソース側の変更があるごとに、全件洗い替え(REPLACE
)を行う」といった方法も考えられます。
一方でこの方法を用いると、すでにDWH上のテーブルにあるレコードの大半を再転送することになるため、転送時間が膨大になることが考えられます。
本ページで紹介したいこと
転送モードとしてUPSERT
やMERGE
がない転送先BigQuery・Snowflakeのテーブルに対して、「データの重複なく」「効率的に」変更を反映する方法を、ユースケースを交えてご紹介します。
転送先Amazon Redshiftおよび転送先Snowflakeは、転送モードとしてUPSERT (MERGE)
に対応しています。
適宜ご利用ください。
ユースケース
ユニークなカラムを利用してデータの重複を避けたい
IDなどのユニークな値を持つカラムをマージキーとして利用することで、データの重複を排除する形でDWH上のマスターテーブルを更新できます。
データソース側の変更に自動で追従させたい
転送元広告系コネクタの中には、先に速報値が生成され、後日になって値が確定する仕様のものがあります。
先に速報値を転送し、後日確定した値を先に転送したテーブルに対してマージするワークフローを組むことで、工数をかけることなくDWH上のマスターテーブルを更新できます。
大容量データを効率的に反映させたい
転送元DB系コネクタの場合、転送対象のデータが大容量であることがあります。
この場合に、updated_at
カラムなどを利用して直近で更新された行だけを取得し、マスターテーブルに対してマージすることで、全件洗い替えと比較して効率的にDWH上のマスターテーブルを更新できます。
更新手順
大きく3つのステップで行います。
2つのテーブルをDWHに転送し、データマートを用いてマージすることで、テーブルを重複なく効率的に更新できます。
- マスターテーブルの作成
- 一時テーブルの作成
- 一時テーブルをマスターテーブルにマージ
1. マスターテーブルの作成
本ステップは、初回のみ実施します。
DWHに対して全件転送を行い、マスターテーブルを作成します。
以下のような転送設定を作成し、ジョブを実行します。
- 転送元側の設定にて、転送設定として全件転送を選択
- 転送元側の設定にて、全件転送となるようにクエリを記述
2. 一時テーブルの作成
DWHに対して差分レコードのみ転送を行い、一時テーブルを作成します。
以下のような転送設定を作成し、ジョブを実行します。
- 転送元側の設定にて、手順1実行後に更新・追加があったレコードのみ転送されるようにクエリを記述
- 以下のようなWHERE句を含める
WHERE updated_at >= "2023-04-01"
なお、WHERE句内に時刻・日時のカスタム変数を埋め込むことで、後述のワークフロー定義を利用した運用も可能となります。
3. 一時テーブルをマスターテーブルにマージ
データマートを用いて、一時テーブルをマスターテーブルにマージします。
データマート定義のクエリ実行モードにて自由記述モードを選択し、MERGE文をクエリに記述し、ジョブを実行します。
以下は、データマートBigQueryを用いる場合のクエリサンプルです。
MERGE
`<master_project>.<master_dataset>.<master_table>` AS master -- master table
USING
`<temp_project>.<temp_dataset>.<temp_table>` AS temp -- temp table
ON
master.id = temp.id
WHEN MATCHED THEN
UPDATE SET
-- Enumerate all columns
master.hoge = temp.hoge,
master.fuga = temp.fuga,
︙
master.piyo = temp.piyo
WHEN NOT MATCHED THEN
-- tips: For BigQuery, write only `INSERT ROW` to insert all columns
INSERT(
-- Enumerate all columns
hoge,
fuga,
︙
piyo
)
VALUES(
-- Enumerate all columns
hoge,
fuga,
︙
piyo
)
ワークフロー定義を利用した運用
実際の運用においては、上記ステップ2・3を定期的に繰り返すことで、マスターテーブルを常に最新の状態に保つことが求められます。
この要件に対し、更新手順のステップ2・3を順に組み込んだワークフロー定義を作成することで、工数をかけることなく運用できます。
加えて、以下の設定を行うことで、ステップ1の初回マスターテーブル作成以外の処理をすべて自動化することもできます。
- ステップ2にて記載したクエリのWHERE句に時刻・日時のカスタム変数を埋め込む
- ワークフロー定義に対してスケジュールを設定する
DELETE
文・INSERT
文を記載したクエリを順に実行するという方法もあります。
DELETE
`<master_project>.<master_dataset>.<master_table>` AS master -- master table
WHERE
master.id IN (
SELECT id
FROM `<temp_project>.<temp_dataset>.<temp_table>` AS temp -- temp table
)
INSERT
`<master_project>.<master_dataset>.<master_table>` (
-- Enumerate all columns
hoge,
fuga,
︙
piyo
)
SELECT
-- Enumerate all columns
hoge,
fuga,
︙
piyo
FROM
`<temp_project>.<temp_dataset>.<temp_table>` AS temp -- temp table