はじまる

適当な事を適当に書く

Treasure Data Workflow (digdag) でテーブルを任意の行数単位で分割して、動的にループする

背景(例)

  • 1億レコードのテーブルがある。
  • テーブルに対してクエリを実行したい。
  • 実行してみたら重たかったので並列処理したい。
  • 100万レコードごとにテーブルを分割して、各テーブルごとにクエリを実行する。

Treasure Data のテーブルインデックスについて

Treasure Data のテーブルは、time というlong 型の列を必ず持つ仕様となっている。このtimeが、標準ではTreasure Data のテーブルの唯一のインデックスにもなっている。time には unix timestamp が数値で入る想定で、Treasure Data が時系列データのビッグデータ分析基盤から始まったことの名残である、と思われる。

このtime,実はlong値であれば、unixtime スタンプでない値もいれることができる。テーブルを任意のレコード件数で分割したいとき、このtime列に分割の基準となる値をいれることで、分割処理でインデックスが効くようになり、処理時間を早めることができる。

こんな感じ

#main.dig

###############################################################
# 環境変数設定
###############################################################

_export:
  td:
    database: hoge
    table: fuga
    split_size: 1000000
  

###############################################################
# 処理
###############################################################


+create_indexed_view:
# time を row_num 列として利用し、テーブルのレコードを分割する。
# https://qiita.com/mumuma/items/faa76b50ca653d85d522
  td>: create_indexed_view.sql
  create_table: ${td.database}.${td.table}_view_${session_date.replaceAll('-', '_')}
  engine: hive

+count_records:
  td>:
  query: "select count(time) as count from ${td.database}.${td.table}"
  store_last_results: true

+caluculate_loop:
# ここで、レコード件数と、分割サイズをもとに、必要なloopの回数を計算する。
  td>: 
  query: "select cast(ceiling( ${td.last_results.count} / (${td.split_size} * 1.0)) AS INTEGER) as loop;"
  store_last_results: true

+p4:
  loop>: ${td.last_results.loop} 
  _parallel:
    limit: 3
  _do:
    +table_split:
      td>: 1.sql
-- create_indexed_view.sql
SELECT 
  row_number() over(order by order_no)as time
  ,column1
  ,column2
FROM
  ${td.database}.${td.table}
-- split.sql
SELECT * FROM ${td.database}.${td.table}
WHERE time BETWEEN (1+${i}*${td.split_size}) AND (${i}+1)*${td.split_size}