Apache Flink 2.2.0: Empowering Real-Time Data and AI for the Stream Processing Era

stream-processing

Apache Flink 2.2.0 advances real-time data processing and AI, introducing `ML_PREDICT` for LLM inference and `VECTOR_SEARCH` for vector similarity. It enhances Materialized Tables, Delta Joins, batch processing, and PyFlink, boosting performance and scalability for streaming AI applications.

The Apache Flink PMC is proud to announce the release of Apache Flink 2.2.0. This version significantly enriches AI capabilities, enhances materialized tables and the Connector framework, and improves batch processing and PyFlink support. This release is the result of contributions from 73 global contributors, implementing 9 Flink Improvement Proposals (FLIPs), and resolving over 220 issues.

Apache Flink 2.2 ushers in the AI era by seamlessly integrating real-time data processing with artificial intelligence. It introduces ML_PREDICT for large language model inference and VECTOR_SEARCH for real-time vector similarity search, empowering streaming AI applications. Enhanced features like Materialized Tables, Delta Joins, balanced task scheduling, and smarter connectors (including rate limiting and skew-aware split assignment) significantly boost performance, scalability, and reliability—laying a robust foundation for intelligent, low-latency data pipelines. We extend our gratitude to all contributors for their invaluable support!

Key highlights of this release include:

Real-time AI Function

Apache Flink has supported leveraging LLM capabilities through the ML_PREDICT function in Flink SQL since version 2.1, enabling users to perform semantic analysis efficiently. In Flink 2.2, the Table API also supports model inference operations, allowing direct integration of machine learning models into data processing pipelines. Users can create models with specific providers (like OpenAI) and use them to make predictions on their data.

Example: Creating and Using Models

// 1. Set up the local environment
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);

// 2. Create a source table from in-memory data
Table myTable = tEnv.fromValues(
    ROW(FIELD("text", STRING())),
    row("Hello"),
    row("Machine Learning"),
    row("Good morning")
);

// 3. Create model
tEnv.createModel(
    "my_model",
    ModelDescriptor.forProvider("openai")
        .inputSchema(Schema.newBuilder().column("input", STRING()).build())
        .outputSchema(Schema.newBuilder().column("output", STRING()).build())
        .option("endpoint", "https://api.openai.com/v1/chat/completions")
        .option("model", "gpt-4.1")
        .option("system-prompt", "translate to chinese")
        .option("api-key", "<your-openai-api-key-here>")
        .build()
);
Model model = tEnv.fromModel("my_model");

// 4. Use the model to make predictions
Table predictResult = model.predict(myTable, ColumnList.of("text"));

// 5. Async prediction example
Table asyncPredictResult = model.predict(myTable, ColumnList.of("text"), Map.of("async", "true"));

Building on the ML_PREDICT function introduced in Flink SQL 2.1, which enables converting unstructured data into high-dimensional vector features, Flink 2.2 addresses the need for real-time online querying and similarity analysis of these vector spaces. The VECTOR_SEARCH function is now available, allowing users to perform streaming vector similarity searches and real-time context retrieval directly within Flink.

Take the following SQL statements as an example:

-- Basic usage
SELECT *
FROM input_table,
LATERAL VECTOR_SEARCH(TABLE vector_table, input_table.vector_column, DESCRIPTOR(index_column), 10);

-- With configuration options
SELECT *
FROM input_table,
LATERAL VECTOR_SEARCH(TABLE vector_table, input_table.vector_column, DESCRIPTOR(index_column), 10, MAP['async', 'true', 'timeout', '100s']);

-- Using named parameters
SELECT *
FROM input_table,
LATERAL VECTOR_SEARCH(
    SEARCH_TABLE => TABLE vector_table,
    COLUMN_TO_QUERY => input_table.vector_column,
    COLUMN_TO_SEARCH => DESCRIPTOR(index_column),
    TOP_K => 10,
    CONFIG => MAP['async', 'true', 'timeout', '100s']
);

-- Searching with constant value
SELECT *
FROM VECTOR_SEARCH(TABLE vector_table, ARRAY[10, 20], DESCRIPTOR(index_column), 10, );

Materialized Table

Materialized Table is a new table type in Flink SQL designed to simplify both batch and stream data pipelines, offering a consistent development experience. By specifying data freshness and a query during creation, the engine automatically derives the schema and creates a data refresh pipeline to achieve the desired freshness.

From Flink 2.2, the FRESHNESS clause is no longer mandatory for CREATE MATERIALIZED TABLE and CREATE OR ALTER MATERIALIZED TABLE DDL statements. This release introduces a new MaterializedTableEnricher interface, providing a formal extension point for customizable default logic. This allows advanced users and vendors to implement "smart" default behaviors, such as inferring freshness from upstream tables. Additionally, users can leverage DISTRIBUTED BY or DISTRIBUTED INTO to support bucketing concepts for Materialized tables and use SHOW MATERIALIZED TABLES to display all materialized tables.

Take the following SQL statements as an example:

CREATE MATERIALIZED TABLE my_materialized_table
PARTITIONED BY (ds)
DISTRIBUTED INTO 5 BUCKETS
FRESHNESS = INTERVAL '1' HOUR
AS SELECT ds FROM ...

SinkUpsertMaterializer V2

The SinkUpsertMaterializer operator in Flink reconciles out-of-order changelog events before sending them to an upsert sink. Previous implementations could suffer from exponentially degrading performance in certain scenarios. Flink 2.2 introduces a new, optimized implementation to address these cases.

Delta Join

Apache Flink 2.1 introduced a new delta join operator to mitigate big state challenges in regular joins. It replaces large states with a bidirectional lookup-based join that reuses data directly from source tables.

Flink 2.2 enhances support for converting more SQL patterns into delta joins. Delta joins now support consuming CDC sources without DELETE operations and allow projection and filter operations after the source. Furthermore, delta joins include support for caching, which helps reduce requests to external storage.

Currently, tables with Apache Fluss (Incubating) can be used as source tables for delta joins.

SQL Types

Before Flink 2.2, row types defined in SQL (e.g., SELECT CAST(f AS ROW<i NOT NULL>)) ignored the NOT NULL constraint. While aligned with the SQL standard, this led to type inconsistencies and cryptic error messages when working with nested data, preventing the use of rows in computed columns or join keys. The new behavior now correctly considers nullability. The config option table.legacy-nested-row-nullability can restore the old behavior if needed, but updating existing queries that ignored constraints is recommended.

Casting to TIME type now correctly considers precision (0-3). Casting incorrect strings to time (e.g., hour component > 24) now results in a runtime exception. Casting between BINARY and VARBINARY will also correctly consider the target length.

Use UniqueKeys instead of Upsertkeys for state management

This is a significant optimization and a breaking change for the StreamingMultiJoinOperator. As noted in the release notes, this operator was launched in an experimental state for Flink 2.1 due to ongoing optimizations that could involve breaking changes.

Runtime

Balanced Tasks Scheduling

Flink 2.2 introduces a balanced tasks scheduling strategy to achieve task load balancing for Task Managers (TMs) and reduce job bottlenecks, optimizing resource utilization and overall job efficiency.

Enhanced Job History Retention Policies for HistoryServer

Prior to Flink 2.2, HistoryServer supported only a quantity-based job archive retention policy, which was insufficient for scenarios requiring time-based retention or combined rules. Users can now utilize the new configuration historyserver.archive.retained-ttl in conjunction with historyserver.archive.retained-jobs to meet more diverse retention requirements.

Metrics

Since Flink 2.2.0, users can assign custom metric variables for each operator/transformation within a job. These variables are subsequently converted to tags/labels by metric reporters, allowing users to tag/label specific operator's metrics. For example, this can be used to name and differentiate sources.

Users can also control the level of detail for checkpoint spans via traces.checkpoint.span-detail-level. Higher levels report a tree of spans for each task and subtask. Reported custom spans can now contain children spans.

Connectors

Introduce RateLimiter for Scan Source

Flink jobs frequently exchange data with external systems, consuming network bandwidth and CPU. Aggressive data pulling can disrupt other workloads, such as with the MySQL CDC connector. Flink 2.2 introduces a RateLimiter interface to provide request rate limiting for Scan Sources. Connector developers can integrate with rate limiting frameworks to implement custom read restriction strategies. This feature is currently only available in the DataStream API.

Balanced splits assignment

The SplitEnumerator is responsible for assigning splits but previously lacked visibility into the actual runtime status or distribution of these splits, making it difficult to guarantee even sharding and often leading to data skew. From Flink 2.2, SplitEnumerator now has access to splits distribution information and provides the ability to evenly assign splits at runtime. This feature can, for example, be used to address data skew issues in the Kafka connector.

Other Improvements

Flink 2.2 adds support for async functions in the Python DataStream API. This enables Python users to efficiently query external services in their Flink jobs, such as large-sized LLMs typically deployed in standalone GPU clusters.

Furthermore, comprehensive support for external service access stability has been provided. This includes limiting the number of concurrent requests to external services to prevent overwhelming them and adding retry support to tolerate temporary unavailability caused by network jitter or other transient issues.

Here is a simple example showing how to use it:

from typing import List
from ollama import AsyncClient
from pyflink.common import Types, Time, Row
from pyflink.datastream import (
    StreamExecutionEnvironment,
    AsyncDataStream,
    AsyncFunction,
    RuntimeContext,
    CheckpointingMode,
)

class AsyncLLMRequest(AsyncFunction[Row, str]):
    def __init__(self, host, port):
        self._host = host
        self._port = port

    def open(self, runtime_context: RuntimeContext):
        self._client = AsyncClient(host='{}:{}'.format(self._host, self._port))

    async def async_invoke(self, value: Row) -> List[str]:
        message = {"role": "user", "content": value.question}
        question_id = value.id
        ollam_response = await self._client.chat(model="qwen3:4b", messages=[message])
        return [f"Question ID {question_id}: response: {ollam_response['message']['content']}"]

    def timeout(self, value: Row) -> List[str]:
        # return a default value in case timeout
        return [f"Timeout for this question: {value.a}"]

def main(output_path):
    env = StreamExecutionEnvironment.get_execution_environment()
    env.enable_checkpointing(30000, CheckpointingMode.EXACTLY_ONCE)

    ds = env.from_collection(
        [
            ("Who are you?", 0),
            ("Tell me a joke", 1),
            ("Tell me the result of comparing 0.8 and 0.11", 2),
        ],
        type_info=Types.ROW_NAMED(["question", "id"], [Types.STRING(), Types.INT()]),
    )

    result_stream = AsyncDataStream.unordered_wait(
        data_stream=ds,
        async_function=AsyncLLMRequest(),
        timeout=Time.seconds(100),
        capacity=1000,
        output_type=Types.STRING(),
    )

    # define the sink
    result_stream.print()

    # submit for execution
    env.execute()

if __name__ == "__main__":
    main(known_args.output)

Upgrade commons-lang3 to version 3.18.0

The commons-lang3 library has been upgraded from 3.12.0 to 3.18.0 to mitigate CVE-2025-48924.

Upgrade protobuf-java from 3.x to 4.32.1 with compatibility patch for parquet-protobuf

Flink now utilizes protobuf-java 4.32.1 (corresponding to Protocol Buffers version 32), a significant upgrade from protobuf-java 3.21.7 (Protocol Buffers version 21). This major upgrade delivers:

  • Protobuf Editions Support: Full support for the new edition = "2023" and edition = "2024" syntax introduced in Protocol Buffers v27+. Editions offer a unified approach combining proto2 and proto3 functionality with fine-grained feature control.
  • Improved Proto3 Field Presence: Better handling of optional fields in proto3 without the limitations of older protobuf versions, eliminating the need to set protobuf.read-default-values to true for field presence checking.
  • Enhanced Performance: Leverages performance improvements and bug fixes from 11 Protocol Buffers releases (versions 22-32).
  • Modern Protobuf Features: Access to newer protobuf capabilities including Edition 2024 features and improved runtime behavior.

Existing proto2 and proto3 .proto files will continue to function without requiring changes.

Upgrade Notes

The Flink community strives to ensure seamless upgrades. However, certain changes may require users to make adjustments to their programs when upgrading to version 2.2. Please refer to the official release notes for a comprehensive list of necessary adjustments and issues to check during the upgrading process.