Airflow is a useful tool for scheduling ETL (Extract, Transform, Load) jobs. Airflow runs DAGs (directed acyclic graphs) composed of tasks. These tasks are built using Python functions named Airflow operators allowing users to run tasks across different technologies. Airflow offers a comprehensive suite of standard operators allowing you to run Python scripts, SQL queries in various common database technologies, start up Docker containers, among other tasks. The standard operators can be found here. At Enigma, we use Airflow to run data pipelines supplying data to Enigma Public.
On my team at Enigma, we build and maintain several data pipelines using Airflow DAGs, some of which use DockerOperator to spin up Parsekit (an internal parsing library) containers. In several of these pipelines, we tweaked the Docker Operator to make up for some shortcomings. As a reminder, DockerOperator takes in the image name, volumes, environment variables, Docker url among other arguments, and spins up the specified container. You can think of it as Airflow’s API to running Docker containers as opposed to the CLI. And like the CLI command, there’s no standard method to pass in inputs and extract outputs. This article will show you how to build a custom Airflow Operator to do the following:
We don’t want to reinvent the wheel here, so we’re going to start our class by inheriting from Airflow’s DockerOperator. DockerOperator takes care of supplying arguments necessary to run the container and starts up the container.
<div class="code-wrap"><code>from airflow.operators.docker_operator import DockerOperator
Class JsonIoOperator(DockerOperator):
def __init__(self, input_task_id, *args, **kwargs):
super().__init__(*args, **kwargs)
self.input_task_id = input_task_id</code></div>
We need a way to pass input into the container. Ideally, the input comes from upstream tasks. In our case, almost all tasks are Python Operators. The default return of a python operator is stored in Airflow XCOM, allowing downstream tasks to access using the `task_id` and the accessor task instance’s `xcom_pull` function. To get the input, the invoker must pass in the upstream task’s task_id when instantiating the JsonIoOperator. To do this, we use DockerOperator’s __init__ function and supply an additional argument `input_task_id`.
<div class="code-wrap"><code>from airflow.operators.docker_operator import DockerOperator
Class JsonIoOperator(DockerOperator):
def __init__(self, input_task_id, *args, **kwargs):
super().__init__(*args, **kwargs)
self.input_task_id = input_task_id
def execute(self, context):
# pass input logic goes here
# setup output logic goes here
# run the container
super().execute(context)
# load output into Airflow logic goes here</code></div>
The execute function is where most of our code lives. We will override the default execute function so we can add I/O logic before and after running DockerOperator’s default execute function.
Our input is a small JSON string. If the input is large (> 1mb), we want supply a file path instead. In standard deployments of Airflow with multiple worker hosts, the file path must exist on a shared storage location such as NFS or S3, which we assume we will have. We will use shared storage later to pass outputs from this task to downstream tasks.
<div class="code-wrap"><code>import json
import tempfile
from airflow.operators.docker_operator import DockerOperator
Class JsonIoOperator(DockerOperator):
def __init__(self, input_task_id, *args, **kwargs):
super().__init__(*args, **kwargs)
self.input_task_id = input_task_id
def execute(self, context):
# pass input logic goes here
input = self.xcom_pull(task_ids=self.input_task_id, context=context)
self.environment['CONTAINER_INPUT'] = json.dumps(input)
# setup output logic goes here
# run the container
super().execute(context)
# load output into Airflow logic goes here</code></div>
Reading in upstream data is easily done using the task instance’s `xcom_pull` method, which is a class method of BaseOperator from which DockerOperator inherits.
To pass the JSON, we have two options: environment variables, and volumes. In my use case, because I have low complexity JSON without special characters, I’m going to serialize the JSON into a string and then set it as an environment variable `CONTAINER_INPUT`. The container process is responsible for reading the environment variable and using it. For more complex inputs, we would want to mount the input file (via shared storage) in and point the container to it via environment variables.
Side note: The task instance context dictionary contains several useful functions and attributes. Here’s a gist listing those out.
<div class="code-wrap"><code>import os
import json
import tempfile
from airflow.operators.docker_operator import DockerOperator
Class JsonIoOperator(DockerOperator):
def __init__(
self,
input_task_id,
shared_dir_path,
output_dir_path=’/tmp/output/’,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.input_task_id = input_task_id
self.shared_dir_path = shared_dir_path
self.output_dir_path = output_dir_path
def execute(self, context):
# pass input logic goes here
input = self.xcom_pull(task_ids=self.input_task_id, context=context)
self.environment['CONTAINER_INPUT'] = json.dumps(input)
# setup output logic goes here
self.environment[‘OUTPUT_DIR’] = self.output_dir
tmp_dir = tempfile.TemporaryDirectory(dir=self.shared_dir_path)
tmp_dir_path = tmp_dir.name
# appending volume
volume = “{}:{}:rw”.format(tmp_dir_path, self.output_dir_path)
self.volumes.append(volume)
# run the container
super().execute(context)
# load output into Airflow logic goes here</code></div>
To access container output downstream tasks, we will mount a shared NFS directory from the host to the container. NFS allows all workers to access the same storage. The base path of this directory should be passed in as an argument as `shared_dir_path`.
We will create a temporary directory within `shared_dir_path` and mount that folder into the container’s `output_dir_path`. `output_dir_path` can be specified by the user. `OUTPUT_DIR` should be read in by the container’s main process and used to write outputs to.
<div class="code-wrap"><code>import os
import json
import tempfile
from airflow.operators.docker_operator import DockerOperator
Class JsonIoOperator(DockerOperator):
def __init__(
self,
input_task_id,
shared_dir_path,
output_dir_path=’/tmp/output/’,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.input_task_id = input_task_id
self.shared_dir_path = shared_dir_path
self.output_dir_path = output_dir_path
def execute(self, context):
# pass input logic goes here
input = self.xcom_pull(task_ids=self.input_task_id, context=context)
self.environment['CONTAINER_INPUT'] = json.dumps(parser_context)
# setup output logic goes here
self.environment[‘OUTPUT_DIR’] = self.output_dir_path
tmp_dir = tempfile.TemporaryDirectory(dir=self.shared_dir_path)
tmp_dir_path = tmp_dir.name
# appending volume
volume = “{}:{}:rw”.format(tmp_dir_path, self.output_dir_path)
self.volumes.append(volume)
# run the container
super().execute(context)
# load output into Airflow logic goes here
# returns path where output files are written to
return tmp_dir_path</code></div>
After output is written to NFS by the container process, we just return the directory path. Downstream tasks will access the files by reading in the directory path.
Temporary directory cleanups: At the end of the DAG, there should be a cleanup task which deletes all temporary output directories created inside the NFS.
<div class="code-wrap"><code>import os
import json
import tempfile
from airflow.operators.docker_operator import DockerOperator
Class JsonIoOperator(DockerOperator):
def __init__(
self,
input_task_id,
shared_dir_path,
output_dir_path=’/tmp/output/’,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.input_task_id = input_task_id
self.shared_dir_path = shared_dir_path
self.output_dir_path = output_dir_path
def execute(self, context):
# pass input logic goes here
input = self.xcom_pull(task_ids=self.input_task_id, context=context)
self.environment['CONTAINER_INPUT'] = json.dumps(parser_context)
# setup output logic goes here
self.environment[‘OUTPUT_DIR’] = self.output_dir_path
tmp_dir = tempfile.TemporaryDirectory(dir=self.shared_dir_path)
files = {}
# using context to avoid explicit garbage collection code
with tmp_dir as tmp_dir:
tmp_dir_path = tmp_dir.name
# appending volume
volume = “{}:{}:rw”.format(tmp_dir_path, self.output_dir)
self.volumes.append(volume)
# run the container
super().execute(context)
# load output into Airflow logic goes here
for filename in os.listdir(tmp_dir_path):
filepath = os.path.join(tmp_dir_path, filename)
with open(filepath, ‘rb’) as f:
files[filename] = f.read()
return files</code></div>
When the output is small and simple, the following method provides an alternative and loads the output directly into Airflow’s XCOM. This approach is brittle and not recommended but useful in certain scenarios where the output is small. Keep in mind XCOM is a table within Airflow’s database, so all output is stored there. As more and more DAG runs occur, the database will grow in size, necessitating regular cleanup dags to remove Airflow metadata depending on how fast the database fills up.
<div class="code-wrap"><code>from airflow import DAG
from airflow.operators import PythonOperator
from operators.json_io_operator import JsonIoOperator
dag = dag(...)
input_task_id = ‘python_task’
input_task = PythonOperator(
task_id=input_task_id,
…,
)
dockerTask = JsonIoOperator(
docker_url=’unix:///var/run/docker.sock’,
image=...,
volumes=...,
environment=...,
task_id=’docker_task’,
dag=dag,
output_dir_path=..., # location within container output will be written to
shared_dir_path=..., # your NFS dir or S3 location
input_task_id=input_task_id,
)</code></div>
Where to place the operator?
Usage is similar to DockerOperator with the addition of three more arguments `output_dir_path`, `shared_dir_path` and ‘input_task_id’. You can check out the DockerOperator docs here.
Mydag.py