Use real-time mode in Lakeflow Spark Declarative Pipelines

Important

Real-time mode in Lakeflow Spark Declarative Pipelines is in Public Preview on Databricks Runtime 18.1.2 on the preview channel.

Real-time mode enables ultra-low-latency data processing, with end-to-end latency as low as five milliseconds. Use real-time mode for operational workloads that require immediate response to streaming data, such as fraud detection and real-time personalization.

Real-time mode is also available directly in Structured Streaming outside of pipelines. See Real-time mode in Structured Streaming.

How real-time mode achieves low latency

Real-time mode differs from standard continuous processing in three key ways:

  • Long-running batches: The system processes data as it becomes available in the source within long-running batches (default is five minutes).
  • Simultaneous stage scheduling: All query stages are scheduled at the same time. The compute resource must have enough available task slots to cover all stages concurrently. See Compute sizing.
  • Streaming shuffle: Data is passed between stages as soon as it is produced, rather than waiting for an upstream stage to complete before starting the downstream stage.

The checkpoint interval (configured via pipelines.trigger.interval) controls how frequently state and source offsets are persisted to durable storage. Longer intervals reduce checkpointing overhead but increase recovery time after a failure and delay metrics reporting. Shorter intervals improve durability but add overhead.

Real-time mode and continuous pipelines

Real-time mode is a specialized type of continuous trigger. Continuous mode is still required — real-time mode adds flow-level latency optimizations on top. To use real-time mode, the pipeline must first run in continuous mode. Real-time mode then applies additional optimizations at the flow level to achieve sub-second latency beyond what standard continuous processing provides.

Enabling real-time mode requires three configuration steps:

  1. Set the pipeline to continuous mode.
  2. Enable real-time mode at the pipeline level.
  3. Define a real-time update flow.

Requirements

Requirement Value
Databricks Runtime 18.1.2 on the SDP preview channel
Compute type Classic compute or serverless

Configure real-time mode

Step 1: Set the pipeline to continuous mode

In your pipeline settings, set Pipeline mode to Continuous, or set it in the pipeline JSON:

{
  "continuous": true
}

Step 2: Enable real-time mode at the pipeline level

In your pipeline settings, add the following key to the Spark configuration under Advanced > Spark config:

spark.databricks.streaming.realTimeMode.enabled = true

You can also set this in the pipeline JSON:

{
  "continuous": true,
  "spark_conf": {
    "spark.databricks.streaming.realTimeMode.enabled": "true"
  }
}

Step 3: Define a real-time update flow

Real-time mode requires an update flow. Use dp.create_sink() to define the output target, then use the @dp.update_flow decorator with pipelines.trigger set to "RealTime" and target pointing to the sink.

from pyspark import pipelines as dp

# Define the output sink
dp.create_sink(
    "my_kafka_sink",
    "kafka",
    {
        "kafka.bootstrap.servers": "<bootstrap-servers>",
        "topic": "<output-topic>",
    }
)

# Define the real-time update flow targeting the sink
@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",  # optional; defaults to 5 minutes
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "<bootstrap-servers>")
            .option("subscribe", "<input-topic>")
            .load()
    )

Flow-level configuration parameters:

Parameter Required Default Description
pipelines.trigger Yes Set to "RealTime" to enable real-time mode for this flow.
pipelines.trigger.interval No "5 minutes" Checkpoint interval. Controls how often state and offsets are committed. Shorter values improve recoverability; longer values reduce overhead.

Code examples

Kafka to Kafka

Read from a Kafka topic and write to a Kafka output target:

from pyspark import pipelines as dp

dp.create_sink("kafka_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="kafka_rtm_flow",
    target="kafka_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def kafka_rtm_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .option("startingOffsets", "latest")
            .load()
            .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
    )

Enrich with a broadcast join

Join a Kafka stream against a static lookup table. Only broadcast (stream-to-static) joins are supported. Stream-to-stream joins are not supported in real-time mode.

from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr

dp.create_sink("enriched_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": enriched_output_topic,
})

@dp.update_flow(
    name="enriched_events_flow",
    target="enriched_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def enriched_events():
    lookup = spark.read.table("catalog.schema.lookup_table")
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .withColumn("event_key", expr("CAST(value AS STRING)"))
            .join(broadcast(lookup), expr("event_key = lookup_key"))
            .select("event_key", "lookup_value", "timestamp")
    )

Aggregation

Count events by key using a stateful groupBy. Set spark.sql.shuffle.partitions to match the input partition count for stateful operations:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
        "spark.sql.shuffle.partitions": "8",
    }
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
            .groupBy(col("event_type"))
            .count()
    )

Supported sources and sinks

Connector As source As sink Notes
Apache Kafka
AWS MSK Uses the Kafka-compatible interface.
Azure Event Hubs (Kafka connector) Uses the Kafka-compatible interface.
Amazon Kinesis Not supported Use for EFO (Enhanced Fan-Out) mode only.
Delta Not supported Not supported

Compute sizing

You can run one real-time pipeline per compute resource if the compute has enough task slots. Available task slots must cover all tasks across all query stages.

Pipeline type Configuration Required task slots
Single-stage stateless (Kafka source + sink) maxPartitions = 8 8
Two-stage stateful (Kafka source + shuffle) maxPartitions = 8, shuffle partitions = 20 28 (8 + 20)
Three-stage (Kafka source + two shuffles) maxPartitions = 8, two shuffle stages of 20 each 48 (8 + 20 + 20)

If you don't set maxPartitions, use the number of partitions in the Kafka topic.

Operator support

Category Operator Supported
Stateless Selection, Projection
UDFs Scala UDF ✓ (with limitations)
UDFs Python UDF ✓ (with limitations)
Aggregation sum, count, max, min, avg
Windowing Tumbling, Sliding
Windowing Session Not supported
Deduplication dropDuplicates ✓ (unbounded state)
Deduplication dropDuplicatesWithinWatermark Not supported
Joins Broadcast table join
Joins Stream-to-stream join Not supported
Custom transformWithState ✓ (with behavioral differences)
Custom union ✓ (with limitations)
Custom forEach Not supported
Custom flatMapGroupsWithState Not supported
Custom mapPartitions Not supported
Custom forEachBatch Not supported

transformWithState in real-time mode

transformWithState is supported in real-time mode with the following differences from micro-batch processing:

  • handleInputRows is invoked once per row rather than once per key per batch. The inputRows iterator yields a single value per invocation.
  • Event-time timers are not supported. Processing-time timers fire when a long-running batch terminates if no data has arrived.
  • transformWithStateInPandas is not supported.

Pandas UDFs in real-time mode

To minimize latency with pandas UDFs, set spark.sql.execution.arrow.maxRecordsPerBatch to 1. This optimizes for latency at the expense of throughput. If throughput is also important, set this value to 100 or higher.

Monitor real-time mode performance

Real-time mode exposes latency metrics in StreamingQueryProgress under the latencies field. Access these metrics via a StreamingQueryListener or by inspecting the lastProgress property on the streaming query.

Metric Description
processingLatencyMs Time between when a record is read by the flow and when it is fully processed by the flow
sourceQueuingLatencyMs Time between when a record is successfully written to the message bus (for example, log append time in Kafka) and when it is first read by the flow
e2eLatencyMs Total end-to-end latency from when the record is produced at the source to when it is fully processed by the flow

Each metric is reported as p50, p90, p95, and p99 percentiles.

Limitations

One real-time flow per pipeline is recommended. Multiple flows are allowed, but task slot contention across flows increases latency.

For a complete list of operator and source limitations, see Real-time mode limitations.

Additional resources