Advanced incremental refresh framework for Microsoft Fabric dataflows with intelligent bucketing, automatic retry mechanisms, and CI/CD support.
Microsoft Fabric’s standard incremental refresh capabilities have limitations that prevent effective data loading in certain scenarios:
Common Problems:
How This Framework Solves These Issues:
This solution is ideal for data engineers working with Microsoft Fabric who need robust, production-ready incremental refresh capabilities beyond what’s available out of the box.
This framework excels in the following scenarios:
Before using this notebook, ensure you have the following:
pandas - Data manipulationsempy.fabric - Microsoft Fabric API clientnotebookutils.data - Fabric data connection utilitiesThis notebook implements an advanced framework for incremental data refresh in Microsoft Fabric dataflows. It is designed to handle scenarios where standard incremental refresh does not work due to limitations in the data source, such as query folding issues or bucket size constraints.
The notebook supports both regular Dataflow Gen2 and CI/CD Dataflow Gen2 objects, automatically detecting the dataflow type and using the appropriate Microsoft Fabric REST API endpoints.
The notebook orchestrates a coordinated refresh process across multiple Fabric components:
┌─────────────┐
│ Pipeline │ (Triggers notebook with parameters)
└──────┬──────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Notebook (DataflowRefresher) │
│ ┌────────────────────────────────────────────────────┐ │
│ │ 1. Read tracking table from Warehouse │ │
│ │ 2. Recover from interrupted runs (if needed) │ │
│ │ 3. Calculate date ranges & buckets │ │
│ │ 4. Backup data in affected range │ │
│ │ 5. Delete overlapping data from destination table │ │
│ │ 6. Update tracking table (Running status) │ │
│ │ 7. Call Fabric REST API to trigger dataflow │ │
│ │ (passes RangeStart/RangeEnd if supported) │ │
│ │ 8. Poll for completion status │ │
│ │ 9. On success: drop backup, update tracking table │ │
│ │ On failure: restore from backup, retry or fail │ │
│ └────────────────────────────────────────────────────┘ │
└─────────┬───────────────────────────────────┬───────────┘
│ │
▼ ▼
┌─────────────────────┐ ┌─────────────────────┐
│ Warehouse │ │ Fabric REST API │
│ │ │ - Regular DF: │
│ [Incremental │ │ /dataflows/... │
│ Update] Table │ │ - CI/CD DF: │
│ - range_start │ │ /items/.../jobs │
│ - range_end │ └──────────┬──────────┘
│ - status │ │
│ │ ▼
│ [_backup] Schema │ ┌───────────────────┐
│ (temporary backup │◄─────────>│ Dataflow Gen2 │
│ during refresh) │ │ (Power Query M) │
└─────────┬───────────┘ └─────────┬─────────┘
│ │
│ ▼
│ ┌───────────────────┐
└──────────────────────>│ Data Source │
(Dataflow reads range)│ (Filtered by │
│ date range) │
└─────────┬─────────┘
│
▼
┌───────────────────┐
│ Warehouse │
│ Destination │
│ Table │
└───────────────────┘
Key Flow:
RangeStart/RangeEnd as public parameters to CI/CD dataflows that support them, eliminating the need for the dataflow to query the tracking tableConfigure the following parameters when setting up the notebook activity in your Fabric pipeline:
| Parameter | Type | Required | Description |
|---|---|---|---|
workspace_id |
String | Yes | ID of the Fabric workspace containing the dataflow |
dataflow_id |
String | Yes | ID of the dataflow to refresh |
dataflow_name |
String | Yes | Name or description of the dataflow |
initial_load_from_date |
String | Yes* | Start date for initial historical load (format: ‘YYYY-MM-DD’). *Required only for first load |
bucket_size |
Integer or String | No | Size of each refresh bucket. Accepts: integer for days (e.g., 7), or string with suffix: "1M" for months, "1Y" for years (default: 1) |
bucket_retry_attempts |
Integer | No | Number of retry attempts for failed buckets (default: 3) |
incrementally_update_last |
Integer or String | No | Period to overlap/refresh in incremental updates. Accepts: integer for days (e.g., 1), or string with suffix: "3M" for months, "1Y" for years (default: 1) |
reinitialize_dataflow |
Boolean | No | Set to True to delete tracking data and restart from scratch (default: False) |
destination_table |
String | Yes | Name of the destination table in the warehouse where data is written. Can be just table name (uses dbo schema) or schema.table format for tables in other schemas |
incremental_update_column |
String | Yes | DateTime column used for incremental filtering |
is_cicd_dataflow |
Boolean | No | Explicitly specify if this is a CI/CD dataflow (auto-detected if not provided) |
load_from_date |
String | No | Ad-hoc extract start date (format: ‘YYYY-MM-DD’). Both load_from_date and load_to_date must be set. |
load_to_date |
String | No | Ad-hoc extract end date (format: ‘YYYY-MM-DD’). Both load_from_date and load_to_date must be set. |
backup_schema |
String | No | Schema name for backup tables, created automatically if it doesn’t exist (default: _backup) |
skip_backup |
Boolean | No | Skip backup/restore before delete operations for faster runs. Warning: data is not recoverable if a refresh fails (default: False) |
The following constants must be configured inside the notebook:
| Constant | Example | Description |
|---|---|---|
SCHEMA |
"[Warehouse DB].[dbo]" |
Database schema where the tracking table resides |
INCREMENTAL_TABLE |
"[Incremental Update]" |
Name of the metadata tracking table |
CONNECTION_ARTIFACT |
"Your Warehouse Name" |
Name of the warehouse artifact |
CONNECTION_ARTIFACT_ID |
"your-warehouse-id-guid" |
Technical ID (GUID) of the warehouse |
CONNECTION_ARTIFACT_TYPE |
"Warehouse" |
Type of artifact (typically “Warehouse”) |
Follow these steps to set up and configure the notebook:
Incrementally Refresh Dataflow.ipynb fileOpen the notebook and update the following constants in the third code cell:
# Update these constants with your warehouse details
SCHEMA = "[YourWarehouseName].[dbo]"
INCREMENTAL_TABLE = "[Incremental Update]"
CONNECTION_ARTIFACT = "YourWarehouseName"
CONNECTION_ARTIFACT_ID = "your-warehouse-id-guid"
CONNECTION_ARTIFACT_TYPE = "Warehouse"
How to find your Warehouse ID:
https://app.fabric.microsoft.com/groups/{workspace_id}/warehouses/{warehouse_id}warehouse_id GUID from the URLEnsure your dataflow Power Query includes logic to read range_start and range_end from the tracking table (see Integration section).
[Incremental Update] table in your warehouse to verify tracking records are createdHere’s a complete example of how to configure the pipeline parameters for your first run:
Pipeline Parameters:
{
"workspace_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"dataflow_id": "x9y8z7w6-v5u4-3210-zyxw-vu9876543210",
"dataflow_name": "Sales Data Incremental Refresh",
"initial_load_from_date": "2024-01-01",
"bucket_size": 7,
"bucket_retry_attempts": 3,
"incrementally_update_last": 2,
"reinitialize_dataflow": false,
"destination_table": "FactSales",
"incremental_update_column": "OrderDate",
"is_cicd_dataflow": null
}
Ad-hoc mode example – reload June 2024 data without affecting the incremental tracking state:
{
"load_from_date": "2024-06-01",
"load_to_date": "2024-06-30"
}
Parameter Explanation:
workspace_id: Your Fabric workspace GUID (found in workspace URL)dataflow_id: Your dataflow GUID (found in dataflow URL)dataflow_name: Descriptive name for logging/trackinginitial_load_from_date: Start loading data from January 1, 2024 (first run only)bucket_size: Process 7 days at a timeincrementally_update_last: Overlap last 2 days on each refreshdestination_table: Table name in warehouse (uses dbo schema by default)incremental_update_column: Date column used for filteringis_cicd_dataflow: Auto-detect dataflow type (set to true or false to override)2024-01-01 to yesterday 23:59:59range_start and range_end from tracking tabledbo.FactSales tableThe notebook automatically creates and manages an [Incremental Update] tracking table in the warehouse with the following schema:
dataflow_id, workspace_id, dataflow_name: Dataflow identifiersinitial_load_from_date: Historical start datebucket_size, incrementally_update_last: Configuration parametersdestination_table, incremental_update_column: Target table informationupdate_time, status: Current refresh status and timestamp. Recognized success statuses: Success, Succeeded, Successful, Completed. Failure/terminal statuses: Failed, Cancelled, Error, Timeoutrange_start, range_end: Date range for the current/last refresh bucketis_cicd_dataflow: Flag indicating dataflow type (for API routing)is_adhoc: Flag indicating this is an ad-hoc extract row (not used for incremental state)The notebook automatically detects whether the dataflow is a CI/CD or regular dataflow by probing the official Microsoft Fabric API endpoints:
/v1/workspaces/{workspace_id}/items/{dataflow_id}/v1/workspaces/{workspace_id}/dataflows/{dataflow_id}/parameters/v1/workspaces/{workspace_id}/items/{dataflow_id}/jobs/instances?jobType=Execute/v1/workspaces/{workspace_id}/items/{dataflow_id}/jobs/instances?jobType=Refresh/v1/workspaces/{workspace_id}/items/{dataflow_id}/jobs/instances/v1.0/myorg/groups/{workspace_id}/dataflows/{dataflow_id}/refreshes/v1.0/myorg/groups/{workspace_id}/dataflows/{dataflow_id}/transactionsinitial_load_from_date is providedinitial_load_from_date to yesterday at 23:59:59bucket_sizeskip_backup is True)bucket_retry_attempts times with exponential backoff (30s, 60s, 120s, etc.)skip_backup is True)incrementally_update_last is set: Uses min(last_end_date + 1 second, yesterday - N days) to ensure overlap without gapslast_end_date + 1 secondWhen a bucket refresh fails, the notebook retries with exponential backoff. The bucket_retry_attempts parameter controls the total number of attempts (including the initial attempt). With the default of 3:
Each subsequent retry doubles the wait time (30s × 2^(attempt-1)). Setting bucket_retry_attempts to 4 would add a fourth attempt after a 120-second wait.
RuntimeError with failure detailssys.exit(1) to mark the notebook as failed in the pipelineThis ensures transient issues (network glitches, temporary service unavailability) are handled gracefully, while persistent failures properly fail the pipeline.
incrementally_update_last is set, the start date ensures overlap while avoiding gapsThe notebook proactively manages database connections to prevent timeout issues:
Before deleting data from the destination table, the notebook backs up the affected rows into a separate schema:
[Database].[_backup].[_backup_<dataflow_id_no_hyphens>] (schema configurable via backup_schema; hyphens are stripped from the dataflow ID)Set skip_backup = True to disable backups for faster execution. Warning: with backups disabled, data deleted before a failed refresh cannot be recovered.
Set both load_from_date and load_to_date to load data for an arbitrary date range without modifying the incremental tracking state:
is_adhoc flag) so the main incremental row is untouchedbucket_size settingUpon completion or failure, the notebook prints:
Dataflow refresh execution completed:
Status: Completed / Completed with N failures / Failed: [error message]
Total buckets processed: N
Successful refreshes: N
Failed refreshes: N
Total retry attempts: N
Duration: X.XX seconds
Dataflow type: CI/CD / Regular
If a bucket fails after all retries:
================================================================================
DATAFLOW REFRESH FAILED
================================================================================
Error: Bucket N/M failed after 3 attempts with status Failed. Range: YYYY-MM-DD to YYYY-MM-DD
The dataflow refresh has been terminated due to bucket failure after all retry attempts.
Please check the logs above for detailed error information.
================================================================================
For unexpected (non-retry) errors, the output uses a distinct header:
================================================================================
DATAFLOW REFRESH ERROR
================================================================================
Error: [error message]
An unexpected error occurred during the dataflow refresh process.
Please check the logs above for detailed error information.
================================================================================
In both cases, the notebook exits with code 1, causing the Fabric pipeline to mark the notebook activity as Failed.
If your dataflow is a CI/CD Dataflow Gen2, you can define RangeStart and RangeEnd as public parameters (type: DateTime). The notebook will automatically discover these parameters and pass the date range values directly when triggering the refresh — no need for the dataflow to query the tracking table.
RangeStart, Type: DateTimeRangeEnd, Type: DateTimelet
FilteredData = Table.SelectRows(YourDataSource,
each [YourDateColumn] >= RangeStart and [YourDateColumn] <= RangeEnd)
in
FilteredData
The notebook uses the Execute endpoint to pass these values at refresh time. If parameter discovery or the Execute call fails, it falls back to the standard Refresh endpoint automatically.
Note: The tracking table is still updated with
range_start/range_endregardless of whether parameters are passed directly. This maintains backward compatibility, status tracking, and retry logic.
Your dataflow Power Query reads the range_start and range_end values from the [Incremental Update] table:
let
Source = Sql.Database("[server]", "[database]"),
TrackingTable = Source{[Schema="dbo", Item="Incremental Update"]}[Data],
FilteredRows = Table.SelectRows(TrackingTable, each
[dataflow_id] = "your-dataflow-id"
and ([is_adhoc] = false or [is_adhoc] = null)),
RangeStart = FilteredRows{0}[range_start],
RangeEnd = FilteredRows{0}[range_end],
// Use RangeStart and RangeEnd to filter your data source
FilteredData = Table.SelectRows(YourDataSource,
each [YourDateColumn] >= RangeStart and [YourDateColumn] <= RangeEnd)
in
FilteredData
Note: The filter on
is_adhocensures the dataflow reads the incremental tracking row, not a temporary ad-hoc row. If you are using ad-hoc extract mode, the ad-hoc row’sis_adhocflag is set totrueso both rows can coexist without conflict.
incrementally_update_last to 1 or more if your source data can be updated retroactively.dbo schema: Use just the table name (e.g., "SalesData")schema.table format (e.g., "staging.SalesData" or "analytics.SalesData")[Incremental Update] table in your warehouse to track refresh history and troubleshoot issues.skip_backup as False (default) for production pipelines. Only use skip_backup = True for development/testing or when the source data can be easily reloaded.Symptoms:
Solutions:
workspace_id, dataflow_id, dataflow_namedestination_table, incremental_update_columninitial_load_from_date (required for first run only)-- Test warehouse connection by running this in your warehouse
SELECT TOP 1 * FROM INFORMATION_SCHEMA.TABLES
SCHEMA, CONNECTION_ARTIFACT_ID, etc.Common Errors:
"initial_load_from_date is required for the first load" → Add this parameter for first execution"Connection timeout" → Verify warehouse is running and accessible"Table does not exist" → Notebook will auto-create tracking table on first runSymptoms:
Solutions:
range_start and range_end correctly-- Check current tracking state
SELECT TOP 5
dataflow_id,
dataflow_name,
status,
range_start,
range_end,
update_time
FROM [dbo].[Incremental Update]
WHERE dataflow_id = 'your-dataflow-id'
ORDER BY update_time DESC
bucket_size (e.g., 1 day instead of 7)Symptoms:
Solutions:
{
"is_cicd_dataflow": true // or false for regular dataflows
}
Symptoms:
Solutions:
Problem: Tracking table shows “Running” but notebook completed
Solution:
-- Manually check for stuck records
SELECT * FROM [dbo].[Incremental Update]
WHERE status = 'Running'
AND update_time < DATEADD(HOUR, -2, GETDATE())
-- If needed, manually update stuck records
UPDATE [dbo].[Incremental Update]
SET status = 'Failed'
WHERE dataflow_id = 'your-dataflow-id'
AND status = 'Running'
AND update_time < DATEADD(HOUR, -2, GETDATE())
Problem: Need to restart from scratch
Solution:
-- Option 1: Delete tracking record (will trigger initial load on next run)
DELETE FROM [dbo].[Incremental Update]
WHERE dataflow_id = 'your-dataflow-id'
-- Option 2: Use reinitialize_dataflow parameter
-- Set reinitialize_dataflow = true in pipeline parameters
Problem: Leftover backup table exists
Solution: The notebook automatically detects and recovers from leftover backup tables on the next run. If you need to manually intervene:
-- Check for backup tables
SELECT name FROM sys.tables WHERE schema_id = SCHEMA_ID('_backup')
-- Manually restore from a backup if needed (note: hyphens are stripped from dataflow ID)
INSERT INTO [dbo].[YourTable] SELECT * FROM [_backup].[_backup_<dataflow_id_no_hyphens>]
-- Drop the backup table
DROP TABLE IF EXISTS [_backup].[_backup_<dataflow_id_no_hyphens>]
Check tracking table history:
-- View refresh history
SELECT
dataflow_name,
status,
range_start,
range_end,
update_time,
DATEDIFF(SECOND,
LAG(update_time) OVER (PARTITION BY dataflow_id ORDER BY update_time),
update_time
) as seconds_since_last_refresh
FROM [dbo].[Incremental Update]
WHERE dataflow_id = 'your-dataflow-id'
ORDER BY update_time DESC
Check for data gaps:
-- Identify gaps in refreshed date ranges
WITH RangedData AS (
SELECT
range_start,
range_end,
LEAD(range_start) OVER (ORDER BY range_start) as next_start
FROM [dbo].[Incremental Update]
WHERE dataflow_id = 'your-dataflow-id'
AND status = 'Success'
)
SELECT
range_end as gap_start,
next_start as gap_end,
DATEDIFF(DAY, range_end, next_start) as gap_days
FROM RangedData
WHERE DATEADD(SECOND, 1, range_end) < next_start
If you continue experiencing issues:
RangeStart/RangeEnd as public parameters to CI/CD dataflows via the Execute endpoint, with graceful fallback to the Refresh endpoint