Spark RDD와 DataFrame 마스터하기

2024-04-30Big Data

이번 글에서는 Spark의 핵심 데이터 구조인 RDD(Resilient Distributed Dataset)와 DataFrame에 대해 자세히 알아보겠습니다. 두 구조의 특징과 활용법을 실제 예제와 함께 살펴보겠습니다.

# RDD(Resilient Distributed Dataset) 기초

## RDD란?

RDD는 Spark의 기본 데이터 구조로, 다음과 같은 특징을 가집니다: - 불변성(Immutable) - 분산 처리(Distributed) - 탄력적 복구(Resilient) - 지연 실행(Lazy Evaluation) - 타입 안정성(Type-safe)

## RDD 생성 방법

1. 병렬화를 통한 생성: ```python from pyspark.sql import SparkSession

# SparkSession 생성 spark = SparkSession.builder \ .appName("RDDExample") \ .getOrCreate()

# 리스트로부터 RDD 생성 rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5]) # 텍스트 파일로부터 RDD 생성 text_rdd = spark.sparkContext.textFile("example.txt") ```

2. 외부 데이터 소스로부터 생성: ```python # HDFS에서 데이터 읽기 hdfs_rdd = spark.sparkContext.textFile("hdfs://...")

# S3에서 데이터 읽기 s3_rdd = spark.sparkContext.textFile("s3://...") ```

# RDD 연산

## 1. Transformation (변환)

주요 변환 연산: ```python # map: 각 요소에 함수 적용 mapped_rdd = rdd.map(lambda x: x * 2)

# filter: 조건에 맞는 요소만 선택 filtered_rdd = rdd.filter(lambda x: x > 2)

# flatMap: map 후 평탄화 flat_rdd = rdd.flatMap(lambda x: range(x))

# distinct: 중복 제거 unique_rdd = rdd.distinct()

# groupByKey: 키별 그룹화 grouped_rdd = rdd.groupByKey() ```

## 2. Action (액션)

실제 계산을 수행하는 연산: ```python # collect: 모든 요소 반환 result = rdd.collect()

# count: 요소 개수 반환 count = rdd.count()

# reduce: 요소들을 하나로 병합 sum_result = rdd.reduce(lambda a, b: a + b)

# saveAsTextFile: 파일로 저장 rdd.saveAsTextFile("output") ```

# DataFrame 이해하기

## DataFrame 소개

DataFrame은 테이블 형태의 데이터 구조로: - 스키마 정의 지원 - SQL 쿼리 가능 - 최적화된 실행 계획 - 다양한 포맷 지원

## DataFrame 생성

1. 기본 생성 방법: ```python # Dictionary로부터 생성 data = [ {'name': 'John', 'age': 30}, {'name': 'Alice', 'age': 25} ] df = spark.createDataFrame(data)

# CSV 파일로부터 생성 df = spark.read.csv("data.csv", header=True)

# JSON 파일로부터 생성 df = spark.read.json("data.json") ```

2. 스키마 정의: ```python from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# 스키마 정의 schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True) ])

# 스키마 적용하여 DataFrame 생성 df = spark.createDataFrame(data, schema=schema) ```

# DataFrame 연산

## 1. 기본 연산

```python # 컬럼 선택 df.select("name", "age")

# 필터링 df.filter(df.age > 25)

# 정렬 df.orderBy("age")

# 그룹화 df.groupBy("age").count() ```

## 2. SQL 활용

```python # 임시 뷰 생성 df.createOrReplaceTempView("people")

# SQL 쿼리 실행 result = spark.sql(""" SELECT age, COUNT(*) as count FROM people GROUP BY age HAVING count > 1 """) ```

# RDD vs DataFrame

## 1. 주요 차이점

RDD: - 로우레벨 API - 타입 안정성 - 완전한 프로그래밍 제어 - 복잡한 데이터 처리에 유용

DataFrame: - 하이레벨 API - 최적화된 실행 계획 - SQL 쿼리 지원 - 구조화된 데이터 처리에 적합

## 2. 선택 가이드

RDD 사용이 좋은 경우: - 비구조화된 데이터 처리 - 복잡한 데이터 변환 - 세밀한 제어 필요 - 타입 안정성 중요

DataFrame 사용이 좋은 경우: - 구조화된 데이터 처리 - SQL 쿼리 활용 - 성능 최적화 중요 - 간단한 데이터 조작

# 성능 최적화

## 1. RDD 최적화

주요 최적화 기법: - 적절한 파티셔닝 - 캐싱 활용 - 셔플링 최소화 - 직렬화 최적화

예제: ```python # 파티션 수 조정 rdd = spark.sparkContext.parallelize(data, numSlices=10)

# 캐싱 rdd.cache()

# 파티셔닝 rdd.partitionBy(10) ```

## 2. DataFrame 최적화

최적화 전략: - 컬럼 선택 최적화 - 조인 최적화 - 필터 푸시다운 - 캐싱 전략

예제: ```python # 필요한 컬럼만 선택 df.select("name", "age")

# 조인 최적화 df1.join(df2, "key").hint("broadcast")

# 캐싱 df.cache() ```

# 실전 활용 예제

## 1. 로그 분석 예제

```python # 로그 파일 읽기 logs = spark.read.text("logs.txt")

# DataFrame을 사용한 로그 분석 parsed_logs = logs.select( regexp_extract('value', r'(d{4}-d{2}-d{2})', 1).alias('date'), regexp_extract('value', r'ERROR|WARN|INFO', 0).alias('level') )

# 일자별, 레벨별 집계 result = parsed_logs.groupBy('date', 'level').count() ```

## 2. 데이터 변환 예제

```python # CSV 데이터 읽기 data = spark.read.csv("data.csv", header=True)

# 데이터 변환 및 처리 transformed = data.withColumn( "amount_category", when(col("amount") > 1000, "high") .when(col("amount") > 500, "medium") .otherwise("low") )

# 결과 저장 transformed.write.parquet("output") ```

# 장애 처리 및 디버깅

## 1. 일반적인 문제

주의해야 할 점: - 메모리 부족 오류 - 셔플링 관련 문제 - 데이터 스큐 - 직렬화 오류

## 2. 해결 방안

문제 해결 전략: - 적절한 파티션 크기 설정 - 메모리 설정 최적화 - 데이터 스큐 해결 - 에러 로깅 강화

# 결론

RDD와 DataFrame은 각각의 장단점이 있으며, 사용 사례에 따라 적절히 선택하는 것이 중요합니다. 특히 최신 Spark에서는 DataFrame을 주로 사용하지만, RDD의 세밀한 제어가 필요한 경우도 여전히 존재합니다.

다음 글에서는 Spark SQL에 대해 더 자세히 다루면서, 실제 데이터 분석 사례를 살펴보도록 하겠습니다.