Airflow escalável com Kubernetes + Git Sync

Rodrigo Lazarini Gil
Tech@Grupo ZAP
Published in
4 min readJan 21, 2020

--

Usando airflow para prover uma solução para múltiplos times

Aqui no GrupoZap, tivemos algumas necessidades que acabamos solucionando implementando uma versão do Airflow na nossa stack de dados. Precisavamos de uma ferramenta que pudessemos delegar um ambiente onde todos os times pudessem criar e monitorar seus fluxos de dados.

Antes de começar alguns termos que usarei durante esse texto:

Exemplo de uma DAG
  • DAG: Directed Acyclic Graph — É um grafo finito direcionado e acíclico, isto é, ele vai definir o conjunto de tarefas a serem executadas em uma ordem a ser seguida
  • Workers: Pods criados no airflow para execução de tarefas
  • Airflow UI: Interface web para monitorar e executar as DAGs
  • Airflow Scheduler: Processo que controla a fila e a criação de workers;
  • Git-Sync: Container baseado na imagem https://github.com/kubernetes/git-sync para auxiliar no compartilhamento de arquivos de um repositório git.

Pronto. Agora podemos começar. A imagem abaixo mostra uma arquitetura com os seguintes detalhes:

  • Airflow com workers e executores escaláveis com pods no kubernetes;
  • Airflow UI e Scheduler também dentro do kubernetes;
  • Adição/Alteração de Dags através de git-sync para permitir que usuários criem e alterem fluxos de dados sem necessidade de reiniciar o airflow
Airflow kubernetes architecture

Airflow docker image

Você pode ter qualquer imagem docker com airflow. O importante é que sua instalação tenha o kubernetes instalado:

apache-airflow[kubernetes]==1.10.6

O entrypoint dessa imagem vai iniciar o banco de dados de metadados, a interface web e o scheduler. Eu acredito que podemos melhorar esse processo colocando o webserver separado do scheduler, mas por enquanto esse container terá múltiplos processos rodando.

#!/bin/bash
airflow initdb
airflow webserver -p 8080 &
airflow scheduler

Repositório git de Dags

Você precisa criar um repositório para manter suas DAGs. Pode ser o mesmo com seu Dockerfile e arquivos de deploy do kubernetes, mas nesse caso, prefiro criar um repositório independente para isolar o código do airflow das DAGs.

Para esse exemplo, vou chamar esse repo dedags-airflow

Airflow Deployment

Agora que temos uma imagem do airflow pronta, podemos fazer o deploy:

Alguns pontos importantes aqui:

  • Esse pod terá 2 containeres. Um com o airflow e um com o gitsync. Git sync é um container que busca novas dags de dags-airflow e compartilha em um volume para serem acessadas pelo airflow. Desse jeito a interface do airflow está sempre atualizada em relação a alteração e criação de novas DAGs.
  • Os parâmetros para o gitsync funcionar são enviados por um configmap e por uma secret (no caso de senhas):
GIT_SYNC_REPO: “https://github.com/xyz/dags-airflow"
GIT_SYNC_BRANCH: “master”
GIT_SYNC_ROOT: “/git”
GIT_SYNC_DEST: “sync”
GIT_SYNC_DEPTH: “1”
GIT_SYNC_ONE_TIME: “false”
GIT_SYNC_WAIT: “60”
GIT_SYNC_USERNAME: “git_username”
GIT_KNOWN_HOSTS: “false”
GIT_PASSWORD: “242452”

Airflow Config

O arquivo de configuraçãoairflow.cfg determina como todo o airflow irá funcionar. Aqui vou mencionar as principais propriedades alteradas:

  • Kubernetes: É necessário alterar o executor, definir qual a imagem do airflow que será usada para os workers , definir se os workers serão ou não excluídos após a execução e qual service_name + namespace para determinar onde os pods serão criados. Também foi definida a propriedade in_cluster para que todos os pods fiquem no mesmo cluster.
executor = KubernetesExecutor
worker_container_repository = airflow
worker_container_tag = 1.2
worker_container_image_pull_policy = IfNotPresent
delete_worker_pods = True
namespace = dataanalytics
worker_service_account_name = k8s-cronjob-autoscaler
in_cluster = True
  • Git: Aqui precisamos definir novamente os parâmetros do git. Isso porque o gitsync que mostramos acima só funciona para manter o airflow principal atualizado. Assim que novos workers são criados para executar tarefas, eles precisam obter o código atualizado do git.
git_repo = https://github.com/xyz/dags-airflow
git_branch = master
git_subpath = dags
git_user = git_username
git_password = 242452
git_sync_root = /git
git_sync_dest = sync
git_sync_depth = 1
git_dags_folder_mount_point = /opt/airflow/dags
git_sync_container_repository = k8s.gcr.io/git-sync
git_sync_container_tag = v3.1.2
git_sync_init_container_name = git-sync-clone
  • Remote logging: Como cada worker é destruído após a execução das tarefas, podemos salvar os logs em um storage (s3 nesse caso). Se você ativar essa opção, o log ficará disponível assim que a tarefa for finalizada.
remote_logging = True
remote_log_conn_id = s3_connection
remote_base_log_folder = s3://${ENVIRONMENT}-dataplatform-logs/airflow
  • Remote logging para workers: O problema de definir o arquivo de configuração por configmap é que o airflow não aplica essas configurações para todos os workers… Então tivemos que definir os parâmetros de remote logging usando a sessão kubernetes_environment_variables. Essa sessão permite enviar variáveis de ambiente para os workers.
AIRFLOW__CORE__REMOTE_LOGGING = True
AIRFLOW__CORE__REMOTE_LOG_CONN_ID = s3_connection
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER = s3://${ENVIRONMENT}-dataplatform-logs/airflow
AIRFLOW__CORE__ENCRYPT_S3_LOGS = False

Conclusão

Com essas configurações temos:

  • Somente um pod fica sempre ativo no kubernetes (UI + Scheduler);
  • Esse pod tem um timer para buscar e atualizar as DAGs;
  • Quando uma DAG é disparada para executar (manualmente ou pela agenda), 1 worker é criado para cada tarefa;
  • Esse worker irá obter a DAG do git, run o airflow com um executor local, salvar os logs no s3 e se auto destrói.

Podemos melhorar esse processo, como no caso de separar o scheduler ou permitir que ele seja facilmente escalável, mas por enquanto é uma boa solução escalável, graças a criação dos workers sob demanda.

Claro que sendo o airflow, esses worker somente executariam sensores e operadores, certo? Mas para isso podemos usar o KubernetesPodOperator que permite:

  • Isolar regras de negócio;
  • Definir quantidade de recursos do pod;
  • Definir imagens próprias para cada tarefa;

Mas isso é assunto para outra estória: (link no futuro)

Por enquanto, essa ferramenta está sendo usado pelos times de data (data analytics e data science), mas aos poucos vamos espalhando isso pela empresa.

Avaliem e dêem opiniões para melhorar esse conteúdo.

--

--

Rodrigo Lazarini Gil
Tech@Grupo ZAP

Working through the years with SQL, data modeling, data platform and engineering. Currently focused on data platform and spark jobs with python.