31M Taxi Rides in EMR Serverless

July 16, 2022

What is EMR Serverless?

This blog will walk you through creating an EMR serverless application and some basic analysis in Hive. At the end we’ll compare this with other offerings like AWS Athena (Presto). In this blog, I’ll be using the eu-west-1 region.

EMR Serverless is a managed Hadoop service from AWS that has recently become generally available. Unlike EMR you’re only billed for what you use, and the cluster handles managing resources in the background (like Lambda). It abstracts these for you, and all you need to worry about is writing your job. You can specify how manu vCPUs and GBs of memory each worker uses, as well as setting global limits so you don’t receive an unexpected bill.

First you ‘create an application’ and then to the application you can submit either Spark (Spark/PySpark) or HiveQL jobs.

The current pricing for eu-west-1 (Ireland) is:

per vCPU per hour: $0.052624

per GB per hour: $0.0057785

Getting Started

I recommend you install the latest version of the AWS CLI and have correctly configured your AWS keys.

Create a service-role for EMR Serverless (obviously this can be done in the AWS console if you’re more comfortable with that):

aws iam create-role \
    --role-name EMRServerlessRole \
    --assume-role-policy-document file://emr-serverless-trust-policy.json

with the trust policy being:

{
    "Version": "2012-10-17",
    "Statement": [
    {
        "Sid": "EMRServerlessTrustPolicy",
        "Action": "sts:AssumeRole",
        "Effect": "Allow",
        "Principal": {
            "Service": "emr-serverless.amazonaws.com"
        }
    }]
}

Next, create an IAM policy, and replace <bucket>, with the bucket you’re going to use for your scripts and data.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ReadAccessForEMRSamples",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*.elasticmapreduce",
                "arn:aws:s3:::*.elasticmapreduce/*"
            ]
        },
        {
            "Sid": "FullAccessToOutputBucket",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::DOC-EXAMPLE-BUCKET",
                "arn:aws:s3:::DOC-EXAMPLE-BUCKET/*"
            ]
        },
        {
            "Sid": "GlueCreateAndReadDataCatalog",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:CreateDatabase",
                "glue:GetDataBases",
                "glue:CreateTable",
                "glue:GetTable",
                "glue:UpdateTable",
                "glue:DeleteTable",
                "glue:GetTables",
                "glue:GetPartition",
                "glue:GetPartitions",
                "glue:CreatePartition",
                "glue:BatchCreatePartition",
                "glue:GetUserDefinedFunctions"
            ],
            "Resource": ["*"]
        }
    ]
}
aws iam create-policy \
    --policy-name EMRServerlessPolicy \
    --policy-document file://emr-sample-access-policy.

Make a note of the policy_arn, you’ll need it for the next step:

aws iam attach-role-policy \
    --role-name EMRServerlessRole \
    --policy-arn <policy-arn>

Finally, create an application using the CLI:

aws emr-serverless create-application \
    --release-label emr-6.6.0 \
    --type "HIVE" \
    --name nyc-30m-records

Configure NYC Taxi Records

The NYC taxi dataset is a famous data set that includes every NYC taxi journey, for the purposes of this blog we’re just focusing on 2021, but the whole dataset is something like 2 Billion records. See Mark Litwintschik’s blog for a quick-look at what can be achieved analysing this dataset.

I’ll create a simple bash script to download all 12 Parquet files:

for i in {01..12}
do
        wget "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-$i.parquet"
done

And I’ll upload these to a new S3 bucket:

aws s3 mb s3://nyc-taxi-yellow
aws s3 cp . s3://nyc-taxi-yellow/parquet/ --recursive --exclude "*" --include "*.parquet"

Let’s just check we have all the records:

 aws s3 ls s3://nyc-taxi-yellow/parquet/
2022-07-16 14:04:00   21686067 yellow_tripdata_2021-01.parquet
2022-07-16 19:21:48   21777258 yellow_tripdata_2021-02.parquet
2022-07-16 19:21:48   30007852 yellow_tripdata_2021-03.parquet
2022-07-16 19:21:48   34018560 yellow_tripdata_2021-04.parquet
2022-07-16 19:21:48   38743682 yellow_tripdata_2021-05.parquet
2022-07-16 19:21:48   44071592 yellow_tripdata_2021-06.parquet
2022-07-16 19:21:48   43697690 yellow_tripdata_2021-07.parquet
2022-07-16 19:21:48   43425907 yellow_tripdata_2021-08.parquet
2022-07-16 19:21:48   46125883 yellow_tripdata_2021-09.parquet
2022-07-16 19:21:48   53286464 yellow_tripdata_2021-10.parquet
2022-07-16 19:21:48   53100722 yellow_tripdata_2021-11.parquet
2022-07-16 19:21:48   49639052 yellow_tripdata_2021-12.parquet

Great, now let’s write the HiveQL script.

Hive

I’m going to be using Hive rather than Spark as I’m familiar with it, and it enables me to quickly fix errors rather than compiling Spark code.

Let’s convert to Apache ORC first, just to test EMR Serverless:

CREATE EXTERNAL TABLE IF NOT EXISTS trips_orc (
   VendorID INT,
   tpep_pickup_datetime STRING,
   tpep_dropoff_datetime STRING,
   Passenger_count DOUBLE,
   Trip_distance DOUBLE,
   PULocationID INT,
   DOLocationID INT,
   RateCodeID INT,
   Store_and_fwd_flag STRING,
   Payment_type INT,
   Fare_amount DOUBLE,
   Extra DOUBLE,
   MTA_tax DOUBLE,
   Improvement_surcharge DOUBLE,
   Tip_amount DOUBLE,
   Tolls_amount DOUBLE,
   Total_amount DOUBLE,
   Congestion_Surcharge DOUBLE,
   Airport_fee DOUBLE
) STORED AS orc
  LOCATION 's3://nyc-taxi-yellow/orc/';

CREATE EXTERNAL TABLE IF NOT EXISTS trips_parquet (
   VendorID INT,
   tpep_pickup_datetime TIMESTAMP,
   tpep_dropoff_datetime TIMESTAMP,
   Passenger_count DOUBLE,
   Trip_distance DOUBLE,
   PULocationID INT,
   DOLocationID INT,
   RateCodeID INT,
   Store_and_fwd_flag STRING,
   Payment_type INT,
   Fare_amount DOUBLE,
   Extra DOUBLE,
   MTA_tax DOUBLE,
   Improvement_surcharge DOUBLE,
   Tip_amount DOUBLE,
   Tolls_amount DOUBLE,
   Total_amount DOUBLE,
   Congestion_Surcharge DOUBLE,
   Airport_fee DOUBLE
) STORED AS parquet
   LOCATION 's3://nyc-taxi-yellow/parquet/';

INSERT INTO TABLE trips_orc
      SELECT * FROM trips_parquet;

-- Benchmarks

SELECT COUNT(*) FROM
   trips_orc;

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;

SELECT payment_type,
       avg(total_amount)
FROM trips_orc
GROUP BY payment_type;

And then upload this to S3 so we can run it:

aws s3 cp hive.q s3://nyc-taxi-yellow/scripts/hive.q

Now let’s finally run the job:

aws emr-serverless start-job-run \
  --application-id application-id \
  --execution-role-arn job-role-arn \
  --execution-timeout-minutes 15 \
  --job-driver '{
    "hive": {
        "query": "s3://nyc-taxi-yellow/scripts/hive.q",
        "parameters": "--hiveconf hive.log.explain.output=false"
    }
   }' \
  --configuration-overrides '{
  "applicationConfiguration": [
    {
      "classification": "hive-site",
      "configurations": [],
      "properties": {
        "hive.tez.cpu.vcores": "1",
        "hive.driver.cores": "4",
        "hive.tez.container.size": "4096",
        "hive.driver.memory": "4g",
        "hive.exec.scratchdir": "s3://nyc-taxi-yellow/emr-serverless-hive/hive/scratch",
        "hive.metastore.warehouse.dir": "s3://nyc-taxi-yellow/emr-serverless-hive/hive/warehouse"
      }
    }
  ]
}'

You can launch the Hive UI and view the logs like you would in a normal cluster.

Hive UI on EMR Serverless

Benchmarking Queries

Hive

INSERT INTO TABLE trips_orc
      SELECT * FROM trips_parquet;

1m40s

SELECT COUNT(*) FROM
   trips_orc;
col1
30904308

3s 274ms

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;

3s 668ms

SELECT payment_type,
       avg(total_amount)
FROM trips_orc
GROUP BY payment_type;

9s 338ms

Athena

SELECT COUNT(*) FROM
   trips_orc;

666ms

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;

1.336 sec

SELECT payment_type,
       avg(total_amount)
FROM trips_orc
GROUP BY payment_type;

1.402 sec

Conclusion

So we have around 31M records in this dataset, and EMR Serverless took 1m40s to convert these from Parquet. Obviously this is converting a columnar file format to another columnar file format so the amount of processing is reduced, but it’s still a valid benchmark of EMR serverless’s capabilities.

As you can see EMR Serverless is slower, this is to be expected as Athena is a mature service that probably has more compute power behind it, but for dev workloads or time-sensitive analysis EMR serverless is a good fit.

If you’re wondering what the total cost was then here you are (excluding S3):

EMR Serverless total cost
Back to top