読者です 読者をやめる 読者になる 読者になる

mixi engineer blog

ミクシィ・グループで、実際に開発に携わっているエンジニア達が執筆している公式ブログです。様々なサービスの開発や運用を行っていく際に得た技術情報から採用情報まで、有益な情報を幅広く取り扱っています。

Apache HiveにおけるJSON連想配列処理の最適化

Hadoop Hive

あけましておめでとうございます. 平野啓一郎著「葬送」がようやく読み終わった技術部の石川有です.ショパンの死を題材とした内容が難解で重く,すべて読み終えるのに都合5ヶ月ぐらい掛かっていたのではないかと思います.本当にとても重い内容ですが,濃厚で至福な時間を過ごせました.

さて「重い」と言えば,「大規模データ解析」という言葉が頭に思い浮かびますよね.以前の記事「mixi の解析基盤とApache Hive での JSON パーサの活用の紹介」の通り,ミクシィの解析基盤として Apache Hive を利用しています. また Apache Hive で"No More 「刺身の上にタンポポをのせる仕事」 - 単純作業の繰り返しで開発者の時間を浪費しないために。"を実現するための Hive の JSON パーサを活用しています.

新年最初のエントリーは,Apache Hive で JSON 連想配列を扱うときのパフォーマンス最適化について書きます. なぜこの記事を書くかと問われれば,JSON の連想配列を扱うときと通常のカラムベースを扱うときでは異なり,それに関する記事があまりなかったためです. また fluentd 経由のデータの JSON 連想配列のデータを Hive で扱う機会も,世間的に増えてくるのではないかと考えています.

本文章の構成

本文章の構成は,まず今回の記事を書くに至った問題点の説明をします. つぎに,Apache Hive における圧縮に関する知識について述べます. 最後に問題点を解決する方法の検証について述べます.

  • ミクシィにおけるJSON 連想配列のログを扱うときの問題点
    • データに対して HDFS のブロック数に無駄がある
    • MapReduce の Map 処理で無駄が多い
    • データを Hadoop に最適化
  • Apache Hive のデータ圧縮
    • Apache Hive におけるデータ圧縮パラメータ
      • ファイルフォーマット
      • 圧縮方式
      • 圧縮単位
  • JSON 連想配列を扱うのに最適な設定
    • 検証
      • 検証環境
      • 検証パラメータ
      • 検証結果の判断基準
      • 検証結果の詳細
  • まとめ

ミクシィにおけるJSON 連想配列のログを扱うときの問題点

ミクシィでは日々のサービス改善のために,日記やボイスをつぶやいたときあるいはコメントやイイネ!をしたというユーザの行動ログをデータ解析に利用しています. その各々のログは,JSON 連想配列としてそれぞれ出力しています. その出力されたログを1時間毎に rotate して,gzip 圧縮しています. そしてログを受けるサーバが2台あるので,1種類の行動ログで2台x 24 時間で 48 の gzip ファイルができます. つまり,n 種類 x 48 ファイルが1日分のファイル数になってしまっていました.

これまでのミクシィの Apache Hive 環境では,このログをそのままロードして利用していました. Hive では,MySQL のようにデータをロードするといっても,データが再整形されて定義した Hive テーブルに適した状態になるわけではありません. 実際には,元データのファイルがそのまま HDFS にあり,Hive で SELECT 文を実行するたびにそれらのデータを読み取るということを行なっています. つまり,SELECT 文を実行するたびに JSON 連想配列をパースするビルドイン関数が実行されます. そのため Hadoop, Hive の特性に対して,大きく2つの問題点がありました.

  1. HDFS のブロック数に無駄がある
  2. MapReduce の Map 処理で無駄が多い

データに対して HDFS のブロック数に無駄がある

上記で説明した通りローカルのファイルをそのまま利用するだけだと,1日で 48 n の gzip ファイルが HDFS 上に増えていきます. Hadoop では,HDFS 上のファイルを NameNode (いわゆるマスターサーバ)と DataNode (いわゆるスレーブサーバ)のそれぞれ管理しています. とても大雑把に言うと管理すべきファイル数が増えると,それだけ管理に利用するメモリが増えます. この問題は,「無駄がある」という程度で時間が相当経過しないとクリティカルな問題にはなりません.またログの種類によっては,1つの gzip ファイルが HDFS のブロックサイズより小さなものもあります.そのため,少なくとも小さなファイルをまとめる必要がありました.

クリティカルな問題となったのは,新規で小規模のクラスタを構築するときにクラスタ間でデータ転送をするときでした. Hadoop には distcp という機能があって,別々の Hadoop クラスタの HDFS 間で rsync のようにデータ転送することができます. 新規クラスタを再構築するときに,約 180 万の gzip ファイルを Hive にロードする必要がありました. ローカルからもう一度すべてのデータを Hive にロードするのは,時間的に現実的ではありませんでした. そこで HDFS 間でデータ転送をして,HDFS から Hive にデータを載せるということを行いました. 完全な原因特定が出来なかったのですが扱うファイル数が多すぎるため,distcp でデータ転送をしているときに転送されている側のクラスタの DataNode がどんどん外れていき,最終的に全台のスレーブが死んだと見なされるということが発生しました. daemon を再起動してもすぐにすべての DataNode が認識されなくなってしまい,結局 HDFS のオペレーションを掛けてもファイルを削除することも出来ず,まさしくつんだ状態になりました. そのため HDFS 上でのブロック数を下げることが要求されます.

MapReduce の Map 処理で無駄が多い

Cloudera Hadoop, including Apache Hadoop 3 update 2 の時点では,ローカルから取り込んだ gzip ファイルはファイルサイズにかかわらず MapReduce の Map 数が gzip ファイル数分になります. つまり,48 個の gzip ファイルのデータを処理するときに,サイズにかかわらず 48 Map が必要になります .どんなに大きなデータでも 48 Map が実行され,どんなに小さなデータでも 48 Map が実行されます. これは gzip ファイルの,分割不可能という性質によるためです. そして Map 数が占有する CPU コア数になるので,小さなデータのときにも 48 という大量のコア数を専有してしまうのは非常に無駄が多いです.

データを Hadoop に最適化

このように元データが HDFS の面からも MapReduce 処理の面からも,無駄が多かったです. この問題を解決する比較的簡単な方法は,ローカルからロードしたデータを Hive を利用してデータ変換することでした. データ変換を行うことで,HDFS 上のブロック数を減らし,MapReduce での Map 数つまり利用する CPU のコア数を減らすことが行えます.

Apache Hive のデータ圧縮

Apache Hive におけるデータ圧縮パラメータ

扱っていた元データが Hadoop の性質に向いていなかったので,Hadoop に適したデータに変換するというアプローチをとります. Apache Hive では,データを変換するときに大きく3つのパラメータがあります.

  1. ファイルフォーマット
  2. 圧縮方式
  3. 圧縮単位
ファイルフォーマット

Apache Hive の LanguageManual DDL にある通り,Hive のテーブルを保存するファイルフォーマットとして TEXTFILE,SEQUENCEFILE,RCFILE があります.

Hive テーブルを CREATE するとき STORED AS になにも指定しないと,TEXTFILE になります.ローカルにある単純なテキストファイルを Hive テーブルにロードするならば,TEXTFILE にしなければならないです.先に述べた通り,Hive の LOAD DATA コマンドはデータを変換するわけではなく,ローカルにあったデータを単純に HDFS に取り込み SELECT 文でそれらが操作されます.これに対して,Hive 上などでデータ変換をするときには,経験則として RCFILE がよいと思われます.

圧縮方式

Hive では,zlib, Gzip, BZip2, Snappy, lz4, lzo など様々な圧縮/展開コーデックを利用することができます. BZip2 は圧縮率がほかに比べて高いが,その分圧縮展開に掛かる処理コストが高いです. Snappy は圧縮率がほかに比べてあまり良くないが,その分圧縮展開にかかる処理コストは低いです. このような性質を比較検討して,扱っているデータに対して向いているものを採用したほうが良いでしょう.

圧縮単位

最後に圧縮単位について説明します. Apache Hive のデータの単位を RECORD ベースにするか BLOCK ベースにするか選べます. しかし基本的には,BLOCK ベースとします.

JSON 連想配列を扱うのに最適な設定

結論から言うと,JSON 連想配列を扱うときに最適な圧縮まわりの設定の組み合わせは,ファイルフォーマットを SEQUENCEFILE,圧縮方式を Snappy,データ単位を BLOCK にするとよさそうです.

検証

JSON 連想配列のデータに適したパラメータを探す検証を行いました. 検証環境,検証パラメータ,検証結果の詳細と説明してきます.

検証環境
Hadoop

検証に用いた Hadoop の環境はつぎになります.

  • Hadoop のディストリビューション
  • Cloudera's Distribution, including Apache Hadoop 4.1.2
  • HDFS のブロックサイズ
  • 128 MB
  • # DataNode(s)
  • 18 台
  • DataNode に利用しているサーバのスペック
    • CPU:Xeon X5650 @ 2.67GHz
    • メモリ:32 GB
    • HDD:RAID 1 TB * 6
データ

検証に用いたデータの説明をします. 元データは,任意の日のログの1部を実際に利用しています. 前述のとおり,2台のサーバで1時間毎に Log Rotate されたログを gzip 圧縮してあります. このような4種類の JSON 連想配列のデータに対して INSERT 文でまとめてデータ圧縮を掛けて,そのうちの2つのデータのSELECT を検証しました.

TestedData Size [B]LinesHDFS Blocks
* 16993442 425503 48
  144907 3310 48
  3100516 82827 48
* 564695271 15530545 48

今回の検証では,つぎのようにデータがわかれます. まず上で説明した gzip された検証用データがあります.このファイルを Hive の LOAD DATA コマンドを利用して,Hive テーブルにロードします.Hive の LOAD DATA コマンドは元のデータファイルを単純に HDFS にコピーするだけなので,元のデータと何ら変わらない状態で HDFS 上にファイルが存在します.つぎに Hive の INSERT 文を利用して,別の Hive テーブルにデータを変換して格納します.こうすることでデータ変換を行ない Hadoop / Hive に適した状態にします.

flow_converting.jpg

検証パラメータ

上記のデータに対して,3種のパラメータのすべての組み合わせを検証しました. org.apache.hadoop.io.compress.BZip2Codec を利用した BZip2 についても検証を行ったのですが,圧縮展開に時間が掛かり過ぎて断念しました. また LZO や LZ4 も考えましたが,今回は検証に手を付けませんでした. そのため LZO や LZ4 の方が,よりよいパフォーマンスが出るという可能性はあります.

  • ファイルフォーマット
    • TEXTFILE
    • SEQUENCEFILE
    • RCFILE
  • 圧縮方式
    • zlib: org.apache.hadoop.io.compress.DefaultCodec (Deflate)
    • Gzip: org.apache.hadoop.io.compress.GzipCodec
    • Snappy: org.apache.hadoop.io.compress.SnappyCodec
  • 圧縮データ単位
    • RECORD
    • BLOCK
検証結果の判断基準

検証結果の判断としては,3つの指標があるかと思います. これらの指標は,トレードオフの関係にあります. 利用されている環境や状況にあわせて,何を重要とするかを選択されると良いと思います. HDFS の容量を重視して処理時間をあまり重要しないのであれば,HDFS 上でのデータサイズを重視するとよいです. 逆に,Hive での処理時間を重視するのであれば,HDFS 上でのデータサイズが大きくなることをあきらめる必要があるでしょう.

  1. HDFS 上でのデータサイズ
  2. データの圧縮効率がよいと,単純に HDFS 上でのファイルサイズも抑えられるので,時系列のデータであればより長い期間のデータが保持できます
  3. HDFS 上でのブロック数
  4. 先に説明した通り,HDFS のブロックサイズを抑えると HDFS のメタデータのサイズが抑えられ NameNode のメモリが抑えられます
  5. Hive での処理時間
  6. 処理時間がより短いと,当然ながらデータ解析の効率があがります
検証結果の詳細

これまでに述べたデータと検証に用いるパラメータの,すべての組み合わせで検証をおこないました. その結果,JSON 連想配列をパースする機能を利用するときは,SEQUENCEFILE + Snappy + BLOCK の組み合わせがよいと思われます. JSON 連想配列ではなく単純にデータを扱うときは,RCFILE + {Gzip,Snappy} というのが経験則として一般的です.

表の見方は,左3つのカラムがそれぞれパラメータを表します. INSERT TIME は上図であげた,データを変換するときの INSERT 文の処理時間です. SELECT TIME は,同時に変換した4種類の実データのうちの変換した2種類のデータに対して SELECT 文を実行した時間です. SELECT TIME 1 は 425503 行のデータで gzip 圧縮の元データでは 16 MB 程度です.そのため HDFS のブロックサイズに設定してある 128 MB には,圧縮状態では達していません.もうひとつの SELECT TIME 2 は 15530545 行の元データの gzip ファイルでは 538 MB 程度と HDFS のブロックサイズを超えたデータとなっています.Data Size は,データ変換後の HDFS 上のサイズです. # Files は,変換した4種類のデータの HDFS 上でのファイル数であり,# Blocks は HDFS 上でのブロック数です. 各セルを指標に対して色を変えています,緑色ほど指標に対して良く,赤色ほど指標に対して悪いことを示します.

a04688_dynamic_partition_json_test_CDH4.1.2.result.jpg

まとめ

軽い記事にしたいと思っていたのに結局重くなってしまった今回の記事では,Apache Hive で JSON 連想配列のままデータを扱いたいときに最適と思われる設定について述べました. 検証したなかでは,SEQUENCEFILE + Snappy (+BLOCK) の組み合わせが良いと思われます. 2013 年も,大規模データ解析の分野はますますの発展を遂げていくと思います.今年も1年がんばって行きたいと思います.

蛇足

最初に検証をしたときは,Cloudera's Distribution, including Apache Hadoop 4.1.1 を利用していました. そうすると,gzip で圧縮ができるけど SELECT でデータを引けなかったり,そもそも圧縮ができないという問題が発生しました. Cloudera 社の Release History を見てみると," HADOOP-8901 - HDFS fails to read files compressed with zlib or Snappy" というものが 4.1.2 でたされているのが確認できます. その影響か圧縮形式によってデータを適切に読み込めず,Map 処理が 100 % に達したかと思うとまた 70 % ぐらいに戻るということを延々と繰り返す現象が確認できました. CDH 4.1.2 をアプリ運用の方に依頼してアップデートしたら,無事データが読み取れました. こういう言い方には語弊がありますが,Hadoop エコシステムのプロジェクトは「ドキュメントに書いてあることができなかったり,逆に書いていないことができたり」することがあります. 特に新しいバージョンのものは,このように網羅的に検証をしてみるのも良いかもしれません.