Apache Sparkでスタンドアローンクラスタを構築してscalaを用いて巨大テキスト処理をする

Apache SparkとはHadoopに変わる分散コンピューティングのミドルウエアです。

Hadoopに比べ、ディスクIOを抑えることができ、メモリ内で収まるデータ量の計算なら高速化できるようです。

詳しくはApache Spark で分散処理入門 – Qiitaがわかりやすかったです。

今回はsparkを使った2つサーバーがぶら下がっている分散処理クラスタを構築してscalaを用いてwikipediaの全データを処理してみたいと思います。

今回はこのような構成の上にクラスタを構築します。

masterとslave、両方に同一のユーザー名でログインできるようにしておいてください。

また、masterとslave両方にJDKをインストールしておいてください。

masterにsparkをインストールする

まずはmaster(192.168.1.8)から設定していきます。

masterにsshでログインしたあと、apache sparkをインストールします。

sparkは公式サイトからダウンロードできます。

# masterにログイン
ssh {user}@{master host}
# sparkダウンロード
wget https://d3kbcqa49mib13.cloudfront.net/spark-2.2.0-bin-hadoop2.7.tgz
# 解凍
tar xvf spark-2.2.0-bin-hadoop2.7.tgz

sparkは実行可能バイナリとして配布されるのでビルドやインストールの必要はありません。

ただ、slaveとインストール先を統一させなければいけないのでルートに近い/opt以下に配置します。

mv spark-2.2.0-bin-hadoop2.7 spark
sudo mv spark /opt
# パーミッションを変更して読み込み書き込み実行ができるようにする
sudo chown {user}:{group} -R /opt/spark
sudo chmod 777 -R /opt/spark

masterの設定をする

masterのsparkの設定ファイルを書き、masterPCを指定します。

cp /opt/spark/conf/spark-env.sh.template /opt/spark/conf/spark-env.sh
nano /opt/spark/conf/spark-env.sh

# 以下を書き込み
# master hostは自身なのでmasterのIP
# portはデフォルト7077だが一応書き込み
SPARK_MASTER_HOST=192.168.1.8
SPARK_MASTER_PORT=7077

続いてslavePCを指定する設定を書きます。

cp /opt/spark/conf/slaves.template /opt/spark/conf/slaves
nano /opt/spark/conf/slaves

# localhostをコメントアウトして以下を書き込み
# localhost
192.168.1.8
192.168.1.9

masterからslaveに公開鍵認証sshできるようにする

パスワード認証でも行けるかも?

sparkではmasterからslaveにsshでログインをして指示をします。

とりあえずやりやすいようにslaveのauthorized_keysにmasterのid_rsa.pubを書き込んでmasterからslaveにsshできるようにします。

# master PCでssh keyを作成する
ssh-keygen
# 公開鍵をslaveに送る
scp ~/.ssh/id_rsa.pub {user}@192.168.1.9:~

# slaveにsshログイン、keyを登録
ssh {user}@192.168.1.9
ssh-keygen
cat ~/id_rsa.pub>>~/.ssh/authorized_keys

パスワードなしでmasterからslaveにsshできるようになったらOK。秘密鍵のパスフレーズを聞かれるかもしれないがここらへんをみて省略できるようにしてもいいかもしれないです。

Slaveにsparkをインストールする

つづいてslaveでの作業でになります。

slave上のmasterと全く同じパスにsparkをインストールします。

# slaveにログイン
ssh {user}@{slave host}
# sparkダウンロード
wget https://d3kbcqa49mib13.cloudfront.net/spark-2.2.0-bin-hadoop2.7.tgz
# 解凍
tar xvf spark-2.2.0-bin-hadoop2.7.tgz
# 移動
mv spark-2.2.0-bin-hadoop2.7 spark
sudo mv spark /opt
# パーミッションを変更して読み込み書き込み実行ができるようにする
sudo chown {user}:{group} -R /opt/spark
sudo chmod 777 -R /opt/spark

MasterでSparkの起動

ここまでできたらSparkを起動できます。MasterにログインしてSparkを起動しましょう。

# masterにログイン
ssh {user}@192.168.1.8
# sparkの起動(master)
/opt/spark/sbin/start-master.sh

start-master.shを起動すると、http://{master host}:8080でWebUIを表示できるようになります。

さらにMasterからSlaveを起動します。

/opt/spark/sbin/start-slaves.sh

start-slaves.shを起動したあと、WebUIを見るとクラスタを構成するコンピュータが表示されるはずです。

ScalaでWikipediaを処理するプログラムを書く

Sparkのプログラムはscalaやpython、Rなどでかけます。とりあえずscalaを利用します。

wikipediaのデータをmaster PCにダウンロードしておきましょう。

ssh {user}@{master host}
cd /tmp
wget https://dumps.wikimedia.org/jawiki/latest/jawiki-latest-pages-articles.xml.bz2
tat jxf jawiki-latest-pages-articles.xml
exit

 

基本的には開発PCでscalaのプログラムを書き、spark-submitコマンドを利用してクラスタにジョブを投げます。

spark-submitコマンドを使うために開発PCにもsparkをインストールします。

# 開発PCでsparkダウンロード
wget https://d3kbcqa49mib13.cloudfront.net/spark-2.2.0-bin-hadoop2.7.tgz
# 解凍
tar xvf spark-2.2.0-bin-hadoop2.7.tgz
# 移動
mv spark-2.2.0-bin-hadoop2.7 spark
sudo mv spark /opt
# パーミッションを変更して読み込み書き込み実行ができるようにする
sudo chown {user}:{group} -R /opt/spark
sudo chmod 777 -R /opt/spark

scalaプログラムについてはIntellij IDEAで作ることができます。

Intellijにscalaプラグインを入れ、sbtプロジェクトを作ります。

sbtプロジェクトのprojectフォルダ内にplugins.sbtファイルを作り、中身を以下のようにします。

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")

Sparkは単体のjarにコンパイルするのですが上記の設定によっていろいろなリソースもすべて1つのjarになり、sparkで扱いやすくなるそうです。

さらに、プロジェクトのルートディレクトリにbuild.sbtファイルを作り、中身を以下のようにします。

name := "spark-test"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "1.4.0"

scalaのプログラムを書きます。今回はmain.scala.ConvetLtGtというクラス名にしました。

package main.scala

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object ConvertLtGt {
  def main(args: Array[String]) {
    //wikipediaのデータ
    val wikiXmlPath = "/tmp/jawiki-latest-pages-articles.xml"
    val conf = new SparkConf()
    conf.setAppName("Convert Lt Gt")
    val sc = new SparkContext(conf)

    //テキストファイルを読み込み
    val wikiXmlRDD = sc.textFile(wikiXmlPath)
    //partitionにマップする。マップした単位で並列化できる
    val wikiXmlReplacedRDD = wikiXmlRDD.map{line=>
      //HTML特殊文字を置換する
      val replaced = line.replace("&amp;","&").replace("&quot;","\"").replace("&lt;","<").replace("&gt;",">").replace("&#39;","'");
      replaced
    }.repartition(1)
    //保存
    val outFile = "/tmp/jawiki-latest-pages-articles.replaced.xml"
    val file = FileSystem.get(new URI(outFile),new Configuration())
    //ファイルが既に存在していれば削除
    try { file.delete(new org.apache.hadoop.fs.Path(outFile), true) } catch { case _ : Throwable => { } }
    wikiXmlReplacedRDD.saveAsTextFile(outFile)

  }
}

できたら、build.sbtがあるフォルダでsbtコマンドを実行します。sbtコマンドがない場合はインストールしましょう。

sbt packageでコンパイルできます。

# sbtインストール
echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823
sudo apt-get update
sudo apt-get install sbt

# コンパイル
sbt pacakge

これでtargetディレクトリにjarファイルができます。

あとはこのようなコマンドでsparkにジョブを投げることができます。

{sparkのインストールディレクトリ}/bin/spark-submit --master spark://192.168.1.8:7077 --class {class name} {JAR}

このようなスクリプトを用意しておけばコンパイルと実行を同時にできます。

sbt package
JAR=`ls target/scala-2.11/*.jar|head -n 1`
/home/garicchi/app/spark/bin/spark-submit --master spark://192.168.1.8:7077 --class main.scala.ConvertLtGt $JAR

 

実行中のアプリケーションはWeb UIで確認できます。

 

sparkでクラスタ構築した結果、1台のマシンで7分かかる処理が2秒でおわりました。

参考

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です

このサイトはスパムを低減するために Akismet を使っています。コメントデータの処理方法の詳細はこちらをご覧ください