Advanced incremental refresh framework for Microsoft Fabric dataflows with intelligent bucketing, automatic retry mechanisms, and CI/CD support.
Microsoft Fabric’s standard incremental refresh capabilities have limitations that prevent effective data loading in certain scenarios:
Common Problems:
How This Framework Solves These Issues:
This solution is ideal for data engineers working with Microsoft Fabric who need robust, production-ready incremental refresh capabilities beyond what’s available out of the box.
This framework excels in the following scenarios:
Before using this notebook, ensure you have the following:
pandas - Data manipulationsempy.fabric - Microsoft Fabric API clientnotebookutils.data - Fabric data connection utilitiesThis notebook implements an advanced framework for incremental data refresh in Microsoft Fabric dataflows. It is designed to handle scenarios where standard incremental refresh does not work due to limitations in the data source, such as query folding issues or bucket size constraints.
The notebook supports both regular Dataflow Gen2 and CI/CD Dataflow Gen2 objects, automatically detecting the dataflow type and using the appropriate Microsoft Fabric REST API endpoints.
The notebook orchestrates a coordinated refresh process across multiple Fabric components:
┌─────────────┐
│ Pipeline │ (Triggers notebook with parameters)
└──────┬──────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Notebook (DataflowRefresher) │
│ ┌────────────────────────────────────────────────────┐ │
│ │ 1. Read tracking table from Warehouse │ │
│ │ 2. Calculate date ranges & buckets │ │
│ │ 3. Update tracking table (Running status) │ │
│ │ 4. Call Fabric REST API to trigger dataflow │ │
│ │ 5. Poll for completion status │ │
│ │ 6. Update tracking table (Success/Failed status) │ │
│ └────────────────────────────────────────────────────┘ │
└─────────┬───────────────────────────────────┬───────────┘
│ │
▼ ▼
┌─────────────────────┐ ┌─────────────────────┐
│ Warehouse │ │ Fabric REST API │
│ [Incremental │ │ - Regular DF: │
│ Update] Table │ │ /dataflows/... │
│ - range_start │ │ - CI/CD DF: │
│ - range_end │ │ /items/.../jobs │
│ - status │ └──────────┬──────────┘
└─────────┬───────────┘ │
│ ▼
│ ┌───────────────────┐
└───────────────────────>│ Dataflow Gen2 │
(Dataflow reads range) │ (Power Query M) │
└─────────┬─────────┘
│
▼
┌───────────────────┐
│ Data Source │
│ (Filtered by │
│ date range) │
└─────────┬─────────┘
│
▼
┌───────────────────┐
│ Warehouse │
│ Destination │
│ Table │
└───────────────────┘
Key Flow:
Configure the following parameters when setting up the notebook activity in your Fabric pipeline:
| Parameter | Type | Required | Description |
|---|---|---|---|
workspace_id |
String | Yes | ID of the Fabric workspace containing the dataflow |
dataflow_id |
String | Yes | ID of the dataflow to refresh |
dataflow_name |
String | Yes | Name or description of the dataflow |
initial_load_from_date |
String | Yes* | Start date for initial historical load (format: ‘YYYY-MM-DD’). *Required only for first load |
bucket_size_in_days |
Integer | No | Size of each refresh bucket in days (default: 1) |
bucket_retry_attempts |
Integer | No | Number of retry attempts for failed buckets (default: 3) |
incrementally_update_last_n_days |
Integer | No | Number of days to overlap/refresh in incremental updates (default: 1) |
reinitialize_dataflow |
Boolean | No | Set to True to delete tracking data and restart from scratch (default: False) |
destination_table |
String | Yes | Name of the destination table in the warehouse where data is written. Can be just table name (uses dbo schema) or schema.table format for tables in other schemas |
incremental_update_column |
String | Yes | DateTime column used for incremental filtering |
is_cicd_dataflow |
Boolean | No | Explicitly specify if this is a CI/CD dataflow (auto-detected if not provided) |
The following constants must be configured inside the notebook:
| Constant | Example | Description |
|---|---|---|
SCHEMA |
"[Warehouse DB].[dbo]" |
Database schema where the tracking table resides |
INCREMENTAL_TABLE |
"[Incremental Update]" |
Name of the metadata tracking table |
CONNECTION_ARTIFACT |
"Warehouse name or id" |
Name of the warehouse artifact |
CONNECTION_ARTIFACT_ID |
"Workspace name or id" |
Technical ID of the warehouse |
CONNECTION_ARTIFACT_TYPE |
"Warehouse" |
Type of artifact (typically “Warehouse”) |
Follow these steps to set up and configure the notebook:
Incrementally Refresh Dataflow.ipynb fileOpen the notebook and update the following constants in the third code cell:
# Update these constants with your warehouse details
SCHEMA = "[YourWarehouseName].[dbo]"
INCREMENTAL_TABLE = "[Incremental Update]"
CONNECTION_ARTIFACT = "YourWarehouseName"
CONNECTION_ARTIFACT_ID = "your-warehouse-id-guid"
CONNECTION_ARTIFACT_TYPE = "Warehouse"
How to find your Warehouse ID:
https://app.fabric.microsoft.com/groups/{workspace_id}/warehouses/{warehouse_id}warehouse_id GUID from the URLEnsure your dataflow Power Query includes logic to read range_start and range_end from the tracking table (see Integration section).
[Incremental Update] table in your warehouse to verify tracking records are createdHere’s a complete example of how to configure the pipeline parameters for your first run:
Pipeline Parameters:
{
"workspace_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"dataflow_id": "x9y8z7w6-v5u4-3210-zyxw-vu9876543210",
"dataflow_name": "Sales Data Incremental Refresh",
"initial_load_from_date": "2024-01-01",
"bucket_size_in_days": 7,
"bucket_retry_attempts": 3,
"incrementally_update_last_n_days": 2,
"reinitialize_dataflow": false,
"destination_table": "FactSales",
"incremental_update_column": "OrderDate",
"is_cicd_dataflow": null
}
Parameter Explanation:
workspace_id: Your Fabric workspace GUID (found in workspace URL)dataflow_id: Your dataflow GUID (found in dataflow URL)dataflow_name: Descriptive name for logging/trackinginitial_load_from_date: Start loading data from January 1, 2024 (first run only)bucket_size_in_days: Process 7 days at a timeincrementally_update_last_n_days: Overlap last 2 days on each refreshdestination_table: Table name in warehouse (uses dbo schema by default)incremental_update_column: Date column used for filteringis_cicd_dataflow: Auto-detect dataflow type (set to true or false to override)2024-01-01 to yesterday 23:59:59range_start and range_end from tracking tabledbo.FactSales tableThe notebook automatically creates and manages an [Incremental Update] tracking table in the warehouse with the following schema:
dataflow_id, workspace_id, dataflow_name: Dataflow identifiersinitial_load_from_date: Historical start datebucket_size_in_days, incrementally_update_last_n_days: Configuration parametersdestination_table, incremental_update_column: Target table informationupdate_time, status: Current refresh status and timestamprange_start, range_end: Date range for the current/last refresh bucketis_cicd_dataflow: Flag indicating dataflow type (for API routing)The notebook automatically detects whether the dataflow is a CI/CD or regular dataflow by probing the official Microsoft Fabric API endpoints:
/v1/workspaces/{workspace_id}/items/{dataflow_id}/jobs/instances endpoint/v1.0/myorg/groups/{workspace_id}/dataflows/{dataflow_id}/refreshes endpointinitial_load_from_date is providedinitial_load_from_date to yesterday at 23:59:59bucket_size_in_daysbucket_retry_attempts times with exponential backoff (30s, 60s, 120s, etc.)incrementally_update_last_n_days is set: Uses min(last_end_date + 1 second, yesterday - N days) to ensure overlap without gapslast_end_date + 1 secondWhen a bucket refresh fails, the notebook:
RuntimeError with failure detailssys.exit(1) to mark the notebook as failed in the pipelineThis ensures transient issues (network glitches, temporary service unavailability) are handled gracefully, while persistent failures properly fail the pipeline.
incrementally_update_last_n_days is set, the start date ensures overlap while avoiding gapsThe notebook proactively manages database connections to prevent timeout issues:
Upon completion or failure, the notebook prints:
Dataflow refresh execution completed:
Status: Completed / Completed with N failures / Failed: [error message]
Total buckets processed: N
Successful refreshes: N
Failed refreshes: N
Total retry attempts: N
Duration: X.XX seconds
Dataflow type: CI/CD / Regular
If a bucket fails after all retries:
================================================================================
DATAFLOW REFRESH FAILED
================================================================================
Error: Bucket N/M failed after 3 attempts with status Failed. Range: YYYY-MM-DD to YYYY-MM-DD
The dataflow refresh has been terminated due to bucket failure after all retry attempts.
Please check the logs above for detailed error information.
================================================================================
The notebook then exits with code 1, causing the Fabric pipeline to mark the notebook activity as Failed.
Your dataflow Power Query should read the range_start and range_end parameters from the [Incremental Update] table:
let
Source = Sql.Database("[server]", "[database]"),
TrackingTable = Source{[Schema="dbo", Item="Incremental Update"]}[Data],
FilteredRows = Table.SelectRows(TrackingTable, each [dataflow_id] = "your-dataflow-id"),
RangeStart = FilteredRows{0}[range_start],
RangeEnd = FilteredRows{0}[range_end],
// Use RangeStart and RangeEnd to filter your data source
FilteredData = Table.SelectRows(YourDataSource,
each [YourDateColumn] >= RangeStart and [YourDateColumn] <= RangeEnd)
in
FilteredData
incrementally_update_last_n_days to 1 or more if your source data can be updated retroactively.dbo schema: Use just the table name (e.g., "SalesData")schema.table format (e.g., "staging.SalesData" or "analytics.SalesData")[Incremental Update] table in your warehouse to track refresh history and troubleshoot issues.Symptoms:
Solutions:
workspace_id, dataflow_id, dataflow_namedestination_table, incremental_update_columninitial_load_from_date (required for first run only)-- Test warehouse connection by running this in your warehouse
SELECT TOP 1 * FROM INFORMATION_SCHEMA.TABLES
SCHEMA, CONNECTION_ARTIFACT_ID, etc.Common Errors:
"initial_load_from_date is required for the first load" → Add this parameter for first execution"Connection timeout" → Verify warehouse is running and accessible"Table does not exist" → Notebook will auto-create tracking table on first runSymptoms:
Solutions:
range_start and range_end correctly-- Check current tracking state
SELECT TOP 5
dataflow_id,
dataflow_name,
status,
range_start,
range_end,
update_time
FROM [dbo].[Incremental Update]
WHERE dataflow_id = 'your-dataflow-id'
ORDER BY update_time DESC
bucket_size_in_days (e.g., 1 day instead of 7)Symptoms:
Solutions:
{
"is_cicd_dataflow": true // or false for regular dataflows
}
Symptoms:
Solutions:
Problem: Tracking table shows “Running” but notebook completed
Solution:
-- Manually check for stuck records
SELECT * FROM [dbo].[Incremental Update]
WHERE status = 'Running'
AND update_time < DATEADD(HOUR, -2, GETDATE())
-- If needed, manually update stuck records
UPDATE [dbo].[Incremental Update]
SET status = 'Failed'
WHERE dataflow_id = 'your-dataflow-id'
AND status = 'Running'
AND update_time < DATEADD(HOUR, -2, GETDATE())
Problem: Need to restart from scratch
Solution:
-- Option 1: Delete tracking record (will trigger initial load on next run)
DELETE FROM [dbo].[Incremental Update]
WHERE dataflow_id = 'your-dataflow-id'
-- Option 2: Use reinitialize_dataflow parameter
-- Set reinitialize_dataflow = true in pipeline parameters
Check tracking table history:
-- View refresh history
SELECT
dataflow_name,
status,
range_start,
range_end,
update_time,
DATEDIFF(SECOND,
LAG(update_time) OVER (PARTITION BY dataflow_id ORDER BY update_time),
update_time
) as seconds_since_last_refresh
FROM [dbo].[Incremental Update]
WHERE dataflow_id = 'your-dataflow-id'
ORDER BY update_time DESC
Check for data gaps:
-- Identify gaps in refreshed date ranges
WITH RangedData AS (
SELECT
range_start,
range_end,
LEAD(range_start) OVER (ORDER BY range_start) as next_start
FROM [dbo].[Incremental Update]
WHERE dataflow_id = 'your-dataflow-id'
AND status = 'Success'
)
SELECT
range_end as gap_start,
next_start as gap_end,
DATEDIFF(DAY, range_end, next_start) as gap_days
FROM RangedData
WHERE DATEADD(SECOND, 1, range_end) < next_start
If you continue experiencing issues: