How to Run Airflow in a non-Python Stack
tl;dr: Please do not re-write everything in Python.
At MadKudu, we decided to go all-in on Airflow to replace our homegrown orchestration engine. Its structure and flow is ideal for managing data pipelines. And it's optimized for the use cases of real-life long-running jobs that is often encountered in Data Engineering.
But what happens when most your existing scripts and jobs are not written in python, the core language of Airflow? In our case, most of our code base is in Typescript and we clearly did not want to rewrite it all in Python.
Fortunately, Airflow comes with a series of operators that can perform well in this kind of context.
Using the BashOperator
Let's start with a script that's not written in python. For this example, let's assume it is maintained on GitHub.
These would the steps to perform in order to get the process completed:
- git clone (or download) code from a repo or location (if private, you will need to authenticate using a token)
- build (if needed)
- run
- get logs / results, anything you'd need for the rest of the job
Hopefully you can have some cache or versioning capabilities to avoid do 1 and 2 again and again. They add no value to the job and worse, they actually raise the number of steps in the flow.
Eventually you would want to have 3 and 4 merged, so everything on stdout and stderr of your program, or directly dispatched to log managers.
Since 1 and 2 have to be performed on Airflow's instance (or at least the worker's), it adds a level of maintenance and configuration. There's a high chance you'll need dedicated volumes for each iteration/build. Also, the best way to use Airflow is to imagine that a task can be run twice at the time depending on cycles, retries, and schedules. But this is actually difficult since you may not have the chance to be able run 2 (or more) different builds at time. That turns out to be a nightmare to maintain.
In conclusion, I would definitely recommend using BashOperator for shell scripts or native commands. But running your own applications / builds / scripts can be dangerous for, in particular, these reasons:
- level of complexity: fair to complex
- level of maintainability: dangerous
- level of isolation: close to 0
Using the DockerOperator
Isolation problems should ring a bell! There are actually 2 crucial elements that can be crushed by the use of Docker here: isolation and build as well.
If you follow the standard CI process, build is done only on program's side (the CI process dedicated to the program, with a docker build step) and Airflow only needs to pull the image that's previously built from the registry and gets a built program. A second good news? Docker caches layers of images. So, compared from a full download, you can now cut the download time by a lot. Additionally, logs are easily captured by the DockerOperator in Airflow.
Here is what you get if you use a docker image with a docker operator within Airflow:
Compared to the use of BashOperator, here is what you get:
- 1 -> cut by a lot
- 2 -> removed
- 3 -> unchanged
- 4 -> standard
Additionally, you don't have to worry about volumes, utils for the build and the run. Everything is contained in the docker image.
In conclusion, that would result in:
- level of complexity: fair
- level of maintainability: low
- level of isolation: as perfect as docker does
Wait... What does the DockerOperator actually do?
We will provide a remote docker API and the DockerOperator will spawn a container and run it. You can either run the default entry-point or command as you would with a regular docker run... or like in a docker-compose yaml file.
Also it can be run where the remote Docker API is set, hence on the same instance or an other. It matters because eventually you would control the resources allocated to extra containers like these, so typically on separate instances.
At this point, you can run an airflow step wherever you want it to run, built with whatever language and/or framework you desire and safely control the resources you allocate for it. That's a good first step to transition to something fully containerized... such as docker-compose, swarn, kubernetes or their respective AWS services ECS and EKS.
What does the Docker Remote API do?
Let's step back a little bit and remember what docker provides already (among other things):
- An API to run containers (whatever the base language is)
- Supports networking, safe and isolated
- Resources management
That's precisely what we need to these kind of tasks:
- Isolated tasks
- Heavy tasks
- Everything is not written in Python
Since planets are now aligned, the only thing we need now is to be able to do all of that safely and programmatically. And that's where the Docker Remote API steps in.
In other words, we're about to allow our Docker host to receive commands programmatically, from Airflow, and for the sake of the example, also from containers.
How do you enable it?
For security reasons, the Docker remote API is not activated by default. Let's see how to activate it.
We will limit the example to Ubuntu and MacOS. Linux2 (on AWS) has also been covered and we can provide documentation for it.
Wether you're running docker, docker-compose or ECS, docker runs as a service. So you would need to go to the service configuration file and edit the last line:
# /etc/systemd/system/docker.service.d/startup_options.conf
[Service]
ExecStart=
ExecStart=/usr/bin/dockerd -H fd:// -H tcp://0.0.0.0:2376
Then, just restart the docker service and do a quick check that the API is online:
$ nc -vz localhost 2376
Connection to localhost 2376 port [tcp/*] succeeded!
Or you could also check in a more functional way:
$ curl localhost:2376/images/json
[
{
"Containers": -1,
"Created": 1576163865,
"Id": "sha256:0dc060c5beb828c8a73e2428fc643baabe7e625f83a86fde51134583216a71c2",
"Labels": {
"maintainer": "MadKudu"
},
"ParentId": "",
"RepoDigests": [
"****/****/airflow@sha256:d3154069335fc4235074a250822597b59abd2f32520e7125a6b2df94f419fbc0"
],
"RepoTags": [
"****/****/airflow:latest"
],
"SharedSize": -1,
"Size": 1127558483,
"VirtualSize": 1127558483
}
]
Note that the port can be changed.
Using Docker in a DAG
Setup the DAG
Airflow provides a pretty good operator for that: DockerOperator.
The easiest use case would be in the case where the images you want to use are already pulled. Which can eventually be done through another DAG, e.g. pulling images on a different schedule.
Back to the main DAG, say we want to run programs using node, the task using Docker would look like this:
run_task_with_docker = DockerOperator(
task_id='run_task_with_docker',
# Assuming this image is already pulled
image='madkudu/node-worker',
api_version='auto',
environment={
'NODE_VAR1': "value1"
'NODE_VAR2': "value2"
},
volumes=[
"/home/ubuntu/res:/root/res"
],
# Run a specific command if needed
command='node service run',
# The local API url we've setup earlier
docker_url='tcp://0.0.0.0:2376',
# If needed
network_mode='bridge',
dag=dag
)
Gist link: https://gist.github.com/anteverse/cf05e677da591690bfeed448eb9bb767
Execution
Although your worker doesn't actually run the task itself, it's hooked on the output of the container where things are happening. And everything gets logged into Airflow.
What you get would look like this:
[2019-12-18 11:51:23,281] {base_task_runner.py:98} INFO - Subtask: [2019-12-18 11:51:23,281] {docker_operator.py:204} INFO - `**Log from within the container`**
Which is very convenient. That means the container can be dropped and things cleaned after it completed and Airflow will ingest the logs as if it was a regular task.
Reporting and monitoring
An important part of the introduction of a new tool within a stack is observability (reporting, logging, monitoring). Don't let your team have to go find the information in a tool they haven't setup, it's long, dangerous, frustrating and this kind of tool are meant to ease that part.
Emails
The default setup allows you to send emails. You can send with whatever information you want to share, upon completion or failure, or both of a DAG run.
There are a bunch of examples you can find online. We haven't chosen that solution though.
Slack notifications
Airflow also comes with a SlackOperator.
Interesting fact with Slack: you can send data (with a bit of formatting) easily, and to channels that people would join or not. It's then cleaner and less polluting than an email. Also, with the new blocks API, you can add a lot more interactions. Here is a kind of message we send
People involved into the customers processes, who are not always familiar with Airflow, still get the minimum viable information: customer identity, process and reason. They can act accordingly and contact the right team or right person to deal with the issue.
Everything in that message above can actually be formatted to meet your needs and fit your security policy.
Slack provides a very good tool for that: Block Kit Build
Recap
How about a simple illustration, in the case where the worker is an actual docker container too:
If you like a good separation of concerns, here is what this means:
- the host gets to manage resources such as env variables, volumes, network config... static elements
- docker would be the runner
- the airflow worker would either run simple things itself or spawn a container for non python code
- the spawned container sends logs, and any relevant status back to the worker. Files can be written in shared volumes and used from other tasks
Conclusion
This illustrates how quickly and smoothly Airflow can be integrated to a non-python stack. It relies on Docker and if you're familiar with it, you'll be able to not only fit your pipeline in Airflow but you'll also be able to scale it faster.
At MadKudu, we've been able to run safely a lot of Node data applications and scripts through Airflow. We've been able to quickly generate new pipelines with different cycles, different volumes, different kind of integrations as well. All of that without the need to change the team and the stack!