Importing data into DynamoDB

July 14th, 2020 Written by Adrian Hesketh

This blog post was originally given as a talk by Adrian Hesketh at AWS ComSum, May 2020, sponsored by Infinity Works.

At Infinity Works we have a very active engineering Slack channel. One day, this question popped up:

“Anyone found a way to load DynamoDB with 4M records quickly and cheaply? It looks like it may take close to 3 hours for each restore…”

That didn’t sound right to me, so I decided to try it out.

Finding some test data

First I needed to find a large quantity of data in CSV format to test. I used Google’s 1-gram data set available for download at http://storage.googleapis.com/books/ngrams/books/datasetsv2.html. The data is actually tab delimited, but since that’s a common data format, that’s useful too.

It’s a fairly typical set of tab separated data. Each row contains a word and data about how often that word was found in a given year’s publications.

To count the number of lines, the wc program can be used in line count mode.

This shows us that the dataset has 26.2 million rows. That’s a good size for this test, because it’s bigger than the 4 million I need. To import this data would cost around $30, because 1M write units costs around $1.50.

Creating a table

DynamoDB only requires that two attributes are defined, a partition key that groups related data, and the sort key that allows for organising and filtering that related data.

In this case, the ngram column in the data specifies a grouping. In this dataset, an ngram is a single word, so the dataset consists of a word related to how frequently the word was used each year.

With this in mind, the partition key should be the ngram and the sort key should be the year to allow the partition to be queried to see how usage of a word changed over time.

I used DynamoDB on-demand capacity to get data in quickly without needing to configure high amounts of expensive provisioned capacity. This is adaptive, AWS maintains capacity based on past usage.

I haven’t seen a service where it’s cheaper to run DynamoDB with provisioned capacity, but I’m sure some exist. If you’re interested in finding out whether you could save money by switching to pay-per-request pricing, you can use this tool I wrote to compare pricing: https://github.com/a-h/dynamocapacity

To create a table in DynamoDB, you can use the SDK, the command line, the console, CloudFormation and a million other ways. I used the command line.

Importing from my machine

The next step was to work out how to import the data. As always with AWS, there are lots of options.

One option is AWS Data Pipeline. It’s mentioned in the docs but it’s not available in London and it doesn’t actually have built-in CSV import. It has built in features to support restoring from DynamoDB backups which actually consist of lines of DynamoDB structured JSON.

I ended up on Stackoverflow, and I tried out the top answer. To get this to work requires some general knowledge of the Node.js ecosystem, because the script must be modified to set the filename, set the table name, and add the missing node packages. Once I’d done this, I could run it, but for my data file it crashed.

The script was trying to load everything into RAM, which resulted in the process running out of RAM, even though my machine has 16GB of RAM which is plenty for the task.

So, I wrote my own Node.js script that streams the data off disk instead. This also allowed me to use the latest JavaScript features like async/await and arrow functions. It reads the file line by line instead of all at once, and uses this function to batch up the inserts into groups of 25. I gave it a nice command line interface to make it easy for people to use.

Aside from the warning that I’ve used all of the latest node features, it worked!

So, the first method resulted in 290 inserts per second.

Method

Performance

Progress

Custom Node.js script

290 inserts per second

+∞%

I’ve gone from something that only worked for small files, to something that works for big files too. But it would still take nearly 4 hours to insert 4M rows of data, which is far too long if you’re sat there waiting for it.

Improving Node.js DynamoDB performance with HTTP keep-alives

I saw on Yan Cui’s blog that it’s possible to improve Node.js DynamoDB performance by enabling HTTP keep alive so I tried that.

It works by reusing HTTP connections between database operations, reducing the amount of TLS negotiation and TCP setup that’s required if multiple database operations are used.

That took the import from being able to import 300 records per second to nearly 900 per second.

Method

Performance

Progress

Custom Node.js script

290 inserts per second

+∞%

Applying HTTP Keep Alive

847 inserts per second

+192%

What if I don’t have Node.js installed?

A downside of this solution is having to have to have Node.js installed. If I switch to Go I can make a binary for Windows, Linux and MacOS that doesn’t need any dependencies. This will make it easier for people to use. The program has roughly the same structure, just written in a different language.

Go is a little faster, and there’s no need for the Keep Alive config because it’s enabled by default, but it would still be 75 minutes of waiting.

Method

Performance

Progress

Custom Node.js script

290 inserts per second

+∞%

Applying HTTP Keep Alive

847 inserts per second

+192%

Switching to Go

880 inserts per second

+3.9%

Goroutines to the rescue

I can use some features in Go to make things a lot faster. Go has concurrency features called channels and goroutines. Channels are like an in-memory queue. This code pushes batches of 25 records into the queue, which can be read by multiple workers.

With this in place, the performance is greatly improved.

This is 6x faster and the time is down to 12 minutes. Now it feels like we’re getting somewhere.

 

Method

Performance

Progress

Custom Node.js script

290 inserts per second

+∞%

Applying HTTP Keep Alive

847 inserts per second

+192%

Switching to Go

880 inserts per second

+3.9%

Using parallelism

5490 inserts per second

+524%

Network latency, destroyer of performance

Much of the time is wasted due to network latency. My laptop is a long way from Amazon’s network. If I run the program inside Amazon’s network, and import it from there it should be much faster.

I’ll run an EC2 instance and see how quick it is. I’ve got to create a keypair so I can SSH in.

Import it to AWS.

Check I have a default VPC to launch it into.

Create a security group so that I can SSH to it to run the program.

Now I need to update the security group to allow SSH. I can allow SSH from anywhere since I’m just in my sandbox account.

Next, I need to create a role to allow the EC2 instance access to DynamoDB. This starts by creating a role policy document.

Then create a role with that policy document.

Attach DynamoDB full access to the role.

Create an instance profile that can be attached to an EC2 instance to give it permissions.

Add the role to the profile.

Find the right Linux image to use.

Start a big instance.

Give it the instance profile so it can access DynamoDB.

Copy the data to the box using the SCP program.

Copy the Linux build of the import binary to the box.

Then, I can SSH in.

By this point, I wished I’d used CloudFormation, Terraform or the v2 of the ECS command line tools, but I hadn’t.

OK, now that is fast. Since I’ve got more CPU cores here than on my laptop, I upped the concurrency to 16. It settled at around 29k rows per second.

Method

Performance

Progress

Custom Node.js script

290 inserts per second

+∞%

Applying HTTP Keep Alive

847 inserts per second

+192%

Switching to Go

880 inserts per second

+3.9%

Using parallelism

5490 inserts per second

+524%

Using an m5.4xlarge instance

29k inserts per second

+428%

This takes two and a bit minutes to complete the work.

Is Lambda faster than EC2?

Well, it’s fast, but it was a bit of a pain to set up. To make this suitable for reuse, I’d need to build and push a Docker container to Dockerhub, then create all the CloudFormation to configure a VPC, ECS cluster, role for the ECS task and everything else. Then I’d need to build a simple CLI to start up the container, and collect the logs.

All-in-all, a bit of a pain. I wondered if I could achieve similar results with Lambda.

Lambda is somewhat easier to deploy thanks to frameworks like Serverless Framework and the AWS SAM CLI. They both require developer tooling. Serverless Framework requires Node and SAM requires Python. 

I can’t copy the data to the Lambda function, so I’ll have to put my data into an S3 bucket and allow Lambda access to retrieve it.

I like Serverless Framework, so I used that to create the deployment package. Serverless Framework creates the skeleton of a YAML configuration document.

Here, I’ve set the memory size close to maximum. Lambda functions are assigned network bandwidth and CPU proportionally to their RAM allocation. Cost won’t be a problem here.

Serverless Framework also has easy-to-use IAM role configuration. I don’t need to specify basic permissions of being able to write to CloudWatch logs or anything, I just need to add my custom permissions. I’ve given it access to read any S3 bucket and write to any DynamoDB table.

That’s unlikely to impress your security team if you install it in your production account, but it’s easy to limit access if required, by entering the ARNs of the tables and buckets you need access to.

My Lambda code isn’t very different to the command line version. The main difference is that instead of taking command line parameters, the Lambda function takes a custom Request object.

The Request object contains the configuration parameters such as the name of S3 bucket containing the data to import, and the name of the DynamoDB table to import into.

The serverless deploy command creates a CloudFormation template and executes it.

With that deployed, I can run the import function.

It’s possible to start a Lambda with a JSON payload, so I created one that contains the configuration.

Then I ran the Lambda within AWS directly from my command line.

There’s no need to have an external event such as an API request trigger a Lambda, you can just manually trigger them yourself.

Serverless Framework also has a feature to easily tail execution logs so I can see what’s going on.

What’s going on is that it’s importing more slowly than the massive EC2 instance.

There’s also a limit on the time. After 900 seconds, it terminated abruptly, after only inserting 14 million of the 26 million records into the database. This is a reason that Lambda doesn’t often get used for long-running tasks.

Checkpoints for continuation with AWS Step Functions

Fortunately, there’s a way around this.

  1. Store location in CSV data
  2. Stop processing before timeout
  3. Start again from where we left off

I can run a Lambda that takes in a start offset position in a file. It would then start reading from that offset by using the S3 range header, and would work until the timeout had nearly expired. The Lambda would then return the end offset. I could then run the Lambda again, making it start from where it got up to within the file. This would form a loop that only completed when the entire file had been read. 

While I could manage this process myself in code, AWS Step Functions can do it for me.

AWS Step Functions allows the design and execution of State Machines. A State Machine is a workflow made up of individual Tasks. There are lots of different types of tasks, including a Lambda Function execution task. Tasks can also receive or emit up to 32KB of data, allowing them to persist state.

State Machines can be started with a JSON payload, just like AWS Lambda. Here I’ve used the same configuration that the Lambda had, but with shorter key names to save space.

Since state Machines can run Lambda functions as a Task, this state machine starts by running a Lambda Function called “preflight”. The preflight Lambda function expects an S3 location, and a start index. It then reads through the file. Every 100,000 lines it adds an entry to the output state’s “batches” array that details where the start and end offset of that batch of 100,000 lines is within the file.

I used the Step Functions plug in for the Serverless Framework. This allows the definition of a state machine to be carried out in YAML rather than JSON:

The YAML configuration in the serverless.yaml maps to the diagram on the right produced by the state machine display.

The preflight function keeps track of where it got up to in the file by outputting state at the end of each run. The state includes the line number, the offset within the file, and whether the preflight needs to do any more work.

The next step in the workflow is a Choice task. This is not a Lambda function – it’s a Task type that is built into Step Functions.

This Choice task looks at the “cnt” (continue) variable output by the preflight Lambda function to decide whether it should continue reading through the file to find more chunks, or should start importing data. If the continue variable is true, then the preflight task is executed again with an updated start offset. If the continue variable is false, then the “process” task is started.

This process adds entries to the “batches” array. Each item is a start and end index within the file.

The import process iterates over the batches of byte ranges using a “Map” step. The map step then executes multiple import Lambda functions in parallel. I’ve limited the concurrency to 50 concurrent imports, but it could go higher.

Each import Lambda function gets passed the configuration, and a byte range to process. Since only the first row contains column names, the import task also gets passed the names of the columns in the data, to map them to DynamoDB attribute names.

Breaking up the single 4 million row import into 100,000 line chunks and running them in parallel allows even faster import.

 

In this case, around 40 Lambdas executed concurrently, taking less than a minute to import the 4 million rows. 4 million records in 50 seconds is around 80 thousand inserts per second.

Method

Performance

Progress

Custom Node.js script

290 inserts per second

+∞%

Applying HTTP Keep Alive

847 inserts per second

+192%

Switching to Go

880 inserts per second

+3.9%

Using parallelism

5490 inserts per second

+524%

Using an m5.4xlarge instance

29k inserts per second

+428%

Using Step Functions

80k inserts per second

+176%

Why don’t you just use the backup and restore feature?

It’s actually slower than this import.

What’s the fastest then?

Overall, the ddbimport Step Function provided the fastest import by parellising the work over 50 concurrent Lambda functions, and it might be possible to tweak things to go even faster.

For example, with the Step Functions solution, it would be possible to start sending chunk locations to an SQS queue during the preflight step instead of waiting until the entire file had been read once.

It may also be possible to increase the concurrency of the EC2 solution even further without negative effects.

By working through the approaches, we’ve ended up with a solution that’s 27,000 times faster than the simple case.

What about cost?

Backup and restore is by far the cheapest at around 30 cents, but not the fastest. Everything else will cost about $1.50 per 1M records – mostly due to the cost of on demand DynamoDB writes.

 

Cost

1M DynamoDB Writes

$1.4846

Backup Cost per GB

$0.11886

Backup Restore per GB

$0.17829

1hr of EC2

$0.222

Approx cost of Step Functions / Lambda

$0.05

TL;DR

If you want to run it yourself, head over to the git repo.

  • If you’re interested in seeing all of these experiments and the Node version, look at the “working” branch.
  • Just remember, this is designed to use all the available capacity. If you run an import against your production system, you might just bring it down.
  • If you manage to rack up a huge bill importing data, it’s also not my fault.

Happy importing!