1  Chapter 1: Ingesting Data

Early draft release: This chapter hasn’t been fully edited yet.

1.1 Introduction to Data Ingestion in Microsoft Fabric

In the world of data engineering, the journey begins with data ingestion. Before we can analyze, transform, or derive insights from our data, we need to bring it into our processing environment. In Microsoft Fabric’s Spark environment, PySpark offers powerful capabilities for loading data from various sources and formats.

Microsoft Fabric provides an integrated environment where PySpark can seamlessly access data stored in OneLake (Fabric’s unified data storage layer), external databases, and various file formats. This integration simplifies the data ingestion process for Fabric users compared to traditional Spark deployments.

Let’s explore the various methods to ingest data using PySpark in Microsoft Fabric.

1.2 Setting Up Your Spark Session

Before we begin loading data, we need a SparkSession, which is your entry point to PySpark functionality in Microsoft Fabric. In a Fabric notebook, a SparkSession is automatically created for you, but it’s good practice to understand how to reference it:

from pyspark.sql import SparkSession

# In Microsoft Fabric notebooks, the spark session is already initialized
# This line just references the existing session
spark = SparkSession.builder.getOrCreate()

# Display Spark version to confirm everything is working
print(f"Spark version: {spark.version}")

1.3 Loading Data from Files in OneLake

One of Microsoft Fabric’s key advantages is its tight integration with OneLake, which stores files in various formats. Let’s explore how to load different file formats from OneLake.

1.3.1 CSV Files

CSV (Comma-Separated Values) is one of the most common formats for tabular data. Here’s how to load a CSV file from OneLake:

# Basic CSV loading from OneLake
csv_df = spark.read.csv("Files/data.csv", header=True, inferSchema=True)

# Display the first few rows
csv_df.show(5)

The inferSchema=True parameter tells PySpark to automatically detect data types. While convenient, inferred schemas may not always be accurate for complex datasets. For more control, you can define the schema explicitly:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Define custom schema
sales_schema = StructType([
    StructField("OrderID", IntegerType(), False),
    StructField("Product", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("Price", DoubleType(), True),
    StructField("SaleDate", StringType(), True)
])

# Load CSV with custom schema
sales_df = spark.read.format("csv") \
    .option("header", "true") \
    .schema(sales_schema) \
    .load("Files/sales.csv")

# Show schema
sales_df.printSchema()

Additional CSV options you’ll commonly use in Fabric include (checkout the docs for more):

# CSV with more options
csv_df = spark.read.format("csv")
    .option("header", "true")
    .option("sep", ",")
    .option("nullValue", "NA")
    .option("dateFormat", "yyyy-MM-dd")
    .option("inferSchema", "true")
    .load("Files/file.csv")

1.3.2 JSON and JSONL Files

JSON (JavaScript Object Notation) is popular for semi-structured data. There are two common formats: single-object JSON and JSON Lines (JSONL), where each line contains a valid JSON object. You can see more options and samples in the doc.

# Loading regular JSON
json_df = spark.read.option("multiline", "true").json("Files/order.json")

# Loading JSONL (JSON Lines)
jsonl_df = spark.read.option("multiline", "true").json("Files/orders.jsonl")
Note

Why both lines have .option("multiline", "true")? By default Spark expects one JSON object per line. Setting multiline=true lets you read a file that’s a single JSON array or that has objects spanning multiple lines—ideal for “pretty-printed” JSON.

By default, Spark will try to infer the type of each JSON property. While it’s useful in some scenarios, for complex types - or if you want to validate it later in the process - it’s better to read every property as strings, without inferring the type. You can achieve this with the primitivesAsString option.

# JSON with options
# This ask Spark to read all values as strings, and prevent trying to infer type
complex_json_df = spark.read.format("json") \
    .option("multiline", "true") \  
    .option("primitivesAsString", "true") \
    .load("Files/complex.json")

1.3.3 Parquet Files

Parquet is a columnar storage format that offers excellent compression and performance. It’s the recommended format for data processing in Microsoft Fabric:

# Loading Parquet
parquet_df = spark.read.parquet("Files/data.parquet")

# With specific options
parquet_df = spark.read.format("parquet") \
    .option("mergeSchema", "true") \
    .load("Files/data.parquet")

1.3.4 Delta Tables

Delta Lake, which powers Delta tables in Microsoft Fabric, extends Parquet with transaction logs for ACID compliance and time travel capabilities:

# Loading from a managed Delta table
delta_df = spark.read.format("delta") \
    .load("Tables/delta_table")

# Reading a specific version (time travel)
delta_df_v2 = spark.read.format("delta") \
    .option("versionAsOf", "2") \
    .load("Tables/delta_table")
Note

In these examples, we are trying to load Delta tables from the “Tables” folder of a lakehouse. They could either be managed tables - where the data lifecycle is managed by Fabric, or external tables where another system manages the lifecycle of the table - and it’s data - but it’s added to

1.3.5 XML Files

XML files can be loaded with the appropriate package, which is pre-installed in Microsoft Fabric:

# Loading XML
xml_df = spark.read.format("com.databricks.spark.xml") \
    .option("rowTag", "book") \
    .load("Files/books.xml")

1.3.6 Text Files

For plain text files, you can use read.text. By default, each line will be treated as a single row. You could use the wholetext option to treat each document as a row.

# Loading text files
text_df = spark.read.text("Files/feedbacks.txt")

# With options for multi-line records
text_df = spark.read.format("text") \
    .option("wholetext", "true") \
    .load("Files/feedbacks.txt")

This wholetext option does not make much sense if you’re loading a single document. Unless we learn how to load multiple documents.

1.3.7 Load Multiple Documents

When working with multiple documents, PySpark provides flexible options to load files using glob patterns and filters based on file properties like modified time. This is particularly useful when dealing with large datasets spread across multiple files.

1.3.7.1 Using Glob Patterns

Glob patterns allow you to specify file paths with wildcards, making it easy to load multiple files that match a specific pattern:

# Load all CSV files in a directory
csv_df = spark.read.csv("Files/feedbacks/*.csv", header=True, inferSchema=True)

# Load specific files based on a pattern
json_df = spark.read.option("multiline", "true").json("Files/feedbacks/feedback_*.json")

In the above examples: - *.csv matches all files with a .csv extension in the Files/feedbacks directory. - feedback_*.json matches all JSON files starting with file_ in the Files/feedbacks directory.

1.3.7.2 Filtering by Modified Time

You can filter files based on their last modified time using the modifiedBefore and modifiedAfter options. This is useful for incremental data processing:

# Load files modified after a specific date
recent_files_df = spark.read \
    .option("multiline", "true") \
    .load("Files/feedbacks/", format="json", modifiedAfter="2025-04-29T09:00:00Z")

# Load files modified before a specific date
old_files_df = spark.read \
    .option("multiline", "true") \
    .load("Files/feedbacks/", format="json", modifiedBefore="2025-04-29T09:00:00Z")

1.3.7.3 Recursive File Loading

To load files from subdirectories, you can use the ** wildcard in the file path:

# Load all JSON files in a directory and its subdirectories
json_df = spark.read.format("json").load("Files/feedbacks/**/*.json")

1.4 Loading Data from OneLake Tables

Microsoft Fabric introduces OneLake Tables, which are managed tables in the Fabric environment. These can be easily accessed with PySpark.

# Reading from a OneLake table
table_df = spark.read.table("DatabaseName.TableName")

# Alternative syntax
table_df = spark.table("DatabaseName.TableName")

1.5 Loading Data from External SQL Databases

Microsoft Fabric allows connecting to external SQL databases using JDBC:

# Load data from SQL Server
jdbc_df = spark.read.format("jdbc") \
    .option("url", "jdbc:sqlserver://servername.database.windows.net:1433") \
    .option("databaseName", "database_name") \
    .option("user", "username") \
    .option("password", "password") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .option("query", "SELECT * FROM Sales.Customers") \
    .load()

1.6 Loading Data from Warehouse and Lakehouse

Microsoft Fabric provides a Spark connector to seamlessly access data from warehouses and the SQL analytics endpoint of lakehouses. This connector simplifies the process of reading and writing data, while maintaining security models like object-level, row-level, and column-level security.

Before using it you need to import the following.

import com.microsoft.spark.fabric
from com.microsoft.spark.fabric.Constants import Constants

1.6.1 Reading Data from a Warehouse or Lakehouse

To read data from a warehouse or lakehouse, you can use the synapsesql method. The method requires a three-part table name in the format <warehouse/lakehouse name>.<schema name>.<table or view name>.

# Reading data from a table or view
warehouse_df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")

# Reading data with a row count limit
limited_df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").limit(10)

# Reading data with a filter
filtered_df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").filter("column_name == 'value'")

# Selecting specific columns
selected_columns_df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").select("columnA", "columnB")

1.6.2 Reading Data Across Workspaces

To access data from a warehouse or lakehouse in a different workspace, specify the workspace ID and the lakehouse or warehouse item ID:

# Reading data from a lakehouse in another workspace
lakehouse_df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")

# Reading data from a warehouse in another workspace
warehouse_df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")

1.7 Loading Data from APIs

To load data from REST APIs, you’ll usually need to make HTTP requests and then parse the results in PySpark. Here’s a pattern using the requests library (available in Fabric notebooks) to query Star Wars movies:

import requests
import json
from pyspark.sql import Row

# Make API request
response = requests.get("https://swapi.info/api/films")
data = response.json()

# Convert to PySpark DataFrame
# For simple flat JSON data:
df_api = spark.read.json(spark.sparkContext.parallelize([json.dumps(data)]))

# For array of objects:
if isinstance(data, list):
    df_api = spark.read.json(spark.sparkContext.parallelize([json.dumps(item) for item in data]))

When working with APIs, data is often paginated to limit the size of each response. Pagination can be implemented in three common ways:

  1. Simple Index Pagination: The API uses a query parameter like page or offset to indicate the page number or starting index for the next set of results.
  2. Next URL in JSON Response Body: The API includes a next field in the JSON response body, which provides the URL for the next page of results.
  3. Next URL in HTTP Headers: The API includes a Link header in the HTTP response, which contains the URL for the next page of results.

In this example, we will query the Deezer API to fetch the list of tracks in a playlist. The API uses a simple index get parameter for pagination.

import requests
import json
from pyspark.sql import SparkSession
import time

# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

# Base URL for the Deezer API
base_url = "https://api.deezer.com/playlist/1083721131/tracks"

# Function to fetch paginated data
def fetch_paginated_data(url, limit=5):
    all_data = []
    index = 0
    while True:
        response = requests.get(f"{url}?index={index}&limit={limit}")
        if response.status_code != 200:
            raise Exception(f"Failed to fetch data: {response.status_code}, {response.text}")

        # Parse JSON response
        data = response.json()
        all_data.extend(data['data'])  # Deezer API returns data in 'data' key

        # Check if there are more results
        if not data.get('next') or len(data['data']) < limit:
            break

        # Increment index for the next page
        index += limit

        # Optional: Add a delay to avoid hitting rate limits
        time.sleep(1)

    return all_data

# Fetch data
tracks_data = fetch_paginated_data(base_url)

# Convert to PySpark DataFrame
tracks_df = spark.read.json(spark.sparkContext.parallelize([json.dumps(track) for track in tracks_data]))

# Show the data
tracks_df.show()

1.8 Working with Schema

As we’ve seen with CSV files, there are two approaches to handling schema in PySpark:

  1. Inferred Schema: PySpark attempts to determine data types automatically
  2. Explicit Schema: You define the data structure

Let’s look at inferred schema more closely:

# With inferred schema
inferred_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("Files/data.csv")

# Check inferred types
inferred_df.printSchema()

Schema inference is convenient but has limitations: - Adds processing overhead during loading - May incorrectly infer types with mixed data - Can’t handle complex nested structures well

For production workloads in Microsoft Fabric, defining explicit schemas is recommended:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, ArrayType, MapType

# Define a complex schema with nested structures
customer_schema = StructType([
    StructField("customer_id", StringType(), False),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("registration_date", TimestampType(), True),
    StructField("purchase_history", ArrayType(
        StructType([
            StructField("product_id", StringType(), True),
            StructField("purchase_date", TimestampType(), True),
            StructField("amount", DoubleType(), True)
        ])
    ), True),
    StructField("preferences", MapType(StringType(), StringType()), True)
])

# Load with explicit schema
customer_df = spark.read.format("json") \
    .schema(customer_schema) \
    .load("Files/customers.json")

1.9 Common Reader Options

PySpark data readers support various options to handle special cases.

Note

These options could be interesting for quick analysis. In a medaillon architecture, we will tend to load data as is, with the missing value or non-formatted data, and use techniques we’ll see in Chapter 2.

1.9.1 Null Value Handling

# Define custom null values
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("nullValue", "NULL") \  # Strings that should be treated as null
    .option("nanValue", "NaN") \    # Strings that should be treated as NaN
    .load("Files/data.csv")

1.9.2 Date Format Handling

# Define date format
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("dateFormat", "yyyy-MM-dd") \
    .load("Files/data.csv")

1.9.3 Handling Bad Records

# Set mode for handling corrupted records
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("mode", "DROPMALFORMED") \  # Options: PERMISSIVE, DROPMALFORMED, FAILFAST
    .load("Files/data.csv")

1.10 Creating Temporary Views

After loading data into DataFrames, you can create temporary views to query them using SQL:

# Register DataFrame as a temporary view
sales_df.createOrReplaceTempView("sales")

# Query using SQL
result_df = spark.sql("""
    SELECT 
        Product, 
        SUM(Quantity) as TotalQuantity, 
        AVG(Price) as AveragePrice
    FROM sales
    GROUP BY Product
    ORDER BY TotalQuantity DESC
""")

result_df.show()

Temporary views exist only within the current SparkSession and are ideal for interactive analysis in Microsoft Fabric notebooks.

For more persistent access, you can create global temporary views:

# Create global temporary view (available across sessions in same Spark application)
sales_df.createOrReplaceGlobalTempView("global_sales")

# Query global temp view (note the global_temp prefix)
global_sales_df = spark.sql("SELECT * FROM global_temp.global_sales")

1.11 Practical Example: Combining Multiple Data Sources

Let’s bring everything together with a practical example that combines data from different sources in Microsoft Fabric:

# Load customer data from CSV in OneLake
customers_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("Files/customers.csv")

# Load orders from Parquet
orders_df = spark.read.parquet("Files/orders.parquet")

# Load product information from Delta table
products_df = spark.read.format("delta") \
    .load("Tables/products_delta")

# Register as temporary views
customers_df.createOrReplaceTempView("customers")
orders_df.createOrReplaceTempView("orders")
products_df.createOrReplaceTempView("products")

# Use SQL to join data
report_df = spark.sql("""
    SELECT 
        c.customer_id,
        c.name,
        p.product_name,
        p.category,
        o.quantity,
        o.order_date,
        o.quantity * p.price as total_amount
    FROM customers c
    JOIN orders o ON c.customer_id = o.customer_id
    JOIN products p ON o.product_id = p.product_id
    WHERE o.order_date >= '2024-01-01'
    ORDER BY total_amount DESC
""")

# Show the combined data
report_df.show(10)

1.12 Summary

In this chapter, we explored the various ways to ingest data into PySpark within Microsoft Fabric:

  1. File Formats: We learned to load CSV, JSON, JSONL, Parquet, Delta, Excel, XML, and text files from OneLake.

  2. Data Sources: We covered OneLake files, OneLake Tables, external SQL databases, and APIs.

  3. Schema Handling: We explored both inferred schemas for quick exploration and explicit schemas for production workloads.

  4. Reader Options: We looked at common options for handling nulls, dates, and bad records.

  5. Temporary Views: We learned how to create temp views for SQL access to our DataFrames.

Microsoft Fabric’s integration with OneLake provides a seamless data ingestion experience, reducing the complexity typically associated with data loading in traditional Spark environments.

In the next chapter, we’ll explore how to clean and prepare this data for analysis using PySpark’s powerful transformation capabilities.

Back to top