1 Chapter 1: Ingesting Data
1.1 Introduction to Data Ingestion in Microsoft Fabric
In the world of data engineering, the journey begins with data ingestion. Before we can analyze, transform, or derive insights from our data, we need to bring it into our processing environment. In Microsoft Fabric’s Spark environment, PySpark offers powerful capabilities for loading data from various sources and formats.
Microsoft Fabric provides an integrated environment where PySpark can seamlessly access data stored in OneLake (Fabric’s unified data storage layer), external databases, and various file formats. This integration simplifies the data ingestion process for Fabric users compared to traditional Spark deployments.
Let’s explore the various methods to ingest data using PySpark in Microsoft Fabric.
1.2 Setting Up Your Spark Session
Before we begin loading data, we need a SparkSession, which is your entry point to PySpark functionality in Microsoft Fabric. In a Fabric notebook, a SparkSession is automatically created for you, but it’s good practice to understand how to reference it:
from pyspark.sql import SparkSession
# In Microsoft Fabric notebooks, the spark session is already initialized
# This line just references the existing session
= SparkSession.builder.getOrCreate()
spark
# Display Spark version to confirm everything is working
print(f"Spark version: {spark.version}")
1.3 Loading Data from Files in OneLake
One of Microsoft Fabric’s key advantages is its tight integration with OneLake, which stores files in various formats. Let’s explore how to load different file formats from OneLake.
1.3.1 CSV Files
CSV (Comma-Separated Values) is one of the most common formats for tabular data. Here’s how to load a CSV file from OneLake:
# Basic CSV loading from OneLake
= spark.read.csv("Files/data.csv", header=True, inferSchema=True)
csv_df
# Display the first few rows
5) csv_df.show(
The inferSchema=True
parameter tells PySpark to automatically detect data types. While convenient, inferred schemas may not always be accurate for complex datasets. For more control, you can define the schema explicitly:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
# Define custom schema
= StructType([
sales_schema "OrderID", IntegerType(), False),
StructField("Product", StringType(), True),
StructField("Quantity", IntegerType(), True),
StructField("Price", DoubleType(), True),
StructField("SaleDate", StringType(), True)
StructField(
])
# Load CSV with custom schema
= spark.read.format("csv") \
sales_df "header", "true") \
.option(\
.schema(sales_schema) "Files/sales.csv")
.load(
# Show schema
sales_df.printSchema()
Additional CSV options you’ll commonly use in Fabric include (checkout the docs for more):
# CSV with more options
= spark.read.format("csv")
csv_df "header", "true")
.option("sep", ",")
.option("nullValue", "NA")
.option("dateFormat", "yyyy-MM-dd")
.option("inferSchema", "true")
.option("Files/file.csv") .load(
1.3.2 JSON and JSONL Files
JSON (JavaScript Object Notation) is popular for semi-structured data. There are two common formats: single-object JSON and JSON Lines (JSONL), where each line contains a valid JSON object. You can see more options and samples in the doc.
# Loading regular JSON
= spark.read.option("multiline", "true").json("Files/order.json")
json_df
# Loading JSONL (JSON Lines)
= spark.read.option("multiline", "true").json("Files/orders.jsonl") jsonl_df
Why both lines have .option("multiline", "true")
? By default Spark expects one JSON object per line. Setting multiline=true
lets you read a file that’s a single JSON array or that has objects spanning multiple lines—ideal for “pretty-printed” JSON.
By default, Spark will try to infer the type of each JSON property. While it’s useful in some scenarios, for complex types - or if you want to validate it later in the process - it’s better to read every property as strings, without inferring the type. You can achieve this with the primitivesAsString
option.
# JSON with options
# This ask Spark to read all values as strings, and prevent trying to infer type
= spark.read.format("json") \
complex_json_df "multiline", "true") \
.option("primitivesAsString", "true") \
.option("Files/complex.json") .load(
1.3.3 Parquet Files
Parquet is a columnar storage format that offers excellent compression and performance. It’s the recommended format for data processing in Microsoft Fabric:
# Loading Parquet
= spark.read.parquet("Files/data.parquet")
parquet_df
# With specific options
= spark.read.format("parquet") \
parquet_df "mergeSchema", "true") \
.option("Files/data.parquet") .load(
1.3.4 Delta Tables
Delta Lake, which powers Delta tables in Microsoft Fabric, extends Parquet with transaction logs for ACID compliance and time travel capabilities:
# Loading from a managed Delta table
= spark.read.format("delta") \
delta_df "Tables/delta_table")
.load(
# Reading a specific version (time travel)
= spark.read.format("delta") \
delta_df_v2 "versionAsOf", "2") \
.option("Tables/delta_table") .load(
In these examples, we are trying to load Delta tables from the “Tables” folder of a lakehouse. They could either be managed tables - where the data lifecycle is managed by Fabric, or external tables where another system manages the lifecycle of the table - and it’s data - but it’s added to
1.3.5 XML Files
XML files can be loaded with the appropriate package, which is pre-installed in Microsoft Fabric:
# Loading XML
= spark.read.format("com.databricks.spark.xml") \
xml_df "rowTag", "book") \
.option("Files/books.xml") .load(
1.3.6 Text Files
For plain text files, you can use read.text
. By default, each line will be treated as a single row. You could use the wholetext
option to treat each document as a row.
# Loading text files
= spark.read.text("Files/feedbacks.txt")
text_df
# With options for multi-line records
= spark.read.format("text") \
text_df "wholetext", "true") \
.option("Files/feedbacks.txt") .load(
This wholetext
option does not make much sense if you’re loading a single document. Unless we learn how to load multiple documents.
1.3.7 Load Multiple Documents
When working with multiple documents, PySpark provides flexible options to load files using glob patterns and filters based on file properties like modified time. This is particularly useful when dealing with large datasets spread across multiple files.
1.3.7.1 Using Glob Patterns
Glob patterns allow you to specify file paths with wildcards, making it easy to load multiple files that match a specific pattern:
# Load all CSV files in a directory
= spark.read.csv("Files/feedbacks/*.csv", header=True, inferSchema=True)
csv_df
# Load specific files based on a pattern
= spark.read.option("multiline", "true").json("Files/feedbacks/feedback_*.json") json_df
In the above examples: - *.csv
matches all files with a .csv
extension in the Files/feedbacks
directory. - feedback_*.json
matches all JSON files starting with file_
in the Files/feedbacks
directory.
1.3.7.2 Filtering by Modified Time
You can filter files based on their last modified time using the modifiedBefore
and modifiedAfter
options. This is useful for incremental data processing:
# Load files modified after a specific date
= spark.read \
recent_files_df "multiline", "true") \
.option("Files/feedbacks/", format="json", modifiedAfter="2025-04-29T09:00:00Z")
.load(
# Load files modified before a specific date
= spark.read \
old_files_df "multiline", "true") \
.option("Files/feedbacks/", format="json", modifiedBefore="2025-04-29T09:00:00Z") .load(
1.3.7.3 Recursive File Loading
To load files from subdirectories, you can use the **
wildcard in the file path:
# Load all JSON files in a directory and its subdirectories
= spark.read.format("json").load("Files/feedbacks/**/*.json") json_df
1.4 Loading Data from OneLake Tables
Microsoft Fabric introduces OneLake Tables, which are managed tables in the Fabric environment. These can be easily accessed with PySpark.
# Reading from a OneLake table
= spark.read.table("DatabaseName.TableName")
table_df
# Alternative syntax
= spark.table("DatabaseName.TableName") table_df
1.5 Loading Data from External SQL Databases
Microsoft Fabric allows connecting to external SQL databases using JDBC:
# Load data from SQL Server
= spark.read.format("jdbc") \
jdbc_df "url", "jdbc:sqlserver://servername.database.windows.net:1433") \
.option("databaseName", "database_name") \
.option("user", "username") \
.option("password", "password") \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.option("query", "SELECT * FROM Sales.Customers") \
.option( .load()
1.6 Loading Data from Warehouse and Lakehouse
Microsoft Fabric provides a Spark connector to seamlessly access data from warehouses and the SQL analytics endpoint of lakehouses. This connector simplifies the process of reading and writing data, while maintaining security models like object-level, row-level, and column-level security.
Before using it you need to import the following.
import com.microsoft.spark.fabric
from com.microsoft.spark.fabric.Constants import Constants
1.6.1 Reading Data from a Warehouse or Lakehouse
To read data from a warehouse or lakehouse, you can use the synapsesql
method. The method requires a three-part table name in the format <warehouse/lakehouse name>.<schema name>.<table or view name>
.
# Reading data from a table or view
= spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")
warehouse_df
# Reading data with a row count limit
= spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").limit(10)
limited_df
# Reading data with a filter
= spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").filter("column_name == 'value'")
filtered_df
# Selecting specific columns
= spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").select("columnA", "columnB") selected_columns_df
1.6.2 Reading Data Across Workspaces
To access data from a warehouse or lakehouse in a different workspace, specify the workspace ID and the lakehouse or warehouse item ID:
# Reading data from a lakehouse in another workspace
= spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")
lakehouse_df
# Reading data from a warehouse in another workspace
= spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<warehouse name>.<schema name>.<table or view name>") warehouse_df
1.7 Loading Data from APIs
To load data from REST APIs, you’ll usually need to make HTTP requests and then parse the results in PySpark. Here’s a pattern using the requests
library (available in Fabric notebooks) to query Star Wars movies:
import requests
import json
from pyspark.sql import Row
# Make API request
= requests.get("https://swapi.info/api/films")
response = response.json()
data
# Convert to PySpark DataFrame
# For simple flat JSON data:
= spark.read.json(spark.sparkContext.parallelize([json.dumps(data)]))
df_api
# For array of objects:
if isinstance(data, list):
= spark.read.json(spark.sparkContext.parallelize([json.dumps(item) for item in data])) df_api
When working with APIs, data is often paginated to limit the size of each response. Pagination can be implemented in three common ways:
- Simple Index Pagination: The API uses a query parameter like
page
oroffset
to indicate the page number or starting index for the next set of results. - Next URL in JSON Response Body: The API includes a
next
field in the JSON response body, which provides the URL for the next page of results. - Next URL in HTTP Headers: The API includes a
Link
header in the HTTP response, which contains the URL for the next page of results.
In this example, we will query the Deezer API to fetch the list of tracks in a playlist. The API uses a simple index
get parameter for pagination.
import requests
import json
from pyspark.sql import SparkSession
import time
# Initialize Spark session
= SparkSession.builder.getOrCreate()
spark
# Base URL for the Deezer API
= "https://api.deezer.com/playlist/1083721131/tracks"
base_url
# Function to fetch paginated data
def fetch_paginated_data(url, limit=5):
= []
all_data = 0
index while True:
= requests.get(f"{url}?index={index}&limit={limit}")
response if response.status_code != 200:
raise Exception(f"Failed to fetch data: {response.status_code}, {response.text}")
# Parse JSON response
= response.json()
data 'data']) # Deezer API returns data in 'data' key
all_data.extend(data[
# Check if there are more results
if not data.get('next') or len(data['data']) < limit:
break
# Increment index for the next page
+= limit
index
# Optional: Add a delay to avoid hitting rate limits
1)
time.sleep(
return all_data
# Fetch data
= fetch_paginated_data(base_url)
tracks_data
# Convert to PySpark DataFrame
= spark.read.json(spark.sparkContext.parallelize([json.dumps(track) for track in tracks_data]))
tracks_df
# Show the data
tracks_df.show()
1.8 Working with Schema
As we’ve seen with CSV files, there are two approaches to handling schema in PySpark:
- Inferred Schema: PySpark attempts to determine data types automatically
- Explicit Schema: You define the data structure
Let’s look at inferred schema more closely:
# With inferred schema
= spark.read.format("csv") \
inferred_df "header", "true") \
.option("inferSchema", "true") \
.option("Files/data.csv")
.load(
# Check inferred types
inferred_df.printSchema()
Schema inference is convenient but has limitations: - Adds processing overhead during loading - May incorrectly infer types with mixed data - Can’t handle complex nested structures well
For production workloads in Microsoft Fabric, defining explicit schemas is recommended:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, ArrayType, MapType
# Define a complex schema with nested structures
= StructType([
customer_schema "customer_id", StringType(), False),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("registration_date", TimestampType(), True),
StructField("purchase_history", ArrayType(
StructField(
StructType(["product_id", StringType(), True),
StructField("purchase_date", TimestampType(), True),
StructField("amount", DoubleType(), True)
StructField(
])True),
), "preferences", MapType(StringType(), StringType()), True)
StructField(
])
# Load with explicit schema
= spark.read.format("json") \
customer_df \
.schema(customer_schema) "Files/customers.json") .load(
1.9 Common Reader Options
PySpark data readers support various options to handle special cases.
These options could be interesting for quick analysis. In a medaillon architecture, we will tend to load data as is, with the missing value or non-formatted data, and use techniques we’ll see in Chapter 2.
1.9.1 Null Value Handling
# Define custom null values
= spark.read.format("csv") \
df "header", "true") \
.option("nullValue", "NULL") \ # Strings that should be treated as null
.option("nanValue", "NaN") \ # Strings that should be treated as NaN
.option("Files/data.csv") .load(
1.9.2 Date Format Handling
# Define date format
= spark.read.format("csv") \
df "header", "true") \
.option("dateFormat", "yyyy-MM-dd") \
.option("Files/data.csv") .load(
1.9.3 Handling Bad Records
# Set mode for handling corrupted records
= spark.read.format("csv") \
df "header", "true") \
.option("mode", "DROPMALFORMED") \ # Options: PERMISSIVE, DROPMALFORMED, FAILFAST
.option("Files/data.csv") .load(
1.10 Creating Temporary Views
After loading data into DataFrames, you can create temporary views to query them using SQL:
# Register DataFrame as a temporary view
"sales")
sales_df.createOrReplaceTempView(
# Query using SQL
= spark.sql("""
result_df SELECT
Product,
SUM(Quantity) as TotalQuantity,
AVG(Price) as AveragePrice
FROM sales
GROUP BY Product
ORDER BY TotalQuantity DESC
""")
result_df.show()
Temporary views exist only within the current SparkSession and are ideal for interactive analysis in Microsoft Fabric notebooks.
For more persistent access, you can create global temporary views:
# Create global temporary view (available across sessions in same Spark application)
"global_sales")
sales_df.createOrReplaceGlobalTempView(
# Query global temp view (note the global_temp prefix)
= spark.sql("SELECT * FROM global_temp.global_sales") global_sales_df
1.11 Practical Example: Combining Multiple Data Sources
Let’s bring everything together with a practical example that combines data from different sources in Microsoft Fabric:
# Load customer data from CSV in OneLake
= spark.read.format("csv") \
customers_df "header", "true") \
.option("inferSchema", "true") \
.option("Files/customers.csv")
.load(
# Load orders from Parquet
= spark.read.parquet("Files/orders.parquet")
orders_df
# Load product information from Delta table
= spark.read.format("delta") \
products_df "Tables/products_delta")
.load(
# Register as temporary views
"customers")
customers_df.createOrReplaceTempView("orders")
orders_df.createOrReplaceTempView("products")
products_df.createOrReplaceTempView(
# Use SQL to join data
= spark.sql("""
report_df SELECT
c.customer_id,
c.name,
p.product_name,
p.category,
o.quantity,
o.order_date,
o.quantity * p.price as total_amount
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
JOIN products p ON o.product_id = p.product_id
WHERE o.order_date >= '2024-01-01'
ORDER BY total_amount DESC
""")
# Show the combined data
10) report_df.show(
1.12 Summary
In this chapter, we explored the various ways to ingest data into PySpark within Microsoft Fabric:
File Formats: We learned to load CSV, JSON, JSONL, Parquet, Delta, Excel, XML, and text files from OneLake.
Data Sources: We covered OneLake files, OneLake Tables, external SQL databases, and APIs.
Schema Handling: We explored both inferred schemas for quick exploration and explicit schemas for production workloads.
Reader Options: We looked at common options for handling nulls, dates, and bad records.
Temporary Views: We learned how to create temp views for SQL access to our DataFrames.
Microsoft Fabric’s integration with OneLake provides a seamless data ingestion experience, reducing the complexity typically associated with data loading in traditional Spark environments.
In the next chapter, we’ll explore how to clean and prepare this data for analysis using PySpark’s powerful transformation capabilities.