4  Chapter 4: Query Data

Note

Early draft release: This chapter hasn’t been fully written yet.

In the previous chapters, we’ve learned how to ingest, clean, and shape data within Microsoft Fabric. Now we’re ready to explore how to effectively query this data using PySpark. Querying data is where the true power of Spark becomes apparent, allowing us to filter, transform, and extract insights from our datasets efficiently at scale.

Microsoft Fabric provides an optimized environment for PySpark queries, with several Fabric-specific performance enhancements that we’ll explore in this chapter. Let’s dive into the various techniques available for querying data in PySpark within the Microsoft Fabric ecosystem.

Setup the environment

All code examples in this chapter assumes that you have a “Sales” table, loaded fom the sales_data.csv file. Please refer to Chapter 1 for instructions on how to set up the environment and load the data.

4.1 Query Data with DataFrame

PySpark DataFrames are the fundamental structure for working with structured data. Within Microsoft Fabric, DataFrames are optimized to work with the Fabric distributed storage and compute architecture.

Let’s start with a simple example using the Lakehouse architecture in Microsoft Fabric:

# Import necessary libraries
from pyspark.sql.functions import *

# Load some sample data from a Lakehouse table
sales_df = spark.read.table("sales")

# Basic filtering operation
filtered_sales = sales_df.filter(sales_df.sale_amount > 100)

# Display the result
display(filtered_sales)

In Microsoft Fabric notebooks, you can use display() instead of show() to get a richer visualization of your results. This is one of several Fabric-specific enhancements to the PySpark experience.

Let’s look at some common DataFrame operations:

# Filter with a SQL-like expression
high_value_sales = sales_df.filter("sale_amount > 100 AND store_location = 'Houston'")

# Select specific columns
result = sales_df.select("date", "product_id", "sale_amount", "customer_id")

# Combine operations
final_result = sales_df.filter(sales_df.sale_amount > 100) \
                      .select("date", "product_id", "sale_amount", "customer_id") \
                      .filter(sales_df.date > "2023-01-01")

4.2 View Only the First Rows or Few Columns

When working with large datasets in Microsoft Fabric, it’s often useful to examine just a portion of the data to understand its structure and content.

# Display the first 5 rows (default is 20)
sales_df.show(5)

# Display the first 5 rows with more of each column visible
sales_df.show(5, truncate=False)

# In Microsoft Fabric notebooks, display() gives a richer visualization
display(sales_df.limit(5))

# Get just the first row as a Python object
first_row = sales_df.first()
print(first_row)

# Select only a few columns
sales_df.select("date", "product_id", "sale_amount").show(5)

Microsoft Fabric enhances the PySpark experience by providing optimized display functions and visualizations directly in the notebook interface, making data exploration more intuitive.

All these methods are great to peek at a portion of the beginning of your DataFrame. However, if you want to see the end of your DataFrame, you can use the tail() method. This could be useful for checking the last few entries in your dataset, especially if you’re dealing with time-series data or want to verify the most recent records.

# Display the last 5 rows of the DataFrame
last_rows = sales_df.tail(5)
display(last_rows)

When you are working with large datasets, it’s frequent to only work on a subset of the data while developing your queries. The methods we just covered allow you to do that. You might also want to sample a random subset of the data instead. This is particularly useful in scenarios where you want to test your queries on a smaller dataset before running them on the full dataset. Using a sample random may be better than using limit() because it gives you a more representative subset of your data, especially if the data is not uniformly distributed.

# Sample a random fraction of the DataFrame
sampled_df = sales_df.sample(fraction=0.1, seed=42)  # 10% sample
# Display the sampled DataFrame
display(sampled_df)
# Sample a specific number of rows
sampled_rows = sales_df.sample(withReplacement=False, fraction=0.1, seed=42).limit(5)
display(sampled_rows)

The sample() method allows you to specify a few parameters:

  • a fraction of the data to sample.
  • a seed for reproducibility. If the seed is not specified (seed), you will get a different sample each time you run the code. If you specify a seed, you will get the same sample every time you run the code.
  • whether to sample with replacement (default is False). Think of it like drawing balls from a bag: if you sample with replacement, you can draw the same ball multiple times; if you sample without replacement, each ball can only be drawn once.

4.3 Sort and Order

Sorting data is a fundamental operation when querying datasets. PySpark provides several ways to sort your data:

# Sort by a single column in ascending order
sorted_sales = sales_df.sort("sale_amount")

# Sort by a single column in descending order
sorted_sales = sales_df.sort(sales_df.sale_amount.desc())

# Sort by multiple columns
sorted_sales = sales_df.sort("store_location", col("sale_amount").desc())

# Alternative syntax using orderBy
sorted_sales = sales_df.orderBy(col("store_location"), col("sale_amount").desc())

# For clearer code, you can use the asc() function explicitly
from pyspark.sql.functions import asc, desc
sorted_sales = sales_df.orderBy(asc("store_location"), desc("sale_amount"))

display(sorted_sales)

4.3.1 Some performance considerations

Sorting can be an expensive operation, especially on large datasets. All the operations we cover in this chapter can be caterogized as either narrow or wide transformations. Narrow transformations do not require shuffling data across partitions. That means that each cluster node can process its own partition independently. Wide transformations, on the other hand, require shuffling data across partitions (and most likely across the network).

The first advice is to place sorting operations as late as possible in your query chain. This way, you minimize the amount of data that needs to be sorted. For example, if you can filter your data before sorting:

# Filter before sorting to reduce the amount of data to sort
filtered_sorted_sales = sales_df.filter(sales_df.sale_amount > 100) \
                                 .sort("sale_amount")
display(filtered_sorted_sales)

You can also leverage partitioning to optimize sorting operations. If you sort data from a partitioned Delta table, PySpark can take advantage of the partitioning scheme to perform the sort operation more efficiently. For example, if your data is partitioned by store_location, you can sort within each partition:

df_partitioned = sales_df.repartition("store_location")
display(df_partitioned.sortWithinPartitions("sale_amount"))

Note: Physical (delta table) partitioning may not be preserved by the DataFrame’s in-memory partitioning. You can repartition the data before sorting, but keep in mind that repartitioning is a wide transformation, so it can be expensive. Use it judiciously.

The last piece of advice we could give is to be aware of the lazy evaluation model of PySpark. Sorting operations are not executed until an action is called (like show(), count(), or write()). This means you can easily end up with code that will run a wide transformation (like sorting) multiple times if you’re not careful. To avoid this, you can cache the result of a sort operation if you plan to use it multiple times:

sorted_sales = sales_df.sort("sale_amount")
sorted_sales.cache() # Now the sorted data is cached in memory
# Use the cached sorted data multiple times
sorted_sales.show(10)
sorted_sales.count()  # This will not trigger a re-sort
display(sorted_sales.filter("sale_amount > 200")) 
sorted_sales.unpersist()  # When done, unpersist to free memory

4.4 Aggregations

Aggregations are essential for summarizing and analyzing data. PySpark provides comprehensive aggregation capabilities:

from pyspark.sql.functions import sum, avg, count, countDistinct, max, min

# Basic aggregations
summary = sales_df.groupBy("store_location").agg(
    sum("sale_amount").alias("total_sales"),
    avg("sale_amount").alias("average_sale"),
    count("*").alias("number_of_sales"),
    countDistinct("customer_id").alias("unique_customers"),
    max("sale_amount").alias("largest_sale"),
    min("sale_amount").alias("smallest_sale")
)

display(summary)

# Grouping by multiple columns
product_location_summary = sales_df.groupBy("product_id", "store_location").agg(
    sum("sale_amount").alias("total_sales")
)

display(product_location_summary)

# Filtering after aggregation
high_volume_stores = summary.filter(col("number_of_sales") > 100)
display(high_volume_stores)

# Pivot tables
pivot_table = sales_df.groupBy("product_id").pivot("store_location").sum("sale_amount")
display(pivot_table)

4.5 Query Plan and Performance Evaluation

Understanding how PySpark executes your queries is crucial for optimizing performance. Microsoft Fabric provides enhanced tools for examining query plans and evaluating performance:

# Examine the logical and physical plans for a query
query = sales_df.filter(col("store_location") == "Houston") \
                .groupBy("product_id") \
                .agg(sum("sale_amount").alias("total_sales")) \
                .filter(col("total_sales") > 10000)

# Display the execution plan
query.explain(extended=True)

When you run .explain(), you’ll see several plans: 1. The Parsed Logical Plan - the initial interpretation of your query 2. The Analyzed Logical Plan - after resolving references 3. The Optimized Logical Plan - after applying optimization rules 4. The Physical Plan - the actual execution strategy

Microsoft Fabric enhances these plans with Fabric-specific optimizations, such as integration with OneLake storage and Fabric’s distributed compute.

4.5.1 Predicate Pushdown

Predicate pushdown is a critical optimization where filtering conditions are “pushed down” to the data source, reducing the amount of data that needs to be processed.

# Example demonstrating predicate pushdown
# Notice in the explain plan how the filter condition is pushed to the file scan
filtered_data = sales_df.filter(col("store_location") == "Seattle").select("date", "sale_amount")
filtered_data.explain()

In Microsoft Fabric, predicate pushdown is optimized for OneLake storage, allowing filters to be applied directly at the storage layer for improved performance.

When you examine the physical plan output, look for these key indicators:

4.5.1.1 FileScan Operation Analysis

In the physical plan, the FileScan operator shows the most critical information:

FileScan parquet spark_catalog.pysparklakehouse.sales[date#758,store_location#760,sale_amount#761] 
Batched: true, 
DataFilters: [isnotnull(store_location#760), (store_location#760 = Seattle)], 
Format: Parquet, 
Location: PreparedDeltaFileIndex(1 paths)[abfss://...], 
PartitionFilters: [], 
PushedFilters: [IsNotNull(store_location), EqualTo(store_location,Seattle)], 
ReadSchema: struct<date:date,store_location:string,sale_amount:double>

Key Elements to Check:

  • PushedFilters: Shows filters applied at the storage layer
  • DataFilters: Filters that will be applied during data reading
  • ReadSchema: Only the columns needed are being read (column pruning)
  • PartitionFilters: Partition-level filters (if applicable)

4.5.1.2 Good vs. Poor Predicate Pushdown

✅ Good Predicate Pushdown:

# Filters appear in PushedFilters at the FileScan level
sales_df.filter(col("store_location") == "Seattle").explain()
# Look for: PushedFilters: [EqualTo(store_location,Seattle)]

❌ Poor Predicate Pushdown:

# Complex conditions that can't be pushed down
from pyspark.sql.functions import upper
sales_df.filter(upper(col("store_location")) == "SEATTLE").explain()
# Upper() function prevents pushdown - filter appears higher in the plan

4.6 Using Spark SQL

PySpark allows you to leverage SQL queries, which can be particularly intuitive for users familiar with SQL. Microsoft Fabric enhances this capability with integration to its broader SQL ecosystem:

4.6.1 Creating Temporary Views

You can create temporary views from DataFrames, allowing you to run SQL queries against them. This is particularly useful for complex queries or when you want to share results across different parts of your application.

# Create a temporary view of your DataFrame
sales_df.createOrReplaceTempView("sales_view")

# For sharing across sessions in Microsoft Fabric, create a global temp view
sales_df.createOrReplaceGlobalTempView("global_sales_view")

4.6.2 Basic SQL Querying

# Run a SQL query against the temporary view
result = spark.sql("""
    SELECT 
        product_id,
        store_location,
        SUM(sale_amount) as total_sales,
        AVG(sale_amount) as average_sale,
        COUNT(*) as transaction_count,
        COUNT(DISTINCT customer_id) as unique_customers
    FROM sales_view
    WHERE date >= '2023-01-01'
    GROUP BY product_id, store_location
    HAVING SUM(sale_amount) > 100
    ORDER BY total_sales DESC
""")

display(result)

# Access a global temporary view (note the database prefix)
global_result = spark.sql("""
    SELECT * FROM global_temp.global_sales_view
    WHERE store_location = 'Houston'
""")
display(global_result)

4.6.3 Use %%sql cell magic to run SQL queries directly in notebooks:

You can also use the %%sql cell magic to run SQL queries directly in Microsoft Fabric notebooks, which provides a more interactive experience:

%%sql
SELECT 
    product_id,
    store_location,
    SUM(sale_amount) AS total_sales,
    AVG(sale_amount) AS average_sale
FROM sales_view
GROUP BY product_id, store_location

4.7 Before you run a query on your largest dataset, make sure…

Absolutely! Here is a suggested checklist (with explanations for each item) for a section titled:


4.8 Before You Run a Query on Your Largest Dataset, Make Sure…

Running queries at scale in Microsoft Fabric (or any distributed Spark environment) requires care. Use this checklist to avoid common pitfalls and ensure your queries are performant, reliable, and safe.

Checklist Item Why It’s Important
1. Test with a Small, Representative Sample First Avoid costly mistakes that can tie up cluster resources, cause unnecessary compute costs, or lead to failed jobs. Test logic, verify results, and estimate performance using .sample(), .limit(), or smaller partitions.
2. Check Your Filter and Join Clauses Predicate pushdown and partition pruning can drastically reduce the amount of data read. Review filters: are they applied as early as possible? Avoid complex expressions that block pushdown.
3. Select Only the Columns You Need Explicitly list fields to minimize data transfer and serialization costs. Avoid select("*") unless necessary.
4. Assess Data Skew and Partitioning Skewed keys in aggregations or joins can overload single nodes. Check value distributions with .groupBy(key).count(). Consider repartitioning large or skewed datasets with repartition() or bucketBy() for more even work distribution.
5. Use Caching Strategically Cache only after expensive or frequently reused intermediate results—and always unpersist when done. Assess memory usage and cluster size in line with data volume.
6. Understand Table and File Partitioning When querying Delta or Parquet tables, align your filters with partitioned columns (e.g., filter on date or region if those are partitions). This ensures only necessary files/partitions are read.
7. Review Query Plan Use .explain() to confirm that pushdown, pruning, and broadcast hints are effective. Look for unexpected full table scans or shuffles and adjust your query if needed.
8. Avoid Collecting Large Dataframes to the Driver .collect(), .toPandas(), and similar methods can overwhelm the driver with data. Use only when absolutely necessary—and only for small result sets.
9. Set Appropriate Resource Limits Ensure your cluster or session has sufficient cores and memory for the expected data size and query complexity. Adjust Spark or Fabric session parameters as needed.
10. Document and Version Your Query Logic For large-scale or production queries, use notebooks or scripts with comments, version control, and reproducible parameters. This aids troubleshooting and collaboration.
11. Monitor Long-Running Queries Use Fabric monitoring tools to observe query execution, resource usage, and identify any bottlenecks or failures during the run.

Best Practice:
Make it routine to review this checklist for new workflows, ETL jobs, or analytical queries—especially when working with evolving data volumes or schema changes.

4.9 Conclusion

In this chapter, we’ve explored the powerful querying capabilities of PySpark within Microsoft Fabric. We’ve learned how to filter and select data, sort results, use column expressions, perform aggregations, optimize performance, and leverage SQL queries.

Microsoft Fabric enhances the PySpark experience with optimized performance for OneLake storage, integrated visualizations, and seamless connections to the broader Fabric ecosystem. By understanding and applying these querying techniques, you can effectively analyze data at scale within your Microsoft Fabric environment.

In the next chapter, we’ll explore how to store the results of your PySpark queries in various formats and destinations within the Microsoft Fabric ecosystem.

Note

Possible future additions to this chapter: - Statistical functions (stddev, variance, corr, freqItems etc.) - Query plan - add more documentation, - multiple value filtering (e.g. isin()) - Window functions (row_number(), rank(), etc.) - Date and time functions - Anything related to SemanticLink? - Broadcast joins - Sample with stratification (sampleBy()) - Window functions (window(), lead(), lag()) - Null handling (na.fill(), na.drop(), etc.)

Back to top