How to run CPU-based Workloads for Deep Learning Using Thousands Of Spot Instances on AWS and GCP…
By Assaf Pinhasi
- 2217 wordsPhoto by Andrey Sharpilo on Unsplash
Deep learning is notorious for consuming large amounts of GPU resources during training. However, there are multiple parts within the Deep Learning workflow that require large amounts of CPU resources :
- Running large scale inference jobs
- Pre-processing input data — and materializing it on disk as preparation for training
These workloads typically have the following properties:
- The workload (job) is triggered periodically (vs. always-on processing)
- A job __ is comprised of many items. Each item can be processed __ independently from other items.
- Items are read, processed and written back to some storage __ (typically object storage or a centralized file system)
- Processing a single item may take anywhere from several seconds to minutes on a single CPU core
- Users care about the throughput of the job vs. the latency of processing a single item
- Single item size ranges from tens of KB’s to many MB’s or even GB **** of data
- The processing is stateless__— i.e. if the processing of a single item failed, it can be safely retried without any side effect.
When you have enough items to process, and when the processing of a single item is heavy enough, these jobs can consume a lot of CPU resources.
How much is a lot?
Example workload — 3D rendering of training data
Let’s say we want to train a model with large amounts of images of real world objects taken in different environments. A great way to produce large amounts of examples is to use synthetic data — take 3D models of objects, and render views of these models from multiple angles, lighting conditions etc.
3D rendering is a notoriously CPU-intensive workload; rendering a single image can take minutes on a single CPU core.> Rendering 250K images can consume 30K+ CPU hours.
How can we deal with such workloads?
First, let’s formalize our requirements.
System requirements
General
- Scale-out to several thousand cores
- Minimize cloud costs by running on spot instances
- Minimize infrastructure setup and maintenance costs
API for running jobs
- Configure the processing logic per job
- Configure the CPU +memory needed per item in the job
- Configure where to read/write data from
- Submit all the items of the job to be processed
- Cancel a job (i.e. do not process its items)
- Observe # of successful/remaining/failed items per job
- Observe CPU/memory consumption (to help tune resources)
- View logs per item (nice to have — search logs)
Logical System Design
The requirements are very suitable for a queue + workers design pattern.
Logical batch processing system design. Image by the author
Logical flow
Let’s walk through the diagram and understand what happens at each stage:
1 — The user pushes a container into a container registry.
The container includes the processing logic and dependencies needed.
2 — The user enqueues a message containing an item to process.
Each message contains:
- The URI of an item in the storage
- Optionally — configuration/metadata to control the processing logic
3 — If needed, the compute auto-scales.
When ready, a message is dequeued and handed to an available container.
4 — The container reads an item from the storage, processes it and writes the output back based on the instructions in the message
Lastly, the system automatically collects metrics and logs into a centralized location.
The hard way — build it yourself on Kubernetes
Here are a few of the steps you’ll need to take to achieve this with K8s:
- First — you’ll need to set up K8s to be able to manage thousands of nodes— from redundancy to right-sizing the control plane etc.
- Create nodegroups that run on spots with a mixture of policies
- Deploy a message bus — either on the cluster or outside of it
- Figure out how to scale out : should you let the user specify the scale in advance and deploy a replica set + find some way to scale down automatically by updating the size and using cluster autoscaler to kill idle nodes? or use a dynamic scaler like KEDA?
- If the storage is a shared file system, you’ll need to create persistent volumes and mount them into the containers ourselves; if it’s object storage we may need to pass in the credentials as a secret
- Add a full observability stack with centralized logging and monitoring
- Turn this into infra-as-code with Terraform
- etc.> This is a no-starter for most teams as the effort is very significant — especially the need to spin and and manage hundreds or thousands of nodes in the cluster, even temporarily
Edit — Other cluster-based solutions
This section was added following a lively discussion on LinkedIn.
There are other solutions apart from K8s that help manage scale-out workloads — such as Ray, Dask, and even Spark.
What all of these tools have in common is that they have the notion of a cluster, built for workloads where machines need to exchange data between them — and at the very least require some form of communication between some of the machines (e.g. master to workers).
When you scale to 1000’s of nodes, this management and communication can introduce various edge cases that are not trivial to handle. If you deploy the cluster, you own it. Even in “managed” clusters like EKS, you still need to ensure it’s running properly.
In inference/pre-processing — there is no added value in running on a cluster where nodes communicate. These are share-nothing workloads. The cluster component is just a liability. Instead, what you want is to provide your code and a list of items to process, and get the compute to take care of it for you….
This leads us to the easier option…
Note: If your workload is written in something other than Python, or requires unusual OS-level libraries, not all of the cluster managers mentioned above can support it.
The easy way — leveraging managed solutions
AWS
AWS has built the perfect tool for this type of workload, aptly named
AWS Batch.
AWS Batch solution for large scale batch processing system. Image by the author
Let’s walk through the entities in the diagrams and explain how this works:
Compute environment
A collection of compute resources; can contain a mixture of node types, including spots or on-demand.
The compute environment specifies its mix/max size — where setting the minimum to 0 enables it to scale to zero when there is no work to perform.
When scaling out, the compute environment creates instances based on instance templates.
AWS Batch compute environments. Image by the author
Instance template
Standard EC2 construct for how to initialize an instance.
Can contain instructions such as adding mounts to the instance etc.
Queue
AWS Batch manages queues for you. Each queue is attached to a specific compute environment. This enables us to create separate queues for separate jobs easily, and isolate jobs from each other so that they don’t compete on resources.
Job definition (note: Job in AWS Batch terms is a single item) A template for what a single item processing looks like.
Specifies:
- The docker image, env. variables and other details
- Mount points from the host
- Compute resources needed for jobs of this type (CPU/mem and even GPU)
- How to pass arguments from the message to the container’s entry point.
User Docker Image
In AWS Batch, the user’s docker needs to contain a command that can process a single item from the queue. It’s return code is used to determine whether the processing succeeded or failed.
Job
A single item to process. has “Instance-of” relationship to a Job definition and contains specific parameter values for processing a specific item.
Job objects are what we insert into the queues for execution.
Putting the flow together
- A user places a job in the queue. The job references a job definition
- The c_ompute environment_ scales by creating a new node from the instance template if needed
- The system starts a container per the job definition on the new instance
- The system pops an item from the queue
- The system invokes the processing command on the container passing it the parameters from the job’s body
- The command’s return value is used to determine whether the processing succeeded or failed.
Observability
AWS Batch provides a dashboard with all the queues, how many jobs (items) are pending/running/succeeded/failed, and access to the logs.
Logs are provided on an item level, and searching the logs across items is not easy. To gain infrastructure level monitoring you need to enable container insights on the underlying ECS cluster.
AWS Batch jobs dashboard. Image by the author
In summary, AWS Batch gets 9/10 as a solution for large scale CPU jobs
GCP
GCP has no built-in solution for such workloads. However, you can achieve something quite similar to AWS Batch using lower-level building blocks and a small amount of glue code, without incurring any more ongoing operational overhead than in AWS.
The essemce of the solution relies on a feature that is somewhat unique to GCP: the ability to spin up a compute instance that starts a docker container with parameters automatically upon boot. GCP compute engine based solution for batch processing system. Image by the author
Managed Instace Group
This is a group of instances that can scale up and down based on conditions. Instances of the group are created from an InstanceTemplate. See the autoscaling section.
Instance Template
Defines what a single instance in the system would look like:
- Resources for the instance (i.e. for the container) — CPU/Mem/GPU.
- Mount points
- Configuration for starting a container upon boot:
Docker Image
Command + parameters> These parameters are static — i.e. all the containers started from this template will start with the exact same command and parameters .
Mounts for the container itself
Whether the instances of this template are preemptible
Queue
Here, you need to use your own queue. A good choice is to leverage a PubSub queue for your environment, along with a default subscription.
Job
Jobs are written as custom json payloads to the PubSub subscription; other than that they should contain all the information needed to process a single item, including the item’s URI and any processing configuration
User Docker Image
Since GCP offers no built-in framework for invoking the containers based on messages — it is the container’s responsibility to pull work items from the queue.> Recall that the parameters passed to the container on startup are specified once for all containers which run on instances from teh same template. A good way to leverage these parameters is to configure them to hold the name of the subscription from which the container needs to read.> Lastly, since a container is started once and only once when it’s instance gets started, The container’s entry command needs to pull and process items in a loop until the queue is empty.
Autoscaling
The Managed Instance Group is able to scale based on Stackdriver metrics.
Specifically, you will want to scale based on “undelivered messages in the PubSub subscription”. See this post for more details.
Putting the flow together
- The user enqueues messages containing items to process in the PubSub queue.
- The Managed Instance Group uses the “underlivered in subscription” scaling rule to create new instances based on the instance template
- When the instance starts, it runs the user’s container with the command and static parameters provided in the Instance Template
- The user’s command runs in a loop: it pops an item from the queue, processes it and so on.
- When the queue is empty, the Managed Instance Group will scale all the instances down
Observability
GCP offers a more convenient logging and monitoring solution than that of AWS; you can search the log streams out of the box, look at instance group-level metrics as well as an individual machine etc.
GCP Managed Instance Group monitoring dashboard. Image by the author
Wrapping the functionality in an SDK
To ease the adoption such a system, it’s wise to provide users with a CLI/SDK that handles the nitty-gritty details.
The SDK’s main API is for submitting a new job for processing, by specifying:
- Job name
- Queue name (or automatically create a new queue with the same name as the job)
- Name representing the compute template (be it a job definition or the name of the ManagedInstanceGroup with the appropriate InstanceTemplate)
- List of URI’s of items to be processed
The SDK would then use the underlying Cloud provider’s API to:
- Create the queue if needed (in GCP this means creating a new instance template and managed instance group for the job programmatically)
- Construct and enqueue messages into the queue, doing things like translating paths or ID’s such that the remote containers can access them etc.
Note:
It’s possible to add more API’s such as getting progress reports on jobs etc.
Summary
As teams continue to make the most of their data during the development of Deep Learning models, they often require the ability to run large sclae CPU-bound workloads.
These batch jobs carry out tasks such as performing inference on large datasets, or pre-processing big files into a more useful representation .
These large scale jobs can require thousands of CPU cores and present a significant scaling challenge to self-managed infrastructure.
In this post, we’ve covered how to build a framework for running such jobs both in GCP and in AWS, in a cost-effective way.