Working with Columns, Expressions, and User-Defined Functions in PySpark
In PySpark, data is often stored in the form of DataFrames. To extract insights and manipulate the data, we need to work with columns, expressions, and sometimes define our own logic using user-defined functions (UDFs).
Understanding Columns in PySpark
Think of a DataFrame as a table and columns as the individual fields like "name", "age", or "salary". You can access, modify, or create new columns using the col()
function or simply by referencing them like df["column_name"]
.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("ColumnExample").getOrCreate()
data = [("Alice", 30), ("Bob", 25), ("Cathy", 40)]
df = spark.createDataFrame(data, ["name", "age"])
# Selecting a column
df.select("name").show()
# Using col() function
df.select(col("age") + 10).show()
+-----+ | name| +-----+ |Alice| | Bob| |Cathy| +-----+ +----------+ |(age + 10)| +----------+ | 40| | 35| | 50| +----------+
Question:
Why use col()
instead of just using column names directly?
Answer:
col()
gives you more control and is required when using functions like withColumn
, filter
, or performing expressions like addition, subtraction, etc.
Working with Expressions
Expressions are used to perform operations on columns. PySpark provides many built-in functions under pyspark.sql.functions
like upper()
, length()
, concat()
, when()
, and arithmetic operations.
from pyspark.sql.functions import upper, length, when
# Add new columns with expressions
df = df.withColumn("name_upper", upper(col("name")))
df = df.withColumn("name_length", length(col("name")))
# Conditional logic with when()
df = df.withColumn("age_group", when(col("age") < 30, "Young").otherwise("Senior"))
df.show()
+-----+---+----------+-----------+---------+ | name|age|name_upper|name_length|age_group| +-----+---+----------+-----------+---------+ |Alice| 30| ALICE| 5| Senior| | Bob| 25| BOB| 3| Young| |Cathy| 40| CATHY| 5| Senior| +-----+---+----------+-----------+---------+
Question:
Can we use multiple expressions in a single operation?
Answer:
Yes. You can chain multiple withColumn
calls or use select()
with multiple expressions inside it.
Creating User-Defined Functions (UDFs)
Sometimes, built-in functions are not enough. For custom logic, you can define your own Python functions and convert them into Spark UDFs using udf()
.
Example: Categorizing age
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Define Python function
def age_category(age):
if age < 18:
return "Minor"
elif age < 60:
return "Adult"
else:
return "Senior"
# Convert to Spark UDF
age_udf = udf(age_category, StringType())
# Use in DataFrame
df = df.withColumn("category", age_udf(col("age")))
df.select("name", "age", "category").show()
+-----+---+--------+ | name|age|category| +-----+---+--------+ |Alice| 30| Adult| | Bob| 25| Adult| |Cathy| 40| Adult| +-----+---+--------+
Question:
When should I use a UDF instead of built-in functions?
Answer:
Use a UDF when your transformation logic can't be expressed using existing Spark functions. For example, complex conditional logic, or custom string transformations.
Summary
- You can manipulate columns using
col()
,withColumn()
, andselect()
. - Expressions like
upper()
andwhen()
help apply transformations and conditions. - UDFs allow for custom Python logic when built-in functions fall short.
Mastering columns, expressions, and UDFs will help you manipulate and prepare your data effectively in PySpark.