patternpythonMajor
Apache Flink basics: stateful stream processing with event time
Viewed 0 times
flink event timeflink watermark late datapyflink tumbling windowflink stateful processingstream processing late arrivals
Problem
Stream processing with processing time (wall clock) produces wrong aggregations when events arrive late or out of order. A 1-minute window using processing time can miss events that arrived 30 seconds late.
Solution
Use event time with watermarks in Flink to handle late arrivals:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# Define source with event time and watermark
t_env.execute_sql("""
CREATE TABLE clicks (
user_id BIGINT,
event_type STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'clicks',
'properties.bootstrap.servers' = 'broker:9092',
'format' = 'json'
)
""")
# Tumbling window aggregation on event time
t_env.execute_sql("""
SELECT
user_id,
TUMBLE_START(ts, INTERVAL '1' MINUTE) AS window_start,
COUNT(*) AS click_count
FROM clicks
GROUP BY user_id, TUMBLE(ts, INTERVAL '1' MINUTE)
""")
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# Define source with event time and watermark
t_env.execute_sql("""
CREATE TABLE clicks (
user_id BIGINT,
event_type STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'clicks',
'properties.bootstrap.servers' = 'broker:9092',
'format' = 'json'
)
""")
# Tumbling window aggregation on event time
t_env.execute_sql("""
SELECT
user_id,
TUMBLE_START(ts, INTERVAL '1' MINUTE) AS window_start,
COUNT(*) AS click_count
FROM clicks
GROUP BY user_id, TUMBLE(ts, INTERVAL '1' MINUTE)
""")
Why
Event time processing uses the timestamp embedded in the event, not when it was processed. Watermarks tell Flink 'all events up to time T-lag have arrived' so it can close windows and emit results even with late data, producing correct aggregations regardless of network delays.
Gotchas
- Watermark lag = maximum tolerated lateness; setting it too high increases result latency, too low drops late events
- Flink state grows indefinitely without TTL configured — set StateTtlConfig on all stateful operations
- PyFlink Table API is more limited than the Java/Scala DataStream API — complex operators may require Java UDFs
- Checkpointing must be enabled for fault tolerance; without it, Flink restarts from scratch on failure
Context
Building real-time streaming aggregations with Apache Flink that must handle late-arriving events
Revisions (0)
No revisions yet.