source: https://www.coursera.org/projects/data-analysis-using-pyspark
[Project] Data Analysis Using Pyspark | Coursera
2시간 이내에 이 안내 프로젝트를 완료하세요. One of the important topics that every data analyst should be familiar with is the distributed data processing technologies. As a ...
www.coursera.org
자료는 컴퓨터 환경 제약을 우려하여 구글 코랩에서 진행하지만 필자는 로컬에서 진행하였다.
spark에서 사용할 sql과 시각화 라이브러리를 호출한다.
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc , col, max, struct
import matplotlib.pyplot as plts
spark를 실행하기위해 session을 만든다.
spark = SparkSession.builder.appName('spark_app').getOrCreate()
# 처음에 배웠던 개념
# spark = SparkSession \
# .builder \
# .appName("PySpark Example") \
# .config("spark.some.config.option", "some-value") \
# .getOrCreate()
# spark # SparkSession 보기
Data import and data check
listening_csv_path = './dataset/listenings.csv' # 1GB 데이터
listening_df = spark.read.format('csv') \
.option('inferSchema', True) \
.option('header', True) \
.load(listening_csv_path)
# inferSchema, True: 자동으로 컬럼 타입 추정해주는 옵션
# header, True: 자동으로 파일 내 컬럼명을 같이 가져오는 옵션
# 데이터 보기(default = 20개)
listening_df.show()
+-----------+-------------+--------------------+---------------+--------------------+
| user_id| date| track| artist| album|
+-----------+-------------+--------------------+---------------+--------------------+
|000Silenced|1299680100000| Price Tag| Jessie J| Who You Are|
|000Silenced|1299679920000|Price Tag (Acoust...| Jessie J| Price Tag|
|000Silenced|1299679440000|Be Mine! (Ballad ...| Robyn| Be Mine!|
|000Silenced|1299679200000| Acapella| Kelis| Acapella|
|000Silenced|1299675660000| I'm Not Invisible| The Tease| I'm Not Invisible|
|000Silenced|1297511400000|Bounce (Feat NORE...| MSTRKRFT| Fist of God|
|000Silenced|1294498440000|Don't Stop The Mu...| Rihanna|Addicted 2 Bassli...|
|000Silenced|1292438340000| ObZen| Meshuggah| ObZen|
|000Silenced|1292437740000| Yama's Messengers| Gojira|The Way of All Flesh|
|000Silenced|1292436360000|On the Brink of E...| Napalm Death|Time Waits For No...|
|000Silenced|1292436360000|On the Brink of E...| Napalm Death|Time Waits For No...|
|000Silenced|1292435940000| In Deference| Napalm Death| Smear Campaign|
|000Silenced|1292434920000| Post(?)organic| Decapitated|Organic Hallucinosis|
|000Silenced|1292434560000| Mind Feeders| Dom & Roland| No Strings Attached|
|000Silenced|1292434320000|Necrosadistic War...|Cannibal Corpse| Kill|
|000Silenced|1292365560000| Dance All Night| Dom & Roland| Chronology|
|000Silenced|1292365260000| Late Night| Dom & Roland| Chronology|
|000Silenced|1292365020000| Freak Seen| Dom & Roland| Chronology|
|000Silenced|1292364720000|Paradrenasite (Hi...| Dom & Roland| Chronology|
|000Silenced|1292364300000| Rhino| Dom & Roland| Chronology|
+-----------+-------------+--------------------+---------------+--------------------+
only showing top 20 rows
데이터 삭제하기
listening_df = listening_df.drop('date') # .drop(컬럼이름) 이용
listening_df = listening_df.na.drop() # 결측치있는거 모두 제거
데이터 구조 파악하기
listening_df.printSchema() # 데이터 구조 파악
root
|-- user_id: string (nullable = true)
|-- track: string (nullable = true)
|-- artist: string (nullable = true)
|-- album: string (nullable = true)
판다스 shape이 없으므로 직접 만들어줘야하는 pyspark
shape = (listening_df.count() , len(listening_df.columns))
print(shape) #pysaprk에서는 count()와 len의 columns을 지정해서 확인
(13758905, 4)
SQL쿼리로 데이터 뽑아내기
단순 셀렉트
q0 = listening_df.select('artist', 'track')
q0.show()
+---------------+--------------------+
| artist| track|
+---------------+--------------------+
| Jessie J| Price Tag|
| Jessie J|Price Tag (Acoust...|
| Robyn|Be Mine! (Ballad ...|
| Kelis| Acapella|
| The Tease| I'm Not Invisible|
| MSTRKRFT|Bounce (Feat NORE...|
| Rihanna|Don't Stop The Mu...|
| Meshuggah| ObZen|
| Gojira| Yama's Messengers|
| Napalm Death|On the Brink of E...|
| Napalm Death|On the Brink of E...|
| Napalm Death| In Deference|
| Decapitated| Post(?)organic|
| Dom & Roland| Mind Feeders|
|Cannibal Corpse|Necrosadistic War...|
| Dom & Roland| Dance All Night|
| Dom & Roland| Late Night|
| Dom & Roland| Freak Seen|
| Dom & Roland|Paradrenasite (Hi...|
| Dom & Roland| Rhino|
+---------------+--------------------+
only showing top 20 rows
리한나만 한정해서 데이터 뽑기
q1 = listening_df.select('*').filter(listening_df.artist == "Rihanna")
q1.show()
+-----------+--------------------+-------+--------------------+
| user_id| track| artist| album|
+-----------+--------------------+-------+--------------------+
|000Silenced|Don't Stop The Mu...|Rihanna|Addicted 2 Bassli...|
|000Silenced| Disturbia|Rihanna|Good Girl Gone Ba...|
|00williamsl| Hatin On The Club|Rihanna| Random|
|00williamsl| Hatin On The Club|Rihanna| Random|
|00williamsl| Complicated|Rihanna| Loud|
|00williamsl|What's My Name (f...|Rihanna| Loud|
|00williamsl|Kanye West feat R...|Rihanna| Loud|
| 0502008|Only Girl (In the...|Rihanna| Loud|
| 0rdos|Pon De Replay (Re...|Rihanna| Music of the Sun|
| 0rdos| Now I Know|Rihanna| Music of the Sun|
| 0rdos|There's a Thug in...|Rihanna| Music of the Sun|
| 0rdos| Rush|Rihanna| Music of the Sun|
| 0rdos| Let Me|Rihanna| Music of the Sun|
| 0rdos| Music of the Sun|Rihanna| Music of the Sun|
| 0rdos| Willing to Wait|Rihanna| Music of the Sun|
| 0rdos| The Last Time|Rihanna| Music of the Sun|
| 0rdos|If It's Lovin' Th...|Rihanna| Music of the Sun|
| 0rdos| Here I Go Again|Rihanna| Music of the Sun|
| 0rdos| Pon de Replay|Rihanna| Music of the Sun|
| 0rdos| Cry|Rihanna| Good Girl Gone Bad|
+-----------+--------------------+-------+--------------------+
only showing top 20 rows
리한나 탑 10 유저는? = 리한나의 것에 count된 유저 top 10은?
q2 = listening_df.select('user_id').filter(listening_df.artist == 'Rihanna').groupby('user_id').agg(count('user_id').alias('count')).orderBy(desc('count')).limit(10)
q2.show() #alias: 해당 함수가 적용된 컬럼 이름 지정
# agg : 함수를 써야할 때 사용
+---------------+-----+
| user_id|count|
+---------------+-----+
| thiessu| 179|
| eyessetkyle| 166|
| adxx| 164|
|missnumberthree| 156|
|helloiamnatalie| 128|
| nmjnb| 124|
| AndyyyA| 123|
| BIGBANG186| 121|
| mixedvibes| 120|
| AndyKitt| 115|
+---------------+-----+
top10 tracks?
q3 = listening_df.select('artist','track').groupby('artist','track').agg(count('*').alias('count')).orderBy(desc('count')).limit(10)
q3.show()
+--------------+----------------+-----+
| artist| track|count|
+--------------+----------------+-----+
| Justin Bieber| Sorry| 3381|
|Arctic Monkeys|Do I Wanna Know?| 2865|
| Bon Iver| Skinny Love| 2836|
| Zayn| PILLOWTALK| 2701|
| The Killers| Mr Brightside| 2690|
| Rihanna| Work| 2646|
| Bastille| Pompeii| 2606|
|Mumford & Sons| Little Lion Man| 2520|
|Mumford & Sons| The Cave| 2485|
| Justin Bieber| Love Yourself| 2481|
+--------------+----------------+-----+
리한나의 top10 tracks?
q4 = listening_df.select('artist','track').filter(listening_df.artist == "Rihanna").groupby('artist','track').agg(count('*').alias('count')).orderBy(desc('count')).limit(10)
q4.show()
+-------+--------------------+-----+
| artist| track|count|
+-------+--------------------+-----+
|Rihanna| Work| 2646|
|Rihanna|Only Girl (In the...| 1749|
|Rihanna|We Found Love (fe...| 1575|
|Rihanna| S&M| 1307|
|Rihanna| Rude Boy| 1303|
|Rihanna| Diamonds| 1224|
|Rihanna| Kiss it Better| 945|
|Rihanna| Where Have You Been| 844|
|Rihanna|Cheers (Drink to ...| 697|
|Rihanna| Needed Me| 679|
+-------+--------------------+-----+
top10 앨범은?
q5 = listening_df.select('artist','album').groupby('artist','album').agg(count('*').alias('count')).orderBy(desc('count')).limit(10)
q5.show()
+--------------------+--------------------+-----+
| artist| album|count|
+--------------------+--------------------+-----+
| Kanye West| The Life Of Pablo|22310|
| The xx| xx|14195|
| Arctic Monkeys| AM|14090|
| alt-J| An Awesome Wave|13635|
| Mumford & Sons| Sigh No More|13543|
| Arctic Monkeys|Whatever People S...|12731|
| Bon Iver| For Emma|11994|
| Grimes| Art Angels|11655|
|Florence + the Ma...| Lungs|11362|
| Adele| 21|11215|
+--------------------+--------------------+-----+
2개 데이터 합치기
import하고 확인해보자.
genre_csv_path = './dataset/genre.csv'
genre_df = spark.read.format('csv').option('inferSchma',True).option('header',True).load(genre_csv_path)
genre_df.show()
+--------------------+-----+
| artist|genre|
+--------------------+-----+
| Muse| rock|
| Nirvana| rock|
| Bon Jovi| rock|
| The Police| rock|
| Kiss| rock|
| Guns N' Roses| rock|
| Rusted Root| rock|
|Katrina and the W...| pop|
| The Beatles| rock|
| Hall & Oates| pop|
| Otis Redding| soul|
| Marvin Gaye| soul|
| The Cranberries| rock|
| Survivor| rock|
| Fleetwood Mac|blues|
| Radiohead| rock|
| Toto| rock|
| U2| rock|
|Creedence Clearwa...| rock|
| REM| rock|
+--------------------+-----+
only showing top 20 rows
이제 합쳐보자 (이너조인)
data = listening_df.join(genre_df, how = 'inner', on=['artist'])
data.show() # 이너조인으로 합침
pop을 좋아하는 top 10?
q6 = data.select('user_id').filter(data.genre == "pop").groupby('user_id').agg(count('*').alias('count')).orderBy(desc('count')).limit(10)
q6.show()
+---------------+-----+
| user_id|count|
+---------------+-----+
| 01Green| 496|
| momousagi| 400|
| mrpsb| 400|
| BlueKnockOut| 378|
| musicboy80s| 376|
| incultojurgis| 374|
| ElektricOrchid| 370|
|foreign_fanatic| 350|
| Kevin_Soutar| 346|
| landrover2171| 301|
+---------------+-----+
top 10 genre?
q7 = data.select('genre').groupby('genre').agg(count('*').alias('count')).orderBy(desc('count')).limit(10)
q7.show()
+----------+-------+
| genre| count|
+----------+-------+
| rock|2691934|
| pop|1544747|
|electronic| 551509|
| hip hop| 532984|
| folk| 438174|
|indie rock| 431439|
| punk| 380915|
| r&b| 344101|
| metal| 208107|
| indie| 206726|
+----------+-------+
데이터 시각화
각 유저들이 좋아하는 장르는?
q8_1 = data.select('user_id', 'genre').groupby('user_id','genre').agg(count('*').alias('count')).orderBy('user_id')
q8_1.show()
+-------+--------------------+-----+
|user_id| genre|count|
+-------+--------------------+-----+
| --Seph| folktronica| 2|
| --Seph| experimental| 3|
| --Seph| Nils Frahm| 1|
| --Seph| reggae| 3|
| --Seph| soul| 1|
| --Seph| The Lady of Rage| 4|
| --Seph| progressive trance| 1|
| --Seph| children music| 1|
| --Seph| hip hop| 7|
| --Seph| funk| 1|
| --Seph| Ruben| 4|
| --Seph| Capella Istropol...| 2|
| --Seph| romantic| 1|
| --Seph| Arnór Dan & Doug...| 1|
| --Seph| indie| 3|
| --Seph| Ne-Yo & Nicky Ro...| 1|
| --Seph| baroque| 1|
| --Seph| rock| 27|
| --Seph| electro| 2|
| --Seph| ninja tune| 1|
+-------+--------------------+-----+
only showing top 20 rows
정렬을 유저로하니 한 유저로 정리하고 다른 유저가 나오는 양상으로 정리되었다. 여기서 더 나아가야한다.
struct(): 여러 개 컬럼 합칠 때 사용한다.
위에서 만든 q8_1을 기준으로 groupby를 한다.
# struct()는 여러개 컬럼을 합치는데 사용
q8_2 = q8_1.groupby('user_id').agg(max(struct(col('count'),col('genre'))).alias('max')).select(col('user_id'),col('max.genre'))
q8_2.show()
+---------------+-----------+
| user_id| genre|
+---------------+-----------+
| --Shinn--| rock|
| -Gemma-|british pop|
| -wasp-| rock|
| -x-kelsey-x-| pop|
| 00fieldsy| rock|
| 01higginsr| hip hop|
| 0503611| rock|
| 0Chris0| hip hop|
| 12thmarquis| rock|
| 1936| pop|
| 1Marley| r&b|
| 1chris90| pop|
| 200percent| folk|
| 20113hz| pop|
|20thCenturyGirl| rock|
| 247flix| synthpop|
| 2AmcD7| rock|
| 2Gns| electronic|
| 2HB| pop|
| 2key| hip hop|
+---------------+-----------+
only showing top 20 rows
pop, rock, metal, hip hop 가수의 숫자를 정리하고 시각화하기
q9 = genre_df.select("genre").filter((col('genre')=='pop')|(col('genre')=='rock')|(col('genre')=='metal')|(col('genre')=='hip hop')).groupby('genre').agg(count('genre').alias('count'))
q9.show()
+-------+-----+
| genre|count|
+-------+-----+
| pop| 6960|
|hip hop| 4288|
| metal| 1854|
| rock| 9066|
+-------+-----+
숫자 정리 완료
#q9에 있는 데이터 list화하기: collect()
q9_list = q9.collect()
q9_list
[Row(genre='pop', count=6960),
Row(genre='hip hop', count=4288),
Row(genre='metal', count=1854),
Row(genre='rock', count=9066)]
그래프를 위한 인덱스, 리스트 만들기
lables = [row['genre'] for row in q9_list]
counts = [row['count'] for row in q9_list]
print(lables)
print(counts)
['pop', 'hip hop', 'metal', 'rock']
[6960, 4288, 1854, 9066]
plts.bar(labels, counts)
'TechStudy > BigDataTools' 카테고리의 다른 글
AWS: instance 만들기, PuTTY로 해당 가상환경 터미널 키기 (0) | 2024.01.18 |
---|---|
Snowflake (1) | 2023.12.08 |
Spark and MLLIB (EDA와 머신러닝) (0) | 2023.12.04 |
Apache Spark (1) | 2023.11.30 |
Hadoop 기본 실행 (0) | 2023.11.20 |