A proper optimization framework for data infrastructure streamlines engineering efforts, allowing platforms to scale. Although an engineering organization may plan for growth and extensibility, rapidly growing datasets are an issue almost every data team will eventually encounter.
In 2020 DoorDash saw accelerated growth in each section of our three-sided marketplace: more customers ordering on our app, more merchants on the platform, and more Dashers (our term for delivery drivers) willing to make deliveries. This growth led to more data processing, but we still had to meet the same SLAs with our business partners, which includes dashboard consumers, advanced SQL writers, and data scientists. Continuing to meet our SLAs required optimizing our ETL jobs to better handle the increased load and data usage.
At DoorDash our data warehouse is in Snowflake. Although we could have just increased our compute resources to handle the additional load, we instead decided to implement an optimization framework for our ETL jobs to meet our SLAs without increasing costs.
Order of optimization
We devised this list of optimizations to prioritize our efforts by focusing on the items with the least effort and the most cost savings:
- Can this ETL be decommissioned?
- Can we break dependencies in the directed acyclic graph (DAG)?
- Can this ETL be done incrementally?
- Can we reduce the number of columns?
- Can we stop data spillage by splitting queries into a smaller dataset?
- Can we implement clustering?
- Can we utilize Snowflake functions?
Decommissioning an ETL job saves all of that job’s compute usage, while adding clustering to a column may just speed up GROUP BY/ORDER BY SQL functions, which is less effective. Many of the optimizations can be used together, such as using a Snowflake function and reducing the number of columns. For each optimization, we will discuss the implementation steps and associated tradeoffs.
Can this ETL be decommissioned?
With new data sources and processes appearing, it's important to remove any legacy ETL jobs that are no longer in use. By tracing the DAG downwards we can do an impact analysis to see if a job can be removed. An impact analysis consists of:
- Finding all tables, reports, and ETL jobs that rely on the ETL job we intend to remove
- Looking at how the job is utilized. For example, if the table is used in a LEFT JOIN it may be a candidate for removal, because LEFT JOINs may have fields that can be removed without impacting other fields in something like an INNER JOIN.
- Looking upstream to check if the source tables are still providing accurate data. More often than not it's better to provide no data than inaccurate data.
(For further explanation of DAGs in terms of ETL, checkout this Apache Airflow documentation page.)
By decommissioning ETL jobs teams can free up compute resources for other jobs to run. This reduces the overall compute cost and helps meet SLAs at the same time since there are fewer jobs being processed.
Can we break dependencies in the DAG?
If the ETL can not be decommissioned, the next step is to try to break up its dependencies. For example, let’s say we have two jobs that feed into a job downstream. Both of these source tables are needed in the target table and we can not rewrite the SQL query. One job, dataset_a, runs at 3 am and the other job, dataset_b, runs every hour. Our downstream job, daily_aggregate, has an 8 am SLA. We can see what the dependencies look like in Figure 1:
Let’s assume that daily_aggregate is sourced from dataset_a with a LEFT JOIN to dataset_b. The data from dataset_b is run hourly so we just need the data that was most recently processed up to when daily_aggregate is built. Let’s also assume that dataset_b’s ETL usually takes 15 minutes to run, but an increase in the data volume can make processes take up to 45 minutes to complete. Initially, daily_aggregate depended on both dataset_a and dataset_b to complete before starting. Without changing any SQL we can just remove dataset_b from the DAG, as shown in Figure 2, below:
Since we know that dataset_b is LEFT JOIN’ed to dataset_a, and we know how LEFT JOINs work, we may be able to remove daily_aggregate’s dependency on dataset_b. This helps us achieve our SLA now that we do not have to wait for dataset_b to complete at a specific time.
Reducing extraneous dependencies in DAG configurations breaks up excessively large DAGs and reduces complexities. This optimization is also a cheap and effective way to hit SLAs. No additional computation power, like scaling up the Snowflake warehouse, is required.
Can this ETL be done incrementally?
Over the last year, we had to revamp many of our ETLs in order to ensure that all our data was processed daily. Before our optimizations, jobs that were designed to handle full-table refreshes started to spill to the disk. Some of our datasets grew five times over a short period of time but our SLAs remained the same.
One way we met our SLAs was to switch many of our ETLs to incremental jobs. Running an incremental ETL means only inserting or updating changed records in a table instead of replacing the whole table. By doing incremental jobs it is possible to avoid reprocessing data that has not changed.
At DoorDash, attributes related to Dasher logistics, such as a Dasher ID, order level information, and fees associated with the orders usually do not change over time. In this case, deleting all the records and loading the same information, again and again, is unnecessary. Architecting the ETL jobs so that Snowflake processes only the required data helps boost performance because less data processed usually means faster processing times.
Can we reduce the number of columns in the SELECT clause of the query?
Routinely reviewing what columns are used in an ETL is a good way to reduce the amount of data processed. As applications change, columns upstream may be deprecated. These columns can be pruned from the ETL. Given how Snowflake stores data in columnar format, we avoid using SELECT * FROM Tablename in any ETL scripts. It is always advisable to use SELECT on only the required columns from the table.
Confining SELECT to the required columns serves multiple purposes in improving the performance of queries:
- By avoiding selecting all the columns from the table the volume of the dataset at the time of processing is reduced, increasing the performance
- This approach reduces caching of unnecessary data in memory
Can we stop data spillage by splitting queries into a smaller dataset?
Most of the time, query performance is impacted by the data volume and the cluster it is running on. When increasing the cluster size is not an option, we need to think about handling the data volume at the time of execution.
In that case, identifying the bottleneck is the first step towards query performance improvement. We used Snowflake’s Query Profile to identify the issue causing the slowness.
When operating certain queries with multiple joins or heavy aggregate operations there is a chance that the data volume exceeds the compute memory and starts spilling into remote and local storages. Working on the data spilled in remote/local storage takes a much longer time compared to working with data in memory. In this case, we can process the data by splitting the queries into the smaller datasets and then combining the resulting set.
Reducing data spillage decreases the compute time of queries. Splitting queries also reduces the compute cost on Snowflake. One thing to watch out for is trying to over-optimize a query into too many steps. A good way to check for this is to again use Query Profile to see how changes differ in the execution plan.
Can we implement clustering?
If queries are running slow or the query profile suggests it is scanning all the partitions of the table, it's an indication that this table requires a cluster key on the appropriate dimension. Snowflake supports clustering keys for large tables.
Cluster keys help partition the table based on the defined dimension and helps reduce the partition scan on large tables. We need to analyze the frequent queries and joins running the large table and decide the dimension to be clustered based on the attribute most used in the filter and join conditions.
When taking this approach, it’s important to note that:
- Clustering keys should not be defined on low cardinality, one or two distinct values, or high cardinality, too many distinct values, such as timestamps.
- Clustering comes with an associated cost in compute resources and storage.
Can we utilize functions within Snowflake?
Another way to optimize queries is to use Snowflake functions within the code. This improves code readability and reduces the chance of error with hard-coded formulas.
A common question in our business is “How far is X from Y?” Distance can be tricky to calculate in SQL given the curvature of the Earth. In these cases, we use Snowflake’s built-in HAVERSINE function. Haversine calculates the great-circle distance between two points on a sphere given their longitudes and latitudes, formatted as HAVERSINE( lat1, lon1, lat2, lon2 ).
QUALIFY is another useful function for removing duplicate values from a query. Qualify is to Window Functions in Snowflake as HAVING is to GROUP BY in SQL: It allows one to filter out rows based upon the results of a Window function. This is especially helpful in avoiding a second pass-through of the data. Here’s an example from Snowflake:
For DoorDash’s Data team, getting 1% better every day means deploying new ETLs or optimizing old ones constantly. Having our Order of Optimizations checklist allows us to approach optimization from many different angles. As our team grows, we hope to add more levels of detail to deliver results faster and more predictably.
Our team not only focuses on delivering fast and reliable data, but also on optimizing the data pipelines. This reduces our compute resource usage and makes our processes more efficient. Having the opportunity to learn how data flows through our applications and what that means for the business is one of the more exciting parts of our role.
If you’re interested in solving interesting data problems, come check out a role on our team.
Thanks goes out to DoorDash’s Business Intelligence team, and a special shout out goes to Josh Li for sharing Snowflake functions to the larger team.