Apache Spark CourseApache Spark Course1

Module 12: Project – Real-World Data PipelineModule 12: Project – Real-World Data Pipeline1

Working with Columns, Expressions, and User-Defined Functions in PySpark



Working with Columns, Expressions, and User-Defined Functions in PySpark

In PySpark, data is often stored in the form of DataFrames. To extract insights and manipulate the data, we need to work with columns, expressions, and sometimes define our own logic using user-defined functions (UDFs).

Understanding Columns in PySpark

Think of a DataFrame as a table and columns as the individual fields like "name", "age", or "salary". You can access, modify, or create new columns using the col() function or simply by referencing them like df["column_name"].


from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("ColumnExample").getOrCreate()

data = [("Alice", 30), ("Bob", 25), ("Cathy", 40)]
df = spark.createDataFrame(data, ["name", "age"])

# Selecting a column
df.select("name").show()

# Using col() function
df.select(col("age") + 10).show()
    
+-----+
| name|
+-----+
|Alice|
|  Bob|
|Cathy|
+-----+

+----------+
|(age + 10)|
+----------+
|        40|
|        35|
|        50|
+----------+
    

Question:

Why use col() instead of just using column names directly?

Answer:

col() gives you more control and is required when using functions like withColumn, filter, or performing expressions like addition, subtraction, etc.

Working with Expressions

Expressions are used to perform operations on columns. PySpark provides many built-in functions under pyspark.sql.functions like upper(), length(), concat(), when(), and arithmetic operations.


from pyspark.sql.functions import upper, length, when

# Add new columns with expressions
df = df.withColumn("name_upper", upper(col("name")))
df = df.withColumn("name_length", length(col("name")))

# Conditional logic with when()
df = df.withColumn("age_group", when(col("age") < 30, "Young").otherwise("Senior"))

df.show()
    
+-----+---+----------+-----------+---------+
| name|age|name_upper|name_length|age_group|
+-----+---+----------+-----------+---------+
|Alice| 30|     ALICE|          5|   Senior|
|  Bob| 25|       BOB|          3|    Young|
|Cathy| 40|     CATHY|          5|   Senior|
+-----+---+----------+-----------+---------+
    

Question:

Can we use multiple expressions in a single operation?

Answer:

Yes. You can chain multiple withColumn calls or use select() with multiple expressions inside it.

Creating User-Defined Functions (UDFs)

Sometimes, built-in functions are not enough. For custom logic, you can define your own Python functions and convert them into Spark UDFs using udf().

Example: Categorizing age


from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define Python function
def age_category(age):
    if age < 18:
        return "Minor"
    elif age < 60:
        return "Adult"
    else:
        return "Senior"

# Convert to Spark UDF
age_udf = udf(age_category, StringType())

# Use in DataFrame
df = df.withColumn("category", age_udf(col("age")))
df.select("name", "age", "category").show()
    
+-----+---+--------+
| name|age|category|
+-----+---+--------+
|Alice| 30|   Adult|
|  Bob| 25|   Adult|
|Cathy| 40|   Adult|
+-----+---+--------+
    

Question:

When should I use a UDF instead of built-in functions?

Answer:

Use a UDF when your transformation logic can't be expressed using existing Spark functions. For example, complex conditional logic, or custom string transformations.

Summary

Mastering columns, expressions, and UDFs will help you manipulate and prepare your data effectively in PySpark.



Welcome to ProgramGuru

Sign up to start your journey with us

Support ProgramGuru.org

Mention your name, and programguru.org in the message. Your name shall be displayed in the sponsers list.

PayPal

UPI

PhonePe QR

MALLIKARJUNA M