# Scala 를 이용한 Spark Self-Contained Applications 테스트 


# 설치 

- Scala 다운로드 : http://www.scala-lang.org/download/all.html (2.10.6 버전)

- SBT(Scala Build Tool) 다운로드 : http://www.scala-sbt.org/download.html

- 두개의 프로그램 모두 공백이 없는 경로에 설치 하거나 mklink를 이용하여 공백이 없는 경로에 접근 가능하게 작업

- 각각의 "scala\bin" "sbt\bin" 디렉토리를 PATH 설정 


# 빌드 준비 

- 아래와 같은 Spark 빌드 환경 설정을 위해 .sbt 파일을 작성

- Scala 버전 2.10.6, Spark 버전 1.5.2 

name := "App"
version := "1.0"
scalaVersion := "2.10.6"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.2"

- sbt run 을 실행하여 빌드 및 실행 (처음에는 의존성 파일 설정을 위해 오래 걸림


- sublime text 3 사용시 Scala 빌드 설정 파일

- scala.sublime-build

{

    "cmd": ["sbt_bin_path\\sbt.bat", "run"],

    "working_dir": "${project_path:${folder}}",

    "selector": "source.scala"

}




# Scala 코드

http://cdecl.tistory.com/306 의  spark_movelens.py 와 같은 기능을 하는 Scala 코드

- Scala의 Self-Contained Applications 환경에서는 sc.stop() 을 하지 않으면 "ERROR Utils: uncaught error in thread SparkListenerBus, stopping SparkContext" 이란 에러 발생함 

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

import org.apache.log4j.Logger
import org.apache.log4j.Level


object App {
	def main(args: Array[String]) {
		Logger.getLogger("org").setLevel(Level.WARN)
		Logger.getLogger("akka").setLevel(Level.WARN)

		val sc = new SparkContext("local[*]", "MyApp")
		RDDRun(sc)
		sc.stop()
	}


	def RDDRun(sc: SparkContext) {
		val ratings = "D:/hadoop/data/ml-20m/ratings.csv"
		
		val rddMovie = (sc: SparkContext) => {
			val movies =  "D:/hadoop/data/ml-20m/movies.csv"
			val rdd = sc.textFile(movies)
			val header = rdd.first()

			rdd.filter(_ != header).map(_.split(",")).map(x => (x(0), x(1)))
		}

		val rdd = sc.textFile(ratings)
		val header = rdd.first()

		val rddR = rdd.filter(_ != header).map(_.split(",")).map(x => (x(1), x(2).toFloat))
			.groupByKey().mapValues(x=> x.sum / x.size)
			.join(rddMovie(sc)).sortBy(_._2._1)
			.map(x=> (x._1, x._2._1, x._2._2))		

		for (t <- rddR.collect()) {
			println("%s, %f, %s".format(t._1, t._2, t._3))
		}

	}
}


# 비교

- Python 코드 : [Finished in 175.3s] http://cdecl.tistory.com/306 

- Scala 코드 : [Finished in 70.5s]

- Scala 쪽이 월등하게 빠르므로, 프로토타입 같은 작업시 Python이 간단해 보이지만 Staging 서비스의 경우 Scala로 작성하는 것이 성능상 이점이 있음


'Dev > Data' 카테고리의 다른 글

Hadoop Single Node 설치 (linux)  (0) 2016.07.23
Apache Tajo 테스트 (Windows)  (0) 2015.11.13
Apache Hadoop 2.7.1 (Windows)  (0) 2015.11.13
Spark 테스트 (Windows, Python 환경)  (0) 2015.11.11
Spark 설치 (Standalone)  (1) 2015.11.11


# Spark 어플리케이션 실행 방법

- Spark 어플리케이션을 실행 하기 위해서 3가지 방법을 제공

http://spark.apache.org/docs/latest/quick-start.html

 


1. Spark Shell 을 이용한 인터랙티브한 환경에서 실행 (scala : bin/spark-shell, python: bin/pyspark)

>> bin\pyspark Python 3.5.0 (v3.5.0:374f501f4567, Sep 13 2015, 02:16:59) [MSC v.1900 32 bit (Intel)] on win32 Type "help", "copyright", "credits" or "license" for more information. 15/11/11 20:05:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/11/11 20:05:57 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.5.1 /_/ Using Python version 3.5.0 (v3.5.0:374f501f4567, Sep 13 2015 02:16:59) SparkContext available as sc, HiveContext available as sqlContext. >>> rdd = sc.textFile('README.md') >>> rdd.count() 98




2. spark-submit 툴을 이용하여 소스나 패키지를 제출하여 실행

## # test.py from pyspark import SparkContext sc = SparkContext("local", "Simple App") rdd = sc.textFile('README.md') print(rdd.count())


%SPARK_HOME%\bin\spark-submit test.py

98



3. 단독 어플리케이션으로 실행 

PYTHONPATH를 설정하여 패키지 참조 하여야 사용 가능 

PYTHONPATH=%SPARK_HOME%/python/lib/pyspark.zip;%SPARK_HOME%/python/lib/py4j-0.8.2.1-src.zip

python test.py

98





# Spark 예제

영화의 평점 샘플 데이터 활용 - http://grouplens.org/datasets/movielens/

http://files.grouplens.org/datasets/movielens/ml-20m.zip  (MovieLens 20M Dataset 사용)

- 사용자에 의해 점수가 매겨진 파일(ratings.csv)을 영화별 평점을 구하여 정렬. 영화정보(movies.csv영화명)과 함께 조인하여 보여 주는 예제


- ratings.csv 

- 영화 평점 정보, 약 500MB, 20,000,264 rows

Ratings Data File Structure (ratings.csv)

-----------------------------------------

All ratings are contained in the file `ratings.csv`.

    userId,movieId,rating,timestamp


userId,movieId,rating,timestamp

138493,60816,4.5,1259865163

138493,61160,4.0,1258390537

138493,65682,4.5,1255816373

138493,66762,4.5,1255805408

138493,68319,4.5,1260209720


- movies.csv

- 영화 정보, 약 1MB , 27,279 rows

Movies Data File Structure (movies.csv)

---------------------------------------

Movie information is contained in the file `movies.csv`. Each line of this file after the header row represents one movie, and has the following format:

    movieId,title,genres


movieId,title,genres

131241,Ants in the Pants (2000),Comedy|Romance

131243,Werner - Gekotzt wird später (2003),Animation|Comedy

131248,Brother Bear 2 (2006),Adventure|Animation|Children|Comedy|Fantasy

131250,No More School (2000),Comedy

131252,Forklift Driver Klaus: The First Day on the Job (2001),Comedy|Horror


# spark_movelens.py

from pyspark import SparkContext from statistics import mean if __name__ == "__main__": sc = SparkContext("local[*]", "Simple App") ratings = "D:/hadoop/data/ml-20m/ratings.csv" movies = "D:/hadoop/data/ml-20m/movies.csv" movie = sc.textFile(movies) header = movie.first() dic = movie.filter(lambda x: x != header).map(lambda x: x.split(',')).map(lambda x: (x[0],x[1])) data = sc.textFile(ratings) header = data.first() rdd = data.filter(lambda x: x != header).map(lambda x: x.split(','))\ .map(lambda x: (x[1], float(x[2]))).groupByKey().mapValues(list)\ .map(lambda x: (x[0], round(mean(x[1]), 2)))\ .join(dic).sortBy(lambda x: x[1][0])\ .map(lambda x: (x[0], x[1][0], x[1][1])) for id, avg, title in rdd.collect(): print('{} {} - {}'.format(id, avg, title.encode('utf8')))

  

- 위의 내용을 Spark SQL로 구현 


# spark_sql.py

from pyspark.sql import SQLContext, Row from pyspark import SparkContext if __name__ == "__main__": sc = SparkContext("local[*]", "Simple SQL App") sqlContext = SQLContext(sc) ratings = "D:/hadoop/data/ml-20m/ratings.csv" moviepath = "D:/hadoop/data/ml-20m/movies.csv" movie = sc.textFile(moviepath) header = movie.first() rdd = movie.filter(lambda x: x != header).map(lambda x: x.split(','))\ .map(lambda x: Row(id=x[0], title=x[1])) data = sc.textFile(ratings) header = data.first() rddRating = data.filter(lambda x: x != header).map(lambda x: x.split(','))\ .map(lambda x: Row(id=x[1], rating=float(x[2]))) sqlContext.createDataFrame(rdd).registerTempTable('movies') sqlContext.createDataFrame(rddRating).registerTempTable('ratings') tbl = sqlContext.sql(""" SELECT a.id, b.title, avg(a.rating) as rating FROM ratings a join movies b WHERE a.id = b.id GROUP BY a.id, b.title ORDER BY avg(a.rating) """) tblMap = tbl.map(lambda x: (x.id, x.title, x.rating)) for a, b, c in tblMap.collect(): print("{}({}): {}".format(a, round(c, 2), b.encode('utf8')))



'Dev > Data' 카테고리의 다른 글

Hadoop Single Node 설치 (linux)  (0) 2016.07.23
Spark 테스트 (Windows, Scala, Self-Contained Applications)  (1) 2015.11.18
Apache Tajo 테스트 (Windows)  (0) 2015.11.13
Apache Hadoop 2.7.1 (Windows)  (0) 2015.11.13
Spark 설치 (Standalone)  (1) 2015.11.11


# Apache Spark 

- http://spark.apache.org/

- Apache Spark™ is a fast and general engine for large-scale data processing.


Spark 의 핵심은 무엇인가? RDD! : http://www.slideshare.net/yongho/rdd-paper-review

- Spark programming guide (번역) : http://www.raonbit.com/spark-programming-guide/



# Spark 설치 (Standalone, Python 기준)


- Java 설치 (1.8)

# Ubuntu 

sudo apt-get install python-software-properties

sudo add-apt-repository ppa:webupd8team/java

sudo apt-get update

sudo apt-get install oracle-java8-installer


# Windows 

http://www.java.com/ko/download/


- Apache Spark 다운로드

http://spark.apache.org/downloads.html (Spark 1.5.2 Pre-built for Hadoop 2.6 and later)


# Ubuntu 

wget http://apache.mirror.cdnetworks.com/spark/spark-1.5.2/spark-1.5.2-bin-hadoop2.6.tgz

tar -zxvf spark-1.5.2/spark-1.5.2-bin-hadoop2.6.tgz

sudo mv spark-1.5.2-bin-hadoop2.6 /usr/local/

sudo ln -s /usr/local/spark-1.5.2-bin-hadoop2.6 /usr/local/spark


- Python 설치 (3.5 기준, 버전은 개인 취향?)  

https://www.python.org/


Ubuntu 15.10 의 경우 3.4 버전까지 기본 설치 되어 있음


- Hadoop 다운로드

Hadoop을 저장소로 사용하지 않으면 설치는 옵션이나 Windows의 경우 최소 winutils.exe 이 필요 하므로 그냥 Windows용 Hadoop 설치

압축을 풀고 HADOOP_HOME 환경 변수만 잡아주면 됨

# Hadoop for Windows 


비공식 바이너리 : https://github.com/karthikj1/Hadoop-2.7.1-Windows-64-binaries/releases 





# Spark 설정


- Path 설정

PYTHONPATH는 단독 어플리케이션 작성 시 spark 패키지를 참조하기 위한 세팅 

Ubuntu 예제 (.profile)


export JAVA_HOME=/usr/lib/jvm/java-8-oracle

export SPARK_HOME=/usr/local/spark

export PYTHONPATH=$SPARK_HOME/python/lib/pyspark.zip:$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip

export HADOOP_HOME=/usr/local/hadoop   # 옵션

Windows 환경변수 세팅 (공백이 없는 경로로 하며, 필요시 mklink를 이용하여 심볼릭 링크 설정을 하면 편함)


%JAVA_HOME%

%SPARK_HOME%

%PYTHONPATH%  

%HADOOP_HOME%



* Windows의 경우 %HADOOP_HOME% 세팅을 하지 않는 경우 아래와 같은 에러가 발생 


ERROR Shell: Failed to locate the winutils binary in the hadoop binary path

java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.


* Windows 에서 hive 에러 날 경우 

%HADOOP_HOME%/bin/winutils.exe chmod 777 D:\tmp\hive


- conf 설정 

cp $SPARK_HOME/conf/log4j.properties.template log4j.properties


# log4j.properties 파일편집

log4j.rootCategory=WARN, console  # INFO->WARN, INFO정보가 많이 출력 되므로 수정   




# Spark 테스트 (Shell)




cdecl@ubuntu:/usr/local/spark$ bin/pyspark Python 2.7.10 (default, Oct 14 2015, 16:09:02)

[GCC 5.2.1 20151010] on linux2
Type "help", "copyright", "credits" or "license" for more information.
15/11/11 18:53:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/11/11 18:53:30 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.137.143 instead (on interface eth0)

15/11/11 18:53:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address

15/11/11 18:53:35 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
Welcome to
____ __


/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.5.2
/_/
Using Python version 2.7.10 (default, Oct 14 2015 16:09:02)
SparkContext available as sc, HiveContext available as sqlContext.
>>> rdd = sc.textFile('README.md')
>>> rdd.count()
98
>>>





+ Recent posts