Spark SQL로 대규모 데이터 분석하기: 완벽 가이드

Apache Spark SQL을 사용한 효율적인 데이터 처리 방법부터 고급 최적화 기법까지 상세히 알아봅니다.

DataSketchers

Senior Data Engineer

2024-05-05Big Data12분 소요

Overview

Spark SQL은 구조화된 데이터 처리를 위한 Apache Spark의 핵심 모듈입니다. SQL의 직관적인 문법과 Spark의 강력한 분산 처리 능력을 결합하여, 대규모 데이터셋에 대한 효율적인 분석이 가능합니다. 이 가이드에서는 Spark SQL의 기본 개념부터 고급 최적화 기법까지 실전에서 바로 활용할 수 있는 내용을 다룹니다.

Spark SQL의 이해

Spark SQL은 정형 데이터 처리를 위한 Spark의 핵심 모듈입니다.

  • 주요 특징:
  • 구조화된 데이터 처리 - 스키마 기반의 안정적인 데이터 처리
  • SQL 문법 지원 - 친숙한 SQL 문법으로 복잡한 분석 수행
  • 고성능 쿼리 최적화 - Catalyst Optimizer를 통한 자동 최적화
  • 다양한 데이터 소스 지원 - CSV, JSON, Parquet, JDBC 등
  • DataFrame API 통합 - 프로그래밍 방식의 데이터 처리
  • 핵심 컴포넌트:
  • 1. Catalyst Optimizer
  • 논리적/물리적 실행 계획 최적화
  • 규칙 기반 최적화
  • 비용 기반 최적화
  • 2. DataFrame/Dataset API
  • 타입 안전성 보장
  • 구조화된 데이터 처리
  • 함수형 프로그래밍 지원
  • 3. 데이터 소스 API
  • 플러그인 방식의 확장
  • 다양한 포맷 지원
  • 푸시다운 최적화

DataFrame 실전 활용

DataFrame은 Spark SQL의 핵심 추상화입니다. 효율적인 데이터 처리를 위한 다양한 기능을 제공합니다.

1. DataFrame 생성 및 기본 조작

# SparkSession 초기화 spark = SparkSession.builder \ .appName("DataAnalysis") \ .config("spark.sql.adaptive.enabled", "true") \ .getOrCreate()

# 다양한 소스에서 데이터 로드 df_csv = spark.read \ .option("header", "true") \ .option("inferSchema", "true") \ .csv("data.csv")

# JDBC 소스 최적화 로드 df_jdbc = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql://localhost:5432/db") \ .option("dbtable", "sales") \ .option("partitionColumn", "id") \ .option("numPartitions", "10") \ .option("fetchsize", "10000") \ .load() ```

2. 고급 데이터 변환

transformed_df = df.select(
    col("id"),
    concat_ws(" ", col("first_name"), col("last_name")).alias("full_name"),
    when(col("age") < 20, "Young")
    .when(col("age") < 40, "Adult")
    .otherwise("Senior").alias("age_group"),
    datediff(current_date(), col("join_date")).alias("days_joined")

# 윈도우 함수 활용 window_spec = Window \ .partitionBy("department") \ .orderBy(col("salary").desc())

ranked_df = df.withColumn( "salary_rank", rank().over(window_spec) ).withColumn( "running_total", sum("salary").over(window_spec) ) ```

3. 성능 최적화 기법

df.repartition(col("date")) \
  .write \
  .partitionBy("date") \
  .format("parquet") \
  .mode("overwrite") \

# 조인 최적화 from pyspark.sql.functions import broadcast

optimized_join = large_df.join( broadcast(small_df), on="key", how="left" ) ```

4. 데이터 품질 검사

stats = df.select([
    count("*").alias("total_rows"),
    count(when(col("value").isNull(), True)).alias("null_count"),
    mean("value").alias("mean_value"),
    stddev("value").alias("std_value")

# 중복 데이터 확인 duplicates = df.groupBy(df.columns) \ .count() \ .filter(col("count") > 1) ```

SQL 쿼리 최적화

Spark SQL의 성능을 최대한 끌어내기 위한 쿼리 최적화 기법들을 알아봅니다.

1. 기본 최적화 원칙

  • 조기 필터링
  • ```sql
  • -- 좋은 예시
  • SELECT * FROM large_table
  • WHERE date_column >= '2024-01-01'
  • AND category = 'electronics'

-- 피해야 할 예시 SELECT * FROM ( SELECT * FROM large_table ) filtered WHERE date_column >= '2024-01-01' ```

2. 조인 최적화

SELECT /*+ BROADCAST(small_table) */ *
FROM large_table l

-- 정렬-병합 조인을 위한 버켓팅 CREATE TABLE bucketed_table USING parquet CLUSTERED BY (join_key) INTO 8 BUCKETS AS SELECT * FROM source_table ```

3. 윈도우 함수 최적화

WITH RankedData AS (
    SELECT *,
        ROW_NUMBER() OVER (
            PARTITION BY department
            ORDER BY salary DESC
        ) as rank
    FROM employees
)
SELECT * FROM RankedData
WHERE rank <= 3

4. 메모리 관리

spark.conf.set("spark.sql.shuffle.partitions", 200)
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "2g")

결론

Spark SQL은 대규모 데이터 처리를 위한 강력한 도구입니다. 이 가이드에서 다룬 최적화 기법들을 적용하면, 더 효율적이고 안정적인 데이터 분석 파이프라인을 구축할 수 있습니다. 다음 글에서는 Spark Streaming을 활용한 실시간 데이터 처리 방법을 살펴보도록 하겠습니다.

이 글이 도움이 되었나요?

댓글

첫 번째 댓글을 작성해보세요!

더 많은 Spark 관련 글을 받아보세요

새로운 글이 발행되면 이메일로 알려드립니다.