This article was translated using AI.

Why Data Cleaning Matters

Data cleaning prepares raw inputs for production pipelines—fixing formats, computing derived values, and removing incomplete or garbage records. Without it, downstream systems fail or produce unreliable insights. Spark helps by offering:

  • Massive processing capacity.
  • A rich framework for transforming data at scale.

Schemas & Types

Define explicit schemas to enforce structure and improve read performance:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

people_schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True),
])

people_df = spark.read.csv("rawdata.csv", schema=people_schema)

Schemas validate data, ensure correct types, and let Spark skip costly inference passes.


Immutability & Lazy Evaluation

Spark DataFrames are immutable; each transformation builds a new logical plan rather than mutating in place:

voter_df = spark.read.csv("voterdata.csv")
voter_df = voter_df.withColumn("full_year", voter_df.year + 2000).drop("year")

No work occurs until you trigger an action (count(), show(), etc.). Lazy evaluation lets Spark reorder operations for optimal execution.


Handling CSVs

CSV pitfalls: missing schemas, mixed delimiters, encoding quirks, no predicate pushdown. Parsing is slow because Spark can’t share CSV I/O across workers. When possible, convert to more efficient formats (Parquet, ORC) after cleaning.


Array Columns & Conditionals

For ArrayType columns use helpers like .size() or .getItem(index). Avoid Python if/else; use vectorized expressions:

from pyspark.sql.functions import when

df = df.select("Name", "Age",
               when(df.Age >= 18, "Adult").otherwise("Minor").alias("Label"))

User Defined Functions (UDFs)

Wrap Python functions when built-ins fall short:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def reverse_str(s): return s[::-1]
reverse_udf = udf(reverse_str, StringType())

df = df.withColumn("ReverseName", reverse_udf(df.Name))

Use UDFs sparingly—they skip a lot of Spark’s optimizations.


Partitioning & IDs

Spark splits DataFrames into partitions automatically. Control the count with minPartitions, repartition(), or coalesce() (coalesce only reduces partitions). Because IDs often require sequential logic, use helpers like monotonically_increasing_id()—IDs are unique but not guaranteed to be contiguous.


Caching

Cache reuse-heavy DataFrames:

df.cache()
print(df.is_cached)  # True
df.unpersist()

Persisting avoids recomputation during iterative pipelines.


Execution Plans & Shuffling

Inspect plans with df.explain(). Shuffling (moving data between executors) is expensive—minimize it:

  • Avoid unnecessary repartition.
  • Use coalesce to shrink partitions.
  • Broadcast small tables before joins: df1.join(broadcast(df2)).

Building Pipelines

Typical flow: input → transform → validate → output.

schema = ...
df = spark.read.csv("datafile", schema=schema)
df = df.withColumn("id", monotonically_increasing_id())
df = df.filter(...).withColumn(...)
df.write.parquet("outdata.parquet")
df.write.json("outdata.json")

Validation may include filtering, joins against reference tables, or rule-based UDFs.


Parsing Real Data

CSV reader options help strip noise:

  • comment='#' removes commented lines.
  • header=True uses the first row as column names.
  • sep="," sets the delimiter (you can use alternate delimiters to handle nested structures).

Validation techniques:

  • Joins against known-good datasets to discard unmatched rows.
  • Complex rules implemented with UDFs for calculations, API checks, etc.

Example UDF:

from pyspark.sql.types import DoubleType

def avg_sale(sales):
    total = sum(item[2] + item[3] for item in sales)
    count = len(sales) * 2
    return total / count

avg_sale_udf = udf(avg_sale, DoubleType())
df = df.withColumn("avg_sale", avg_sale_udf(df.sales_list))

Whenever possible, replace UDFs with built-in operations for better performance.


Cluster Basics

Spark clusters consist of a driver (task scheduling, result aggregation) and multiple workers (execute tasks). Best practices:

  • Use more moderate-sized workers rather than a few huge ones.
  • Provide fast local storage for caching and shuffle spill.
  • Give the driver plenty of memory (often 2× a worker).

Import Performance

Reading many small files usually outperforms one giant file. Use glob patterns (spark.read.csv("airport-*.txt.gz")) and convert to columnar formats early. Partition sizes should be balanced for consistent workload distribution.

Split large files using Unix tools (split -l 10000) or by reading once and writing out with Spark (df.write.parquet(...)).


Configuration & Deployment

Adjust Spark configs via spark.conf.get / spark.conf.set. Deployment options include single-node, standalone clusters, or managed environments (YARN, Mesos, Kubernetes).


PySpark provides everything you need to clean, validate, and pipeline massive datasets—leverage schemas, vectorized operations, and cluster-aware optimizations to keep workloads reliable and fast.