# Spark 어플리케이션 실행 방법
- Spark 어플리케이션을 실행 하기 위해서 3가지 방법을 제공
- http://spark.apache.org/docs/latest/quick-start.html
1. Spark Shell 을 이용한 인터랙티브한 환경에서 실행 (scala : bin/spark-shell, python: bin/pyspark)
>> bin\pyspark Python 3.5.0 (v3.5.0:374f501f4567, Sep 13 2015, 02:16:59) [MSC v.1900 32 bit (Intel)] on win32 Type "help", "copyright", "credits" or "license" for more information. 15/11/11 20:05:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/11/11 20:05:57 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.5.1 /_/ Using Python version 3.5.0 (v3.5.0:374f501f4567, Sep 13 2015 02:16:59) SparkContext available as sc, HiveContext available as sqlContext. >>> rdd = sc.textFile('README.md') >>> rdd.count() 98
2. spark-submit 툴을 이용하여 소스나 패키지를 제출하여 실행
## # test.py from pyspark import SparkContext sc = SparkContext("local", "Simple App") rdd = sc.textFile('README.md') print(rdd.count())
%SPARK_HOME%\bin\spark-submit test.py
98
3. 단독 어플리케이션으로 실행
- PYTHONPATH를 설정하여 패키지 참조 하여야 사용 가능
- PYTHONPATH=%SPARK_HOME%/python/lib/pyspark.zip;%SPARK_HOME%/python/lib/py4j-0.8.2.1-src.zip
python test.py
98
# Spark 예제
- 영화의 평점 샘플 데이터 활용 - http://grouplens.org/datasets/movielens/
- http://files.grouplens.org/datasets/movielens/ml-20m.zip (MovieLens 20M Dataset 사용)
- 사용자에 의해 점수가 매겨진 파일(ratings.csv)을 영화별 평점을 구하여 정렬. 영화정보(movies.csv, 영화명)과 함께 조인하여 보여 주는 예제
- ratings.csv
- 영화 평점 정보, 약 500MB, 20,000,264 rows
Ratings Data File Structure (ratings.csv)
-----------------------------------------
All ratings are contained in the file `ratings.csv`.
userId,movieId,rating,timestamp
userId,movieId,rating,timestamp
138493,60816,4.5,1259865163
138493,61160,4.0,1258390537
138493,65682,4.5,1255816373
138493,66762,4.5,1255805408
138493,68319,4.5,1260209720
- movies.csv
- 영화 정보, 약 1MB , 27,279 rows
Movies Data File Structure (movies.csv)
---------------------------------------
Movie information is contained in the file `movies.csv`. Each line of this file after the header row represents one movie, and has the following format:
movieId,title,genres
movieId,title,genres
131241,Ants in the Pants (2000),Comedy|Romance
131243,Werner - Gekotzt wird später (2003),Animation|Comedy
131248,Brother Bear 2 (2006),Adventure|Animation|Children|Comedy|Fantasy
131250,No More School (2000),Comedy
131252,Forklift Driver Klaus: The First Day on the Job (2001),Comedy|Horror
# spark_movelens.py
from pyspark import SparkContext from statistics import mean if __name__ == "__main__": sc = SparkContext("local[*]", "Simple App") ratings = "D:/hadoop/data/ml-20m/ratings.csv" movies = "D:/hadoop/data/ml-20m/movies.csv" movie = sc.textFile(movies) header = movie.first() dic = movie.filter(lambda x: x != header).map(lambda x: x.split(',')).map(lambda x: (x[0],x[1])) data = sc.textFile(ratings) header = data.first() rdd = data.filter(lambda x: x != header).map(lambda x: x.split(','))\ .map(lambda x: (x[1], float(x[2]))).groupByKey().mapValues(list)\ .map(lambda x: (x[0], round(mean(x[1]), 2)))\ .join(dic).sortBy(lambda x: x[1][0])\ .map(lambda x: (x[0], x[1][0], x[1][1])) for id, avg, title in rdd.collect(): print('{} {} - {}'.format(id, avg, title.encode('utf8')))
- 위의 내용을 Spark SQL로 구현
# spark_sql.py
from pyspark.sql import SQLContext, Row from pyspark import SparkContext if __name__ == "__main__": sc = SparkContext("local[*]", "Simple SQL App") sqlContext = SQLContext(sc) ratings = "D:/hadoop/data/ml-20m/ratings.csv" moviepath = "D:/hadoop/data/ml-20m/movies.csv" movie = sc.textFile(moviepath) header = movie.first() rdd = movie.filter(lambda x: x != header).map(lambda x: x.split(','))\ .map(lambda x: Row(id=x[0], title=x[1])) data = sc.textFile(ratings) header = data.first() rddRating = data.filter(lambda x: x != header).map(lambda x: x.split(','))\ .map(lambda x: Row(id=x[1], rating=float(x[2]))) sqlContext.createDataFrame(rdd).registerTempTable('movies') sqlContext.createDataFrame(rddRating).registerTempTable('ratings') tbl = sqlContext.sql(""" SELECT a.id, b.title, avg(a.rating) as rating FROM ratings a join movies b WHERE a.id = b.id GROUP BY a.id, b.title ORDER BY avg(a.rating) """) tblMap = tbl.map(lambda x: (x.id, x.title, x.rating)) for a, b, c in tblMap.collect(): print("{}({}): {}".format(a, round(c, 2), b.encode('utf8')))
'Dev > Data' 카테고리의 다른 글
Hadoop Single Node 설치 (linux) (0) | 2016.07.23 |
---|---|
Spark 테스트 (Windows, Scala, Self-Contained Applications) (1) | 2015.11.18 |
Apache Tajo 테스트 (Windows) (0) | 2015.11.13 |
Apache Hadoop 2.7.1 (Windows) (0) | 2015.11.13 |
Spark 설치 (Standalone) (1) | 2015.11.11 |