What is Apache Flink?
Apache Flink is a distributed stream processing framework designed for stateful computations over unbounded and bounded data streams. Unlike batch-first systems, Flink treats batch as a special case of streaming.
Flink provides exactly-once state consistency, event-time processing, and millisecond latency - making it ideal for real-time analytics and event-driven applications.
Flink vs Spark Streaming
┌─────────────────────────────────────────────────────────────────┐
│ Flink vs Spark Streaming │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Apache Flink │
│ ├── True streaming (event-by-event) │
│ ├── Native event-time processing │
│ ├── Exactly-once with Chandy-Lamport checkpoints │
│ ├── Millisecond latency │
│ ├── Built-in state management with RocksDB │
│ └── SQL/Table API unified with DataStream │
│ │
│ Spark Structured Streaming │
│ ├── Micro-batch processing │
│ ├── Event-time with watermarks │
│ ├── Exactly-once with checkpoints │
│ ├── Seconds to minutes latency (continuous mode: sub-ms) │
│ ├── State via StateStore │
│ └── Unified with batch DataFrame API │
│ │
│ When to Use Flink: │
│ ├── Ultra-low latency requirements │
│ ├── Complex event processing (CEP) │
│ ├── Large stateful applications │
│ └── Event-driven architectures │
│ │
│ When to Use Spark: │
│ ├── Unified batch + streaming pipelines │
│ ├── Existing Spark ecosystem │
│ ├── ML integration (MLlib) │
│ └── Simpler learning curve │
│ │
└─────────────────────────────────────────────────────────────────┘
Flink Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Flink Cluster Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Client (Job Submission) │ │
│ │ - Builds JobGraph from user code │ │
│ │ - Submits to JobManager │ │
│ └────────────────────────┬─────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────▼─────────────────────────────────┐ │
│ │ JobManager │ │
│ │ - Coordinates distributed execution │ │
│ │ - Schedules tasks │ │
│ │ - Triggers checkpoints │ │
│ │ - Handles failover recovery │ │
│ └───────────┬─────────────┬──────────────┬─────────────────┘ │
│ │ │ │ │
│ ┌───────────▼───┐ ┌──────▼──────┐ ┌───▼───────────┐ │
│ │ TaskManager 1 │ │TaskManager 2│ │ TaskManager 3 │ │
│ │ ┌───────────┐ │ │┌───────────┐│ │ ┌───────────┐ │ │
│ │ │ Task 1 │ │ ││ Task 3 ││ │ │ Task 5 │ │ │
│ │ │ Slot 1 │ │ ││ Slot 1 ││ │ │ Slot 1 │ │ │
│ │ └───────────┘ │ │└───────────┘│ │ └───────────┘ │ │
│ │ ┌───────────┐ │ │┌───────────┐│ │ ┌───────────┐ │ │
│ │ │ Task 2 │ │ ││ Task 4 ││ │ │ Task 6 │ │ │
│ │ │ Slot 2 │ │ ││ Slot 2 ││ │ │ Slot 2 │ │ │
│ │ └───────────┘ │ │└───────────┘│ │ └───────────┘ │ │
│ └───────────────┘ └─────────────┘ └───────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
DataStream API Basics
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.common.serialization import SimpleStringSchema
# Create execution environment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
# Configure Kafka source
kafka_source = KafkaSource.builder() \
.set_bootstrap_servers("localhost:9092") \
.set_topics("events") \
.set_group_id("flink-consumer") \
.set_starting_offsets(KafkaOffsetsInitializer.latest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
# Create DataStream from Kafka
stream = env.from_source(
kafka_source,
WatermarkStrategy.no_watermarks(),
"Kafka Source"
)
# Basic transformations
result = stream \
.map(lambda x: json.loads(x)) \
.filter(lambda x: x['event_type'] == 'purchase') \
.map(lambda x: (x['user_id'], x['amount']))
# Execute the job
env.execute("Event Processing Job")
Flink SQL & Table API
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
# Create Table Environment
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
# Create Kafka source table using SQL DDL
t_env.execute_sql("""
CREATE TABLE events (
event_id STRING,
user_id STRING,
event_type STRING,
amount DECIMAL(10, 2),
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-sql',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
""")
# Create sink table
t_env.execute_sql("""
CREATE TABLE purchase_metrics (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
user_id STRING,
total_purchases BIGINT,
total_amount DECIMAL(10, 2),
PRIMARY KEY (window_start, user_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/analytics',
'table-name' = 'purchase_metrics',
'username' = 'postgres',
'password' = 'password'
)
""")
# Windowed aggregation with SQL
t_env.execute_sql("""
INSERT INTO purchase_metrics
SELECT
TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start,
TUMBLE_END(event_time, INTERVAL '5' MINUTE) as window_end,
user_id,
COUNT(*) as total_purchases,
SUM(amount) as total_amount
FROM events
WHERE event_type = 'purchase'
GROUP BY
TUMBLE(event_time, INTERVAL '5' MINUTE),
user_id
""")
Event Time & Watermarks
from pyflink.common import WatermarkStrategy, Duration
from pyflink.common.watermark_strategy import TimestampAssigner
# Custom timestamp assigner
class EventTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, value, record_timestamp):
# Extract timestamp from event
return value['event_time_ms']
# Watermark strategy with bounded out-of-orderness
watermark_strategy = WatermarkStrategy \
.for_bounded_out_of_orderness(Duration.of_seconds(5)) \
.with_timestamp_assigner(EventTimestampAssigner())
# Apply watermark strategy
stream_with_watermarks = env.from_source(
kafka_source,
watermark_strategy,
"Events with Watermarks"
)
# Using SQL with watermarks
"""
CREATE TABLE events (
event_id STRING,
user_id STRING,
event_time TIMESTAMP(3),
-- Declare watermark: events can be up to 5 seconds late
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (...)
-- Watermark visualization:
-- Event Time: 12:00:05 12:00:10 12:00:15 12:00:20
-- Watermark: 12:00:00 12:00:05 12:00:10 12:00:15
-- (5 sec behind event time)
"""
Window Operations
# Python DataStream API Windows
from pyflink.datastream.window import TumblingEventTimeWindows,
SlidingEventTimeWindows, SessionWindows
from pyflink.common import Time
# Tumbling Window (non-overlapping)
tumbling = stream \
.key_by(lambda x: x['user_id']) \
.window(TumblingEventTimeWindows.of(Time.minutes(5))) \
.reduce(lambda a, b: {
'user_id': a['user_id'],
'count': a['count'] + b['count'],
'total': a['total'] + b['total']
})
# Sliding Window (overlapping)
sliding = stream \
.key_by(lambda x: x['product_id']) \
.window(SlidingEventTimeWindows.of(
Time.minutes(10), # window size
Time.minutes(5) # slide interval
)) \
.aggregate(AverageAggregate())
# Session Window (gap-based)
session = stream \
.key_by(lambda x: x['session_id']) \
.window(SessionWindows.with_gap(Time.minutes(30))) \
.process(SessionAnalyzer())
# SQL Window Functions
"""
-- Tumbling Window
SELECT
TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start,
user_id,
COUNT(*) as event_count
FROM events
GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE), user_id;
-- Hop (Sliding) Window
SELECT
HOP_START(event_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
product_id,
AVG(amount) as avg_amount
FROM events
GROUP BY HOP(event_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE), product_id;
-- Session Window
SELECT
SESSION_START(event_time, INTERVAL '30' MINUTE) as session_start,
user_id,
COUNT(*) as events_in_session
FROM events
GROUP BY SESSION(event_time, INTERVAL '30' MINUTE), user_id;
"""
State Management
from pyflink.datastream import RuntimeContext
from pyflink.datastream.functions import KeyedProcessFunction
from pyflink.datastream.state import ValueStateDescriptor, MapStateDescriptor
class FraudDetector(KeyedProcessFunction):
def open(self, runtime_context: RuntimeContext):
# Define state for last transaction
self.last_transaction = runtime_context.get_state(
ValueStateDescriptor("last_transaction", Types.FLOAT())
)
# Map state for transaction history
self.transaction_history = runtime_context.get_map_state(
MapStateDescriptor("history", Types.LONG(), Types.FLOAT())
)
def process_element(self, value, ctx):
last_amount = self.last_transaction.value()
current_amount = value['amount']
# Fraud detection logic
if last_amount is not None:
if current_amount > last_amount * 10:
# Large increase - potential fraud
yield {
'user_id': value['user_id'],
'alert': 'POTENTIAL_FRAUD',
'current': current_amount,
'previous': last_amount
}
# Update state
self.last_transaction.update(current_amount)
self.transaction_history.put(value['timestamp'], current_amount)
# Apply stateful function
alerts = stream \
.key_by(lambda x: x['user_id']) \
.process(FraudDetector())
# State Backends
"""
# RocksDB state backend (production)
env.set_state_backend(RocksDBStateBackend("hdfs://checkpoints"))
# HashMapStateBackend (in-memory, for small state)
env.set_state_backend(HashMapStateBackend())
State Backend Options:
├── HashMapStateBackend
│ ├── State stored in JVM heap
│ ├── Fast but limited by memory
│ └── Good for small state
│
└── RocksDBStateBackend
├── State stored on local disk with RocksDB
├── Can handle TB of state
├── Incremental checkpoints
└── Recommended for production
"""
Checkpointing & Fault Tolerance
from pyflink.datastream import CheckpointingMode
from pyflink.common import Duration
# Enable checkpointing
env.enable_checkpointing(60000) # checkpoint every 60 seconds
# Checkpoint configuration
config = env.get_checkpoint_config()
# Exactly-once semantics
config.set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
# Minimum pause between checkpoints
config.set_min_pause_between_checkpoints(30000) # 30 seconds
# Checkpoint timeout
config.set_checkpoint_timeout(600000) # 10 minutes
# Number of concurrent checkpoints
config.set_max_concurrent_checkpoints(1)
# Enable externalized checkpoints for recovery
config.enable_externalized_checkpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
)
# Checkpoint storage
config.set_checkpoint_storage("hdfs://namenode:8020/flink/checkpoints")
# Savepoints for planned migrations
"""
# Create savepoint
$ flink savepoint hdfs://savepoints/
# Stop with savepoint
$ flink stop --savepointPath hdfs://savepoints/
# Resume from savepoint
$ flink run -s hdfs://savepoints/savepoint-123 app.jar
"""
# Exactly-once with Kafka
"""
Flink achieves exactly-once with Kafka using:
1. Kafka transactions for sinks
2. Checkpointing with Chandy-Lamport algorithm
3. Two-phase commit protocol
Configuration:
env.get_config().set_auto_watermark_interval(200)
kafka_sink.set_semantic(DeliverySemantic.EXACTLY_ONCE)
kafka_sink.set_transaction_timeout(Duration.of_minutes(15))
"""
Complex Event Processing (CEP)
# Flink CEP - Pattern Detection
from pyflink.cep import CEP, Pattern
# Define a pattern: detect suspicious login attempts
# 3+ failed logins followed by a success within 5 minutes
pattern = Pattern.begin("failed") \
.where(lambda event: event['status'] == 'FAILED') \
.times_or_more(3) \
.consecutive() \
.within(Duration.of_minutes(5)) \
.followed_by("success") \
.where(lambda event: event['status'] == 'SUCCESS')
# Apply pattern to stream
pattern_stream = CEP.pattern(
login_stream.key_by(lambda x: x['user_id']),
pattern
)
# Process matched patterns
def detect_suspicious(match):
failed_events = match['failed']
success_event = match['success'][0]
return {
'user_id': success_event['user_id'],
'alert_type': 'SUSPICIOUS_LOGIN',
'failed_attempts': len(failed_events),
'success_ip': success_event['ip_address']
}
alerts = pattern_stream.select(detect_suspicious)
# SQL Pattern Recognition (MATCH_RECOGNIZE)
"""
SELECT *
FROM events
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY event_time
MEASURES
FIRST(FAIL.event_time) AS start_time,
LAST(FAIL.event_time) AS end_time,
COUNT(FAIL.event_id) AS failed_count,
SUCCESS.ip_address AS success_ip
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (FAIL{3,} SUCCESS)
DEFINE
FAIL AS status = 'FAILED',
SUCCESS AS status = 'SUCCESS'
) AS suspicious_logins;
"""
Flink with Kafka
from pyflink.datastream.connectors.kafka import (
KafkaSource, KafkaSink, KafkaRecordSerializationSchema
)
from pyflink.common.serialization import SimpleStringSchema
# Kafka Source with exactly-once
kafka_source = KafkaSource.builder() \
.set_bootstrap_servers("kafka:9092") \
.set_topics("input-events") \
.set_group_id("flink-processor") \
.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets(
OffsetResetStrategy.EARLIEST
)) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
# Kafka Sink with exactly-once delivery
kafka_sink = KafkaSink.builder() \
.set_bootstrap_servers("kafka:9092") \
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("output-events")
.set_value_serialization_schema(SimpleStringSchema())
.build()
) \
.set_delivery_guarantee(DeliveryGuarantee.EXACTLY_ONCE) \
.set_transactional_id_prefix("flink-sink") \
.build()
# Complete pipeline
stream = env.from_source(
kafka_source,
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(5)),
"Kafka Source"
)
processed = stream \
.map(parse_json) \
.filter(lambda x: x['event_type'] == 'purchase') \
.map(enrich_event) \
.map(to_json_string)
processed.sink_to(kafka_sink)
env.execute("Kafka Pipeline")
Deployment & Operations
# Flink on Kubernetes (recommended for production)
# flink-deployment.yaml
"""
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: streaming-job
spec:
image: flink:1.18
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "4"
state.backend: rocksdb
state.checkpoints.dir: s3://bucket/checkpoints
execution.checkpointing.interval: 60s
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "4096m"
cpu: 2
replicas: 3
job:
jarURI: s3://bucket/jobs/streaming-job.jar
parallelism: 8
upgradeMode: savepoint
"""
# Monitoring with Prometheus
"""
flink-conf.yaml:
metrics.reporters: prom
metrics.reporter.prom.factory.class:
org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9249
Key Metrics to Monitor:
├── flink_jobmanager_job_uptime (job health)
├── flink_taskmanager_job_task_operator_numRecordsIn
├── flink_taskmanager_job_task_operator_numRecordsOut
├── flink_taskmanager_job_task_checkpointDuration
├── flink_taskmanager_Status_JVM_Memory_Heap_Used
└── flink_taskmanager_job_latency_source_histogram
"""
# Common CLI commands
"""
# Submit a job
flink run -d -p 4 streaming-job.jar
# List running jobs
flink list
# Cancel a job
flink cancel
# Create savepoint
flink savepoint hdfs://savepoints/
# Resume from savepoint
flink run -s hdfs://savepoints/savepoint-xxx streaming-job.jar
"""
Best Practices
- Use RocksDB state backend: For production workloads with large state
- Configure proper watermarks: Balance latency vs completeness
- Enable incremental checkpoints: Reduces checkpoint overhead
- Monitor checkpoint duration: Should be << checkpoint interval
- Test with Flink's exactly-once: Verify end-to-end semantics
- Use savepoints for upgrades: Allows state-preserving job updates
Master Apache Flink
Our Data Engineering program covers stream processing with Flink and other modern frameworks.
Explore Data Engineering Program