cdeclog

Spark 테스트 (Windows, Python 환경) 본문

Dev/Data

Spark 테스트 (Windows, Python 환경)

디클 2015.11.11 20:54


# Spark 어플리케이션 실행 방법

- Spark 어플리케이션을 실행 하기 위해서 3가지 방법을 제공

http://spark.apache.org/docs/latest/quick-start.html

 


1. Spark Shell 을 이용한 인터랙티브한 환경에서 실행 (scala : bin/spark-shell, python: bin/pyspark)

>> bin\pyspark Python 3.5.0 (v3.5.0:374f501f4567, Sep 13 2015, 02:16:59) [MSC v.1900 32 bit (Intel)] on win32 Type "help", "copyright", "credits" or "license" for more information. 15/11/11 20:05:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/11/11 20:05:57 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.5.1 /_/ Using Python version 3.5.0 (v3.5.0:374f501f4567, Sep 13 2015 02:16:59) SparkContext available as sc, HiveContext available as sqlContext. >>> rdd = sc.textFile('README.md') >>> rdd.count() 98




2. spark-submit 툴을 이용하여 소스나 패키지를 제출하여 실행

## # test.py from pyspark import SparkContext sc = SparkContext("local", "Simple App") rdd = sc.textFile('README.md') print(rdd.count())


%SPARK_HOME%\bin\spark-submit test.py

98



3. 단독 어플리케이션으로 실행 

PYTHONPATH를 설정하여 패키지 참조 하여야 사용 가능 

PYTHONPATH=%SPARK_HOME%/python/lib/pyspark.zip;%SPARK_HOME%/python/lib/py4j-0.8.2.1-src.zip

python test.py

98





# Spark 예제

영화의 평점 샘플 데이터 활용 - http://grouplens.org/datasets/movielens/

http://files.grouplens.org/datasets/movielens/ml-20m.zip  (MovieLens 20M Dataset 사용)

- 사용자에 의해 점수가 매겨진 파일(ratings.csv)을 영화별 평점을 구하여 정렬. 영화정보(movies.csv영화명)과 함께 조인하여 보여 주는 예제


- ratings.csv 

- 영화 평점 정보, 약 500MB, 20,000,264 rows

Ratings Data File Structure (ratings.csv)

-----------------------------------------

All ratings are contained in the file `ratings.csv`.

    userId,movieId,rating,timestamp


userId,movieId,rating,timestamp

138493,60816,4.5,1259865163

138493,61160,4.0,1258390537

138493,65682,4.5,1255816373

138493,66762,4.5,1255805408

138493,68319,4.5,1260209720


- movies.csv

- 영화 정보, 약 1MB , 27,279 rows

Movies Data File Structure (movies.csv)

---------------------------------------

Movie information is contained in the file `movies.csv`. Each line of this file after the header row represents one movie, and has the following format:

    movieId,title,genres


movieId,title,genres

131241,Ants in the Pants (2000),Comedy|Romance

131243,Werner - Gekotzt wird später (2003),Animation|Comedy

131248,Brother Bear 2 (2006),Adventure|Animation|Children|Comedy|Fantasy

131250,No More School (2000),Comedy

131252,Forklift Driver Klaus: The First Day on the Job (2001),Comedy|Horror


# spark_movelens.py

from pyspark import SparkContext from statistics import mean if __name__ == "__main__": sc = SparkContext("local[*]", "Simple App") ratings = "D:/hadoop/data/ml-20m/ratings.csv" movies = "D:/hadoop/data/ml-20m/movies.csv" movie = sc.textFile(movies) header = movie.first() dic = movie.filter(lambda x: x != header).map(lambda x: x.split(',')).map(lambda x: (x[0],x[1])) data = sc.textFile(ratings) header = data.first() rdd = data.filter(lambda x: x != header).map(lambda x: x.split(','))\ .map(lambda x: (x[1], float(x[2]))).groupByKey().mapValues(list)\ .map(lambda x: (x[0], round(mean(x[1]), 2)))\ .join(dic).sortBy(lambda x: x[1][0])\ .map(lambda x: (x[0], x[1][0], x[1][1])) for id, avg, title in rdd.collect(): print('{} {} - {}'.format(id, avg, title.encode('utf8')))

  

- 위의 내용을 Spark SQL로 구현 


# spark_sql.py

from pyspark.sql import SQLContext, Row from pyspark import SparkContext if __name__ == "__main__": sc = SparkContext("local[*]", "Simple SQL App") sqlContext = SQLContext(sc) ratings = "D:/hadoop/data/ml-20m/ratings.csv" moviepath = "D:/hadoop/data/ml-20m/movies.csv" movie = sc.textFile(moviepath) header = movie.first() rdd = movie.filter(lambda x: x != header).map(lambda x: x.split(','))\ .map(lambda x: Row(id=x[0], title=x[1])) data = sc.textFile(ratings) header = data.first() rddRating = data.filter(lambda x: x != header).map(lambda x: x.split(','))\ .map(lambda x: Row(id=x[1], rating=float(x[2]))) sqlContext.createDataFrame(rdd).registerTempTable('movies') sqlContext.createDataFrame(rddRating).registerTempTable('ratings') tbl = sqlContext.sql(""" SELECT a.id, b.title, avg(a.rating) as rating FROM ratings a join movies b WHERE a.id = b.id GROUP BY a.id, b.title ORDER BY avg(a.rating) """) tblMap = tbl.map(lambda x: (x.id, x.title, x.rating)) for a, b, c in tblMap.collect(): print("{}({}): {}".format(a, round(c, 2), b.encode('utf8')))



0 Comments
댓글쓰기 폼