Hands-On with Flink — Part 6: Calling LLMs from Flink | by Katya Gorshkova | Jun, 2026 | MediumSitemapOpen in appSign up<br>Sign in
Medium Logo
Get app<br>Write
Search
Sign up<br>Sign in
Hands-On with Flink — Part 6: Calling LLMs from Flink
Katya Gorshkova
13 min read·<br>1 hour ago
Listen
Share
Previous parts:<br>Part 1: Filtering data from Kafka<br>Part 2: Running on Kubernetes with the Operator<br>Part 3: SQL instead of Java<br>Part 4: SQL and Avro<br>Part 5: Managing State<br>Integrating ML Models into Data Pipelines<br>Let’s leave aside boring SELECTs and JOINs and do something more exciting.<br>In the latest versions of Apache Flink, machine learning models become first-class citizens and can be invoked directly from SQL pipelines. No custom Java code, no user-defined functions, and no external inference services. Everything can be done using pure Flink SQL.<br>In this part of the Hands-On with Flink series, we will build a simple system that summarizes product reviews using a Large Language Model (LLM).<br>The idea is illustrated in the diagram below.
Press enter or click to view image in full size
Product reviews arrive as events in the product_reviews Kafka topic. Our first Flink job continuously aggregates reviews for each product and produces a compact context event into the product_review_context topic. These context events contain information such as the number of reviews, average rating, and the collected review texts.<br>A second Flink job consumes the aggregated context events and uses an LLM to generate a concise summary for each product. The generated summaries are then written to the product_review_summary topic.<br>This architecture demonstrates an important pattern when integrating AI with streaming systems: Flink is responsible for collecting, filtering, aggregating, and preparing the context, while the LLM focuses on interpreting that context and generating insights.<br>As usual, everything will run locally on our machine. And yes, that includes the LLM.<br>The source code for this part is here.<br>New Flink AI Constructs<br>Before diving into the installation steps, let’s first look at the Flink SQL features that we will use in this example.<br>The first part of the pipeline is relatively straightforward. We will read product reviews from Kafka and aggregate them into a compact product context. This context will later be sent to the LLM.<br>Reading raw reviews<br>First, we define a source table over the product_reviews Kafka topic:<br>CREATE TABLE product_reviews (<br>review_id STRING,<br>product_id STRING,<br>product_name STRING,<br>rating INT,<br>review_text STRING,<br>proc_time AS PROCTIME()<br>) WITH (<br>'connector' = 'kafka',<br>'topic' = 'product_reviews',<br>'properties.bootstrap.servers' = 'host.docker.internal:9092',<br>'properties.group.id' = 'product-reviews-source',<br>'scan.startup.mode' = 'earliest-offset',<br>'format' = 'json'<br>);This table represents raw review events arriving into Kafka. Most fields are self-explanatory: review ID, product ID, product name, rating, and review text.<br>The only new field is:<br>proc_time AS PROCTIME()This is a computed column. It tells Flink to attach the current processing time to each incoming record. We will use this timestamp to group reviews into short time intervals.<br>Creating product context<br>Next, we define the output table for aggregated product context:<br>CREATE TABLE product_review_context (<br>product_id STRING,<br>product_name STRING,<br>window_start TIMESTAMP(3),<br>window_end TIMESTAMP(3),<br>review_count BIGINT,<br>average_rating DOUBLE,<br>negative_reviews BIGINT,<br>positive_reviews BIGINT,<br>reviews STRING<br>) WITH (<br>'connector' = 'kafka',<br>'topic' = 'product_review_context',<br>'properties.bootstrap.servers' = 'host.docker.internal:9092',<br>'format' = 'json'<br>);This topic will not contain individual reviews. Instead, it will contain aggregated context for each product.<br>For each product and time interval, Flink will produce information such as:<br>how many reviews were received<br>the average rating<br>the number of negative reviews<br>the number of positive reviews<br>the review texts concatenated into one field<br>The window_start and window_end fields represent the time boundaries of the aggregation interval. We have not covered Flink windows in detail yet, so for now it is enough to understand them as the beginning and end of the time bucket in which reviews are collected.<br>Aggregating reviews<br>Finally, we define the streaming aggregation:<br>INSERT INTO product_review_context<br>SELECT<br>product_id,<br>MAX(product_name) AS product_name,<br>window_start,<br>window_end,<br>COUNT(*) AS review_count,<br>AVG(rating) AS average_rating,<br>SUM(CASE WHEN rating = 4 THEN 1 ELSE 0 END) AS positive_reviews,<br>LISTAGG(<br>CONCAT(<br>'Rating: ',<br>CAST(rating AS STRING),<br>'. Review: ',<br>review_text<br>),<br>' | '<br>) AS reviews<br>FROM TABLE(<br>TUMBLE(<br>TABLE product_reviews,<br>DESCRIPTOR(proc_time),<br>INTERVAL '1' MINUTE<br>GROUP BY<br>product_id,<br>window_start,<br>window_end;This query continuously reads incoming reviews and groups them by product and by one-minute processing-time windows.<br>For each product in each one-minute interval, it...