Scalable airflow with Kubernetes + Git Sync
Using airflow to provide a solution for multiple teams
This article summarizes a way to use of Airflow with Kubernetes with DAGs synced through Git.
This architecture here shows:
- Airflow with scalable workers and executors as Kubernetes pods;
- Airflow UI and Scheduler also running inside Kubernetes;
- Adding Dags through git-sync allowing users to create and update new pipelines without restarting airflow
Airflow docker image
You can have any airflow image created. The important thing is that your airflow have to be installed with the extra feature kubernetes
:
apache-airflow[kubernetes]==1.10.6
The entrypoint of my image starts the airflow metadata db, the webserver and the scheduler. I believe this can be improved separating the webserver from the scheduler, but for now the airflow container has multiple processes running:
#!/bin/bashairflow initdb
airflow webserver -p 8080 &
airflow scheduler
Dags Repository
You need to create a git repository to keep your DAGs. It can be the same one with all your Dockerfile and kubernetes deployment files, but in my case, I prefered a new one to keep the Dags and the airflow code isolated.
For this example, I’ll call it dags-airflow
Airflow Deployment
Now that we have an airflow image with kubernetes, we can deploy it:
Important things here:
- This pod will have 2 containers: One for airflow and one for k8s.gcr.io/git-sync:v3.1.2. Git sync container shares a volume with the airflow container and will fetch the dags in the
dags-airflow.
This will keep the scheduler and the UI always up-to-date with the new/updates DAGS; - The parameters to gitsync are sent through a
configmap
(the password is actually passed through a secret):
GIT_SYNC_REPO: “https://github.com/xyz/dags-airflow"
GIT_SYNC_BRANCH: “master”
GIT_SYNC_ROOT: “/git”
GIT_SYNC_DEST: “sync”
GIT_SYNC_DEPTH: “1”
GIT_SYNC_ONE_TIME: “false”
GIT_SYNC_WAIT: “60”
GIT_SYNC_USERNAME: “git_username”
GIT_KNOWN_HOSTS: “false”
GIT_PASSWORD: “242452”
Airflow Config File
The airflow config airflow.cfg
determines how all the process will work. Here I’ll just mention the main properties I’ve changed:
- Kubernetes: You have to change the
executor
, define the docker image that the workers are going to use, choose if these pods are deleted after conclusion and the service_name + namespace they will be created on. Also the in_cluster is necessary if you want everything running in the same cluster.
executor = KubernetesExecutor
worker_container_repository = airflow
worker_container_tag = 1.2
worker_container_image_pull_policy = IfNotPresent
delete_worker_pods = True
namespace = dataanalytics
worker_service_account_name = k8s-cronjob-autoscaler
in_cluster = True
- Git: Here we have to set again the git parameters. Why? Because the git sync container in the deployment works just for main airflow. And the workers need to clone the DAG code also. This here will work for the created workers when the scheduler triggers a new execution.
git_repo = https://github.com/xyz/dags-airflow
git_branch = master
git_subpath = dags
git_user = git_username
git_password = 242452
git_sync_root = /git
git_sync_dest = sync
git_sync_depth = 1
git_dags_folder_mount_point = /opt/airflow/dags
git_sync_container_repository = k8s.gcr.io/git-sync
git_sync_container_tag = v3.1.2
git_sync_init_container_name = git-sync-clone
- Remote logging: Since the worker is destroyed after execution, we can save the logs in a remote storage (s3 in this case). If you activate this option, the log will only be available after the task is done.
remote_logging = True
remote_log_conn_id = s3_connection
remote_base_log_folder = s3://${ENVIRONMENT}-dataplatform-logs/airflow
- Remote logging for workers: The problem of setting the airflow.cfg through a config map is that airflow does not apply it for the workers… So I had to add this in the kubernetes_environment_variables section. This section allows you to send environment variables to workers.
AIRFLOW__CORE__REMOTE_LOGGING = True
AIRFLOW__CORE__REMOTE_LOG_CONN_ID = s3_connection
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER = s3://${ENVIRONMENT}-dataplatform-logs/airflow
AIRFLOW__CORE__ENCRYPT_S3_LOGS = False
Conclusion
With this configuration, we’ll have:
- Only one pod always active (UI + Scheduler);
- This pod has a timer to fetch and copy new DAGs;
- When you trigger a DAG (or the time of execution arrives), airflow will deploy one pod/worker per task;
- The worker will fetch the DAG, run airflow with the localExecutor, save the logs to S3 and be deleted afterwards
We still may need to do some improvement, like splitting the main pod to have the UI isolated from the scheduler, but for now it’s a good scalable solution, because all the workers are deployed and killed on demand.
Of course, that these workers would only run airflow sensors and operators, right? But we can make use o KubernetesPodOperator to simplify the DAGs implementations and we can define the resources and docker image to run for each task… (soon a new story about Dags and KubernetesPodOperator).
Give me your opinion about this solution and contact me in case of any doubt.