본문 바로가기
TechStudy/BigDataTools

Apache Spark

https://www.coursera.org/learn/spark-hadoop-snowflake-data-engineering

 

Spark, Hadoop, and Snowflake for Data Engineering

Duke University에서 제공합니다. e.g. This is primarily aimed at first- and second-year undergraduates interested in engineering or science, along with ... 무료로 등록하십시오.

www.coursera.org

여기서 들은 수업을 바탕으로 정리하는 내용이다. (1주차)

 

 

 

Spark: 빅데이터 분석 엔진. RAM 사용. Hadoop보다 빠름. 데이터 분배 저장 필요

장점: Speed(disk-based 하둡's MapReduce보다 빠름), Versatility(다재다능:= batch 처리, 쿼리, ML, streaming data 모두 可), Ease of Use(직관적, 개발편의적, 진입쉬움), Advanced Analytics(ML 可 -> 고도분석 可), Real-time Processing(streaming data에 적합)

 

** streaming data는 실시간으로 계속 변하는 정보이다. Ex) Server activity log

 

단점: Memory Requirement(메모리 사용 높음 -> 비용↑ 可), Learning Curve(복잡한 작업위해서는 공부 필요), Compatibility(몇몇 옛날 방식;legacy systems은 하둡이 적절)

 

[Spark vs Hadoop 선택 기준]

1. Use case(사용목적): batch processing은 Hadoop이, rea-time processing 및 다용도는 spark가 적ㅈㅓㄹ

2. Skill Set: Java, MapReduce가 친숙하면 Hadoop이 적절. 뉴비는 Spark가 easier API라서 적절

3. Infrastructure: 사용자 환경이 어디에 더 편의적으로 조성되어있는지도 중요.

4. Performance: 시간에 민감한 신속함이 필요하면 Spark, 특정 업무라면 Hadoop.

 

[RDD(Resilient Distributed Dataset)]: Abstraction of data and tasks partitioned across nodes. 스파크 근본 데이터

(resilient to node failure) Spark framework의 근본 데이터 구조이다. 

화살표: operation

주요 특징들::

Resilience: node failures로부터 회복 可 = recreate lost data partitions

Distributed: Spark cluster의 nodes에 분배됨. (Parallel processing of data 可)

Immutable: 한 번 생성시 수정 不可. 수정 대신 transformation으로 new RDD생성(data integrity 보존)

In-Memory Processing: RAM에 저장 -> disk-based(하둡)보다 빠른 연산 可

Transformation and Action Operations: RDD의 Operations은 2종류이다.
  1) Transformation = lazy (기존 존재하는 것에서 new RDD 생성) ex) map, filter, reduceByKey

   Transformations are lazy operations = Spark RDD에서 호출(action is called) 전까지는 수행이 안된다는 의미

   Transformations을 lazy(lazy operation)라고 부르기도 한다.
  2) actions(연산 및 출력) ex) count, collect, saveAsTextFile

Data Partitioning: data locality 최적화, 진행 효율 개선 도움

 


pip install pyspark해도 경로를 못찾을 때 ->  환경변수 SPARK_HOME 설정한다:

C:\Users\유저이름\anaconda3\Lib\site-packages\pyspark       으로 하면 된다. (윈도우 아나콘다 기준)

 

 

터미널에서 시작하기: pyspark   입력하고 엔터치면 아래와 같이 나옴

pyspark이므로 python 명령어가 인식된다.

 

주피터 노트북 터미널(windows powershell)에서 spark에서 count()가 python 불일치로 실행이 안될 때:

PYSPARK_PYTHON = "C:\Users\유저이름\anaconda3\python.exe"
PYSPARK_DRIVER_PYTHON = "C:\Users\유저이름\anaconda3\python.exe"

을 환경변수에 추가하고 주피터 노트북을 끄고 다시 시작해본다.

numbers = list(range(15))
numbers
rdd = sc.parallelize(numbers)
rdd
rdd2 = rdd.map(lambda x : x*2 )
threes = rdd.filter(lambda x : x % 3 == 0)
threes.count()

입력한 코드 다음과 같다. 여기서

PySpark에서 rdd를 생성하는 메서드

rdd = sc.parallelize(numbers)
  • sc: SparkContext 객체, RDD 생성 및 조작의 핵심 인터페이스.
  • parallelize: 데이터를 여러 개 분산된 파티션으로 나눠 RDD로 변환하는 메서드 
    (parallelize manner = 병렬처리)

결국 이 코드는 numbers를 parallelize 메서드로 RDD로 변환하는 것.

이 데이터를 여러 파티션으로 분리, 분산 저장함. 각 파티션은 클러스터의 여러 노드에 분배되어 병렬로 처리 可

즉, 정수 리스트를 분산된 형태로 저장하고 처리하기 위한 PySpark RDD로 변환하는 과정

 

 

 


Spark SQL

Spark의 RDD 특성을 활용하여 SQL 처리 可

Java, Scala, R에서도 可

Interoperability: 다른 Spark component(streaming, MLlib, GraphX)와 호환됨

UDFs(User-Defined Functions) 可

Dataframe 기능 有. 단, Pandas df와 다른 개념임!

 

 

 

실습

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("PySpark Example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()

spark  # SparkSession 보기

 

 

 

 

accounts = spark.read.csv('./data/accounts.csv') # 데이터 기본 읽기
accounts # 데이터 컬럼 구성 출력
# 이렇게 호출하면 컬럼의 번호와 type만 출력한다.

DataFrame[_c0: string, c1: string, ... ,_c4: string]

 

 

 

accounts = spark.read.option('header', 'true').csv('./data/accounts.csv')
#호출할 때 option을 주어 호출하면
accounts
#_c0~_c4가 각각 무슨 컬럼인지 이름이 반영되어 출력된다.

DataFrame[account_number: string, aba: string, ... , balance: string]

 

 

accounts.printSchema() 
# 보통 이름 호출보단 Schema를 통해 데이터 구조를 파악한다.

 

root
|-- account_number: string (nullable = ture)
|-- aba: string (nullable = true)
|-- bic: string (nullable = true)
|-- opened: string (nullable = true)
|-- balance: string (nullable = true)

 

 

 

 

transactions = spark.read.option('header', True).parquet('./data/transaction.parquet')
# spark는 parquet파일 형태도 read 可
transactions.count()

1000000

 

 

 

account_transctions = transactions.groupby('account_number').sum()
# account_number 기준으로 groupby해서 sum을 구하고
with_sum = accounts.join(account_transactions, 'account_number', 'inner')
# account_number와 inner join
accounts = with_sum.withColumn('new_balance', sum([with_sum.balance, with_sum('sum(amount)')]))
# balance와 sum(amount)를 sum 한 것을 new_balance로 정의
accounts.printSchema()

root
|-- account_number: string (nullable = ture)
|-- aba: string (nullable = true)
|-- bic: string (nullable = true)
|-- opened: string (nullable = true)
|-- balance: string (nullable = true)
|-- sum(amount): long (nullable = true)        <-  새로운 컬럼1 형성!
|-- new_balance: double (nullable = true)    <-  새로운 컬럼 2 형성!

 

neg_balance = accounts.filter(accounts.new_balance < 0)
# 누가 마이너스 통장인지 필터링

clients = spark.read.json('./data/clients.json')
#json 파일도 읽을 수 있다.

clients

DataFrame[account_number: string, address: string, ... , last_name: string]

 

 

clients = clients.json(neg_balance, 'account_number', 'inner') # 이너조인
clients.select(['first_name', 'last_name', 'account_number', 'new_balane'])
# 말 그대로 해당 컬럼들만 select

 

 

clients.select(['first_name', 'last_name', 'account_number', 'new_balane']).show(5)
# 5개까지 보여주세요 라는 show(5)를 추가하면

이제 누가 마이너스 통장인지에 대해 5명을 출력하였다

 

728x90
반응형