Apache Sparkを試す

CDH5の環境を手元に作ったので、Apache Sparkを使ってみる。
依存はHDFSのみ。Masterが1台でWorkerが複数台の構成なのかな。
ClouderaManagerで設定して起動するとWebUIがデフォルトでは18080ポートになっていた。

Pythonから使う場合はpysparkコマンドとなる。pysparkコマンドをそのまま実行するとスタンドアロンサーバーが起動してしまう(この場合はWebUIが4040ポートらしい)ので、環境変数でMasterサーバーを指定する。

$ MASTER=spark://master1.hadoop.nullpobug.com:7077 pyspark

pysparkコマンドを実行するとPythonシェルが立ち上がる。scという名前の変数でSparkContextのインスタンスを参照できるようになっている。

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 0.9.0
      /_/

Using Python version 2.6.6 (r266:84292, Jan 22 2014 09:42:36)
Spark context available as sc.
>>> sc
<pyspark.context.SparkContext object at 0x7fea2a41efd0>

100万行、500MB程度のCSVファイルをHDFSに置いて、読み込ませてみた。

$ hdfs dfs -ls /tmp/
Found 1 items
-rw-r--r--   3 root supergroup  511888890 2014-03-22 03:33 /tmp/test.csv
>>> testdata = sc.textFile('/tmp/test.csv')
>>> testdata.count()  # 行数カウント
1000000
>>> # カンマ区切りで1つ目のカラムを数値にして、3で割りきれるものをカウント(対象のデータの1カラム目は0〜999999の数値の連番になっている)
>>> testdata.filter(lambda w: int(w.split(',', 1)[0]) % 3 == 0).count()
333334

絞り込みをPythonのコードで書けるのが面白いですね。ipythonにも対応していて、ipython notebookから操作もできるのでかなり遊べるのでは。