⬅ Previous Topic
Using SQL Queries on DataFrames in SparkNext Topic ⮕
Optimization using Catalyst in Apache Spark⬅ Previous Topic
Using SQL Queries on DataFrames in SparkNext Topic ⮕
Optimization using Catalyst in Apache SparkIn data analysis, aggregations and joins are two of the most commonly used operations. Spark SQL makes these tasks simple and efficient, even when working with very large datasets. In this lesson, you'll learn how to perform common aggregation functions and join operations using PySpark.
Aggregations allow us to summarize data. For example, we can calculate totals, averages, minimums, and maximums across a dataset. In Spark SQL, we can use built-in functions like groupBy
, agg
, count
, avg
, sum
, min
, and max
.
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, avg, count
# Start a Spark session
spark = SparkSession.builder.appName("AggregationsExample").getOrCreate()
# Sample data
data = [
("North", "Product A", 100),
("North", "Product B", 200),
("South", "Product A", 150),
("South", "Product B", 50),
("East", "Product C", 300)
]
columns = ["region", "product", "sales"]
df = spark.createDataFrame(data, columns)
# Group by region and calculate total sales
agg_df = df.groupBy("region").agg(sum("sales").alias("total_sales"))
agg_df.show()
+------+-----------+ |region|total_sales| +------+-----------+ | North| 300| | South| 200| | East| 300| +------+-----------+
Why do we use alias("total_sales")
in the aggregation?
By default, the aggregated column would have a name like sum(sales)
. The alias()
function renames it to a more readable column name like total_sales
.
We can calculate more than one aggregation at a time.
agg_df = df.groupBy("region").agg(
sum("sales").alias("total_sales"),
avg("sales").alias("average_sales"),
count("product").alias("product_count")
)
agg_df.show()
+------+-----------+-------------+-------------+ |region|total_sales|average_sales|product_count| +------+-----------+-------------+-------------+ | North| 300| 150.0| 2| | South| 200| 100.0| 2| | East| 300| 300.0| 1| +------+-----------+-------------+-------------+
Joins allow you to combine two DataFrames based on a common column (similar to SQL joins). This is useful when your data is split across multiple sources or tables.
# Sales data
sales_data = [
("North", "Product A", 100),
("South", "Product B", 50),
("East", "Product C", 300)
]
sales_columns = ["region", "product", "sales"]
# Manager data
manager_data = [
("North", "Alice"),
("South", "Bob"),
("West", "Charlie")
]
manager_columns = ["region", "manager"]
sales_df = spark.createDataFrame(sales_data, sales_columns)
manager_df = spark.createDataFrame(manager_data, manager_columns)
# Perform inner join
joined_df = sales_df.join(manager_df, on="region", how="inner")
joined_df.show()
+------+---------+-----+-------+ |region| product |sales|manager| +------+---------+-----+-------+ | North|Product A| 100| Alice| | South|Product B| 50| Bob| +------+---------+-----+-------+
Why is "East" region not included in the result?
Because it's an inner join, only regions that exist in both DataFrames are included. The "East" region doesn’t have a corresponding manager entry.
left_join_df = sales_df.join(manager_df, on="region", how="left")
left_join_df.show()
+------+---------+-----+-------+ |region| product |sales|manager| +------+---------+-----+-------+ | North|Product A| 100| Alice| | South|Product B| 50| Bob| | East|Product C| 300| null| +------+---------+-----+-------+
Notice how the "East" row is still present, but the manager column is null. That’s the behavior of a left join — all rows from the left DataFrame are kept.
groupBy
with aggregation functions to summarize data in Spark SQL.⬅ Previous Topic
Using SQL Queries on DataFrames in SparkNext Topic ⮕
Optimization using Catalyst in Apache SparkYou can support this website with a contribution of your choice.
When making a contribution, mention your name, and programguru.org in the message. Your name shall be displayed in the sponsors list.