patternpythonMajor
Incremental load patterns: append-only, merge, and delete-detect
Viewed 0 times
incremental load patternupsert etlwatermark incrementaldetect changes etlmerge pattern data
Problem
Loading full table snapshots nightly scales poorly as source tables grow. Teams need to process only new or changed rows but struggle with detecting deletes and managing the watermark state.
Solution
Three patterns for incremental loads:
import pandas as pd
from datetime import datetime
# Pattern 1: Append-only (immutable events)
def append_new_events(conn, last_loaded_ts: datetime):
df = pd.read_sql(
'SELECT * FROM events WHERE created_at > %s',
conn, params=[last_loaded_ts]
)
df.to_sql('events', dest_conn, if_exists='append', index=False)
return df['created_at'].max() # new watermark
# Pattern 2: Merge/upsert (mutable records)
def upsert_customers(conn, last_updated_ts: datetime):
changed = pd.read_sql(
'SELECT * FROM customers WHERE updated_at > %s',
conn, params=[last_updated_ts]
)
# Write to staging, then MERGE
changed.to_sql('customers_stage', dest_conn, if_exists='replace')
dest_conn.execute('MERGE INTO customers USING customers_stage ...')
# Pattern 3: Full hash comparison (no updated_at available)
# Compare row hashes between source and destination to find changes
import pandas as pd
from datetime import datetime
# Pattern 1: Append-only (immutable events)
def append_new_events(conn, last_loaded_ts: datetime):
df = pd.read_sql(
'SELECT * FROM events WHERE created_at > %s',
conn, params=[last_loaded_ts]
)
df.to_sql('events', dest_conn, if_exists='append', index=False)
return df['created_at'].max() # new watermark
# Pattern 2: Merge/upsert (mutable records)
def upsert_customers(conn, last_updated_ts: datetime):
changed = pd.read_sql(
'SELECT * FROM customers WHERE updated_at > %s',
conn, params=[last_updated_ts]
)
# Write to staging, then MERGE
changed.to_sql('customers_stage', dest_conn, if_exists='replace')
dest_conn.execute('MERGE INTO customers USING customers_stage ...')
# Pattern 3: Full hash comparison (no updated_at available)
# Compare row hashes between source and destination to find changes
Why
Incremental loads reduce processing time from O(total rows) to O(changed rows). Append-only is simplest and fastest. Merge handles updates and prevents duplicates. Hash comparison is a fallback when sources lack updated_at timestamps.
Gotchas
- updated_at columns must be indexed on the source or incremental queries scan the full table
- Deletes are invisible to updated_at-based detection — compare row counts or use a full hash scan periodically
- Watermarks stored in local variables are lost on failure — persist the last successful watermark to a state store
- Timezone handling in watermarks is a common bug — always store and compare in UTC
Context
Designing efficient incremental data load strategies for ETL pipelines
Revisions (0)
No revisions yet.