3 Chapter 3: Shape Data
Early draft release: This chapter hasn’t been fully edited yet.
3.1 Introduction
In the previous chapters, we learned how to ingest and clean data in Microsoft Fabric. Now that we have our data loaded and cleansed, we’re ready to shape it into the structures we need for our analysis. Think of data shaping as a sculptor working with clay – we’re taking raw material and transforming it into something more useful and meaningful.
In Microsoft Fabric, shaping data with PySpark gives us powerful tools to transform, filter, join, and restructure our datasets. This chapter explores how to manipulate your data’s format and structure to prepare it for deeper analysis. We’ll also explore how to leverage AI capabilities native to Microsoft Fabric to enhance your data transformations.
Let’s begin by understanding the fundamental concepts of transformations and actions in PySpark, and then work through increasingly sophisticated data shaping techniques.
3.2 Transformations and Actions in PySpark
PySpark operations fall into two categories: transformations and actions. Understanding this distinction is crucial for efficient data processing in Microsoft Fabric.
3.2.1 Transformations vs. Actions
Transformations create a new DataFrame from an existing one but don’t execute computations immediately. They are “lazy” operations that build up a transformation plan. Examples include select()
, filter()
, and join()
.
Actions trigger the execution of transformations and return results. They initiate the actual computation. Examples include count()
, collect()
, and show()
.
This lazy evaluation pattern is key to PySpark’s efficiency in Microsoft Fabric, as it allows the engine to optimize the entire execution plan before running anything.
# Load sample sales data
= spark.read.format("csv") \
sales_df "header", "true") \
.option("inferSchema", "true") \
.option("Files/sales_data.csv")
.load(
# This is a transformation (lazy) - nothing happens yet
= sales_df.filter(sales_df.sale_amount > 150)
filtered_sales
# This is an action - now computation happens
print(f"Number of high-value sales: {filtered_sales.count()}")
# Let's see what our data looks like
5) filtered_sales.show(
3.2.2 Shaping Data Format
Let’s explore common operations for reshaping your DataFrame structure in Microsoft Fabric.
3.2.2.1 Selecting and Removing Columns
Selecting specific columns or excluding others helps focus your analysis on relevant data:
# Select specific columns
= sales_df.select("customer_id", "product_id", "date")
customer_sales
# Drop unwanted columns
= sales_df.drop("transaction_id", "store_location")
simplified_sales
# Show the results
print("Selected columns:")
3)
customer_sales.show(print("After dropping columns:")
3) simplified_sales.show(
3.2.2.2 Renaming Columns
Clear, descriptive column names make your data more understandable:
# Rename single column
= sales_df.withColumnRenamed("sale_amount", "sale_amount_without_tax")
renamed_df
# Rename multiple columns
from pyspark.sql.functions import col
# Method 1: Chain multiple withColumnRenamed calls
= sales_df \
renamed_multiple "sale_amount", "sale_amount_without_tax") \
.withColumnRenamed("date", "sale_date")
.withColumnRenamed(
# Method 2: Use select with alias for multiple columns
= sales_df.select(
renamed_select "customer_id"),
col("sale_amount").alias("sale_amount_without_tax"),
col("date").alias("sale_date"),
col("product_id")
col(
)
3) renamed_select.show(
3.2.2.3 Adding and Modifying Columns
Creating new columns based on existing data is a common transformation:
from pyspark.sql.functions import col, round, upper, current_date, datediff
# Add a new column
= sales_df.withColumn("amount_with_tax", round(col("sale_amount") * 1.08, 2))
sales_with_tax
# Convert string column to uppercase
= sales_df.withColumn("product_id", upper(col("product_id")))
sales_upper
# Calculate days since purchase (using Fabric's handling of date functions)
= sales_df.withColumn("days_since_purchase",
sales_with_age "date")))
datediff(current_date(), col(
# Show our modified DataFrame
3) sales_with_tax.show(
3.2.2.4 Changing Data Types
Ensuring columns have the correct data type is essential for accurate analysis:
from pyspark.sql.types import IntegerType, DateType
from pyspark.sql.functions import to_date
# Convert string to integer
= sales_df.withColumn("customer_id", col("customer_id").cast(IntegerType()))
sales_df
# Convert string to date
= sales_df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))
sales_df
# Check our schema after conversions
sales_df.printSchema()
3.3 Filtering, Sorting, and Aggregating Data
These operations help narrow down your dataset to focus on specific insights and patterns.
These concepts will be explored in details in the chapter 4 “Query Data”. Yet, as they’re essentials to shape data, here is a primer on the topic.
3.3.1 Filtering Data
Filtering allows you to work with subsets of your data that meet specific criteria:
from pyspark.sql.functions import year
# Basic filtering
= sales_df.filter(col("sale_amount") > 100)
high_value_sales
# Multiple conditions - customers who spent over $500 in 2024
= sales_df.filter(
sales_2023_high "date")) == 2024) &
(year(col("sale_amount") > 100)
(col(
)
# Using SQL expressions for filtering
= sales_df.filter("sale_amount > 100 AND year(date) = 2024")
sql_filtered
print("High-value sales from 2024:")
5) sales_2023_high.show(
3.3.2 Sorting Data
Sorting helps identify patterns, extremes, and facilitate analysis:
# Sort by a single column (ascending by default)
= sales_df.orderBy("sale_amount")
sorted_by_amount
# Sort by multiple columns (descending amount, ascending date)
= sales_df.orderBy(col("sale_amount").desc(), col("date"))
sorted_multiple
# Alternative syntax
= sales_df.sort(col("sale_amount").desc())
sorted_alt
print("Sales sorted by amount (highest first):")
5) sorted_by_amount.show(
3.3.3 Aggregating Data
Aggregation condenses your data into summary statistics and insights:
from pyspark.sql.functions import sum, avg, count, max, min, countDistinct
# Basic aggregations
= sales_df.agg(
summary_stats sum("sale_amount").alias("total_sales"),
"sale_amount").alias("average_sale"),
avg(max("sale_amount").alias("largest_sale"),
min("sale_amount").alias("smallest_sale"),
"*").alias("transaction_count"),
count("customer_id").alias("unique_customers")
countDistinct(
)
# Group by aggregations
= sales_df.groupBy("product_id").agg(
sales_by_product sum("sale_amount").alias("total_product_sales"),
"sale_amount").alias("avg_product_price"),
avg("*").alias("units_sold")
count("total_product_sales").desc())
).orderBy(col(
print("Sales summary statistics:")
summary_stats.show()
print("Top selling products:")
5) sales_by_product.show(
3.4 Joining and Merging Datasets
In Microsoft Fabric, you’ll often need to combine data from multiple sources for comprehensive analysis.
Thanks to Fabric’s shortcuts features and the ability to include multiple lakehouses within the same Notebook session, it’s quite easy to merge datasets coming from different sources.
3.4.1 Types of Joins
PySpark supports various join types to accommodate different data relationships:
# Let's create a sample customer dataset
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
= StructType([
customer_schema "customer_id", IntegerType(), False),
StructField("name", StringType(), True),
StructField("segment", StringType(), True)
StructField(
])
= [
customer_data 1, "Sarah Johnson", "Premium"),
(2, "Michael Chen", "Standard"),
(3, "Elena Rodriguez", "Premium"),
(4, "David Smith", "Standard")
(
]
= spark.createDataFrame(customer_data, customer_schema)
customer_df
# Inner join: only matches found in both datasets
= sales_df.join(customer_df, "customer_id", "inner")
inner_joined
# Left join: all records from sales, matching from customers
= sales_df.join(customer_df, "customer_id", "left")
left_joined
# Right join: all records from customers, matching from sales
= sales_df.join(customer_df, "customer_id", "right")
right_joined
# Full outer join: all records from both datasets
= sales_df.join(customer_df, "customer_id", "full")
full_joined
print("Inner join result (sales matched with customers):")
5)
inner_joined.show(
# When columns are named differently
# Let's assume we have a products DataFrame with 'prod_id' instead of 'product_id'
= spark.createDataFrame(
products_df 101, "Laptop", "Electronics"),
[(102, "Desk Chair", "Furniture")],
("prod_id", "product_name", "category"]
[
)
# Join on different column names
= sales_df.join(
sales_with_products
products_df,== products_df.prod_id,
sales_df.product_id "left"
).select("*"],
sales_df["product_name"],
products_df["category"]
products_df[
)
print("Sales with product details:")
5) sales_with_products.show(
3.4.2 Union Operations
When you need to combine datasets with identical schemas:
# Create a second sales dataset
= [
sales_data2 3, 103, "2024-01-07", "TXN100003", "San Francisco", 24.99),
(4, 104, "2024-01-10", "TXN100004", "Chicago", 124.95),
(5, 105, "2024-01-13", "TXN100005", "Miami", 33.98),
(6, 108, "2024-01-15", "TXN100006", "Seattle", 96.00)
(
]= spark.createDataFrame(sales_data2, ["customer_id", "product_id", "amount", "date"])
sales_df2
# Union the two datasets
= sales_df.union(sales_df2)
all_sales
print("Combined sales data:")
all_sales.show()
3.5 Advanced Data Reshaping Techniques
Now let’s explore more sophisticated reshaping operations available in Microsoft Fabric.
3.5.1 Pivoting and Unpivoting
Pivoting transforms rows into columns, while unpivoting does the reverse:
# Let's create a sample dataset for monthly sales
= [
monthly_data "Product A", "Jan", 1000),
("Product A", "Feb", 1200),
("Product A", "Mar", 900),
("Product B", "Jan", 800),
("Product B", "Feb", 950),
("Product B", "Mar", 1100)
(
]
= spark.createDataFrame(monthly_data, ["product", "month", "sales"])
monthly_df
# Pivot: transform months from rows to columns
= monthly_df.groupBy("product").pivot("month").sum("sales")
pivoted_df
print("Pivoted data (months as columns):")
pivoted_df.show()
# Unpivot: transform columns back to rows
from pyspark.sql.functions import expr
# First, let's create sample data with multiple columns
= spark.createDataFrame([
quarterly_sales "Product A", 1000, 1200, 900),
("Product B", 800, 950, 1100)
("product", "Q1", "Q2", "Q3"])
], [
# Unpivot using stack
= quarterly_sales.select(
unpivoted_df "product",
"stack(3, 'Q1', `Q1`, 'Q2', `Q2`, 'Q3', `Q3`) as (quarter, sales)")
expr(
)
print("Unpivoted data (quarters as rows):")
unpivoted_df.show()
3.5.2 Window Functions
Window functions perform calculations across a specified range of rows:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank, row_number, lag, lead, sum
# Define a window specification
= Window.partitionBy("product_id").orderBy(col("date"))
window_spec
# Calculate running total of sales by product
= sales_df.withColumn(
running_total "running_total",
sum("sale_amount").over(window_spec)
)
# Calculate the previous sale amount for each product
= sales_df.withColumn(
with_previous "previous_sale",
"sale_amount", 1).over(window_spec)
lag(
)
# Calculate rank of sales by amount within each product
= Window.partitionBy("product_id").orderBy(col("sale_amount").desc())
ranking_spec = sales_df.withColumn(
ranked_sales "sale_rank",
rank().over(ranking_spec)
)
print("Sales with running totals by product:")
5)
running_total.show(
print("Top-ranked sales by product:")
filter(col("sale_rank") <= 3).show(10) ranked_sales.
3.5.3 User-Defined Functions (UDFs)
UDFs allow you to apply custom transformations when built-in functions don’t meet your needs:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, FloatType
# Define a function to categorize sales
def categorize_sale(amount):
if amount < 30:
return "Small"
elif amount < 100:
return "Medium"
else:
return "Large"
# Register as UDF
= udf(categorize_sale, StringType())
categorize_udf
# Apply UDF to create a new column
= sales_df.withColumn("sale_category", categorize_udf(col("sale_amount")))
categorized_sales
# More complex UDF with multiple inputs
@udf(FloatType())
def calculate_discount(amount, customer_segment):
if customer_segment == "Premium":
return float(amount * 0.10) # 10% discount
else:
return float(amount * 0.05) # 5% discount
# Join and apply the UDF
= sales_df.join(customer_df, "customer_id", "left") \
sales_with_discount "discount_amount",
.withColumn("sale_amount"), col("segment")))
calculate_discount(col(
print("Sales with categories:")
5)
categorized_sales.show(
print("Sales with calculated discounts:")
5) sales_with_discount.show(
3.5.4 Working with Complex Data Types
PySpark handles complex data types effectively.
Working with array‐typed columns in Spark gives you a powerful way to model and manipulate “one-to-many” relationships without leaving the DataFrame API. Imagine each product can have zero or more tags, features or categories. Rather than maintaining a separate “product_tags” table and having to join every time, you can keep an array of tags right on the product row. This is extremely common in catalogues, taxonomies, or any place where you want to associate a variable-length list of attributes with an entity.
from pyspark.sql.functions import struct, array, explode, map_keys, map_values
# Creating and working with structs (nested structures)
= sales_df.join(customer_df, "customer_id") \
customer_detail "customer_info",
.withColumn("name"), col("segment")))
struct(col(
# Working with arrays
= spark.createDataFrame([
product_tags 101, ["electronics", "computer", "laptop"]),
(102, ["furniture", "office", "chair"])
("product_id", "tags"])
], [
# Explode arrays into rows
= product_tags.select(
exploded_tags "product_id",
"tags").alias("tag")
explode(
)
print("DataFrame with struct column:")
"customer_id", "sale_amount", "customer_info").show(5)
customer_detail.select(
print("Exploded array data:")
exploded_tags.show()
This is also useful when you’re parsing JSON nested structures. When you read a JSON file that has arrays in it, Spark will automatically infer ArrayType(...)
.
from pyspark.sql.functions import explode, col
= [
json_data # Order with two line-items
"""
{
"order_id": "1001",
"customer": {
"id": 1,
"name": "Alice"
},
"items": [
{ "sku": "A1", "qty": 2 },
{ "sku": "B3", "qty": 1 }
]
}
""",
# Order with one line-item
"""
{
"order_id": "1002",
"customer": {
"id": 2,
"name": "Bob"
},
"items": [
{ "sku": "C7", "qty": 5 }
]
}
"""
]
# Parallelize the JSON strings into an RDD and read as JSON
= spark.sparkContext.parallelize(json_data)
rdd = spark.read.json(rdd)
orders_df
# Inspect the inferred schema
orders_df.printSchema()
# Show the raw DataFrame
print("Raw nested DataFrame:")
=False)
orders_df.show(truncate
# Explode the `items` array so each line-item is its own row
= orders_df.select(
exploded "order_id"),
col("customer.id").alias("cust_id"),
col("customer.name").alias("cust_name"),
col("items")).alias("item")
explode(col(
)
# Select nested fields out of the exploded struct
= exploded.select(
flat "order_id",
"cust_id",
"cust_name",
"item.sku").alias("sku"),
col("item.qty").alias("quantity")
col(
)
print("Exploded & flattened orders:")
flat.show()
3.6 Using AI in Microsoft Fabric
Microsoft Fabric integrates powerful AI capabilities directly into your data processing workflows. One of the key features are AI Services in Fabric, a series of prebuilt and customizable models for tasks like language understanding, image recognition, and data analysis. These models runs within your Fabric Capacity, which means that you don’t need to deploy anything (in Azure) to use them. Let’s explore how to enhance your data transformations with these features.
3.6.1 Translating Text with Synapse ML
Microsoft Fabric includes Synapse ML, which provides easy access to translation services:
# First, let's import the required libraries
import synapse.ml.core
from synapse.ml.services import *
from pyspark.sql.functions import col, flatten
# Create sample data with text in different languages
= spark.createDataFrame([
multilingual_df 1, "Hello, how are you today?", "en"),
(2, "Bonjour, comment allez-vous aujourd'hui?", "fr"),
(3, "Hola, ¿cómo estás hoy?", "es")
("id", "text", "source_language"])
], [
# Initialize the translator - in Fabric, authentication is handled seamlessly
= (Translate()
translator "text")
.setTextCol("fr"])
.setToLanguage(["translation")
.setOutputCol(5))
.setConcurrency(
# Apply translation
= translator.transform(multilingual_df)\
result "translation_result", flatten(col("translation.translations")))\
.withColumn("text_translated", col("translation_result.text"))
.withColumn(
"id","text","source_language","text_translated")) display(result.select(
3.6.2 Extracting Insights with GPT
Microsoft Fabric allows you to leverage GPT models to extract structured insights from unstructured text. Here is an example where we ask GPT to extract not only the sentiment, but also the positive and negative aspects of product reviews.
As of writing, the OpenAI models available in AI Services for Fabric don’t support structured outputs. This may lead to inconsistencies on some runs.
from pyspark.sql import DataFrame
from pyspark.sql.functions import (
lit, array, struct, col, from_json, expr
)from pyspark.sql.types import (
StructType, StructField, StringType, ArrayType
)from synapse.ml.services.openai import OpenAIChatCompletion
# The original customer feedback
= spark.createDataFrame([
feedback_df 1, "The laptop works great but the battery life is disappointing. Screen quality is excellent!"),
(2, "Delivery was fast, but the product arrived damaged. Customer service was helpful."),
(3, "Great value for money, would definitely recommend to others.")
("id", "feedback"])
], [
# Helper method to build the `messages` column
def with_chat_messages(df: DataFrame, system_prompt: str) -> DataFrame:
"""
Adds a column `messages: Array<Struct{role,content,name}>`
with one system message and one user message per row.
"""
return df.withColumn(
"messages",
array(
struct("system").alias("role"),
lit("content"),
lit(system_prompt).alias("system").alias("name")
lit(
),
struct("user").alias("role"),
lit("feedback").alias("content"),
col("user").alias("name")
lit(
)
)
)
= """
system_prompt Extract sentiment and aspects from customer feedback.
Return a JSON with the following structure:
{
"sentiment": "positive|negative|mixed",
"positive_aspects": ["aspect1", "aspect2"],
"negative_aspects": ["aspect1", "aspect2"]
}
"""
= with_chat_messages(feedback_df, system_prompt)
msgs_df
# configure the OpenAI call
= (
chat
OpenAIChatCompletion()"gpt-35-turbo-0125")
.setDeploymentName("messages")
.setMessagesCol("error")
.setErrorCol("response")
.setOutputCol(
)
# Call the model
= chat.transform(msgs_df)
raw_df
# Pull out the generated JSON
= raw_df.withColumn(
with_json_str "json_str",
"response.choices[0].message.content")
expr(
)
# Define a schema matching the JSON you asked for
= StructType([
schema "sentiment", StringType(), True),
StructField("positive_aspects", ArrayType(StringType()), True),
StructField("negative_aspects", ArrayType(StringType()), True)
StructField(
])
# Parse the JSON string into columns
= with_json_str.withColumn(
parsed "parsed",
"json_str"), schema)
from_json(col(
)
# Flatten out the result
= parsed.select(
result_df "id"),
col("parsed.sentiment").alias("sentiment"),
col("parsed.positive_aspects").alias("positive_aspects"),
col("parsed.negative_aspects").alias("negative_aspects")
col(
)
=False) result_df.show(truncate
3.7 Chapter Summary
In this chapter, we explored various techniques for shaping data in Microsoft Fabric using PySpark:
We learned the fundamental distinction between transformations and actions, which is key to understanding PySpark’s execution model.
We covered essential data shaping operations:
- Selecting, removing, and renaming columns
- Adding and modifying columns
- Converting data types
We explored data manipulation techniques:
- Filtering data with conditions
- Sorting data in meaningful orders
- Aggregating data to derive insights
We learned different methods for joining and merging datasets:
- Various join types (inner, left, right, full)
- Union operations for combining similar datasets
We delved into advanced reshaping techniques:
- Pivoting and unpivoting data
- Window functions for relative calculations
- User-defined functions for custom transformations
- Working with complex data types
Finally, we leveraged Microsoft Fabric’s AI capabilities:
- Translating text with Synapse ML
- Extracting insights from unstructured data with GPT
In the next chapter, we’ll explore how to query this shaped data effectively to extract meaningful business insights.