4 Chapter 4: Query Data
Early draft release: This chapter hasn’t been fully written yet.
In the previous chapters, we’ve learned how to ingest, clean, and shape data within Microsoft Fabric. Now we’re ready to explore how to effectively query this data using PySpark. Querying data is where the true power of Spark becomes apparent, allowing us to filter, transform, and extract insights from our datasets efficiently at scale.
Microsoft Fabric provides an optimized environment for PySpark queries, with several Fabric-specific performance enhancements that we’ll explore in this chapter. Let’s dive into the various techniques available for querying data in PySpark within the Microsoft Fabric ecosystem.
All code examples in this chapter assumes that you have a “Sales” table, loaded fom the sales_data.csv
file. Please refer to Chapter 1 for instructions on how to set up the environment and load the data.
4.1 Query Data with DataFrame
PySpark DataFrames are the fundamental structure for working with structured data. Within Microsoft Fabric, DataFrames are optimized to work with the Fabric distributed storage and compute architecture.
Let’s start with a simple example using the Lakehouse architecture in Microsoft Fabric:
# Import necessary libraries
from pyspark.sql.functions import *
# Load some sample data from a Lakehouse table
= spark.read.table("sales")
sales_df
# Basic filtering operation
= sales_df.filter(sales_df.sale_amount > 100)
filtered_sales
# Display the result
display(filtered_sales)
In Microsoft Fabric notebooks, you can use display()
instead of show()
to get a richer visualization of your results. This is one of several Fabric-specific enhancements to the PySpark experience.
Let’s look at some common DataFrame operations:
# Filter with a SQL-like expression
= sales_df.filter("sale_amount > 100 AND store_location = 'Houston'")
high_value_sales
# Select specific columns
= sales_df.select("date", "product_id", "sale_amount", "customer_id")
result
# Combine operations
= sales_df.filter(sales_df.sale_amount > 100) \
final_result "date", "product_id", "sale_amount", "customer_id") \
.select(filter(sales_df.date > "2023-01-01") .
4.2 View Only the First Rows or Few Columns
When working with large datasets in Microsoft Fabric, it’s often useful to examine just a portion of the data to understand its structure and content.
# Display the first 5 rows (default is 20)
5)
sales_df.show(
# Display the first 5 rows with more of each column visible
5, truncate=False)
sales_df.show(
# In Microsoft Fabric notebooks, display() gives a richer visualization
5))
display(sales_df.limit(
# Get just the first row as a Python object
= sales_df.first()
first_row print(first_row)
# Select only a few columns
"date", "product_id", "sale_amount").show(5) sales_df.select(
Microsoft Fabric enhances the PySpark experience by providing optimized display functions and visualizations directly in the notebook interface, making data exploration more intuitive.
All these methods are great to peek at a portion of the beginning of your DataFrame. However, if you want to see the end of your DataFrame, you can use the tail()
method. This could be useful for checking the last few entries in your dataset, especially if you’re dealing with time-series data or want to verify the most recent records.
# Display the last 5 rows of the DataFrame
= sales_df.tail(5)
last_rows display(last_rows)
When you are working with large datasets, it’s frequent to only work on a subset of the data while developing your queries. The methods we just covered allow you to do that. You might also want to sample a random subset of the data instead. This is particularly useful in scenarios where you want to test your queries on a smaller dataset before running them on the full dataset. Using a sample random may be better than using limit()
because it gives you a more representative subset of your data, especially if the data is not uniformly distributed.
# Sample a random fraction of the DataFrame
= sales_df.sample(fraction=0.1, seed=42) # 10% sample
sampled_df # Display the sampled DataFrame
display(sampled_df)# Sample a specific number of rows
= sales_df.sample(withReplacement=False, fraction=0.1, seed=42).limit(5)
sampled_rows display(sampled_rows)
The sample()
method allows you to specify a few parameters:
- a fraction of the data to sample.
- a seed for reproducibility. If the seed is not specified (
seed
), you will get a different sample each time you run the code. If you specify a seed, you will get the same sample every time you run the code. - whether to sample with replacement (default is
False
). Think of it like drawing balls from a bag: if you sample with replacement, you can draw the same ball multiple times; if you sample without replacement, each ball can only be drawn once.
4.3 Sort and Order
Sorting data is a fundamental operation when querying datasets. PySpark provides several ways to sort your data:
# Sort by a single column in ascending order
= sales_df.sort("sale_amount")
sorted_sales
# Sort by a single column in descending order
= sales_df.sort(sales_df.sale_amount.desc())
sorted_sales
# Sort by multiple columns
= sales_df.sort("store_location", col("sale_amount").desc())
sorted_sales
# Alternative syntax using orderBy
= sales_df.orderBy(col("store_location"), col("sale_amount").desc())
sorted_sales
# For clearer code, you can use the asc() function explicitly
from pyspark.sql.functions import asc, desc
= sales_df.orderBy(asc("store_location"), desc("sale_amount"))
sorted_sales
display(sorted_sales)
4.3.1 Some performance considerations
Sorting can be an expensive operation, especially on large datasets. All the operations we cover in this chapter can be caterogized as either narrow or wide transformations. Narrow transformations do not require shuffling data across partitions. That means that each cluster node can process its own partition independently. Wide transformations, on the other hand, require shuffling data across partitions (and most likely across the network).
The first advice is to place sorting operations as late as possible in your query chain. This way, you minimize the amount of data that needs to be sorted. For example, if you can filter your data before sorting:
# Filter before sorting to reduce the amount of data to sort
= sales_df.filter(sales_df.sale_amount > 100) \
filtered_sorted_sales "sale_amount")
.sort( display(filtered_sorted_sales)
You can also leverage partitioning to optimize sorting operations. If you sort data from a partitioned Delta table, PySpark can take advantage of the partitioning scheme to perform the sort operation more efficiently. For example, if your data is partitioned by store_location
, you can sort within each partition:
= sales_df.repartition("store_location")
df_partitioned "sale_amount")) display(df_partitioned.sortWithinPartitions(
Note: Physical (delta table) partitioning may not be preserved by the DataFrame’s in-memory partitioning. You can repartition the data before sorting, but keep in mind that repartitioning is a wide transformation, so it can be expensive. Use it judiciously.
The last piece of advice we could give is to be aware of the lazy evaluation model of PySpark. Sorting operations are not executed until an action is called (like show()
, count()
, or write()
). This means you can easily end up with code that will run a wide transformation (like sorting) multiple times if you’re not careful. To avoid this, you can cache the result of a sort operation if you plan to use it multiple times:
= sales_df.sort("sale_amount")
sorted_sales # Now the sorted data is cached in memory
sorted_sales.cache() # Use the cached sorted data multiple times
10)
sorted_sales.show(# This will not trigger a re-sort
sorted_sales.count() filter("sale_amount > 200"))
display(sorted_sales.# When done, unpersist to free memory sorted_sales.unpersist()
4.4 Aggregations
Aggregations are essential for summarizing and analyzing data. PySpark provides comprehensive aggregation capabilities:
from pyspark.sql.functions import sum, avg, count, countDistinct, max, min
# Basic aggregations
= sales_df.groupBy("store_location").agg(
summary sum("sale_amount").alias("total_sales"),
"sale_amount").alias("average_sale"),
avg("*").alias("number_of_sales"),
count("customer_id").alias("unique_customers"),
countDistinct(max("sale_amount").alias("largest_sale"),
min("sale_amount").alias("smallest_sale")
)
display(summary)
# Grouping by multiple columns
= sales_df.groupBy("product_id", "store_location").agg(
product_location_summary sum("sale_amount").alias("total_sales")
)
display(product_location_summary)
# Filtering after aggregation
= summary.filter(col("number_of_sales") > 100)
high_volume_stores
display(high_volume_stores)
# Pivot tables
= sales_df.groupBy("product_id").pivot("store_location").sum("sale_amount")
pivot_table display(pivot_table)
4.5 Query Plan and Performance Evaluation
Understanding how PySpark executes your queries is crucial for optimizing performance. Microsoft Fabric provides enhanced tools for examining query plans and evaluating performance:
# Examine the logical and physical plans for a query
= sales_df.filter(col("store_location") == "Houston") \
query "product_id") \
.groupBy(sum("sale_amount").alias("total_sales")) \
.agg(filter(col("total_sales") > 10000)
.
# Display the execution plan
=True) query.explain(extended
When you run .explain()
, you’ll see several plans: 1. The Parsed Logical Plan - the initial interpretation of your query 2. The Analyzed Logical Plan - after resolving references 3. The Optimized Logical Plan - after applying optimization rules 4. The Physical Plan - the actual execution strategy
Microsoft Fabric enhances these plans with Fabric-specific optimizations, such as integration with OneLake storage and Fabric’s distributed compute.
4.5.1 Predicate Pushdown
Predicate pushdown is a critical optimization where filtering conditions are “pushed down” to the data source, reducing the amount of data that needs to be processed.
# Example demonstrating predicate pushdown
# Notice in the explain plan how the filter condition is pushed to the file scan
= sales_df.filter(col("store_location") == "Seattle").select("date", "sale_amount")
filtered_data filtered_data.explain()
In Microsoft Fabric, predicate pushdown is optimized for OneLake storage, allowing filters to be applied directly at the storage layer for improved performance.
When you examine the physical plan output, look for these key indicators:
4.5.1.1 FileScan Operation Analysis
In the physical plan, the FileScan
operator shows the most critical information:
FileScan parquet spark_catalog.pysparklakehouse.sales[date#758,store_location#760,sale_amount#761]
Batched: true,
DataFilters: [isnotnull(store_location#760), (store_location#760 = Seattle)],
Format: Parquet,
Location: PreparedDeltaFileIndex(1 paths)[abfss://...],
PartitionFilters: [],
PushedFilters: [IsNotNull(store_location), EqualTo(store_location,Seattle)],
ReadSchema: struct<date:date,store_location:string,sale_amount:double>
Key Elements to Check:
- PushedFilters: Shows filters applied at the storage layer
- DataFilters: Filters that will be applied during data reading
- ReadSchema: Only the columns needed are being read (column pruning)
- PartitionFilters: Partition-level filters (if applicable)
4.5.1.2 Good vs. Poor Predicate Pushdown
✅ Good Predicate Pushdown:
# Filters appear in PushedFilters at the FileScan level
filter(col("store_location") == "Seattle").explain()
sales_df.# Look for: PushedFilters: [EqualTo(store_location,Seattle)]
❌ Poor Predicate Pushdown:
# Complex conditions that can't be pushed down
from pyspark.sql.functions import upper
filter(upper(col("store_location")) == "SEATTLE").explain()
sales_df.# Upper() function prevents pushdown - filter appears higher in the plan
4.6 Using Spark SQL
PySpark allows you to leverage SQL queries, which can be particularly intuitive for users familiar with SQL. Microsoft Fabric enhances this capability with integration to its broader SQL ecosystem:
4.6.1 Creating Temporary Views
You can create temporary views from DataFrames, allowing you to run SQL queries against them. This is particularly useful for complex queries or when you want to share results across different parts of your application.
# Create a temporary view of your DataFrame
"sales_view")
sales_df.createOrReplaceTempView(
# For sharing across sessions in Microsoft Fabric, create a global temp view
"global_sales_view") sales_df.createOrReplaceGlobalTempView(
4.6.2 Basic SQL Querying
# Run a SQL query against the temporary view
= spark.sql("""
result SELECT
product_id,
store_location,
SUM(sale_amount) as total_sales,
AVG(sale_amount) as average_sale,
COUNT(*) as transaction_count,
COUNT(DISTINCT customer_id) as unique_customers
FROM sales_view
WHERE date >= '2023-01-01'
GROUP BY product_id, store_location
HAVING SUM(sale_amount) > 100
ORDER BY total_sales DESC
""")
display(result)
# Access a global temporary view (note the database prefix)
= spark.sql("""
global_result SELECT * FROM global_temp.global_sales_view
WHERE store_location = 'Houston'
""")
display(global_result)
4.6.3 Use %%sql cell magic to run SQL queries directly in notebooks:
You can also use the %%sql
cell magic to run SQL queries directly in Microsoft Fabric notebooks, which provides a more interactive experience:
%%sql
SELECT
product_id,
store_location,
SUM(sale_amount) AS total_sales,
AVG(sale_amount) AS average_sale
FROM sales_view GROUP BY product_id, store_location
4.7 Before you run a query on your largest dataset, make sure…
Absolutely! Here is a suggested checklist (with explanations for each item) for a section titled:
4.8 Before You Run a Query on Your Largest Dataset, Make Sure…
Running queries at scale in Microsoft Fabric (or any distributed Spark environment) requires care. Use this checklist to avoid common pitfalls and ensure your queries are performant, reliable, and safe.
✅ | Checklist Item | Why It’s Important |
---|---|---|
1. | Test with a Small, Representative Sample First | Avoid costly mistakes that can tie up cluster resources, cause unnecessary compute costs, or lead to failed jobs. Test logic, verify results, and estimate performance using .sample() , .limit() , or smaller partitions. |
2. | Check Your Filter and Join Clauses | Predicate pushdown and partition pruning can drastically reduce the amount of data read. Review filters: are they applied as early as possible? Avoid complex expressions that block pushdown. |
3. | Select Only the Columns You Need | Explicitly list fields to minimize data transfer and serialization costs. Avoid select("*") unless necessary. |
4. | Assess Data Skew and Partitioning | Skewed keys in aggregations or joins can overload single nodes. Check value distributions with .groupBy(key).count() . Consider repartitioning large or skewed datasets with repartition() or bucketBy() for more even work distribution. |
5. | Use Caching Strategically | Cache only after expensive or frequently reused intermediate results—and always unpersist when done. Assess memory usage and cluster size in line with data volume. |
6. | Understand Table and File Partitioning | When querying Delta or Parquet tables, align your filters with partitioned columns (e.g., filter on date or region if those are partitions). This ensures only necessary files/partitions are read. |
7. | Review Query Plan | Use .explain() to confirm that pushdown, pruning, and broadcast hints are effective. Look for unexpected full table scans or shuffles and adjust your query if needed. |
8. | Avoid Collecting Large Dataframes to the Driver | .collect() , .toPandas() , and similar methods can overwhelm the driver with data. Use only when absolutely necessary—and only for small result sets. |
9. | Set Appropriate Resource Limits | Ensure your cluster or session has sufficient cores and memory for the expected data size and query complexity. Adjust Spark or Fabric session parameters as needed. |
10. | Document and Version Your Query Logic | For large-scale or production queries, use notebooks or scripts with comments, version control, and reproducible parameters. This aids troubleshooting and collaboration. |
11. | Monitor Long-Running Queries | Use Fabric monitoring tools to observe query execution, resource usage, and identify any bottlenecks or failures during the run. |
Best Practice:
Make it routine to review this checklist for new workflows, ETL jobs, or analytical queries—especially when working with evolving data volumes or schema changes.
4.9 Conclusion
In this chapter, we’ve explored the powerful querying capabilities of PySpark within Microsoft Fabric. We’ve learned how to filter and select data, sort results, use column expressions, perform aggregations, optimize performance, and leverage SQL queries.
Microsoft Fabric enhances the PySpark experience with optimized performance for OneLake storage, integrated visualizations, and seamless connections to the broader Fabric ecosystem. By understanding and applying these querying techniques, you can effectively analyze data at scale within your Microsoft Fabric environment.
In the next chapter, we’ll explore how to store the results of your PySpark queries in various formats and destinations within the Microsoft Fabric ecosystem.
Possible future additions to this chapter: - Statistical functions (stddev
, variance
, corr
, freqItems
etc.) - Query plan - add more documentation, - multiple value filtering (e.g. isin()
) - Window functions (row_number()
, rank()
, etc.) - Date and time functions - Anything related to SemanticLink? - Broadcast joins - Sample with stratification (sampleBy()
) - Window functions (window()
, lead()
, lag()
) - Null handling (na.fill()
, na.drop()
, etc.)