https://www.coursera.org/projects/diabetes-prediction-with-pyspark-mllib
스파크를 통해 EDA를 하고 머신러닝도 해보자.
#create a sparksession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark').getOrCreate()
# get or create는 있으면 불러오고 없으면 만들라는 의미
세션 설정부터 해주고
#clone the diabetes dataset from the github repository
#데이터 가져오기 (source: kaggle)
! git clone https://github.com/education454/diabetes_dataset
#check if the dataset exists
#잘 가져왔는지 확인 (리눅스는 dir대신 ls사용)
! dir diabetes_dataset
Volume in drive C has no label.
Volume Serial Number is D001-AFD7
Directory of C:\Users\Seward_Shin\Desktop\2023_study\PySpark\diabetes_dataset
12/04/2023 02:30 PM <DIR> .
12/04/2023 03:46 PM <DIR> ..
12/04/2023 02:30 PM 62,058 diabetes.csv
12/04/2023 02:30 PM 213 new_test.csv
2 File(s) 62,271 bytes
2 Dir(s) 216,771,551,232 bytes free
import하고 확인하기 (당뇨병 걸리냐 안걸리냐 target이 있는 data)
#create spark dataframe
df = spark.read.csv("./diabetes_dataset/diabetes.csv", header=True, inferSchema=True)
# options 따로 안줘도 이렇게 한번에 설정 可
#display the dataframe
df.show() # outcome에서 1 = diabetes, 0 = normal
#print the schema 대략 컬럼과 type 확인
df.printSchema()
root
|-- Pregnancies: integer (nullable = true)
|-- Glucose: integer (nullable = true)
|-- BloodPressure: integer (nullable = true)
|-- SkinThickness: integer (nullable = true)
|-- Insulin: integer (nullable = true)
|-- BMI: double (nullable = true)
|-- DiabetesPedigreeFunction: double (nullable = true)
|-- Age: integer (nullable = true)
|-- Outcome: integer (nullable = true)
describe()함수는 spark에도 있다!
#get the summary statistics
df.describe().show()
결측치 확인
#check for null values 결측치 개수 확인 (isNullSum)
for col in df.columns:
print(col+":", df[df[col].isNull()].count())
Pregnancies: 0
Glucose: 0
BloodPressure: 0
SkinThickness: 0
Insulin: 0
BMI: 0
DiabetesPedigreeFunction: 0
Age: 0
Outcome: 0
-> 결측치는 하나도 없다!
그러나 0이되면 안되는 수치들인데 0인 이상치들이 많다!
#look for the unnecessary values present 0일 수 없는 비현실적인 컬럼값들 조사
def count_zeros():
columns_list = ['Glucose','BloodPressure','SkinThickness','Insulin','BMI']
for i in columns_list:
print(i+":", df[df[i]==0].count())
count_zeros()
Glucose: 13
BloodPressure: 90
SkinThickness: 573
Insulin: 956
BMI: 28
인슐린 956개가 0 -> 전체 50% 데이터가 이상치
-> 50%가량을 삭제 불가 -> 0에 대해서 평균값으로 교체
# 평균값 삽입은 전체 데이터에 덜 영향주는 무난한 방법
# calculate and replace the unnecessary values by the mean value
from pyspark.sql.functions import *
for i in df.columns[1:6]:
data = df.agg({i:'mean'}).first()[0]
print("mean value for {} is {}".format(i,int(data))) # 값 파악 용도
df = df.withColumn(i, when(df[i]==0, int(data)).otherwise(df[i])) # 0을 교체하기
# print 값으로 mean을 파악한 후에 0인 값을 평균값으로 교체(df 재정의)
#display the dataframe
df.show()
mean value for Glucose is 121
mean value for BloodPressure is 69
mean value for SkinThickness is 20
mean value for Insulin is 80
mean value for BMI is 32
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
| 2| 138| 62| 35| 80|33.6| 0.127| 47| 1|
| 0| 84| 82| 31| 125|38.2| 0.233| 23| 0|
| 0| 145| 69| 20| 80|44.2| 0.63| 31| 1|
| 0| 135| 68| 42| 250|42.3| 0.365| 24| 1|
| 1| 139| 62| 41| 480|40.7| 0.536| 21| 0|
| 0| 173| 78| 32| 265|46.5| 1.159| 58| 0|
| 4| 99| 72| 17| 80|25.6| 0.294| 28| 0|
| 8| 194| 80| 20| 80|26.1| 0.551| 67| 0|
| 2| 83| 65| 28| 66|36.8| 0.629| 24| 0|
| 2| 89| 90| 30| 80|33.5| 0.292| 42| 0|
| 4| 99| 68| 38| 80|32.8| 0.145| 33| 0|
| 4| 125| 70| 18| 122|28.9| 1.144| 45| 1|
| 3| 80| 69| 20| 80|32.0| 0.174| 22| 0|
| 6| 166| 74| 20| 80|26.6| 0.304| 66| 0|
| 5| 110| 68| 20| 80|26.0| 0.292| 30| 0|
| 2| 81| 72| 15| 76|30.1| 0.547| 25| 0|
| 7| 195| 70| 33| 145|25.1| 0.163| 55| 1|
| 6| 154| 74| 32| 193|29.3| 0.839| 39| 0|
| 2| 117| 90| 19| 71|25.2| 0.313| 21| 0|
| 3| 84| 72| 32| 80|37.2| 0.267| 28| 0|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 20 rows
0인 값이 mean value로 바뀌었다.
target data(outcome)과 상관분석하기
#find the correlation among the set of input & output variables
for i in df.columns:
print("Correlation to outcome for {} is {}".format(i,df.stat.corr('Outcome',i)))
Correlation to outcome for Pregnancies is 0.22443699263363961
Correlation to outcome for Glucose is 0.48796646527321064
Correlation to outcome for BloodPressure is 0.17171333286446713
Correlation to outcome for SkinThickness is 0.1659010662889893
Correlation to outcome for Insulin is 0.1711763270226193
Correlation to outcome for BMI is 0.2827927569760082
Correlation to outcome for DiabetesPedigreeFunction is 0.1554590791569403
Correlation to outcome for Age is 0.23650924717620253
Correlation to outcome for Outcome is 1.0
상관계수 확인 -> 0.5넘는거 없음
필요한 컬럼들만 선정해서 내용 분석.
features: 각 컬럼의 값이 features의 원소값으로, 즉 2차원 -> 1차원 배열로 정리되었다.
#feature selection
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age'],outputCol='features')
output_data = assembler.transform(df)
#print the schema
output_data.printSchema()
root
|-- Pregnancies: integer (nullable = true)
|-- Glucose: integer (nullable = true)
|-- BloodPressure: integer (nullable = true)
|-- SkinThickness: integer (nullable = true)
|-- Insulin: integer (nullable = true)
|-- BMI: double (nullable = true)
|-- DiabetesPedigreeFunction: double (nullable = true)
|-- Age: integer (nullable = true)
|-- Outcome: integer (nullable = true)
|-- features: vector (nullable = true)
output_data.show()
머신러닝 모형만들고 평가하기
#create final data
from pyspark.ml.classification import LogisticRegression
final_data = output_data.select('features','Outcome')
#print schema of final data
final_data.printSchema()
root
|-- features: vector (nullable = true)
|-- Outcome: integer (nullable = true)
features의 1차원 배열을 통해 머신러닝 모형에 넣을 수 있게 되었다.
split하고 보면
#split the dataset ; build the model 비율맞춰 split하기
train, test = final_data.randomSplit([0.7, 0.3])
models = LogisticRegression(labelCol='Outcome')
model = models.fit(train)
#summary of the model
summary = model.summary
summary.predictions.describe().show()
+-------+-------------------+-------------------+
|summary| Outcome| prediction|
+-------+-------------------+-------------------+
| count| 1404| 1404|
| mean| 0.344017094017094|0.26994301994301995|
| stddev|0.47521593065491124|0.44408811278670485|
| min| 0.0| 0.0|
| max| 1.0| 1.0|
+-------+-------------------+-------------------+
모형을 평가해보자
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = model.evaluate(test)
predictions.predictions.show(20)
# 표로 실제 outcome과 예측값 prediction 비교
# score 확인
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',labelCol='Outcome')
evaluator.evaluate(model.transform(test))
+--------------------+-------+--------------------+--------------------+----------+
| features|Outcome| rawPrediction| probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,67.0,76.0,20...| 0|[2.31416588388269...|[0.91004347572478...| 0.0|
|[0.0,73.0,69.0,20...| 0|[4.17901867012219...|[0.98491743939342...| 0.0|
|[0.0,78.0,88.0,29...| 0|[2.60132114101244...|[0.93094655766088...| 0.0|
|[0.0,78.0,88.0,29...| 0|[2.60132114101244...|[0.93094655766088...| 0.0|
|[0.0,84.0,64.0,22...| 0|[2.40000309974290...|[0.91682753987666...| 0.0|
|[0.0,86.0,68.0,32...| 0|[2.71523051885331...|[0.93791940243510...| 0.0|
|[0.0,91.0,68.0,32...| 0|[2.28501153532101...|[0.90762806998223...| 0.0|
|[0.0,91.0,68.0,32...| 0|[2.28501153532101...|[0.90762806998223...| 0.0|
|[0.0,91.0,80.0,20...| 0|[2.25960721697027...|[0.90547601841886...| 0.0|
|[0.0,93.0,60.0,20...| 0|[2.32571529425541...|[0.91098449377881...| 0.0|
|[0.0,93.0,60.0,25...| 0|[2.75947556299199...|[0.94044626861937...| 0.0|
|[0.0,93.0,100.0,3...| 0|[1.01089279002994...|[0.73319483333260...| 0.0|
|[0.0,93.0,100.0,3...| 0|[1.01089279002994...|[0.73319483333260...| 0.0|
|[0.0,94.0,69.0,20...| 0|[2.53819200609889...|[0.92677622703370...| 0.0|
|[0.0,94.0,69.0,20...| 0|[2.53819200609889...|[0.92677622703370...| 0.0|
|[0.0,95.0,80.0,45...| 0|[2.38741363531358...|[0.91586248122966...| 0.0|
|[0.0,95.0,85.0,25...| 1|[1.99796064120719...|[0.88058279205000...| 0.0|
|[0.0,97.0,64.0,36...| 0|[1.99975662149681...|[0.88077152242791...| 0.0|
|[0.0,98.0,82.0,15...| 0|[2.83113388521097...|[0.94433523614451...| 0.0|
|[0.0,99.0,69.0,20...| 0|[2.96155056622244...|[0.95080656994320...| 0.0|
+--------------------+-------+--------------------+--------------------+----------+
only showing top 20 rows
0.8290698406700673
모델 저장과 로드
# save model
#model.save("model")
model.write().overwrite().save('model')
# Hadoop 오류로 저장이 안되면 Hadoop을 설치하거나,
# 아니면 spark session에서 .config("spark.hadoop.validateOutputSpecs", "false") 이걸 추가
# load saved model back to the environment
from pyspark.ml.classification import LogisticRegressionModel
model = LogisticRegressionModel.load('model')
하둡 설치가 안되어있으면 HADOOP_HOME을 찾지못하는 오류가 생긴다. Hadoop 설치가 필수인듯..
새 데이터를 통해 예측해보기 (new_test.csv 활용)
#create a new spark dataframe
test_df = spark.read.csv("./diabetes_dataset/new_test.csv", header=True, inferSchema=True)
#print the schema
test_df.printSchema()
root
|-- Pregnancies: integer (nullable = true)
|-- Glucose: integer (nullable = true)
|-- BloodPressure: integer (nullable = true)
|-- SkinThickness: integer (nullable = true)
|-- Insulin: integer (nullable = true)
|-- BMI: double (nullable = true)
|-- DiabetesPedigreeFunction: double (nullable = true)
|-- Age: integer (nullable = true)
features 컬럼 만들기
#create an additional feature merged column
test_data = assembler.transform(test_df)
#print the schema
test_data.printSchema()
root
|-- Pregnancies: integer (nullable = true)
|-- Glucose: integer (nullable = true)
|-- BloodPressure: integer (nullable = true)
|-- SkinThickness: integer (nullable = true)
|-- Insulin: integer (nullable = true)
|-- BMI: double (nullable = true)
|-- DiabetesPedigreeFunction: double (nullable = true)
|-- Age: integer (nullable = true)
|-- features: vector (nullable = true)
#use model to make predictions
results = model.transform(test_data)
results.printSchema()
root
|-- Pregnancies: integer (nullable = true)
|-- Glucose: integer (nullable = true)
|-- BloodPressure: integer (nullable = true)
|-- SkinThickness: integer (nullable = true)
|-- Insulin: integer (nullable = true)
|-- BMI: double (nullable = true)
|-- DiabetesPedigreeFunction: double (nullable = true)
|-- Age: integer (nullable = true)
|-- features: vector (nullable = true)
|-- rawPrediction: vector (nullable = true)
|-- probability: vector (nullable = true)
|-- prediction: double (nullable = false)
모형으로 예측 수행하기
#display the predictions
results.select('features','prediction').show()
+--------------------+----------+
| features|prediction|
+--------------------+----------+
|[1.0,190.0,78.0,3...| 1.0|
|[0.0,80.0,84.0,36...| 0.0|
|[2.0,138.0,82.0,4...| 1.0|
|[1.0,110.0,63.0,4...| 1.0|
+--------------------+----------+
데이터에 대해 상태를 예측해주는 모형 완성
'TechStudy > BigDataTools' 카테고리의 다른 글
AWS: instance 만들기, PuTTY로 해당 가상환경 터미널 키기 (0) | 2024.01.18 |
---|---|
Snowflake (1) | 2023.12.08 |
Spark 실습(조작법, 쿼리, 시각화) (1) | 2023.12.04 |
Apache Spark (1) | 2023.11.30 |
Hadoop 기본 실행 (0) | 2023.11.20 |