Airflow com Dags em YAML Dags e Kubernetes Operator

Rodrigo Lazarini Gil
Tech@Grupo ZAP
Published in
4 min readMay 14, 2020

--

Simplificando a criação de DAGs

Posted by fatty119 on Reddit

No primeiro artigo sobre a arquitetura com airflow (https://medium.com/@nbrgil/scalable-airflow-with-kubernetes-git-sync-63c34d0edfc3), eu expliquei como usar o airflow com Kubernetes Executor.

Isso permitiu que nós tivessemos um airflow executores escaláveis, mas ainda temos problemas como esse. Esse artigo irá mostrar como:

  1. Usar o operador Kubernetes do airflow para isolar todas as regras de negócio dos fluxos do airflow;
  2. Criar DAGs em YAML usando validadores de schema para simplificar o uso do airflow para alguns usuários;
  3. Definir um padrão nos fluxos criados

Kubernetes Pod Operator

Conseguimos fazer isso usando somente o Kubernetes Pod Operator, de forma que os usuários mantenham todo seu código (e as regras de negócio) no seu respectivo repositório/imagem docker.

Claro que dessa maneira perdemos ótimos operadores existentes no airflow, mas dessa maneira, mantemos nossos fluxos mais fáceis de ler e sem nenhuma dependência com o airflow.

Simplificando as imagens docker

Quando decidimos usar o Kubernetes pod operator, nós temos que considerar que alguns usuários estarão trabalhando com imagens docker. Isso pode criar outros problemas:

  • Falta de conhecimento em docker;
  • Grande quantidade de imagens similares no seu repositório docker;

Para resolver isso, usamos uma separação entre uma imagem customizada e um imagem “base”:

  • Toda vez que alguém quer usar sua própria imagem docker, só precisa criar uma task custom ;
  • Mas, se o usuário tem um código python em qualquer repositório da empresa, ele pode usar a imagem base.

A imagem base funciona dessa maneira:

  • É uma imagem docker baseada em python, com git instalado e algumas bibliotecas muito comuns em python, como por exemplo, pandas;
  • Ela recebe como parâmetro em seu entrypoint algo como “qual arquivo python devo rodar?” ou “preciso instalar algum outro requerimento em python?”;
  • O entrypoint da image faz um git clone do repositório, instala os requerimentos e executa o código python.

Exemplo dessa task :

name: base-pod 
base:
repository: docker-test
workdir: sample_project
main_file: exec.py
requirements: False
args: [“ — agg”, “1”, “ — name”, “teste”]

Nesse caso, a imagem vai fazer um “sparse checkout” do diretóriosample_project do repositório docker-test e executa o exec.py dessa maneira:

python exec.py --agg 1 --name test

Usando YAML para criar DAGs

Uma coisa boa das DAGs escritas em python no airflow é a flexibilidade de usar seu próprio código para criar um fluxo.

Mas:

  • Não podemos definir uma regra que limite o uso de determinados operadores no airflow;
  • Muitas regras de negócio acabam sendo escritas dentro das DAGs;
  • Isso cria uma quantidade de DAGs bem diferentes e sem padrão;
  • Existem configurações padrão de alguns operadores como image pull policy ou kubernetes annotations que seria bom abstrair do usuário.

O conceito criado aqui é bem simples:

  1. A pessoa adiciona/atualiza o arquivo YAML com todas as informações necessárias para criar a DAG;
  2. O YAML é validado e criado usando uma biblioteca python chamada cerberus;
  3. O arquivo python gerado é enviado para o branch master;

YAML Dags Validation

A biblioteca cerberus foi usada para conferir se o “schema” do yaml está correto:

  • Definição de campos obrigatórios;
  • Aplicar regras de validação por regex;
  • Definir campos como list e dicionários;
  • Definir tipo dos campos;

Outro exemplo de validação que podemos fazer é verificar se o próprio airflow considera o arquivo gerado como válido. Isso pode ser feito executando comando do CLI do airflow como “list dags” e verificar se aparecem algumas mensagens de erro.

Gerador de dags em Python

A dag é gerada usando a biblioteca jinja2. Como um site em HTML, você pode criar um template e converter o arquivo YAML em python. A biblioteca em si já possui funções para fazer esse parse e conversão.

Deploy

Em que momento nós criamos esses arquivos python? No nosso caso, os scripts são executado durante o período de CI/CD.

Depois de criar os novos arquivos, eles são enviados para um repositório git de onde o airflow faz o git sync.

Exemplo

Aqui vou mostrar um exemplo de uma DAG definida usando YAML e sua conversão. Algumas das funções estão abstraídas em uma biblioteca utils, de maneira que os arquivos gerados fiquem menores e não mostrem alguns parâmetros padrão (como a URL do cluster do k8s).

YAML DAG
Python DAG

Conclusão

Para um time ou empresa que já conhece bem do uso do airflow, esse artigo parece mais como uma “remoção” de algumas features do airflow.

Mas em nosso caso, isso simplificou o processo de entrada de novos usuários e permitiu algumas vantagens:

  • O fluxo em YAML é uma descrição do job a ser executado sem nenhuma dependência com o airflow;
  • Nós podemos mudar o comportamento do airflow alterando o gerador de DAGs para recriar todos os fluxos, graças a essa nova camada criada;
  • Todas as nossas DAGs são muito similares e todos são aplicações com docker;
  • Para pessoas que não conhecem docker, nós podemos ter mais de uma imagem base para rodar o código delas;
  • E a parte mais importante, airflow foi aceito rapidamente por muitos usuários da nossa stack de dados.

Obrigado :)

--

--

Rodrigo Lazarini Gil
Tech@Grupo ZAP

Working through the years with SQL, data modeling, data platform and engineering. Currently focused on data platform and spark jobs with python.