スクリプト類とメニューボタン

トップ ソフト 雑記 日記 リンク

11月 02, 2013

Treasure Dataを使ってみました。(Upload編)

拙作ふぁぼるっくはPostgreSQLを使用しているのですが、データの蓄積量が増えてきたことから集計処理に時間が掛かるようになってきました。
しかし持っているサーバーはどれも高負荷状態。
なんとか手持ちのCPUリソースを割かずに集計する方法はないものかと考えていたところ、Treasure Dataを思い出したので使ってみました。

アカウントを作って、コマンドラインのツールをインストールします。
gem install td
で完了。
tdコマンドでアカウントを設定します。ちなみに、tdコマンドのバージョンは0.10.91です。

Treasure Dataのバルクインポート説明 http://docs.treasure-data.com/articles/bulk-import を見ると、データはまず加工(Prepare)し、行単位のデータベースへアップロード、そこから列単位のデータベースに変換(Perform)して、PlazmaというTreasure Dataのカラム型データベースへ投入(Commit)するという流れになります。

16個に分割された合計82GBのcsvファイルをアップロードします。
td import:upload --auto-create favlook.fav_old \
--auto-perform --auto-commit \
--parallel 8 \
--columns status_id,id,tp,user_id \
--time-value 0 \
-o prepared_parts /mnt/sdd/tmp/fav_*.csv

「--time-value 0」というオプションについてですが、Treasure Dataでは時間の追加カラムが必要なようで、これがないと時間書式のカラムがないというエラーになってしまいました。
もしデータに時間のカラムが存在するなら「--time-column ~」でカラムを指定すると良いようです。
今回使うデータには、時間カラムがなかったため、--time-value 0で全てゼロとして指定しました。

アップロードは回線速度次第です。気長に待ちましょう。
--auto-perform --auto-commitを付けているとアップロード完了時にそのままperformに進むと思うのですが、今回はいくつかのファイルが失敗してしまいました。
Next Steps:
  => check td-bulk-import.log and re-upload prepared_parts/fav_2012c_csv_66.msgpack.gz: Retry failed.
  => check td-bulk-import.log and re-upload prepared_parts/fav_2013d_csv_17.msgpack.gz: Retry failed.

端末に出てくるリトライ回数は0となっています。tdコマンドのオプション指定を見てみたのですが、リトライの指定方法が分かりませんでした。
td-bulk-import.logというログファイルも出力されるのですが、ローテーションするみたいで、直近のものしか残っておらず、該当ファイルの失敗理由などは分かりませんでした。

とりあえず失敗したファイルを再度アップロードします。
しかし1つ15GBも有るファイルをもう一度上げ直すのは大変。でも、失敗したファイルだけ再送信できます。
td import:upload favlook_fav_old_2013_10_25_1382661066 prepared_parts/fav_2012c_csv_66.msgpack.gz
td import:upload favlook_fav_old_2013_10_25_1382661066 prepared_parts/fav_2013d_csv_17.msgpack.gz

と、アップロードセッション名、対象ファイルを指定します。
prepared_partsというディレクトリは、最初のimport:upload時に指定した「-o prepared_parts」が効いています。
このディレクトリにPrepareしたデータが格納されています。
そして前に指定したセッション名、こちらはアップロード時の画面に出てくるので、それを見ていると分かるのですが、出力が多いと見逃してしまうかもしれません。
これはtdコマンドのimport:listで表示できます。
+---------------------------------------+-----------------+-----------+--------+-------+-------------+-------------+---------------+---------------+
| Name                                  | Table           | Status    | Frozen | JobID | Valid Parts | Error Parts | Valid Records | Error Records |
+---------------------------------------+-----------------+-----------+--------+-------+-------------+-------------+---------------+---------------+
| favlook_fav_old_2013_10_25_1382661066 | favlook.fav_old | Uploading |        |       |             |             |               |               |
+---------------------------------------+-----------------+-----------+--------+-------+-------------+-------------+---------------+---------------+
1 rows in set

アップロードが終わると

Next Steps:
  => execute 'td import:perform favlook_fav_old_2013_10_25_1382661066'.

と、次にやるべきことが表示されます。

td import:perform favlook_fav_old_2013_10_25_1382661066
Job 5451708 is queued.
Use 'td job:show [-w] 5451708' to show the status.

ジョブが開始されました。td job:showで状況が見られるようです。

td job:show -w 5451708
Organization :
JobID        : 5451708
Status       : running
Type         : bulk_import_perform
Priority     : NORMAL
Retry limit  : 0
Result       :
Database     : favlook
Query        :
queued...
  started at 2013-10-25T23:19:16Z
  13/10/25 23:19:23 WARN conf.Configuration: fs.default.name is deprecated. Instead, use fs.defaultFS
  13/10/25 23:19:25 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
  13/10/25 23:19:26 INFO storage.S3StorageBackend: listing s3 files with prefix e71b21f4-3b82-4d7b-9ce8-4fc1ca74bf94/ps/
  13/10/25 23:19:28 WARN conf.Configuration: fs.default.name is deprecated. Instead, use fs.defaultFS
  13/10/25 23:19:28 INFO mapred.JobClient: Running job: job_201306191947_257352
  13/10/25 23:19:29 INFO mapred.JobClient:  map 0% reduce 0%
  13/10/25 23:19:53 INFO mapred.JobClient:  map 1% reduce 0%
  13/10/25 23:19:54 INFO mapred.JobClient:  map 4% reduce 0%

とmap & reduceの処理が流れていきます。気長に待ちましょう。
処理が終わったら、td import:listで状況を見てみましょう。

td import:list
+---------------------------------------+-----------------+-----------+--------+---------+-------------+-------------+---------------+---------------+
| Name                                  | Table           | Status    | Frozen | JobID   | Valid Parts | Error Parts | Valid Records | Error Records |
+---------------------------------------+-----------------+-----------+--------+---------+-------------+-------------+---------------+---------------+
| favlook_fav_old_2013_10_25_1382661066 | favlook.fav_old | Ready     |        | 5451708 | 1438        | 0           | 2062874026    | 0             |
+---------------------------------------+-----------------+-----------+--------+---------+-------------+-------------+---------------+---------------+
1 rows in set

StatusがReadyになっています。では、確定してcommitしましょう。

td import:commit favlook_fav_old_2013_10_25_1382661066

すぐに処理が帰ってきましたが、状況を見てみると

td import:list
+---------------------------------------+-----------------+------------+--------+---------+-------------+-------------+---------------+---------------+
| Name                                  | Table           | Status     | Frozen | JobID   | Valid Parts | Error Parts | Valid Records | Error Records |
+---------------------------------------+-----------------+------------+--------+---------+-------------+-------------+---------------+---------------+
| favlook_fav_old_2013_10_25_1382661066 | favlook.fav_old | Committing | Frozen | 5451708 | 1438        | 0           | 2062874026    | 0             |
+---------------------------------------+-----------------+------------+--------+---------+-------------+-------------+---------------+---------------+
1 row in set

という状態で、commitにもしばらく時間が掛かるようです。
Frozenというのがよく分からないのですが、処理が一時停止しているのかと思い、「td import:unfreeze」コマンドを実行してみたのですがエラーとなってしまいました。
ひとまず状況が分からないのですが、Committingなのでしばらく放っておこうかと放置。

td import:list
+---------------------------------------+-----------------+-----------+--------+---------+-------------+-------------+---------------+---------------+
| Name                                  | Table           | Status    | Frozen | JobID   | Valid Parts | Error Parts | Valid Records | Error Records |
+---------------------------------------+-----------------+-----------+--------+---------+-------------+-------------+---------------+---------------+
| favlook_fav_old_2013_10_25_1382661066 | favlook.fav_old | Committed | Frozen | 5451708 | 1438        | 0           | 2062874026    | 0             |
+---------------------------------------+-----------------+-----------+--------+---------+-------------+-------------+---------------+---------------+
1 row in set

見てみると完了していました。Frozen、謎です。

ブラウザでアクセスできる管理画面からも、データが確認できるようになりました。
レコード数が20億6287万4026件、サイズが17.3 GB。
CSVでは82GB有りましたが、項目名が「Compressed Size」となっているように、内部では圧縮して持っているようです。
無料で使える150GBというのは、この圧縮状態を指すのでしょうか?


こうしてデータの準備ができました。
次に集計をやってみます。
(Jobを走らせることはできたのですが、結果の取得方法などがよく分かっていないので、まだ実験中)

広告