To read 16 million records from MSSQL and load them into Elasticsearch
In our case study, we have a frontend application that needs to do fast fuzzy searches on items contained in a relational database. The application uses Elasticsearch for this purpose, so the items need a fast and reliable way to go from the RDS (MSSQL) database to the ES cluster. The data needs to be in an “analysed” state; that means we want ES to do some tokenization on the data fields so we can search by keywords, composed terms, etc.
AWS Glue + Lambda
We started investigating AWS Glue when it was still in preview, before its August 2017 launch. The service abstracts lots of tedious tasks so we can focus on the ETL, but it does not abstract everything; there are still many aspects you have to either fully understand or implement yourself. As with many other AWS services, the standard documentation is either nonexistent or very lax, so the developer API and the GitHub project pages are indispensable. I would highly recommend reading about the GlueContext class before starting any new projects.
AWS Glue is mainly based on Apache Spark; you need to know how that works and what it does under the hood if you want to get anything working in Glue. One of the key concepts of Glue is that it loads the so called “tables” in a Python object (we are using Pyspark, but Scala is also available) called a DynamicDataFrame. This object is an HDFS based database (like a complex dictionary) that is an extension of the Spark DataFrame. We recommend familiarising yourself with these objects before getting hands on.
Loading the data in Glue (into the DynamicDataFrame) differs dramatically from doing it manually with an EMR cluster. Glue has a tool called a crawler that saves huge amounts of time. It has lots of options and can read from many sources, but basically connects to a relational database and reads the schema definition of one or more tables. It can also read and parse structured files, like JSON and XML. It then creates an abstract “Table” containing the schema definition present in the source file or database. It does not crawl any data, only schema definitions. Once the crawler has defined the abstract tables (they can also be defined by hand), we can use them as data input or output templates. This will read data from a relational database or a file by querying those fields defined in the abstract table, and will create a DynamicDataFrame object with those same fields and the data ready to be processed by our Glue (Spark) job. The abstract table will be used at the time of loading the output data into an output file or database; we will declare in our job a “sink” that will correspond to such “table”. The table contains information about the format (SQL, JSON, XML, etc.), the output path (an S3 bucket for files or a JDBC database, or Redshift) and the schema definition. This means that Glue will create such output files for us in the desired format and place, or will do SQL inserts into a particular relational database, etc.
Once we had our data processed and placed in S3, we used an AWS Lambda function to ship it to AWS ElasticSearch. We used JSON as it is a format that ES can ingest using the Bulk API, which speeds things up a little bit. The Lambda function can do many things, including creating dashboards and handling retries, exponential backoffs and custom metrics, but it superficially parses the files looking for newlines and sends the JSON objects present in those files to ES using the Bulk API. We also put an ES template with a schema definition and some tokenizing options in ES the first time we create the index we are loading the data into. The Lambda function is triggered by the Glue job output S3 bucket’s Put operations therefore the ES ingestion process happens automatically.
The configuration tools
Terraform + AWS console :(
Unfortunately, when we wrote these lines, there was no Terraform support for most of the Glue resources, so we have done all the Lambda, IAM roles, buckets and networking using Terraform, but the Glue Jobs, crawlers, and Tables have been set up manually using the console. We are currently working on adding Terraform support for AWS Glue so we can publish it into the Terraform forge (as we’ve previously done for other unsupported features).
The solution works in a reasonable amount of time and is 100% serverless
I guess we should show you some numbers and graphs now. As with any other AWS serverless service, we are playing a game where we try to provision the right number of resources using the right sizes and limits. If you’ve tried this too, you’ll know that this is where serverless becomes tricky and things stop being “just drop your data and AWS will do the rest for you”.
In this case, the Glue tasks are relatively simple, so the whole Glue process took around 23 minutes (including 10 minutes to launch the underlying EMR cluster you don’t see). After reading the 16.5 million rows from MSSQL, we repartitioned the DynamicDataFrame into 800 partitions. It was creating 20 partitions in our case. We used 30 DPUs for our EMR cluster (you set that in the Glue job configuration) and substituted a legacy .NET process on EC2 that was taking around 90 minutes to do the same job. Yes, from 90 minutes to 23 minutes.
Once in S3, the Lambda function gets triggered and almost 100% throttled - or should I say queued. This process is interesting enough so let’s look at it in more detail.
Lambda concurrency and throttling
We were setting a concurrency limit for our ingesting Lambda function so it did not overload the ES cluster. We were also controlling the size of each output JSON file by re-partitioning the DataFrame (see above) so we had relatively small JSON files that would be ingested by the Lambda function before timing out (5 minutes). The problem was that we had 16.5 million rows, which resulted in 20Gb of data split into 800 files. Glue is a parallel process, so when it finished, it dropped 800 files in the output bucket. With results in 800 Lambda functions launched, but our concurrency set to only 40, what now?
I think this is one of the best hidden features of AWS Lambda, and it is not well know enough yet. When Lambda has been triggered asynchronously (like from an S3 trigger in our case), throttled invocations get queued and they retry in an exponential backoff way for up to 6 hours. This means there is a well managed queue that will ensure your Lambdas are executed within 6 hours (more than 6 hours will result in actual throttling) at the rate we set. We have to calculate the rate ourselves as it depends on the ingestion time per Lambda and the concurrency of Lambda invocations.
In this case, we used an ES cluster formed of 3 standalone masters (c4.large.elasticsearch) and 3 data nodes (c4.2xlarge.elasticsearch). This cluster size has being defined to satisfy the application frontend requirements, and we are loading the data at off-peak times, so we don’t impact customers.
This is an ES CPU graph during the tests:
As you can see, Lambda ingests faster if we provision more memory. In theory, it is a linear relationship, but the bottomline (at 128M) is efficient enough to deliver a good network and CPU speed. At 3G of RAM and concurrency 10 we are spending 5.6 times more money than ingesting at 128M of RAM and concurrency 40, and the speed is even faster (412.5K rows per minute vs 323K rows per minute). Keep in mind that each row is a fairly big JSON object, so we can not compare this speed with other case studies.
The next graph illustrates the ingestion operations. This is a custom metric of our Lambda and helps us to see ingestion peaks:
The fastest ingestion rate was obtained when concurrency was highest. Bigger concurrency caused ES CPU overload, which cascaded in Lambda ingestion timeouts. This caused lots of retries, increasing Lambda time billed, and ES being unusable during the load. We would need to horizontal or vertical scale up the Elasticsearch cluster.
A note on memory: in my opinion (this may vary depending on the nature of the data), ES has a fairly static memory consumption and it is CPU that we have to worry about. This is why we tend to use computed optimised instances, rather than general purpose or memory optimised. They tend to work well if you’re tokenizing (like we do) or you use ES pipelines to process the data at ingestion time (which is common practice for us in other projects). Here, we can see how there were no massive peaks of allocated memory in ES during the whole testing period:
The discarded options
Lambda (for the ETL), EMR, Firehose
AWS solutions are really cool and powerful, but they tend to over sell some stuff (as I’d do). One of the questions always ask myself is “Can I do this as if I were an AWS presenter doing a demo at re:Invent?”. Most of time, the answer is no, there are a lot of limitations and constraints in the serverless world. Those within the AWS world include Lambda functions timing out, max memory allocation, very low temp filesystem, Kinesis shards throughput, etc.
If your data, data sources and data targets are static, known and match all the serverless services specifications, you’re good to go. But in the majority of the cases, like this one, you have to choose plan B, or at least a different plan A.
We had to use something way bigger than Lambda to do our ETL, and fortunately, Glue was in a mature enough status to perform brilliantly. Some people suggested Lambda for the ETL but that would have required a nightmare of fine control on timeouts, Step Functions, S3 file uploads, streaming, and re-uploads. We think Glue is the way to go for ETL on large amounts of data.
EMR would have done the same as Glue, but why get into the hassle of provisioning it, configuring it, manually setting Spark DataFrame schema definitions and sinking the data into the target by ourselves? That was an easy one.
Finally, some people asked why we did not use Firehose to deliver the JSON data to ElasticSearch. Well, Firehose can not be triggered by S3; you need a Lambda function to stream the files into Firehose anyway. Then in theory, you don’t need to worry about throttling and overloading ES, but you also can not specify mappings or issue any kind of ES command on each indexing operation. Also, given that the Lambda concurrency and throttling combo works wonders, why not have the rest of its flexibility? Maybe Firehose will soon be able to do exactly what I need, but for the time being it is not a feasible option for most of our use cases. Once again, the AWS line that we can “simply stream the data to it” is shown to be a little optimistic. In reality, it’s a little more complicated than that.
Dario Ferrer and Ray Butcher at Claranet UK