Spark RDD와 DataFrame 마스터하기
이번 글에서는 Spark의 핵심 데이터 구조인 RDD(Resilient Distributed Dataset)와 DataFrame에 대해 자세히 알아보겠습니다. 두 구조의 특징과 활용법을 실제 예제와 함께 살펴보겠습니다.
# RDD(Resilient Distributed Dataset) 기초
## RDD란?
RDD는 Spark의 기본 데이터 구조로, 다음과 같은 특징을 가집니다: - 불변성(Immutable) - 분산 처리(Distributed) - 탄력적 복구(Resilient) - 지연 실행(Lazy Evaluation) - 타입 안정성(Type-safe)
## RDD 생성 방법
1. 병렬화를 통한 생성: ```python from pyspark.sql import SparkSession
# SparkSession 생성 spark = SparkSession.builder \ .appName("RDDExample") \ .getOrCreate()
# 리스트로부터 RDD 생성 rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5]) # 텍스트 파일로부터 RDD 생성 text_rdd = spark.sparkContext.textFile("example.txt") ```
2. 외부 데이터 소스로부터 생성: ```python # HDFS에서 데이터 읽기 hdfs_rdd = spark.sparkContext.textFile("hdfs://...")
# S3에서 데이터 읽기 s3_rdd = spark.sparkContext.textFile("s3://...") ```
# RDD 연산
## 1. Transformation (변환)
주요 변환 연산: ```python # map: 각 요소에 함수 적용 mapped_rdd = rdd.map(lambda x: x * 2)
# filter: 조건에 맞는 요소만 선택 filtered_rdd = rdd.filter(lambda x: x > 2)
# flatMap: map 후 평탄화 flat_rdd = rdd.flatMap(lambda x: range(x))
# distinct: 중복 제거 unique_rdd = rdd.distinct()
# groupByKey: 키별 그룹화 grouped_rdd = rdd.groupByKey() ```
## 2. Action (액션)
실제 계산을 수행하는 연산: ```python # collect: 모든 요소 반환 result = rdd.collect()
# count: 요소 개수 반환 count = rdd.count()
# reduce: 요소들을 하나로 병합 sum_result = rdd.reduce(lambda a, b: a + b)
# saveAsTextFile: 파일로 저장 rdd.saveAsTextFile("output") ```
# DataFrame 이해하기
## DataFrame 소개
DataFrame은 테이블 형태의 데이터 구조로: - 스키마 정의 지원 - SQL 쿼리 가능 - 최적화된 실행 계획 - 다양한 포맷 지원
## DataFrame 생성
1. 기본 생성 방법: ```python # Dictionary로부터 생성 data = [ {'name': 'John', 'age': 30}, {'name': 'Alice', 'age': 25} ] df = spark.createDataFrame(data)
# CSV 파일로부터 생성 df = spark.read.csv("data.csv", header=True)
# JSON 파일로부터 생성 df = spark.read.json("data.json") ```
2. 스키마 정의: ```python from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# 스키마 정의 schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True) ])
# 스키마 적용하여 DataFrame 생성 df = spark.createDataFrame(data, schema=schema) ```
# DataFrame 연산
## 1. 기본 연산
```python # 컬럼 선택 df.select("name", "age")
# 필터링 df.filter(df.age > 25)
# 정렬 df.orderBy("age")
# 그룹화 df.groupBy("age").count() ```
## 2. SQL 활용
```python # 임시 뷰 생성 df.createOrReplaceTempView("people")
# SQL 쿼리 실행 result = spark.sql(""" SELECT age, COUNT(*) as count FROM people GROUP BY age HAVING count > 1 """) ```
# RDD vs DataFrame
## 1. 주요 차이점
RDD: - 로우레벨 API - 타입 안정성 - 완전한 프로그래밍 제어 - 복잡한 데이터 처리에 유용
DataFrame: - 하이레벨 API - 최적화된 실행 계획 - SQL 쿼리 지원 - 구조화된 데이터 처리에 적합
## 2. 선택 가이드
RDD 사용이 좋은 경우: - 비구조화된 데이터 처리 - 복잡한 데이터 변환 - 세밀한 제어 필요 - 타입 안정성 중요
DataFrame 사용이 좋은 경우: - 구조화된 데이터 처리 - SQL 쿼리 활용 - 성능 최적화 중요 - 간단한 데이터 조작
# 성능 최적화
## 1. RDD 최적화
주요 최적화 기법: - 적절한 파티셔닝 - 캐싱 활용 - 셔플링 최소화 - 직렬화 최적화
예제: ```python # 파티션 수 조정 rdd = spark.sparkContext.parallelize(data, numSlices=10)
# 캐싱 rdd.cache()
# 파티셔닝 rdd.partitionBy(10) ```
## 2. DataFrame 최적화
최적화 전략: - 컬럼 선택 최적화 - 조인 최적화 - 필터 푸시다운 - 캐싱 전략
예제: ```python # 필요한 컬럼만 선택 df.select("name", "age")
# 조인 최적화 df1.join(df2, "key").hint("broadcast")
# 캐싱 df.cache() ```
# 실전 활용 예제
## 1. 로그 분석 예제
```python # 로그 파일 읽기 logs = spark.read.text("logs.txt")
# DataFrame을 사용한 로그 분석 parsed_logs = logs.select( regexp_extract('value', r'(d{4}-d{2}-d{2})', 1).alias('date'), regexp_extract('value', r'ERROR|WARN|INFO', 0).alias('level') )
# 일자별, 레벨별 집계 result = parsed_logs.groupBy('date', 'level').count() ```
## 2. 데이터 변환 예제
```python # CSV 데이터 읽기 data = spark.read.csv("data.csv", header=True)
# 데이터 변환 및 처리 transformed = data.withColumn( "amount_category", when(col("amount") > 1000, "high") .when(col("amount") > 500, "medium") .otherwise("low") )
# 결과 저장 transformed.write.parquet("output") ```
# 장애 처리 및 디버깅
## 1. 일반적인 문제
주의해야 할 점: - 메모리 부족 오류 - 셔플링 관련 문제 - 데이터 스큐 - 직렬화 오류
## 2. 해결 방안
문제 해결 전략: - 적절한 파티션 크기 설정 - 메모리 설정 최적화 - 데이터 스큐 해결 - 에러 로깅 강화
# 결론
RDD와 DataFrame은 각각의 장단점이 있으며, 사용 사례에 따라 적절히 선택하는 것이 중요합니다. 특히 최신 Spark에서는 DataFrame을 주로 사용하지만, RDD의 세밀한 제어가 필요한 경우도 여전히 존재합니다.
다음 글에서는 Spark SQL에 대해 더 자세히 다루면서, 실제 데이터 분석 사례를 살펴보도록 하겠습니다.