https://www.coursera.org/learn/spark-hadoop-snowflake-data-engineering
여기서 들은 수업을 바탕으로 정리하는 내용이다. (1주차)
Spark: 빅데이터 분석 엔진. RAM 사용. Hadoop보다 빠름. 데이터 분배 저장 필요
장점: Speed(disk-based 하둡's MapReduce보다 빠름), Versatility(다재다능:= batch 처리, 쿼리, ML, streaming data 모두 可), Ease of Use(직관적, 개발편의적, 진입쉬움), Advanced Analytics(ML 可 -> 고도분석 可), Real-time Processing(streaming data에 적합)
** streaming data는 실시간으로 계속 변하는 정보이다. Ex) Server activity log
단점: Memory Requirement(메모리 사용 높음 -> 비용↑ 可), Learning Curve(복잡한 작업위해서는 공부 필요), Compatibility(몇몇 옛날 방식;legacy systems은 하둡이 적절)
[Spark vs Hadoop 선택 기준]
1. Use case(사용목적): batch processing은 Hadoop이, rea-time processing 및 다용도는 spark가 적ㅈㅓㄹ
2. Skill Set: Java, MapReduce가 친숙하면 Hadoop이 적절. 뉴비는 Spark가 easier API라서 적절
3. Infrastructure: 사용자 환경이 어디에 더 편의적으로 조성되어있는지도 중요.
4. Performance: 시간에 민감한 신속함이 필요하면 Spark, 특정 업무라면 Hadoop.
[RDD(Resilient Distributed Dataset)]: Abstraction of data and tasks partitioned across nodes. 스파크 근본 데이터
(resilient to node failure) Spark framework의 근본 데이터 구조이다.
화살표: operation
주요 특징들::
Resilience: node failures로부터 회복 可 = recreate lost data partitions
Distributed: Spark cluster의 nodes에 분배됨. (Parallel processing of data 可)
Immutable: 한 번 생성시 수정 不可. 수정 대신 transformation으로 new RDD생성(data integrity 보존)
In-Memory Processing: RAM에 저장 -> disk-based(하둡)보다 빠른 연산 可
Transformation and Action Operations: RDD의 Operations은 2종류이다.
1) Transformation = lazy (기존 존재하는 것에서 new RDD 생성) ex) map, filter, reduceByKey
Transformations are lazy operations = Spark RDD에서 호출(action is called) 전까지는 수행이 안된다는 의미
Transformations을 lazy(lazy operation)라고 부르기도 한다.
2) actions(연산 및 출력) ex) count, collect, saveAsTextFile
Data Partitioning: data locality 최적화, 진행 효율 개선 도움
pip install pyspark해도 경로를 못찾을 때 -> 환경변수 SPARK_HOME 설정한다:
C:\Users\유저이름\anaconda3\Lib\site-packages\pyspark 으로 하면 된다. (윈도우 아나콘다 기준)
터미널에서 시작하기: pyspark 입력하고 엔터치면 아래와 같이 나옴
주피터 노트북 터미널(windows powershell)에서 spark에서 count()가 python 불일치로 실행이 안될 때:
PYSPARK_PYTHON = "C:\Users\유저이름\anaconda3\python.exe"
PYSPARK_DRIVER_PYTHON = "C:\Users\유저이름\anaconda3\python.exe"
을 환경변수에 추가하고 주피터 노트북을 끄고 다시 시작해본다.
numbers = list(range(15))
numbers
rdd = sc.parallelize(numbers)
rdd
rdd2 = rdd.map(lambda x : x*2 )
threes = rdd.filter(lambda x : x % 3 == 0)
threes.count()
입력한 코드 다음과 같다. 여기서
PySpark에서 rdd를 생성하는 메서드
rdd = sc.parallelize(numbers)
- sc: SparkContext 객체, RDD 생성 및 조작의 핵심 인터페이스.
- parallelize: 데이터를 여러 개 분산된 파티션으로 나눠 RDD로 변환하는 메서드
(parallelize manner = 병렬처리)
결국 이 코드는 numbers를 parallelize 메서드로 RDD로 변환하는 것.
이 데이터를 여러 파티션으로 분리, 분산 저장함. 각 파티션은 클러스터의 여러 노드에 분배되어 병렬로 처리 可
즉, 정수 리스트를 분산된 형태로 저장하고 처리하기 위한 PySpark RDD로 변환하는 과정
Spark SQL
Spark의 RDD 특성을 활용하여 SQL 처리 可
Java, Scala, R에서도 可
Interoperability: 다른 Spark component(streaming, MLlib, GraphX)와 호환됨
UDFs(User-Defined Functions) 可
Dataframe 기능 有. 단, Pandas df와 다른 개념임!
실습
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("PySpark Example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
spark # SparkSession 보기
accounts = spark.read.csv('./data/accounts.csv') # 데이터 기본 읽기
accounts # 데이터 컬럼 구성 출력
# 이렇게 호출하면 컬럼의 번호와 type만 출력한다.
DataFrame[_c0: string, c1: string, ... ,_c4: string]
accounts = spark.read.option('header', 'true').csv('./data/accounts.csv')
#호출할 때 option을 주어 호출하면
accounts
#_c0~_c4가 각각 무슨 컬럼인지 이름이 반영되어 출력된다.
DataFrame[account_number: string, aba: string, ... , balance: string]
accounts.printSchema()
# 보통 이름 호출보단 Schema를 통해 데이터 구조를 파악한다.
root
|-- account_number: string (nullable = ture)
|-- aba: string (nullable = true)
|-- bic: string (nullable = true)
|-- opened: string (nullable = true)
|-- balance: string (nullable = true)
transactions = spark.read.option('header', True).parquet('./data/transaction.parquet')
# spark는 parquet파일 형태도 read 可
transactions.count()
1000000
account_transctions = transactions.groupby('account_number').sum()
# account_number 기준으로 groupby해서 sum을 구하고
with_sum = accounts.join(account_transactions, 'account_number', 'inner')
# account_number와 inner join
accounts = with_sum.withColumn('new_balance', sum([with_sum.balance, with_sum('sum(amount)')]))
# balance와 sum(amount)를 sum 한 것을 new_balance로 정의
accounts.printSchema()
root
|-- account_number: string (nullable = ture)
|-- aba: string (nullable = true)
|-- bic: string (nullable = true)
|-- opened: string (nullable = true)
|-- balance: string (nullable = true)
|-- sum(amount): long (nullable = true) <- 새로운 컬럼1 형성!
|-- new_balance: double (nullable = true) <- 새로운 컬럼 2 형성!
neg_balance = accounts.filter(accounts.new_balance < 0)
# 누가 마이너스 통장인지 필터링
clients = spark.read.json('./data/clients.json')
#json 파일도 읽을 수 있다.
clients
DataFrame[account_number: string, address: string, ... , last_name: string]
clients = clients.json(neg_balance, 'account_number', 'inner') # 이너조인
clients.select(['first_name', 'last_name', 'account_number', 'new_balane'])
# 말 그대로 해당 컬럼들만 select
clients.select(['first_name', 'last_name', 'account_number', 'new_balane']).show(5)
# 5개까지 보여주세요 라는 show(5)를 추가하면
'TechStudy > BigDataTools' 카테고리의 다른 글
AWS: instance 만들기, PuTTY로 해당 가상환경 터미널 키기 (0) | 2024.01.18 |
---|---|
Snowflake (1) | 2023.12.08 |
Spark and MLLIB (EDA와 머신러닝) (0) | 2023.12.04 |
Spark 실습(조작법, 쿼리, 시각화) (1) | 2023.12.04 |
Hadoop 기본 실행 (0) | 2023.11.20 |