背景(例)
- 1億レコードのテーブルがある。
- テーブルに対してクエリを実行したい。
- 実行してみたら重たかったので並列処理したい。
- 100万レコードごとにテーブルを分割して、各テーブルごとにクエリを実行する。
Treasure Data のテーブルインデックスについて
Treasure Data のテーブルは、time というlong 型の列を必ず持つ仕様となっている。このtimeが、標準ではTreasure Data のテーブルの唯一のインデックスにもなっている。time には unix timestamp が数値で入る想定で、Treasure Data が時系列データのビッグデータ分析基盤から始まったことの名残である、と思われる。
このtime,実はlong値であれば、unixtime スタンプでない値もいれることができる。テーブルを任意のレコード件数で分割したいとき、このtime列に分割の基準となる値をいれることで、分割処理でインデックスが効くようになり、処理時間を早めることができる。
こんな感じ
#main.dig ############################################################### # 環境変数設定 ############################################################### _export: td: database: hoge table: fuga split_size: 1000000 ############################################################### # 処理 ############################################################### +create_indexed_view: # time を row_num 列として利用し、テーブルのレコードを分割する。 # https://qiita.com/mumuma/items/faa76b50ca653d85d522 td>: create_indexed_view.sql create_table: ${td.database}.${td.table}_view_${session_date.replaceAll('-', '_')} engine: hive +count_records: td>: query: "select count(time) as count from ${td.database}.${td.table}" store_last_results: true +caluculate_loop: # ここで、レコード件数と、分割サイズをもとに、必要なloopの回数を計算する。 td>: query: "select cast(ceiling( ${td.last_results.count} / (${td.split_size} * 1.0)) AS INTEGER) as loop;" store_last_results: true +p4: loop>: ${td.last_results.loop} _parallel: limit: 3 _do: +table_split: td>: 1.sql
-- create_indexed_view.sql SELECT row_number() over(order by order_no)as time ,column1 ,column2 FROM ${td.database}.${td.table}
-- split.sql SELECT * FROM ${td.database}.${td.table} WHERE time BETWEEN (1+${i}*${td.split_size}) AND (${i}+1)*${td.split_size}