How We Solved Our Airflow I/O Problem By Using A Custom Docker Operator

By

Airflow is a useful tool for scheduling ETL (Extract, Transform, Load) jobs. Airflow runs DAGs (directed acyclic graphs) composed of tasks. These tasks are built using Python functions named Airflow operators allowing users to run tasks across different technologies. Airflow offers a comprehensive suite of standard operators allowing you to run Python scripts, SQL queries in various common database technologies, start up Docker containers, among other tasks. The standard operators can be found here. At Enigma, we use Airflow to run data pipelines supplying data to Enigma Public. More about running Docker containers on Airflow can be found in this blog post.

On my team at Enigma, we build and maintain several data pipelines using Airflow DAGs, some of which use DockerOperator to spin up Parsekit (an internal parsing library) containers. In several of these pipelines, we tweaked the Docker Operator to make up for some shortcomings. As a reminder, DockerOperator takes in the image name, volumes, environment variables, Docker url among other arguments, and spins up the specified container. You can think of it as Airflow’s API to running Docker containers as opposed to the CLI. And like the CLI command, there’s no standard method to pass in inputs and extract outputs. This article will show you how to build a custom Airflow Operator to do the following:

  1. Supply JSON input into the Docker Container
  2. Extract file outputs (XLSX, CSV, etc) from within the Docker Container
  3. Operate on multi-worker Airflow deployments

Starting out

We don’t want to reinvent the wheel here, so we’re going to start our class by inheriting from Airflow’s DockerOperator. DockerOperator takes care of supplying arguments necessary to run the container and starts up the container.

from airflow.operators.docker_operator import DockerOperator

Class JsonIoOperator(DockerOperator):

Setup

We need a way to pass input into the container. Ideally, the input comes from upstream tasks. In our case, almost all tasks are Python Operators. The default return of a python operator is stored in Airflow XCOM, allowing downstream tasks to access using the `task_id` and the accessor task instance’s `xcom_pull` function.

To get the input, the invoker must pass in the upstream task’s task_id when instantiating the JsonIoOperator. To do this, we use DockerOperator’s __init__ function and supply an additional argument `input_task_id`.

from airflow.operators.docker_operator import DockerOperator

Class JsonIoOperator(DockerOperator):

    def __init__(self, input_task_id, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.input_task_id = input_task_id

Overriding execute

The execute function is where most of our code lives. We will override the default execute function so we can add I/O logic before and after running DockerOperator’s default execute function.

Our input is a small JSON string. If the input is large (> 1mb), we want supply a file path instead. In standard deployments of Airflow with multiple worker hosts, the file path must exist on a shared storage location such as NFS or S3, which we assume we will have. We will use shared storage later to pass outputs from this task to downstream tasks.

from airflow.operators.docker_operator import DockerOperator

Class JsonIoOperator(DockerOperator):

    def __init__(self, input_task_id, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.input_task_id = input_task_id

    def execute(self, context):
        
        # pass input logic goes here
        
        # setup output logic goes here
 
        # run the container
        super().execute(context)

        # load output into Airflow logic goes here

Grabbing input from other tasks

Reading in upstream data is easily done using the task instance’s `xcom_pull` method, which is a class method of BaseOperator from which DockerOperator inherits.

To pass the JSON, we have two options: environment variables, and volumes. In my use case, because I have low complexity JSON without special characters, I’m going to serialize the JSON into a string and then set it as an environment variable `CONTAINER_INPUT`. The container process is responsible for reading the environment variable and using it. For more complex inputs, we would want to mount the input file (via shared storage) in and point the container to it via environment variables.

Side note:
The task instance context dictionary contains several useful functions and attributes. Here’s a gist listing those out.

import json
import tempfile
from airflow.operators.docker_operator import DockerOperator

Class JsonIoOperator(DockerOperator):

    def __init__(self, input_task_id, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.input_task_id = input_task_id

    def execute(self, context):
        
        # pass input logic goes here
        input = self.xcom_pull(task_ids=self.input_task_id, context=context)
        self.environment['CONTAINER_INPUT'] = json.dumps(input)

        # setup output logic goes here
 
        # run the container
        super().execute(context)

        # load output into Airflow logic goes here

Setting up output

To access container output downstream tasks, we will mount a shared NFS directory from the host to the container. NFS allows all workers to access the same storage. The base path of this directory should be passed in as an argument as `shared_dir_path`.

We will create a temporary directory within `shared_dir_path` and mount that folder into the container’s `output_dir_path`. `output_dir_path` can be specified by the user. `OUTPUT_DIR` should be read in by the container’s main process and used to write outputs to.

import os
import json
import tempfile
from airflow.operators.docker_operator import DockerOperator

Class JsonIoOperator(DockerOperator):

    def __init__(
        self, 
        input_task_id,
        shared_dir_path,
        output_dir_path=’/tmp/output/’,
        *args,
        **kwargs):

        super().__init__(*args, **kwargs)
        self.input_task_id = input_task_id
        self.shared_dir_path = shared_dir_path
        self.output_dir_path = output_dir_path

    def execute(self, context):
        
        # pass input logic goes here
        input = self.xcom_pull(task_ids=self.input_task_id, context=context)
        self.environment['CONTAINER_INPUT'] = json.dumps(input)

        # setup output logic goes here
        self.environment[‘OUTPUT_DIR’] = self.output_dir
        tmp_dir = tempfile.TemporaryDirectory(dir=self.shared_dir_path)
        tmp_dir_path = tmp_dir.name
        
        # appending volume
        volume = “{}:{}:rw”.format(tmp_dir_path, self.output_dir_path)
        self.volumes.append(volume)
 
        # run the container
        super().execute(context)

        # load output into Airflow logic goes here

Loading output into Airflow

After output is written to NFS by the container process, we just return the directory path. Downstream tasks will access the files by reading in the directory path.

Temporary directory cleanups:
At the end of the DAG, there should be a cleanup task which deletes all temporary output directories created inside the NFS.

from airflow.operators.docker_operator import DockerOperator

Class JsonIoOperator(DockerOperator):

    def __init__(
        self, 
        input_task_id,
        shared_dir_path,
        output_dir_path=’/tmp/output/’,
        *args,
        **kwargs):

        super().__init__(*args, **kwargs)
        self.input_task_id = input_task_id
        self.shared_dir_path = shared_dir_path
        self.output_dir_path = output_dir_path

    def execute(self, context):
        
        # pass input logic goes here
        input = self.xcom_pull(task_ids=self.input_task_id, context=context)
        self.environment['CONTAINER_INPUT'] = json.dumps(parser_context)

        # setup output logic goes here
        self.environment[‘OUTPUT_DIR’] = self.output_dir_path
        tmp_dir = tempfile.TemporaryDirectory(dir=self.shared_dir_path)
        tmp_dir_path = tmp_dir.name
        
        # appending volume
        volume = “{}:{}:rw”.format(tmp_dir_path, self.output_dir_path)
        self.volumes.append(volume)
 
        # run the container
        super().execute(context)

        # load output into Airflow logic goes here
        # returns path where output files are written to
        return tmp_dir_path

Alternative: Use XCOM to load output into Airflow

When the output is small and simple, the following method provides an alternative and loads the output directly into Airflow’s XCOM. This approach is brittle and not recommended but useful in certain scenarios where the output is small. Keep in mind XCOM is a table within Airflow’s database, so all output is stored there. As more and more DAG runs occur, the database will grow in size, necessitating regular cleanup dags to remove Airflow metadata depending on how fast the database fills up.

import os
import json
import tempfile
from airflow.operators.docker_operator import DockerOperator

Class JsonIoOperator(DockerOperator):

    def __init__(
        self, 
        input_task_id,
        shared_dir_path,
        output_dir_path=’/tmp/output/’,
        *args,
        **kwargs):

        super().__init__(*args, **kwargs)
        self.input_task_id = input_task_id
        self.shared_dir_path = shared_dir_path
        self.output_dir_path = output_dir_path

    def execute(self, context):
        
        # pass input logic goes here
        input = self.xcom_pull(task_ids=self.input_task_id, context=context)
        self.environment['CONTAINER_INPUT'] = json.dumps(parser_context)

        # setup output logic goes here
        self.environment[‘OUTPUT_DIR’] = self.output_dir_path
        tmp_dir = tempfile.TemporaryDirectory(dir=self.shared_dir_path)
        files = {}
        
        # using context to avoid explicit garbage collection code
        with tmp_dir as tmp_dir:
            tmp_dir_path = tmp_dir.name
        
            # appending volume
            volume = “{}:{}:rw”.format(tmp_dir_path, self.output_dir)
            self.volumes.append(volume)
 
            # run the container
            super().execute(context)

            # load output into Airflow logic goes here

            for filename in os.listdir(tmp_dir_path):
                filepath = os.path.join(tmp_dir_path, filename)
                with open(filepath, ‘rb’) as f:
                    files[filename] = f.read()
        
        return files

Usage

Where to place the operator?

  • airflow dags
    • operators
      • json_io_operator.py
    • mydag.py

Usage is similar to DockerOperator with the addition of three more arguments `output_dir_path`, `shared_dir_path` and ‘input_task_id’. You can check out the DockerOperator docs here.

mydag.py

from airflow import DAG
from airflow.operators import PythonOperator
from operators.json_io_operator import JsonIoOperator

dag = dag(...)

input_task_id = ‘python_task’

input_task = PythonOperator(
    task_id=input_task_id,
    …,
)

dockerTask = JsonIoOperator(
    docker_url=’unix:///var/run/docker.sock’,
    image=...,
    volumes=...,
    environment=...,
    task_id=’docker_task’,
    dag=dag,
    output_dir_path=..., # location within container output will be written to
    shared_dir_path=..., # your NFS dir or S3 location
    input_task_id=input_task_id,
)

Recommended reads