3  Chapter 3: Shape Data

Note

Early draft release: This chapter hasn’t been fully edited yet.

3.1 Introduction

In the previous chapters, we learned how to ingest and clean data in Microsoft Fabric. Now that we have our data loaded and cleansed, we’re ready to shape it into the structures we need for our analysis. Think of data shaping as a sculptor working with clay – we’re taking raw material and transforming it into something more useful and meaningful.

In Microsoft Fabric, shaping data with PySpark gives us powerful tools to transform, filter, join, and restructure our datasets. This chapter explores how to manipulate your data’s format and structure to prepare it for deeper analysis. We’ll also explore how to leverage AI capabilities native to Microsoft Fabric to enhance your data transformations.

Let’s begin by understanding the fundamental concepts of transformations and actions in PySpark, and then work through increasingly sophisticated data shaping techniques.

3.2 Transformations and Actions in PySpark

PySpark operations fall into two categories: transformations and actions. Understanding this distinction is crucial for efficient data processing in Microsoft Fabric.

3.2.1 Transformations vs. Actions

Transformations create a new DataFrame from an existing one but don’t execute computations immediately. They are “lazy” operations that build up a transformation plan. Examples include select(), filter(), and join().

Actions trigger the execution of transformations and return results. They initiate the actual computation. Examples include count(), collect(), and show().

This lazy evaluation pattern is key to PySpark’s efficiency in Microsoft Fabric, as it allows the engine to optimize the entire execution plan before running anything.

# Load sample sales data
sales_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("Files/sales_data.csv")

# This is a transformation (lazy) - nothing happens yet
filtered_sales = sales_df.filter(sales_df.sale_amount > 150)

# This is an action - now computation happens
print(f"Number of high-value sales: {filtered_sales.count()}")

# Let's see what our data looks like
filtered_sales.show(5)

3.2.2 Shaping Data Format

Let’s explore common operations for reshaping your DataFrame structure in Microsoft Fabric.

3.2.2.1 Selecting and Removing Columns

Selecting specific columns or excluding others helps focus your analysis on relevant data:

# Select specific columns
customer_sales = sales_df.select("customer_id", "product_id", "date")

# Drop unwanted columns
simplified_sales = sales_df.drop("transaction_id", "store_location")

# Show the results
print("Selected columns:")
customer_sales.show(3)
print("After dropping columns:")
simplified_sales.show(3)

3.2.2.2 Renaming Columns

Clear, descriptive column names make your data more understandable:

# Rename single column
renamed_df = sales_df.withColumnRenamed("sale_amount", "sale_amount_without_tax")

# Rename multiple columns
from pyspark.sql.functions import col

# Method 1: Chain multiple withColumnRenamed calls
renamed_multiple = sales_df \
    .withColumnRenamed("sale_amount", "sale_amount_without_tax") \
    .withColumnRenamed("date", "sale_date")

# Method 2: Use select with alias for multiple columns
renamed_select = sales_df.select(
    col("customer_id"),
    col("sale_amount").alias("sale_amount_without_tax"),
    col("date").alias("sale_date"),
    col("product_id")
)

renamed_select.show(3)

3.2.2.3 Adding and Modifying Columns

Creating new columns based on existing data is a common transformation:

from pyspark.sql.functions import col, round, upper, current_date, datediff

# Add a new column
sales_with_tax = sales_df.withColumn("amount_with_tax", round(col("sale_amount") * 1.08, 2))

# Convert string column to uppercase
sales_upper = sales_df.withColumn("product_id", upper(col("product_id")))

# Calculate days since purchase (using Fabric's handling of date functions)
sales_with_age = sales_df.withColumn("days_since_purchase", 
                                     datediff(current_date(), col("date")))

# Show our modified DataFrame
sales_with_tax.show(3)

3.2.2.4 Changing Data Types

Ensuring columns have the correct data type is essential for accurate analysis:

from pyspark.sql.types import IntegerType, DateType
from pyspark.sql.functions import to_date

# Convert string to integer
sales_df = sales_df.withColumn("customer_id", col("customer_id").cast(IntegerType()))

# Convert string to date
sales_df = sales_df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

# Check our schema after conversions
sales_df.printSchema()

3.3 Filtering, Sorting, and Aggregating Data

These operations help narrow down your dataset to focus on specific insights and patterns.

These concepts will be explored in details in the chapter 4 “Query Data”. Yet, as they’re essentials to shape data, here is a primer on the topic.

3.3.1 Filtering Data

Filtering allows you to work with subsets of your data that meet specific criteria:

from pyspark.sql.functions import year

# Basic filtering
high_value_sales = sales_df.filter(col("sale_amount") > 100)

# Multiple conditions - customers who spent over $500 in 2024
sales_2023_high = sales_df.filter(
    (year(col("date")) == 2024) & 
    (col("sale_amount") > 100)
)

# Using SQL expressions for filtering
sql_filtered = sales_df.filter("sale_amount > 100 AND year(date) = 2024")

print("High-value sales from 2024:")
sales_2023_high.show(5)

3.3.2 Sorting Data

Sorting helps identify patterns, extremes, and facilitate analysis:

# Sort by a single column (ascending by default)
sorted_by_amount = sales_df.orderBy("sale_amount")

# Sort by multiple columns (descending amount, ascending date)
sorted_multiple = sales_df.orderBy(col("sale_amount").desc(), col("date"))

# Alternative syntax
sorted_alt = sales_df.sort(col("sale_amount").desc())

print("Sales sorted by amount (highest first):")
sorted_by_amount.show(5)

3.3.3 Aggregating Data

Aggregation condenses your data into summary statistics and insights:

from pyspark.sql.functions import sum, avg, count, max, min, countDistinct

# Basic aggregations
summary_stats = sales_df.agg(
    sum("sale_amount").alias("total_sales"),
    avg("sale_amount").alias("average_sale"),
    max("sale_amount").alias("largest_sale"),
    min("sale_amount").alias("smallest_sale"),
    count("*").alias("transaction_count"),
    countDistinct("customer_id").alias("unique_customers")
)

# Group by aggregations
sales_by_product = sales_df.groupBy("product_id").agg(
    sum("sale_amount").alias("total_product_sales"),
    avg("sale_amount").alias("avg_product_price"),
    count("*").alias("units_sold")
).orderBy(col("total_product_sales").desc())

print("Sales summary statistics:")
summary_stats.show()

print("Top selling products:")
sales_by_product.show(5)

3.4 Joining and Merging Datasets

In Microsoft Fabric, you’ll often need to combine data from multiple sources for comprehensive analysis.

Tip

Thanks to Fabric’s shortcuts features and the ability to include multiple lakehouses within the same Notebook session, it’s quite easy to merge datasets coming from different sources.

3.4.1 Types of Joins

PySpark supports various join types to accommodate different data relationships:

# Let's create a sample customer dataset
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

customer_schema = StructType([
    StructField("customer_id", IntegerType(), False),
    StructField("name", StringType(), True),
    StructField("segment", StringType(), True)
])

customer_data = [
    (1, "Sarah Johnson", "Premium"),
    (2, "Michael Chen", "Standard"),
    (3, "Elena Rodriguez", "Premium"),
    (4, "David Smith", "Standard")
]

customer_df = spark.createDataFrame(customer_data, customer_schema)

# Inner join: only matches found in both datasets
inner_joined = sales_df.join(customer_df, "customer_id", "inner")

# Left join: all records from sales, matching from customers
left_joined = sales_df.join(customer_df, "customer_id", "left")

# Right join: all records from customers, matching from sales
right_joined = sales_df.join(customer_df, "customer_id", "right")

# Full outer join: all records from both datasets
full_joined = sales_df.join(customer_df, "customer_id", "full")

print("Inner join result (sales matched with customers):")
inner_joined.show(5)

# When columns are named differently
# Let's assume we have a products DataFrame with 'prod_id' instead of 'product_id'
products_df = spark.createDataFrame(
    [(101, "Laptop", "Electronics"), 
     (102, "Desk Chair", "Furniture")],
    ["prod_id", "product_name", "category"]
)

# Join on different column names
sales_with_products = sales_df.join(
    products_df,
    sales_df.product_id == products_df.prod_id,
    "left"
).select(
    sales_df["*"], 
    products_df["product_name"], 
    products_df["category"]
)

print("Sales with product details:")
sales_with_products.show(5)

3.4.2 Union Operations

When you need to combine datasets with identical schemas:

# Create a second sales dataset
sales_data2 = [
    (3, 103, "2024-01-07", "TXN100003", "San Francisco", 24.99),
    (4, 104, "2024-01-10", "TXN100004", "Chicago", 124.95),
    (5, 105, "2024-01-13", "TXN100005", "Miami", 33.98),
    (6, 108, "2024-01-15", "TXN100006", "Seattle", 96.00)
]
sales_df2 = spark.createDataFrame(sales_data2, ["customer_id", "product_id", "amount", "date"])

# Union the two datasets
all_sales = sales_df.union(sales_df2)

print("Combined sales data:")
all_sales.show()

3.5 Advanced Data Reshaping Techniques

Now let’s explore more sophisticated reshaping operations available in Microsoft Fabric.

3.5.1 Pivoting and Unpivoting

Pivoting transforms rows into columns, while unpivoting does the reverse:

# Let's create a sample dataset for monthly sales
monthly_data = [
    ("Product A", "Jan", 1000),
    ("Product A", "Feb", 1200),
    ("Product A", "Mar", 900),
    ("Product B", "Jan", 800),
    ("Product B", "Feb", 950),
    ("Product B", "Mar", 1100)
]

monthly_df = spark.createDataFrame(monthly_data, ["product", "month", "sales"])

# Pivot: transform months from rows to columns
pivoted_df = monthly_df.groupBy("product").pivot("month").sum("sales")

print("Pivoted data (months as columns):")
pivoted_df.show()

# Unpivot: transform columns back to rows
from pyspark.sql.functions import expr

# First, let's create sample data with multiple columns
quarterly_sales = spark.createDataFrame([
    ("Product A", 1000, 1200, 900),
    ("Product B", 800, 950, 1100)
], ["product", "Q1", "Q2", "Q3"])

# Unpivot using stack
unpivoted_df = quarterly_sales.select(
    "product",
    expr("stack(3, 'Q1', `Q1`, 'Q2', `Q2`, 'Q3', `Q3`) as (quarter, sales)")
)

print("Unpivoted data (quarters as rows):")
unpivoted_df.show()

3.5.2 Window Functions

Window functions perform calculations across a specified range of rows:

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank, row_number, lag, lead, sum

# Define a window specification
window_spec = Window.partitionBy("product_id").orderBy(col("date"))

# Calculate running total of sales by product
running_total = sales_df.withColumn(
    "running_total",
    sum("sale_amount").over(window_spec)
)

# Calculate the previous sale amount for each product
with_previous = sales_df.withColumn(
    "previous_sale",
    lag("sale_amount", 1).over(window_spec)
)

# Calculate rank of sales by amount within each product
ranking_spec = Window.partitionBy("product_id").orderBy(col("sale_amount").desc())
ranked_sales = sales_df.withColumn(
    "sale_rank",
    rank().over(ranking_spec)
)

print("Sales with running totals by product:")
running_total.show(5)

print("Top-ranked sales by product:")
ranked_sales.filter(col("sale_rank") <= 3).show(10)

3.5.3 User-Defined Functions (UDFs)

UDFs allow you to apply custom transformations when built-in functions don’t meet your needs:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, FloatType

# Define a function to categorize sales
def categorize_sale(amount):
    if amount < 30:
        return "Small"
    elif amount < 100:
        return "Medium"
    else:
        return "Large"

# Register as UDF
categorize_udf = udf(categorize_sale, StringType())

# Apply UDF to create a new column
categorized_sales = sales_df.withColumn("sale_category", categorize_udf(col("sale_amount")))

# More complex UDF with multiple inputs
@udf(FloatType())
def calculate_discount(amount, customer_segment):
    if customer_segment == "Premium":
        return float(amount * 0.10)  # 10% discount
    else:
        return float(amount * 0.05)  # 5% discount

# Join and apply the UDF
sales_with_discount = sales_df.join(customer_df, "customer_id", "left") \
    .withColumn("discount_amount", 
                calculate_discount(col("sale_amount"), col("segment")))

print("Sales with categories:")
categorized_sales.show(5)

print("Sales with calculated discounts:")
sales_with_discount.show(5)

3.5.4 Working with Complex Data Types

PySpark handles complex data types effectively.

Working with array‐typed columns in Spark gives you a powerful way to model and manipulate “one-to-many” relationships without leaving the DataFrame API. Imagine each product can have zero or more tags, features or categories. Rather than maintaining a separate “product_tags” table and having to join every time, you can keep an array of tags right on the product row. This is extremely common in catalogues, taxonomies, or any place where you want to associate a variable-length list of attributes with an entity.

from pyspark.sql.functions import struct, array, explode, map_keys, map_values

# Creating and working with structs (nested structures)
customer_detail = sales_df.join(customer_df, "customer_id") \
    .withColumn("customer_info", 
                struct(col("name"), col("segment")))

# Working with arrays
product_tags = spark.createDataFrame([
    (101, ["electronics", "computer", "laptop"]),
    (102, ["furniture", "office", "chair"])
], ["product_id", "tags"])

# Explode arrays into rows
exploded_tags = product_tags.select(
    "product_id", 
    explode("tags").alias("tag")
)

print("DataFrame with struct column:")
customer_detail.select("customer_id", "sale_amount", "customer_info").show(5)

print("Exploded array data:")
exploded_tags.show()

This is also useful when you’re parsing JSON nested structures. When you read a JSON file that has arrays in it, Spark will automatically infer ArrayType(...).

from pyspark.sql.functions import explode, col

json_data = [
    # Order with two line-items
    """
    {
      "order_id": "1001",
      "customer": {
        "id": 1,
        "name": "Alice"
      },
      "items": [
        { "sku": "A1", "qty": 2 },
        { "sku": "B3", "qty": 1 }
      ]
    }
    """,
    # Order with one line-item
    """
    {
      "order_id": "1002",
      "customer": {
        "id": 2,
        "name": "Bob"
      },
      "items": [
        { "sku": "C7", "qty": 5 }
      ]
    }
    """
]

# Parallelize the JSON strings into an RDD and read as JSON
rdd = spark.sparkContext.parallelize(json_data)
orders_df = spark.read.json(rdd)

# Inspect the inferred schema
orders_df.printSchema()

# Show the raw DataFrame
print("Raw nested DataFrame:")
orders_df.show(truncate=False)

# Explode the `items` array so each line-item is its own row
exploded = orders_df.select(
    col("order_id"),
    col("customer.id").alias("cust_id"),
    col("customer.name").alias("cust_name"),
    explode(col("items")).alias("item")
)

# Select nested fields out of the exploded struct
flat = exploded.select(
    "order_id",
    "cust_id",
    "cust_name",
    col("item.sku").alias("sku"),
    col("item.qty").alias("quantity")
)

print("Exploded & flattened orders:")
flat.show()

3.6 Using AI in Microsoft Fabric

Microsoft Fabric integrates powerful AI capabilities directly into your data processing workflows. One of the key features are AI Services in Fabric, a series of prebuilt and customizable models for tasks like language understanding, image recognition, and data analysis. These models runs within your Fabric Capacity, which means that you don’t need to deploy anything (in Azure) to use them. Let’s explore how to enhance your data transformations with these features.

3.6.1 Translating Text with Synapse ML

Microsoft Fabric includes Synapse ML, which provides easy access to translation services:

# First, let's import the required libraries
import synapse.ml.core
from synapse.ml.services import *
from pyspark.sql.functions import col, flatten

# Create sample data with text in different languages
multilingual_df = spark.createDataFrame([
    (1, "Hello, how are you today?", "en"),
    (2, "Bonjour, comment allez-vous aujourd'hui?", "fr"),
    (3, "Hola, ¿cómo estás hoy?", "es")
], ["id", "text", "source_language"])

# Initialize the translator - in Fabric, authentication is handled seamlessly
translator = (Translate()
    .setTextCol("text")
    .setToLanguage(["fr"])
    .setOutputCol("translation")
    .setConcurrency(5))

# Apply translation
result = translator.transform(multilingual_df)\
        .withColumn("translation_result", flatten(col("translation.translations")))\
        .withColumn("text_translated", col("translation_result.text"))

display(result.select("id","text","source_language","text_translated"))

3.6.2 Extracting Insights with GPT

Microsoft Fabric allows you to leverage GPT models to extract structured insights from unstructured text. Here is an example where we ask GPT to extract not only the sentiment, but also the positive and negative aspects of product reviews.

Note

As of writing, the OpenAI models available in AI Services for Fabric don’t support structured outputs. This may lead to inconsistencies on some runs.

from pyspark.sql import DataFrame
from pyspark.sql.functions import (
    lit, array, struct, col, from_json, expr
)
from pyspark.sql.types import (
    StructType, StructField, StringType, ArrayType
)
from synapse.ml.services.openai import OpenAIChatCompletion

# The original customer feedback
feedback_df = spark.createDataFrame([
    (1, "The laptop works great but the battery life is disappointing. Screen quality is excellent!"),
    (2, "Delivery was fast, but the product arrived damaged. Customer service was helpful."),
    (3, "Great value for money, would definitely recommend to others.")
], ["id", "feedback"])

# Helper method to build the `messages` column
def with_chat_messages(df: DataFrame, system_prompt: str) -> DataFrame:
    """
    Adds a column `messages: Array<Struct{role,content,name}>` 
    with one system message and one user message per row.
    """
    return df.withColumn(
        "messages",
        array(
            struct(
                lit("system").alias("role"),
                lit(system_prompt).alias("content"),
                lit("system").alias("name")
            ),
            struct(
                lit("user").alias("role"),
                col("feedback").alias("content"),
                lit("user").alias("name")
            )
        )
    )

system_prompt = """
Extract sentiment and aspects from customer feedback.
Return a JSON with the following structure:
{
  "sentiment": "positive|negative|mixed",
  "positive_aspects": ["aspect1", "aspect2"],
  "negative_aspects": ["aspect1", "aspect2"]
}
"""

msgs_df = with_chat_messages(feedback_df, system_prompt)

# configure the OpenAI call
chat = (
    OpenAIChatCompletion()
      .setDeploymentName("gpt-35-turbo-0125")
      .setMessagesCol("messages")
      .setErrorCol("error")
      .setOutputCol("response")
)

# Call the model
raw_df = chat.transform(msgs_df)

# Pull out the generated JSON
with_json_str = raw_df.withColumn(
    "json_str",
    expr("response.choices[0].message.content")
)

# Define a schema matching the JSON you asked for
schema = StructType([
    StructField("sentiment",        StringType(),             True),
    StructField("positive_aspects", ArrayType(StringType()),  True),
    StructField("negative_aspects", ArrayType(StringType()),  True)
])

# Parse the JSON string into columns
parsed = with_json_str.withColumn(
    "parsed",
    from_json(col("json_str"), schema)
)

# Flatten out the result
result_df = parsed.select(
    col("id"),
    col("parsed.sentiment").alias("sentiment"),
    col("parsed.positive_aspects").alias("positive_aspects"),
    col("parsed.negative_aspects").alias("negative_aspects")
)

result_df.show(truncate=False)

3.7 Chapter Summary

In this chapter, we explored various techniques for shaping data in Microsoft Fabric using PySpark:

  1. We learned the fundamental distinction between transformations and actions, which is key to understanding PySpark’s execution model.

  2. We covered essential data shaping operations:

    • Selecting, removing, and renaming columns
    • Adding and modifying columns
    • Converting data types
  3. We explored data manipulation techniques:

    • Filtering data with conditions
    • Sorting data in meaningful orders
    • Aggregating data to derive insights
  4. We learned different methods for joining and merging datasets:

    • Various join types (inner, left, right, full)
    • Union operations for combining similar datasets
  5. We delved into advanced reshaping techniques:

    • Pivoting and unpivoting data
    • Window functions for relative calculations
    • User-defined functions for custom transformations
    • Working with complex data types
  6. Finally, we leveraged Microsoft Fabric’s AI capabilities:

    • Translating text with Synapse ML
    • Extracting insights from unstructured data with GPT

In the next chapter, we’ll explore how to query this shaped data effectively to extract meaningful business insights.

Back to top