Apache Spark 입문
Apache Spark란?
- 빅데이터 처리와 분석을 위해 설계된 오픈 소스 분산 컴퓨팅 시스템
- 클러스터 환경에서 대규모 데이터 세트를 처리하기 위한 통합 엔진을 제공
- 배치 처리부터 실시간 스트림 처리 및 기계 학습까지 다양하게 활용
Apache Spark의 주요 특징:
빠른 처리 속도: 메모리 내 처리 엔진을 사용하여 매우 빠른 데이터 처리 가능
다양한 기능: 다양한 데이터 처리 작업을 위한 API와 라이브러리 제공
사용 편의성: Java, Scala, Python 및 R을 지원
- 내결함성 보장: 장애 내성을 보장하기 위해 resilient distributed datasets (RDDs) 사용
RDD(Resilient Distributed Dataset): 변경할 수 없는 분산된 객체 컬렉션
- 클러스터에서 병렬 처리될 수 있음
- 장애에서 회복할 수 있는 회복력을 가지고 있음
- 여러 노드에 파티션으로 나뉘어 있어 분산
- 데이터셋과 유사한 형태를 가지고 있으며, 모든 유형의 요소를 포함할 수 있음
- 기존 RDD에서 새로운 RDD를 생성하는 변환(transformations) 및 변환을 실행하고 결과를 반환하는 액션(actions)
- 확장성: Spark의 분산 아키텍처는 대규모 하드웨어 클러스터 확장할 수 있어, 데이터 처리 작업을 효율적으로 확장 가능
실습
https://github.com/yehoon17/Spark-with-MovieLens-Dataset
데이터
MovieLens에서 다운로드
movies.csv
1
2
3
4
5
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
ratings.csv
1
2
3
4
5
userId,movieId,rating,timestamp
1,296,5.0,1147880044
1,306,3.5,1147868817
1,307,5.0,1147868828
1,665,5.0,1147878820
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Initialize SparkSession
spark = SparkSession.builder \
.appName("SparkProject") \
.getOrCreate()
# Load dataset
ratings_df = spark.read.csv(
"data/ratings.csv",
header=True,
inferSchema=True
)
movies_df = spark.read.csv(
"data/movies.csv",
header=True,
inferSchema=True
)
# Analysis
analyze_top_movies(ratings_df, movies_df)
analyze_user_trands(ratings_df)
# Stop SparkSession
spark.stop()
PySpark의 SparkSession
은 Python에서 Apache Spark를 사용하기 위한 시작점이다.
- 구조화된 데이터 작업, SQL 쿼리 실행 및 Spark 속성 구성과 같은 작업을 수행하는 통합된 인터페이스를 제공
- Spark 리소스(예: Executor 및 메모리)를 관리하고 작업 진행 상황을 모니터링하기 위한 Spark UI에 액세스를 제공
- Spark 애플리케이션의 라이프사이클을 처리하여 초기화에서 종료까지의 과정을 관리
상위 영화 분석
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def analyze_top_movies(ratings_df, movies_df):
# Join ratings_df with movies_df to get movie titles and genres
movie_ratings_df = ratings_df.join(movies_df, "movieId")
# Calculate the average rating and number of ratings for each movie
movie_stats_df = (
movie_ratings_df
.groupBy("movieId", "title", "genres")
.agg(
count("rating").alias("num_ratings"),
avg("rating").alias("avg_rating")
)
)
# Calculate a weighted average rating based on the number of ratings
expr_ = "avg_rating * num_ratings / (num_ratings + 10) + 5 * 10 / (num_ratings + 10)"
weighted_avg_df = movie_stats_df.withColumn("weighted_avg_rating", expr(expr_))
# Order the movies by weighted average rating and number of ratings
top_movies_df = weighted_avg_df.orderBy(
col("weighted_avg_rating").desc(),
col("num_ratings").desc()
)
# Display top N movies
print("Top 10 Movies by Weighted Average Rating and Number of Ratings:")
cols = ["title", "genres", "weighted_avg_rating", "num_ratings"]
top_movies_df.select(cols).show(10, truncate=False)
결과
1
2
3
4
5
6
7
8
9
10
11
12
13
14
+---------------------------+--------------------------------+-------------------+-----------+
|title |genres |weighted_avg_rating|num_ratings|
+---------------------------+--------------------------------+-------------------+-----------+
|Lonesome Dove Church (2014)|Western |5.0 |3 |
|Sound of Christmas (2016) |Drama |5.0 |3 |
|Borrowed Time (2012) |Drama |5.0 |3 |
|Awaken (2013) |Drama|Romance|Sci-Fi |5.0 |3 |
|The Memory Book (2014) |Drama|Romance |5.0 |2 |
|The Ties That Bind (2015) |(no genres listed) |5.0 |2 |
|Joy Road (2011) |Crime|Drama |5.0 |2 |
|Genius on Hold (2013) |(no genres listed) |5.0 |2 |
|FB: Fighting Beat (2007) |Action |5.0 |2 |
|Windstorm 2 (2015) |Adventure|Children|Drama|Romance|5.0 |2 |
+---------------------------+--------------------------------+-------------------+-----------+
사용자별 평점 추이
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def analyze_user_trands(ratings_df):
# Convert timestamp to a timestamp type
ratings_df = ratings_df.withColumn("timestamp", from_unixtime("timestamp"))
# Group ratings by user and time window
user_trends_df = (
ratings_df
.groupBy("userId", window("timestamp", "1 week"))
.avg("rating")
)
# Order the user trends by userId and window
user_trends_df = user_trends_df.orderBy("userId", "window")
# Display user trends
print("User Rating Trends:")
user_trends_df.show(truncate=False)
결과
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
+------+------------------------------------------+------------------+
|userId|window |avg(rating) |
+------+------------------------------------------+------------------+
|1 |{2006-05-11 09:00:00, 2006-05-18 09:00:00}|3.8142857142857145|
|2 |{2006-03-02 09:00:00, 2006-03-09 09:00:00}|3.630434782608696 |
|3 |{2015-08-13 09:00:00, 2015-08-20 09:00:00}|3.7450248756218905|
|3 |{2016-01-21 09:00:00, 2016-01-28 09:00:00}|3.769230769230769 |
|3 |{2017-01-12 09:00:00, 2017-01-19 09:00:00}|3.734375 |
|3 |{2017-04-20 09:00:00, 2017-04-27 09:00:00}|4.055555555555555 |
|3 |{2019-08-15 09:00:00, 2019-08-22 09:00:00}|3.5258064516129033|
|4 |{2019-11-14 09:00:00, 2019-11-21 09:00:00}|3.378099173553719 |
|5 |{1996-04-25 09:00:00, 1996-05-02 09:00:00}|3.6216216216216215|
|5 |{1996-05-09 09:00:00, 1996-05-16 09:00:00}|4.5 |
|5 |{1996-05-23 09:00:00, 1996-05-30 09:00:00}|4.0 |
|5 |{1996-06-20 09:00:00, 1996-06-27 09:00:00}|2.0 |
|5 |{1997-03-13 09:00:00, 1997-03-20 09:00:00}|3.7735849056603774|
|6 |{1999-12-09 09:00:00, 1999-12-16 09:00:00}|4.153846153846154 |
|7 |{1996-06-20 09:00:00, 1996-06-27 09:00:00}|3.64 |
|8 |{1998-03-19 09:00:00, 1998-03-26 09:00:00}|3.625 |
|8 |{1998-03-26 09:00:00, 1998-04-02 09:00:00}|3.0 |
|9 |{1997-03-20 09:00:00, 1997-03-27 09:00:00}|3.865168539325843 |
|10 |{2008-11-20 09:00:00, 2008-11-27 09:00:00}|3.452830188679245 |
|11 |{2008-04-10 09:00:00, 2008-04-17 09:00:00}|3.1458333333333335|
+------+------------------------------------------+------------------+
only showing top 20 rows
This post is licensed under CC BY 4.0 by the author.