2 Chapter 2: Clean Data
Early draft release: This chapter hasn’t been fully edited yet.
In this chapter, we’ll explore how to clean data using PySpark within Microsoft Fabric. Data cleaning is one of the most critical phases in any data engineering workflow. Even the most sophisticated analytics will produce misleading results if built upon dirty data. Let’s learn how to prepare our data for reliable analysis using Microsoft Fabric’s PySpark capabilities.
2.1 Understanding Data Quality
Before diving into specific cleaning techniques, it’s important to understand what makes data “dirty” and why cleaning is essential. Within Microsoft Fabric’s lakehouse environment, you’ll often integrate data from multiple sources, each with its own quality issues.
Let’s first create a sample DataFrame to work with throughout this chapter:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import random
import datetime
# In Microsoft Fabric, SparkSession is already available as 'spark'
# No need to create a new one as you might in standalone Spark
# Create a sample DataFrame with quality issues for demonstration
= [
data 1, "John Doe", None, "2023-01-15", 35000, "Sales"),
(2, "Jane Smith", "jane.smith@example.com", "2023-02-20", 42000, "Marketing"),
(3, None, "mark.johnson@example.com", "2023-03-10", -5000, "Engineering"),
(4, "Sarah Williams", "sarah.williams@example.com", "2023-04-05", 38000, "Sales"),
(5, "Michael Brown", "michael.brown@example.com", "2023-04-05", 38000, "Sales"),
(6, "Michael Brown", "michael.brown@example.com", "2023-04-05", 38000, "Sales"),
(7, "Lisa Davis", "invalid_email", "2023-05-20", 41000, "Marketing"),
(8, "Robert Wilson", "robert.wilson@example.com", "not_a_date", 39500, "Engineering")
(
]
# Define the schema
= StructType([
schema "employee_id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("email", StringType(), True),
StructField("hire_date", StringType(), True),
StructField("salary", IntegerType(), True),
StructField("department", StringType(), True)
StructField(
])
# Create DataFrame
= spark.createDataFrame(data, schema)
df
# Display the DataFrame
display(df)
2.2 Using Data Wrangler in Microsoft Fabric
Microsoft Fabric provides an interactive Data Wrangler feature within the notebook experience that allows you to visually inspect and clean your data. This is especially helpful for data engineers who are new to PySpark.
To access Data Wrangler in Fabric:
- Run a cell that displays your DataFrame (using
display(df)
) - Click on the “Data Wrangler” button that appears in the output
- Use the graphical interface to explore and transform your data
Data Wrangler allows you to perform many common cleaning operations without writing code: - Filter rows - Remove or fill null values - Format strings - Change data types
After making changes in Data Wrangler, you can generate the corresponding PySpark code, which you can then incorporate into your notebook.
Let’s continue with programmatic approaches, which give you more control and are essential for production data pipelines.
2.3 Identifying Data Quality Issues
2.3.1 Types of Dirty Data
Let’s examine common quality issues in our sample data.
You should really use Data Wrangler for this while doing exploratory analysis. It has a better UI than the results of this code. However, this code could be useful if you want to have logs in your data pipelines. We’ll see later that you can schedule the execution of a notebook. On any previous run, you are able to see the notebook with all the results. Including this in your notebooks can you help data quality issues on a specific notebook run.
# Check for nulls in each column
= df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
null_counts
display(null_counts)
# Check for duplicates
= df.count() - df.dropDuplicates().count()
duplicate_count print(f"Number of duplicate rows: {duplicate_count}")
# Validate email format using regex pattern
= df.filter(~col("email").rlike("^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}$") & ~col("email").isNull())
invalid_emails print("Invalid email formats:")
"employee_id", "name", "email"))
display(invalid_emails.select(
# Identify out-of-range values (negative salaries)
= df.filter(col("salary") < 0)
anomalies print("Anomalies in salary:")
"employee_id", "name", "salary"))
display(anomalies.select(
# Check for invalid dates
from pyspark.sql.functions import to_date
= df.filter(to_date(col("hire_date"), "yyyy-MM-dd").isNull() & ~col("hire_date").isNull())
invalid_dates print("Invalid date formats:")
"employee_id", "name", "hire_date")) display(invalid_dates.select(
2.3.2 The Impact of Quality Issues
Data quality issues can significantly impact your analyses within Microsoft Fabric:
- Nulls can cause calculations to return unexpected results or errors
- Duplicates can skew aggregations and statistical analyses
- Invalid formats (emails, dates) can break downstream transformations
- Anomalies (like negative salaries) can produce misleading insights
This is especially important in Microsoft Fabric, where data may flow through multiple services like Data Factory pipelines, Power BI reports, or ML models.
2.3.3 Understanding the Shape of Your Data
Let’s examine our data’s structure and distribution:
# Get basic DataFrame info
print(f"Number of rows: {df.count()}")
print(f"Number of columns: {len(df.columns)}")
print("\nSchema:")
df.printSchema()
# Get summary statistics for numeric columns
print("\nSummary Statistics for Numeric Columns:")
"employee_id", "salary").summary().show()
df.select(
# Distribution of categorical data
print("\nDepartment Distribution:")
"department").count().orderBy("count", ascending=False).show()
df.groupBy(
# Check for imbalance in the data
print("\nPercent of rows by department:")
= df.groupBy("department").count().withColumn(
dept_counts "percentage", round(col("count") * 100 / df.count(), 2)
) display(dept_counts)
Understanding the shape and distribution of your data helps you make informed decisions about which cleaning strategies to apply.
Doing this analysis on millions (or billions) of rows could be quite expensive. If you have a very large dataset, we usually run this over a small portion. Could be a percentage (like 1%) or a fixed number (10 000 first rows).
Please be aware that, depending on your dataset, using only the first 1% or the last 1% may not give you an accurate picture.
2.4 Data Cleaning Techniques in PySpark
Now that we understand our data quality issues, let’s apply specific cleaning techniques using PySpark
2.4.1 Handling Missing/Null Values
Microsoft Fabric’s PySpark offers several strategies for handling nulls:
# Strategy 1: Remove rows with nulls in specific columns
# This is appropriate when nulls make the record unusable
= df.na.drop(subset=["name"])
df_no_nulls print("After removing rows with null names:")
display(df_no_nulls)
# Strategy 2: Fill nulls with default values
# Good for cases where a reasonable default exists
= {"name": "Unknown", "email": "no_email@company.com"}
defaults = df.na.fill(defaults)
df_filled print("After filling nulls with defaults:")
display(df_filled)
# Strategy 3: Fill nulls with computed values
# Calculate median salary and use it to replace nulls
from pyspark.sql.functions import median
# In Microsoft Fabric, we can use SQL for complex aggregations
"employees")
df.createOrReplaceTempView(= spark.sql("SELECT percentile_approx(salary, 0.5) as median FROM employees").collect()[0]["median"]
median_salary
# Then apply the calculated value
= df.fillna({"salary": median_salary})
df_computed print(f"After filling null salaries with median ({median_salary}):")
"employee_id", "name", "salary")) display(df_computed.select(
2.4.2 Removing Duplicates and Outliers
Duplicates and outliers can significantly skew your analysis results:
# Remove exact duplicates (all columns match)
= df.dropDuplicates()
df_no_dupes print("After removing exact duplicates:")
display(df_no_dupes)
# Remove duplicates based on subset of columns
# For example, same person (name and email) should not appear multiple times
= df.dropDuplicates(["name", "email"])
df_unique_people print("After removing duplicates based on name and email:")
display(df_unique_people)
# Identify and handle outliers
# Calculate statistics to identify outliers in salary
from pyspark.sql.functions import stddev, avg
# Get statistics
= df.select(
stats "salary").alias("avg_salary"),
avg("salary").alias("stddev_salary")
stddev(0]
).collect()[
= stats["avg_salary"]
avg_salary = stats["stddev_salary"]
stddev_salary
# Define outlier threshold (common practice: outside 3 standard deviations)
= avg_salary - 3 * stddev_salary
lower_bound = avg_salary + 3 * stddev_salary
upper_bound
# Filter out or cap outliers
= df.filter((col("salary") >= lower_bound) & (col("salary") <= upper_bound))
df_no_outliers print(f"After removing salary outliers (outside range {lower_bound:.2f} to {upper_bound:.2f}):")
display(df_no_outliers)
# Alternative approach: cap outliers instead of removing them
= df.withColumn(
df_capped "salary_capped",
"salary") < lower_bound, lower_bound)
when(col("salary") > upper_bound, upper_bound)
.when(col("salary"))
.otherwise(col(
)print("After capping salary outliers:")
"employee_id", "name", "salary", "salary_capped")) display(df_capped.select(
There are a lot of cases where you would want to anonymize a dataset. Removing outliers is probably one of the first good steps. However, data anonymization is a very complex endeavor. In a Fabric world, some anonymization needs can be replaced by row/column level security - which is a feature preventing certain users from accessing some rows or columns - or dynamic data masking - that replaces part of specific rows for specific users, like masking all excepts the last four digits of a credit card number. It allows customer support agents to match a credit card, but prevent them from reusing this information.
2.5 Transforming Data Types and Formats
Microsoft Fabric’s PySpark allows you to convert data types and standardize formats, which is crucial for consistent analysis.
2.5.1 Converting Columns to Appropriate Types
We already saw in Chapter 1 that PySpark could infer the datatypes of the files you load, and that you can check a DataFrame columns datatypes with the method printSchema()
. In the following code, we will see two prime examples of type conversion: Dates and decimal values. We’ll also see how to manage conversion errors.
# Convert string date to proper date type
= df.withColumn(
df_typed "hire_date_formatted",
"hire_date"), "yyyy-MM-dd")
to_date(col(
)
# Handle conversion errors by providing a default date for invalid formats
from datetime import datetime
= datetime.strptime("1900-01-01", "%Y-%m-%d").date()
default_date
= df_typed.withColumn(
df_typed "hire_date_clean",
"hire_date_formatted").isNotNull(), col("hire_date_formatted"))
when(col(
.otherwise(lit(default_date))
)
print("After converting date strings to date type:")
"employee_id", "name", "hire_date", "hire_date_formatted", "hire_date_clean"))
display(df_typed.select(
# Convert salary to decimal for more precise calculations
from pyspark.sql.types import DecimalType
= df_typed.withColumn(
df_typed "salary_decimal",
"salary").cast(DecimalType(10, 2))
col(
)
print("After converting salary to decimal type:")
"employee_id", "name", "salary", "salary_decimal")) display(df_typed.select(
2.5.2 Validating and Standardizing Data
Standardizing data ensures consistency across your dataset:
# Standardize email addresses to lowercase
= df.withColumn(
df_std "email_std",
"email"))
lower(col(
)
# Validate email format and mark invalid ones
= df_std.withColumn(
df_std "is_valid_email",
"email").rlike("^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}$"), True)
when(col(False)
.otherwise(
)
# Replace invalid emails with a placeholder
= df_std.withColumn(
df_std "email_clean",
"is_valid_email") == True, col("email_std"))
when(col("invalid@placeholder.com"))
.otherwise(lit(
)
print("After standardizing and validating emails:")
"employee_id", "name", "email", "email_std", "is_valid_email", "email_clean"))
display(df_std.select(
# Standardize name format (Title Case)
= df_std.withColumn(
df_std "name_std",
"name"))
initcap(col(
)
print("After standardizing names:")
"employee_id", "name", "name_std")) display(df_std.select(
2.6 Using Built-in Functions for Cleaning
Microsoft Fabric’s PySpark implementation includes a rich set of built-in functions specifically designed for data cleaning tasks.
2.6.1 Overview of Common PySpark Functions
Let’s explore some of the most useful functions for data cleaning:
# Data type validation and conversion
from pyspark.sql.functions import isnan, isnull, when, coalesce
# String manipulation and cleaning
from pyspark.sql.functions import trim, upper, lower, regexp_replace, regexp_extract
# Handling nulls and defaults
from pyspark.sql.functions import nvl, nullif
# Date and time functions
from pyspark.sql.functions import current_date, datediff, months_between, date_format
# Let's apply some of these functions to clean our data
= df.withColumn(
df_clean # Trim whitespace from all string columns
"name_clean", trim(col("name"))
).withColumn(# Replace nulls with a default value
"email_clean", coalesce(col("email"), lit("no_email@company.com"))
).withColumn(# Convert to date type and handle invalid formats
"hire_date_clean",
"hire_date"), "yyyy-MM-dd"), lit("1900-01-01").cast("date"))
coalesce(to_date(col(
).withColumn(# Make sure salary is not negative
"salary_clean",
"salary") < 0, 0).otherwise(col("salary"))
when(col(
).withColumn(# Standardize department names
"department_clean",
"department")))
initcap(trim(col(
)
print("After applying multiple cleaning functions:")
display(df_clean)
2.6.2 Examples and Best Practices
Let’s put together a comprehensive cleaning pipeline based on best practices:
def clean_employee_data(df):
"""
Comprehensive cleaning function for employee data
"""
from pyspark.sql.functions import (
col, when, coalesce, to_date, trim, lower,
initcap, regexp_replace, lit
)
# Step 1: Remove exact duplicate rows
= df.dropDuplicates()
df_clean
# Step 2: Handle missing values
= df_clean.na.fill({
df_clean "name": "Unknown Employee",
"department": "Unassigned"
})
# Step 3: Clean and standardize string columns
= df_clean.withColumn("name_clean", initcap(trim(col("name"))))
df_clean
# Step 4: Validate and clean email addresses
= "^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}$"
email_pattern = df_clean.withColumn(
df_clean "email_clean",
when("email").rlike(email_pattern), lower(trim(col("email")))
col("invalid@placeholder.com"))
).otherwise(lit(
)
# Step 5: Convert and validate dates
= df_clean.withColumn(
df_clean "hire_date_clean",
coalesce("hire_date"), "yyyy-MM-dd"),
to_date(col("1900-01-01").cast("date")
lit(
)
)
# Step 6: Handle invalid numeric values
= df_clean.withColumn(
df_clean "salary_clean",
"salary").isNull() | (col("salary") < 0), 0)
when(col("salary"))
.otherwise(col(
)
# Step 7: Standardize categorical values
= df_clean.withColumn(
df_clean "department_clean",
"department")))
initcap(trim(col(
)
return df_clean
# Apply the comprehensive cleaning function
= clean_employee_data(df)
df_final print("Final cleaned dataset:")
display(df_final)
2.6.3 Best Practices for Data Cleaning in Microsoft Fabric
When cleaning data in Microsoft Fabric using PySpark, consider these best practices:
Document your cleaning transformations: Add detailed comments to your code to explain why certain cleaning decisions were made.
Create reusable cleaning functions: As demonstrated above, package your cleaning logic into functions that can be reused across different notebooks.
Validate after cleaning: Always validate that your cleaning operations achieved the expected results.
Preserve raw data: In Microsoft Fabric, create separate tables or views for raw and cleaned data instead of overwriting.
Use Data Wrangler for exploration: Start with Data Wrangler to explore and understand your data before writing custom PySpark code.
Monitor cleaning performance: For large datasets, monitor the performance of your cleaning operations within Fabric’s notebook environment.
Incrementally clean data: For large datasets, consider cleaning data in batches using Fabric’s Delta tables.
Let’s validate our cleaning results:
# Check for any remaining nulls
= df_final.select([count(when(col(c).isNull(), c)).alias(c) for c in df_final.columns])
remaining_nulls
display(remaining_nulls)
# Verify data types
print("Schema after cleaning:")
df_final.printSchema()
# Verify value distributions after cleaning
print("Distribution of departments after cleaning:")
"department_clean").count().orderBy("count", ascending=False).show()
df_final.groupBy(
# Compare row counts before and after cleaning
print(f"Original row count: {df.count()}")
print(f"Cleaned row count: {df_final.count()}")
print(f"Difference: {df.count() - df_final.count()} rows removed")
2.7 Summary
In this chapter, we’ve learned how to clean data effectively using PySpark within Microsoft Fabric. We explored:
- How to identify different types of data quality issues in your Fabric lakehouse
- Techniques for handling missing values, duplicates, and outliers
- Methods for transforming data types and standardizing formats
- Built-in PySpark functions that simplify the cleaning process
- Best practices for creating maintainable cleaning pipelines in Fabric
Clean data is the foundation of reliable analysis, and Microsoft Fabric provides powerful tools to perform this critical task efficiently. In the next chapter, we’ll build on these clean datasets to shape our data into forms suitable for specific analytical needs.
Remember that data cleaning is often an iterative process. As you discover more about your data in later analytics stages, you may need to revisit and enhance your cleaning processes. The flexible notebook environment in Microsoft Fabric makes this iterative approach straightforward and efficient.