Stream Processing on Flink using Kafka Source and S3 Sink

Siddharth Sharma
5 min readJan 8, 2019


Photo by Alistair MacRobert on Unsplash

As Apache Flink continues to pique my interest due to its rapidly growing popularity in Big Data/Hadoop ecosystem, I toyed with its stream processing capability over the holidays. In this article, I will highlight how Flink can be used for distributed real-time stream processing of unbounded data stream using Kafka as the event source and AWS S3 as the data sink. In my previous article, I’d highlighted how Flink can be used for distributed batch processing of bounded data set. Flink stream processing jobs range from simple transformations for data import/export, to more complex applications that aggregate data in windows or implement CEP functionality. Flink simplifies the programming model of batch and stream processing by providing a unified API (source → operators → sink) on top of its execution engine.

data Artisans and the Flink community have put a lot of work into integrating Flink with Kafka in a way that (1) guarantees exactly-once delivery of events, (2) does not create problems due to backpressure, (3) has high throughput, and (4) is easy to use for application developers

Use case

Photo by Henrik Dønnestad on Unsplash

Let’s assume a simple use case where you have a data collector that captures the event of a user viewing a particular movie and emits the following JSON log to a Kafka topic.


The ask is to aggregate the total number of views for a movie per user with the most recent impression time in a 15 mins window and store the aggregated results in S3 bucket partitioned by yyyy-MM-dd — HH/{movie-id}/



  • Kafka — We will deploy Kafka locally using Docker image which bootstraps both Kafka and Zookeeper containers. Once you install Docker compose and deploy above image using

docker-compose -f docker-compose-single-broker.yml up

you will by default get a Kafka topic called test.In order to change the name of the topic, modify the value of KAFKA_CREATE_TOPICS property in


  • Flink 1.7.1 — We will run Flink within IntelliJ IDE which adds Flink’s core classes to the runtime classpath and makes it easier to run Flink using the main method. Assuming you have Maven installed, to create a Flink Java project, execute the following command
mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.7.1

Once you enter group id, artifact id, and a project version, this command will create the required project structure with a pom.xml which declares all the required dependencies to execute Flink program within IntelliJ.

  • AWS S3 — Configure AWS credentials on your machine using any of the preferred credentials providers to allow access to S3.

Code Walkthrough

Let’s walk through the below two (producer and processor) self-explanatory code snippets. I have used a few explicit and verbose constructs (TimestampAssigner, AggregationFunction) to highlight the power of Flink programming model, however, Flink also comes with a lot of implicit functions (count, sum etc.) that makes the overall code look succinct.

Kafka Producer — We will use a dummy data generator that emits movie impression every 10 ms to a Kafka topic. For simplicity, we will randomize a few movie ids and user ids with ascending timestamp.

Mock Kafka Movie Impression Producer

Flink Processor — Self-explanatory code that creates a stream execution environment, configures Kafka consumer as the source, aggregates movie impressions for movie/user combination every 15 minutes and writes to S3.

Movie Impression Aggregator

Below are some of the useful Flink concepts from the above code snippet

Flink supports 3 types of time notion in streaming programs — Processing Time, Event Time and Ingestion Time. The configured time characteristic determines what time portion should be used for windowing computation.

We have used Event time which is the time that each individual event occurred on its producing device. This time is embedded within the records before they enter Flink, and that event timestamp can be extracted from each record. Refer MovieImpressionTimestampAssigner class in the above code snippet.

Watermark is Apache Flink’s mechanism of measuring progress in event time. Watermarks are part of the data stream and carry a timestamp t. A Watermark (t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t

In order to make state fault tolerant, Flink needs to checkpoint the state. Checkpoints allow Flink to recover state and positions in the streams to give the application the same semantics as a failure-free execution.

Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations. A window is created as soon as the first element that should belong to this window arrives, and the window is completely removed when the time (event or processing time) passes its end timestamp

Flink provides different state backends that specify how and where the state is stored.

This connector provides a Sink that writes partitioned files to filesystems supported by the Flink FileSystem abstraction. In the above code snippet, it will create a streaming sink that creates hourly buckets and uses a default rolling policy. It uses MovieBucketAssigner class to determine the folder each output event goes into.


Since Flink itself is written in Java, I was able to quickly understand most of its internal workings by looking at the source code. With Java 8 support and its very expressive interfaces, Flink streaming programs can be succinct yet complex in what they do.

There are many great production uses of Flink for stream processing. You can find in-depth discussions from Alibaba, Netflix, Lyft, Uber, DriveTribe and others on their adoption of Flink to address the stream processing needs of their business. I’m excited to further explore and evaluate it for some of the CEP problems.