Part 1: Event-driven serverless architecture for supporting EMR Spot Instance Fleet in AWS DataPipeline

Siddharth Sharma
4 min readNov 1, 2018

--

This post describes the problem statement, implementation, and the learnings of how we solved an AWS limitation using AWS services.

I’ll write another follow-up post to describe how we implemented CI/CD, monitoring and alerting for this system.

We heavily use AWS DataPipeline to orchestrate our data-processing. All our batch jobs use Spark or MapReduce and run on AWS EMR clusters with EMRFS as the S3 connector for HDFS.

These clusters are transient in nature i.e. they are created on-demand and terminated as soon as the processing completes and use EC2 Spot Instances with a mix of instance families (m3, m5d, c3, c5d etc.) and types (m3.xlarge, c3.2xlarge, m5d.xlarge etc).

On an average, we spin ~3000 clusters daily using ~100,000 Spot Instances. For such large scale, Spot Instances provide us with unlimited capacity and massive savings (up to 90% savings off on-demand price).

Daily EMR & EC2 Scale

Current

To provision Spot Instances with lowest cost, we’ve our custom in-house algorithm which queries AWS Spot Market for pricing history to determine the best Availability Zone(AZ) which has the lowest bid price in a given region. This worked great until we started observing EMR bootstrapping failures due to the unavailability of Spot Instances in the chosen AZ.

To mitigate this, we’ve built fault tolerance in our system by retrying once more in the next cheapest AZ, and then falling back to on-demand on 3rd attempt. This fallback technique gave us some SLA guarantee but incurred extra cost.

Idea

Back in 03/17, AWS had announced Instance Fleet support for EMR. With Instance Fleet, given a choice of instances and AZs, AWS will automatically provision the desired capacity in an AZ with the lowest cost and best capacity. We decided to give Instance Fleet a try and were quite pleased with the results of

  • Less interruption
  • Low cost
  • Less code — no need to maintain and update pricing algorithm

However, we hit a roadblock when we realized that AWS DataPipeline doesn’t support most of the newer EMR features including

  • Instance Fleet
  • Security Configurations
  • Custom AMI

Since we’d a good understanding of the internals of DataPipeline, we decided to add support for Instance Fleet ourselves.

Implementation

Assuming you’ve basic understanding of the following AWS services

  • DataPipeline — Orchestration service to build data pipelines
  • EMR — Managed Hadoop service
  • SQS — Managed queuing service
  • SNS — Managed topic service
  • Lambda — Serverless computing
  • DynamoDb — Managed NoSQL store
  • CloudWatch — Monitoring and alerting service

Below is the serverless architecture that we finalized on and implemented in less than a month.

Instance Fleet for EMR in DataPipeline

Walkthrough

  1. To use EMR with Instance Fleet within a data pipeline, create a ShellCommandActivity to invoke a runnable jar and provide json serialized pipeline definition as an argument
  2. Within the runnable method, deserialize the pipeline definition and trigger an EMR cluster with Instance Fleet (encapsulates spot market pricing and instance fleet logic)
  3. Install DataPipeline Task Runner with a unique WorkerGroup name as the first step on EMR master node
  4. Store mapping of PipelineId = List<ClusterIds> in DynamoDb
  5. Run one or more EMRActivity on above WorkerGroup
  6. Upon successful completion of all activities, invoke a ShellCommandActivity to send a message with PipelineId to an SNS topic
  7. SNS topic is subscribed by SQS
  8. SQS is subscribed by AWS Lambda as an event source
  9. Lambda reads List<ClusterIds> for the given PipelineId from DynamoDb.
  10. Lambda terminates EMR cluster(s)
  11. Upon failure of any activity, DataPipelines send a message with WorkerGroup name to SNS topic. Steps 6–10 are repeated

Goodies

This solved our minimum requirement of Instance Fleet, but we’re losing out on a key DataPipeline feature — Terminate After.

To mitigate this, we added a few more components to mimic the Terminate After feature

  1. If a cluster defines terminate after property, an SQS message with terminate after value in epoch is sent to a specific queue with max visibility timeout 15 mins as of today
  2. SQS is subscribed by AWS Lambda as an event source
  3. Lambda compares current epoch ≥ message epoch and either terminates the cluster or resends the message back in the queue to be processed after 15 mins

There was one more use case where developers were manually deleting DataPipelines without terminating linked EMR clusters.

To mitigate this, we

  1. Created a CloudWatch Rule Event to send an SQS message to above queue with epoch = now for every DataPipeline that was deleted explicitly

Learnings

  1. To build event-driven serverless architecture, we need to break down the system into smaller loosely coupled stateless components. Any communication between these components should be done using message passing only.
  2. Use SQS for message queuing and retrying
  3. Use SNS for broadcasting to multiple subscribers
  4. Use CloudWatch Event Rules to periodically trigger any event (serverless cron)
  5. Monitor Lambda error rate and execution time using CloudWatch Metrics and CloudWatch Logs. We’re also in process of shipping Lambda CloudWatch logs to SumoLogic

P.S. This is my debut on medium :-)

--

--