Get Started Free
course: Apache Flink® 101

Batch and Stream Processing (Docker)

15 min
David Anderson

David Anderson

Principal Software Practice Lead

Batch and Stream Processing with Flink SQL (Docker)

In this exercise you'll learn about running queries in Flink SQL using both batch and streaming execution modes, and observe some of the differences and similarities between them.

This exercise uses open source Flink running in Docker. If you haven't already done so, begin by following the instructions for getting started with Docker.

Experiment with batch and streaming

Let's begin by creating a table so there is something to query against. We're going to create a fixed-length (bounded) table with 500 rows of data generated by the faker table source. flink-faker is a convenient and powerful mock data generator designed to be used with Flink SQL.

CREATE TABLE `bounded_pageviews` (
  `url` STRING,
  `ts` TIMESTAMP(3)
)
WITH (
  'connector' = 'faker',
  'number-of-rows' = '500',
  'rows-per-second' = '100',
  'fields.url.expression' = '/#{GreekPhilosopher.name}.html',
  'fields.ts.expression' =  '#{date.past ''5'',''1'',''SECONDS''}'
);

If you are curious what this data looks like, you can use this query to see a sample:

SELECT * FROM bounded_pageviews LIMIT 10;

Batch mode, bounded input

By default, the Flink SQL Client is running in streaming mode. Let's switch to batch mode so we can see how that behaves:

SET 'execution.runtime-mode' = 'batch';

It will be easier to appreciate the differences between batch and streaming if we work with a query that produces an updating table as its result, like this one:

SELECT COUNT(*) AS `count` FROM bounded_pageviews;

When executing this query in batch mode, the sink receives only a single, final value, which you'll see displayed:

count
-----
  500

This takes 5 seconds to complete because the source is configured to produce 500 rows at a rate of 100 rows per second.

Streaming mode, bounded input

You can now try the same thing, but in streaming mode.

SET 'execution.runtime-mode' = 'streaming';
SELECT COUNT(*) AS `count` FROM bounded_pageviews;

If you look closely, you will see the count increment from 100 to 200, etc, up to 500. Again, this will take 5 seconds.

To make it clearer what's going on, you should also change how the results are being displayed. This won't affect the internal behavior of the runtime, but it will change how the SQL Client displays the results. In changelog mode, the SQL Client doesn't just update the count in place, but instead displays each message in the stream of updates it's receiving from the Flink SQL runtime.

SET 'sql-client.execution.result-mode' = 'changelog';

Now when you execute the same query as before

SELECT COUNT(*) AS `count` FROM bounded_pageviews;

the results should appear like this:

 op                count
 -----------------------
...                  ...
 -U                  497
 +U                  498
 -U                  498
 +U                  499
 -U                  499
 +U                  500

When operating in streaming mode, the Flink runtime can't rely on the stream to ever end, so it is instead continuously updating the result as it processes the input stream. It ultimately arrives at the same result as when running in batch mode, but the sink for this streaming counting job is seeing all of the incremental work done along the way by the SQL runtime.

Streaming mode, unbounded input

To complete the picture, now try the streaming version running against an unbounded input stream.

Begin by creating a new, unbounded table. If the faker source isn't configured with a number-of-rows setting then it will just continue producing data indefinitely, so we'll remove that number-of-rows configuration option:

CREATE TABLE `streaming_pageviews` (
  `url` STRING,
  `ts` TIMESTAMP(3)
)
WITH (
  'connector' = 'faker',
  'rows-per-second' = '100',
  'fields.url.expression' = '/#{GreekPhilosopher.name}.html',
  'fields.ts.expression' =  '#{date.past ''5'',''1'',''SECONDS''}'
);

Here's that same query, updated to use the new table:

SELECT COUNT(*) AS `count` FROM streaming_pageviews;

You might also want to reset the SQL Client to use its default display mode, rather than the changelog mode we were using earlier to expose the inner workings of the update stream:

SET 'sql-client.execution.result-mode' = 'table';

If you want to see this example running faster, or slower, you can modify the table using something like this:

ALTER TABLE `streaming_pageviews` SET ('rows-per-second' = '10');

ALTER TABLE is a part of the SQL standard that makes it easy to modify a table's definition. Otherwise you could always DROP the table and then CREATE it again with the updated configuration, but this is easier.

Finish

You may want to leave everything running, since you can use the same setup in other exercises for this course.

But when you're done, you can exit Flink SQL with quit;. And if you want to shut down the Flink cluster, you can do that with

docker compose down -v

in your shell, which will stop all of the containers and remove the volume used for checkpointing.

Resources

Do you have questions or comments? Join us in the #confluent-developer community Slack channel to engage in discussions with the creators of this content.

Use the promo codes FLINK101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.