Principal Software Practice Lead
In this exercise you'll learn about running queries in Flink SQL using both batch and streaming execution modes, and observe some of the differences and similarities between them.
This exercise uses open source Flink running in Docker. If you haven't already done so, begin by following the instructions for getting started with Docker.
Let's begin by creating a table so there is something to query against. We're going to create a fixed-length (bounded) table with 500 rows of data generated by the faker table source. flink-faker is a convenient and powerful mock data generator designed to be used with Flink SQL.
CREATE TABLE `bounded_pageviews` (
`url` STRING,
`ts` TIMESTAMP(3)
)
WITH (
'connector' = 'faker',
'number-of-rows' = '500',
'rows-per-second' = '100',
'fields.url.expression' = '/#{GreekPhilosopher.name}.html',
'fields.ts.expression' = '#{date.past ''5'',''1'',''SECONDS''}'
);
If you are curious what this data looks like, you can use this query to see a sample:
SELECT * FROM bounded_pageviews LIMIT 10;
By default, the Flink SQL Client is running in streaming mode. Let's switch to batch mode so we can see how that behaves:
SET 'execution.runtime-mode' = 'batch';
It will be easier to appreciate the differences between batch and streaming if we work with a query that produces an updating table as its result, like this one:
SELECT COUNT(*) AS `count` FROM bounded_pageviews;
When executing this query in batch mode, the sink receives only a single, final value, which you'll see displayed:
count
-----
500
This takes 5 seconds to complete because the source is configured to produce 500 rows at a rate of 100 rows per second.
You can now try the same thing, but in streaming mode.
SET 'execution.runtime-mode' = 'streaming';
SELECT COUNT(*) AS `count` FROM bounded_pageviews;
If you look closely, you will see the count increment from 100 to 200, etc, up to 500. Again, this will take 5 seconds.
To make it clearer what's going on, you should also change how the results are being displayed. This won't affect the internal behavior of the runtime, but it will change how the SQL Client displays the results. In changelog mode, the SQL Client doesn't just update the count in place, but instead displays each message in the stream of updates it's receiving from the Flink SQL runtime.
SET 'sql-client.execution.result-mode' = 'changelog';
Now when you execute the same query as before
SELECT COUNT(*) AS `count` FROM bounded_pageviews;
the results should appear like this:
op count
-----------------------
... ...
-U 497
+U 498
-U 498
+U 499
-U 499
+U 500
When operating in streaming mode, the Flink runtime can't rely on the stream to ever end, so it is instead continuously updating the result as it processes the input stream. It ultimately arrives at the same result as when running in batch mode, but the sink for this streaming counting job is seeing all of the incremental work done along the way by the SQL runtime.
To complete the picture, now try the streaming version running against an unbounded input stream.
Begin by creating a new, unbounded table. If the faker source isn't configured with a number-of-rows setting then it will just continue producing data indefinitely, so we'll remove that number-of-rows configuration option:
CREATE TABLE `streaming_pageviews` (
`url` STRING,
`ts` TIMESTAMP(3)
)
WITH (
'connector' = 'faker',
'rows-per-second' = '100',
'fields.url.expression' = '/#{GreekPhilosopher.name}.html',
'fields.ts.expression' = '#{date.past ''5'',''1'',''SECONDS''}'
);
Here's that same query, updated to use the new table:
SELECT COUNT(*) AS `count` FROM streaming_pageviews;
You might also want to reset the SQL Client to use its default display mode, rather than the changelog mode we were using earlier to expose the inner workings of the update stream:
SET 'sql-client.execution.result-mode' = 'table';
If you want to see this example running faster, or slower, you can modify the table using something like this:
ALTER TABLE `streaming_pageviews` SET ('rows-per-second' = '10');
ALTER TABLE is a part of the SQL standard that makes it easy to modify a table's definition. Otherwise you could always DROP the table and then CREATE it again with the updated configuration, but this is easier.
You may want to leave everything running, since you can use the same setup in other exercises for this course.
But when you're done, you can exit Flink SQL with quit;. And if you want to shut down the Flink cluster, you can do that with
docker compose down -v
in your shell, which will stop all of the containers and remove the volume used for checkpointing.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.