Distributed Batch Processing using Apache Flink on AWS EMR YARN Cluster
If you have been following big-data trends recently, you might have heard of the new kid on the block called Apache Flink. In this article, I’ll introduce you to how you can use Apache Flink to implement simple batch processing algorithm on a YARN cluster using AWS EMR.
What is Flink?
Flink is another open-source big data distributed processing framework (somewhat similar to Apache Spark) that takes a different approach at processing either bounded (batch) or potentially unbounded (stream) dataset. Where Spark is truly a batch-processing engine with support for real-time streams through the processing of micro-batches, Flink was built from the ground up as a native streaming engine, where data is immediately pushed through the operators as soon as it arrives. Batch processing is also supported and built on top of the streaming framework. Both these frameworks are a great choice in writing simpler algorithms to reduce the latency introduced by Hadoop. In terms of processing, Flink simplifies the algorithms as it has a unified processing engine under the hoods for both batch (DataSet) and stream (DataStream) datasets.
It is futile to compare and contrast Flink with Spark as both these technologies can co-exist rather than compete against each other. Coming from Hadoop background, my learning curve with Flink has been a lot faster and easier as compared to Spark.
Why Batch Processing?
Flink really shines in distributed stream processing, however, if you have some prior background of writing MapReduce jobs in Hadoop and are just getting started with Flink, it is better to start with batch processing and understand how Flink API works for most of the familiar MapReduce algorithms. Once you’ve covered batch processing, you can move onto stream processing. Flink executes batch programs as a special case of streaming programs, where the streams are bounded (finite number of elements).
Flink Architecture
Understanding Flink architecture and programming model helps in choosing the right operators and parallelism for optimal execution path. Flink project documentation is quite verbose and hence I’ve summarized some of the key high-level components from the official documentation.
Engine
- Flink can be deployed either locally on a single JVM or on a cluster using multiple platforms viz. YARN, Mesos, Docker, Kubernetes, AWS, GCE etc.
- The lowest level abstraction simply offers stateful streaming.
- In practice, most applications would not need the above described low-level abstraction, but would instead program against the Core APIs like the DataStream API (bounded/unbounded streams) and the DataSet API (bounded data sets). These fluent APIs offer the common building blocks for data processing, like various forms of user-specified transformations, joins, aggregations, windows, state, etc. Data types processed in these APIs are represented as classes in the respective programming languages.
- The Table API and SQL are declarative DSL and SQL expressions respectively centred around tables, which may be dynamically changing tables (when representing streams).
Programming Model
The basic building blocks of Flink programs are sources (input), transformations (map/filter/reduce) and sinks (output). When executed, Flink programs are mapped to streaming dataflows, consisting of streams and transformation operators. Each dataflow starts with one or more sources and ends in one or more sinks. The dataflows resemble arbitrary directed acyclic graphs(DAGs).
Runtime Environment
The Flink runtime consists of two types of processes:
- The JobManager (master) is the orchestrator and coordinates the distributed execution.
- The TaskManagers (workers) execute the tasks (or more specifically, the subtasks) of a dataflow, and buffer and exchange the data streams.
The JobManagers and TaskManagers can be started in various ways: directly on the machines as a standalone cluster, in containers, or managed by resource frameworks like YARN or Mesos. TaskManagers connect to JobManagers, announcing themselves as available, and are assigned work. Interprocess distributed communication in the cluster happens asynchronously using Akka and Actors
Each TaskManager is a JVM process and may execute one or more subtasks in separate threads. To control how many tasks a worker accepts, a worker has so-called task slots (at least one). As a rule-of-thumb, a good default number of task slots would be the number of CPU cores.
Programs in Flink are inherently parallel and distributed. During execution, each operator has one or more operator subtasks. The operator subtasks are independent of one another, and execute in different threads and possibly on different machines or containers.
The number of operator subtasks is the parallelism of that particular operator.
Simple Program
Now that we have some basic understanding of how Flink works, let’s look at a very simple batch processing job which reads records from a CSV file, groups them on composite fields and writes the most recent record from each group in a CSV file.
Data Format — Each record indicates a movie rating given by a user. Since a user can provide multiple ratings for a movie, we need to de-duplicate and use the most recent rating (timestamp) from the dataset. S3 is used as the object store for the input data source and the output sink.
Movie^User^Rating^Timestamp
Java/Maven — In this article, I have used Java to program Flink application, but you can use Scala or Python.
Assuming you have Maven installed, to create a Flink Java project, execute the following command:
mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.7.0
Once you enter group id, artifact id, and a project version, this command will create the required project structure with a pom.xml which declares all the required dependencies. Once the project is ready, create below LogProcessor class with the main method.
package com.sid.flink.jobs;
import static org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.util.Collector;
public class LogProcessor {
public static void main(String[] args) throws Exception { // set up the batch execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Movie^User^Rating^Timestamp
env.readCsvFile("s3://some-bucket/input/")
.fieldDelimiter("^")
.ignoreInvalidLines()
.tupleType(LogLine.class)
.groupBy(0, 1)
.reduceGroup(LogProcessor::reduce)
.writeAsCsv("s3://some-bucket/output/", "\n", "^");
env.execute("LogProcessor");
}
private static void reduce(Iterable<LogLine> values, Collector<LogLine> collector) {
Long maxTs = 0L;
LogLine latestVal = null;
for (LogLine val : values) {
if (val.getTimestamp() >= maxTs) {
maxTs = val.getTimestamp();
latestVal = val;
}
}
collector.collect(latestVal);
}
public static class LogLine extends Tuple4<String, String, String, Long> {
public LogLine() {
}
LogLine(String movie, String user, String rating, long timestamp) {
super(movie, user, rating, timestamp);
}
public String getMovie() {
return getField(0);
}
public String getUser() {
return getField(1);
}
public String getRating() {
return getField(2);
}
public Long getTimestamp() {
return getField(3);
}
}
}
Here is the execution flow of the above program.
- Create a Flink execution environment
- Read all CSV files from an S3 folder
- Convert each line into a Tuple
- Group record using movie and user combination
- Compare timestamp and uses the most recent rating within a group
- Write records back to an S3 folder
Local Execution
For testing with a small dataset, you can execute the program within your IDE. Flink will create a single JVM and execute the application. For S3 connector, you will have to add below dependency to the pom.xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3-fs-hadoop</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
AWS EMR Execution
- Execute
mvn clean package
and upload the uber jar file to an S3 key. Let’s assume the jar is uploaded tos3://xxxxx/flink-job.jar
- Create a shell script with the following commands and upload it to another S3 key. This shell script will be run as the first step on the cluster to copy the application jar from S3 onto the local cluster. Let’s assume the script is uploaded to
s3://xxxxxx/copy_jar.sh
#!/bin/sh
mkdir -p /home/hadoop/jars;
hadoop fs -copyToLocal s3://xxxxx/flink-job.jar /home/hadoop/jars/flink-job.jar;
Following are the steps required to execute above Flink program on AWS EMR YARN cluster with 4 m3.xlarge nodes with 4 vCPUs each. Each node will run a dedicated task manager with 4 task slots each i.e. one slot for each core. The parallelism of the job can be 16 as we have 4 * 4 task slots.
Using AWS EMR Java SDK
- Create an EMR cluster with 1 master node and 4 core nodes.
- Configure IAM roles, SSH security key and S3 log URI
- Add
flink
as an application - Add a step to copy jar from S3 to a local folder
- Add a step to start Flink on a yarn session and specify the number of task managers, task slots, parallelism and class name.
List<StepConfig> stepConfigs = new ArrayList<StepConfig>();// Copy Jar Ste
HadoopJarStepConfig copyJarStepConf = new HadoopJarStepConfig()
.withJar("s3://elasticmapreduce/libs/script-runner/script-runner.jar")
.withArgs("s3://xxxxxx/copy_jar.sh");StepConfig copyJarStep = new StepConfig()
.withName("Copy Jar")
.withHadoopJarStep(copyJarStepConf);// Flink Job Step
HadoopJarStepConfig logProcessorStepConf = new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs("bash","-c", "flink run -m yarn-cluster -yn 4 -ys 4 -p 16 -c com.sid.flink.jobs.LogProcessor /home/hadoop/jars/flink-job.jar");StepConfig logProcessorStep = new StepConfig()
.withName("LogProcessor")
.withHadoopJarStep(logProcessorStepConf);stepConfigs.add(copyJarStep);
stepConfigs.add(logProcessorStep);// EMR Cluster
RunJobFlowRequest request = new RunJobFlowRequest()
.withName("Test Flink Job")
.withReleaseLabel("emr-5.19.0")
.withApplications("flink")
.withServiceRole("EMR_DefaultRole")
.withJobFlowRole("EMR_EC2_DefaultRole")
.withLogUri("s3://XXXXXX")
.withInstances(
new JobFlowInstancesConfig()
.withEc2KeyName("XXXXX")
.withInstanceCount(5)
.withKeepJobFlowAliveWhenNoSteps(false)
.withMasterInstanceType("m3.xlarge")
.withSlaveInstanceType("m3.xlarge"))
.withSteps(stepConfigs);// Trigger Cluster
RunJobFlowResult result = emr.runJobFlow(request);
Thoughts
Above program is simpler and faster than a traditional MapReduce application. Now that you know how to code and deploy a simple Flink program, you can look at writing complex stream processing algorithms and understand the real horsepower of Flink. It provides native integration with Kafka and Kinesis for both data sources and sinks. Using Amazon S3 as the data source and sink for datasets was painless and the option to deploy on Amazon EMR reduces the time normally spent configuring a Hadoop cluster.
The most tricky part in configuring a Flink cluster setup is to estimate the appropriate level of job or operator parallelism. I’m bullish that Flink will soon provide useful tools and in some cases use YARN scheduling and resource allocation to auto-configure the right level of parallelism.
References