By Tom Yedwab
Introduction
When we last discussed Khan Academy’s data pipeline management system, we shared our homegrown orchestration tool called Khanalytics and the progress we had made toward centralizing data management tasks in one tool. In the three years since, the Data Infrastructure team has grown in size and responsibility, and we have made a lot of changes. The biggest change was to stop development on Khanalytics and deprecate it in favor of the open-source project Apache Airflow. We’ve integrated this technology as a core component of a new system that we have dubbed “Khanflow”.
In this post I will talk a bit about why and how we made the transition.
Deciding to switch to Airflow
Before talking about the implementation, I want to describe the decision-making process because in my mind it is just as important. Way back in 2016 — before embarking on the multi-year journey of development on Khanalytics — we did a survey of the existing tools and identified Airflow as the most likely option for integrating a third-party solution. However, it was a very new project at the time and we decided it was not robust and capable enough for our needs. So we built our own, knowing that the data tooling landscape was moving quickly and our own understanding of our needs would change. So the question becomes: when to re-evaluate?
When you have developed your own software tooling, it is easy to fall into one of two traps: on one hand, there is the sunk cost fallacy of continuing investing in a tool because of the time invested and the inherent customization for the team’s specific needs. On the other hand, there is a temptation to jump to the latest up-and-coming technology, especially if there is excitement and momentum around a particular project, only to spend a considerable investment getting to feature parity or discovering hidden deficiencies. Adopting Airflow would be a significant investment, so why not continue to invest in making our own tool better?
The short answer is that as a small Data Infrastructure team, our mission is not to develop the perfect tooling but rather to ensure the rest of the company has timely, reliable access to the data and metrics they need. Indeed, this emphasis on efficiency in serving the needs of our product users is consistent with Khan Academy’s engineering philosophy. The more time we spend on the “simple” stuff — running scheduled data pipelines — the less time we can spend on more complex and technical needs. And what we discovered is that developing and maintaining a pipeline orchestration platform in the cloud takes considerable ongoing investment, while community-supported projects such as Airflow are rapidly improving and becoming easier to manage every year.
So, does Airflow meet our needs as well as or better than our own custom solution? When we investigated, we found:
- Solid and growing community support (a key feature when evaluating open-source projects),
- A broad set of supported clouds and tools, so our favorite Google Cloud products such as BigQuery (for running analytics queries) and Dataflow (for running data pipelines) are supported out of the box,
- Fully managed options such as Astronomer and Google Cloud Composer,
- Solid support for both hosting in Kubernetes and for running tasks using KubernetesPodOperator in similar ways to Khanalytics pipelines.
Implementation
The first stage in any project like this is a proof-of-concept, which was made easier with Cloud Composer. In one click of a button, we had a working Airflow deployment and could start running pipelines (“DAGs” or Directed Acyclic Graphs in Airflow parlance). We experimentally ported a few common pipeline types and started building out a proposed plan for what we would need to build around Airflow to make it a fully-supported alternative to Khanalytics.
Some of the earliest decisions we had to make were about best practices; as a highly configurable and customizable tool, Airflow can be used in many different ways. For example, when running a BigQuery SQL query we could either use the built-in BigQueryOperator (which runs directly in the worker container) or use KubernetesPodOperator to launch our custom Docker image as we would do in Khanalytics. We went with the latter for this use case and felt validated by examples of others advocating strict adherence to this approach. For simple tasks such as sending an HTTP request we opted to use the built-in operators instead.
When it became clear that porting existing pipelines was relatively easy and maintaining the environment was much less work than our own custom infrastructure, we focused on the needs not directly provided by Airflow, including:
1. Wrappers for DAG() and commonly-used operators.
Since pipelines are just Python objects in Airflow, we can create base classes that define reasonable defaults, enforce constraints, and add custom functionality. The Data Infrastructure team maintains these and updates them in response to common issues and needs we see. Rather than invoking KubernetesPodOperator directly, we create a wrapper for each of our Docker images that takes explicit parameters which are then passed to the image as environment variables.
2. A CI/CD pipeline to lint, test, and deploy pipelines to production.
We use a GitHub repository to version all our pipelines, including the associated queries and scripts. These files all need to be synced to Airflow, but only after passing some simple checks – for instance, the Python code needs to parse correctly, otherwise the pipeline will disappear from the Airflow UI and stop running. We run these checks in a GitHub Action, which integrates with our pull-request based review and deployment process. Once merged to master, pipeline code is continuously synced from GitHub so we do not need to rebuild the Airflow image to deploy a pipeline.
We also have a separate GitHub repository for the Airflow configuration itself and the commonly-used Docker images. This has a separate CI/CD process which builds the images and runs some test pipelines in the production Airflow cluster against the new images. Once checks pass we update a version file in Google Cloud Storage causing all future pipeline runs to use the new image version.
3. A development environment for running Airflow locally.
While we initially set up a test Airflow deployment for iterating on pipelines, copying pipeline files into the cloud on every change quickly became too slow and cumbersome, and coordinating with others to share a single environment was limiting. There isn’t currently a single supported way to run Airflow on a local machine, so we developed a script which automatically sets up a Python virtual environment, installs dependencies, sets up a local Postgres database, and then runs the Airflow webserver and scheduler. While it isn’t perfect, it is good enough for non-engineers to build and iterate on pipelines quickly.
With this same environment we can run individual pipeline tasks for even quicker iteration and also run our custom linter.
4. A set of Google Cloud service accounts to limit pipeline access to data.
Some of our pipelines access more sensitive data than others. While there is no built-in security model for Airflow pipelines, we have limited the access of the default service account that pipelines run with to a reasonable minimum, and require that access to more sensitive data explicitly use a specifically-privileged service account.
Example pipeline
To illustrate how we’ve customized Airflow into our own Khanflow system, here is a trivial example running a custom Python script:
example_python.py
# An example pipeline using Airflow
pipeline = KhanflowPipeline(
'example',
'Example pipeline',
'Infrastructure',
default_args,
datetime(2020, 9, 18),
slack_channel_to_notify='infrastructure-data')
# A task which runs a Python script
a = KhanflowPythonOperator(
pipeline,
task_id="script",
app=pipeline.path("script.py"),
)
# A task which runs a SQL query and writes to a table
b = KhanflowBigQueryOperator(
pipeline,
task_id="query",
query=[ pipeline.path("bigquery.sql") ],
variables={ "timestamp": "{{ ts }}" },
dataset="khanflow_stats",
table="example_bigquery",
)
# Add a dependency so "b" only runs after "a"
a >> b
Naturally, script.py
and bigquery.sql
are a Python script and BigQuery query both checked into the same pipeline repository in the same directory as the pipeline itself.
In this example we use three helper classes: KhanflowPipeline
, KhanflowPythonOperator
, and KhanflowBigQueryOperator
.
KhanflowPipeline
is a wrapper for Airflow’s DAG which provides some default values and functionality but also adds a new required parameter – the team which owns the pipeline – as well as some optional parameters such as the Slack channel to notify on failure.
KhanflowPythonOperator
is a bit more interesting. Internally, it uses KubernetesPodOperator
to start a pod in the same cluster running our pre-built Python image. In production, an init container runs first using git-sync to fetch the pipeline repository into a shared volume so that when the main container runs it can look up the script filename that is passed in as an environment variable. Running in a separate container like this allows us to install whatever Python dependencies we want which may be incompatible with libraries that Airflow needs for its worker images.
This is all well and good for production, but how does this pipeline work in our development environment? We do not currently expect pipeline authors to be running a Kubernetes environment locally, so when the pipeline is run locally we still run the pods in the cloud, in our test GKE cluster. Since we want to run the pod with the latest local version of script.py
, we package up any local changes in an archive and upload the archive to GCS where the running pod can pull it instead of pulling from GitHub. This approach is a bit clunky, but has the benefit of more closely resembling the production environment and not requiring us to store cluster secrets on the author’s machine.
KhanflowBigQueryOperator
is very much like its Python counterpart. It can handle both inline queries and files, it can concatenate multiple queries, and it can interpolate variables into a query such as timestamp
in this instance. It also automatically adds labels to the query job so we can later extract per-pipeline and per-task metadata such as query costs for reporting purposes.
Though we’ve added a fair amount of customization, this pipeline is both straightforward to understand and should look very familiar to someone familiar with Airflow.
Where we are now
At this point we have a fully-functional Airflow deployment and are actively porting all existing pipelines, whether still running on Khanalytics or on a cron somewhere, to the new system. While Composer made it very easy to get started, we are currently hosting Airflow in our own GKE cluster using Terraform and Helm for added flexibility and this has also been straightforward to maintain. We recently upgraded to Airflow 2.0 which brings a lot of new features and performance improvements. Minor bugs do happen, however the developers are very responsive in GitHub and on Slack and there are usually straightforward workarounds. Most importantly, the system is very stable and we don’t see frequent failures that need to be diagnosed.
So what is in store for the future of our pipeline architecture? We will continue to make the tool easier and more approachable. For our Data Insights team’s needs, we need to make scheduling a query as easy as using other tools such as BigQuery’s Scheduled Queries, so we can get the benefits of visibility and revision history for these queries as well. We are also planning to build a data lineage system to track data as it moves from upstream sources, through Airflow and onward to data dashboards and reports.
Looking back we are very happy with our decision to adopt Airflow; it is solving a problem that is common across data infrastructure teams and we are glad to see many of these teams standardizing on a shared tool.
Now we get to focus on the more interesting needs of our Data Insights team around building better learning metrics, conducting efficacy research, and improving and optimizing the experience of learners who depend on our product to succeed in their learning journey. If you’re excited to help us build our Data Infrastructure, we’re hiring!