Airflow com Dags em YAML Dags e Kubernetes Operator
Simplificando a criação de DAGs
No primeiro artigo sobre a arquitetura com airflow (https://medium.com/@nbrgil/scalable-airflow-with-kubernetes-git-sync-63c34d0edfc3), eu expliquei como usar o airflow com Kubernetes Executor.
Isso permitiu que nós tivessemos um airflow executores escaláveis, mas ainda temos problemas como esse. Esse artigo irá mostrar como:
- Usar o operador Kubernetes do airflow para isolar todas as regras de negócio dos fluxos do airflow;
- Criar DAGs em YAML usando validadores de schema para simplificar o uso do airflow para alguns usuários;
- Definir um padrão nos fluxos criados
Kubernetes Pod Operator
Conseguimos fazer isso usando somente o Kubernetes Pod Operator, de forma que os usuários mantenham todo seu código (e as regras de negócio) no seu respectivo repositório/imagem docker.
Claro que dessa maneira perdemos ótimos operadores existentes no airflow, mas dessa maneira, mantemos nossos fluxos mais fáceis de ler e sem nenhuma dependência com o airflow.
Simplificando as imagens docker
Quando decidimos usar o Kubernetes pod operator, nós temos que considerar que alguns usuários estarão trabalhando com imagens docker. Isso pode criar outros problemas:
- Falta de conhecimento em docker;
- Grande quantidade de imagens similares no seu repositório docker;
Para resolver isso, usamos uma separação entre uma imagem customizada e um imagem “base”:
- Toda vez que alguém quer usar sua própria imagem docker, só precisa criar uma task
custom
; - Mas, se o usuário tem um código python em qualquer repositório da empresa, ele pode usar a imagem
base
.
A imagem base funciona dessa maneira:
- É uma imagem docker baseada em python, com git instalado e algumas bibliotecas muito comuns em python, como por exemplo, pandas;
- Ela recebe como parâmetro em seu entrypoint algo como “qual arquivo python devo rodar?” ou “preciso instalar algum outro requerimento em python?”;
- O entrypoint da image faz um git clone do repositório, instala os requerimentos e executa o código python.
Exemplo dessa task
:
name: base-pod
base:
repository: docker-test
workdir: sample_project
main_file: exec.py
requirements: False
args: [“ — agg”, “1”, “ — name”, “teste”]
Nesse caso, a imagem vai fazer um “sparse checkout” do diretóriosample_project
do repositório docker-test
e executa o exec.py
dessa maneira:
python exec.py --agg 1 --name test
Usando YAML para criar DAGs
Uma coisa boa das DAGs escritas em python no airflow é a flexibilidade de usar seu próprio código para criar um fluxo.
Mas:
- Não podemos definir uma regra que limite o uso de determinados operadores no airflow;
- Muitas regras de negócio acabam sendo escritas dentro das DAGs;
- Isso cria uma quantidade de DAGs bem diferentes e sem padrão;
- Existem configurações padrão de alguns operadores como
image pull policy
oukubernetes annotations
que seria bom abstrair do usuário.
O conceito criado aqui é bem simples:
- A pessoa adiciona/atualiza o arquivo YAML com todas as informações necessárias para criar a DAG;
- O YAML é validado e criado usando uma biblioteca python chamada cerberus;
- O arquivo python gerado é enviado para o branch master;
YAML Dags Validation
A biblioteca cerberus foi usada para conferir se o “schema” do yaml está correto:
- Definição de campos obrigatórios;
- Aplicar regras de validação por regex;
- Definir campos como list e dicionários;
- Definir tipo dos campos;
Outro exemplo de validação que podemos fazer é verificar se o próprio airflow considera o arquivo gerado como válido. Isso pode ser feito executando comando do CLI do airflow como “list dags” e verificar se aparecem algumas mensagens de erro.
Gerador de dags em Python
A dag é gerada usando a biblioteca jinja2. Como um site em HTML, você pode criar um template e converter o arquivo YAML em python. A biblioteca em si já possui funções para fazer esse parse e conversão.
Deploy
Em que momento nós criamos esses arquivos python? No nosso caso, os scripts são executado durante o período de CI/CD.
Depois de criar os novos arquivos, eles são enviados para um repositório git de onde o airflow faz o git sync.
Exemplo
Aqui vou mostrar um exemplo de uma DAG definida usando YAML e sua conversão. Algumas das funções estão abstraídas em uma biblioteca utils
, de maneira que os arquivos gerados fiquem menores e não mostrem alguns parâmetros padrão (como a URL do cluster do k8s).
Conclusão
Para um time ou empresa que já conhece bem do uso do airflow, esse artigo parece mais como uma “remoção” de algumas features do airflow.
Mas em nosso caso, isso simplificou o processo de entrada de novos usuários e permitiu algumas vantagens:
- O fluxo em YAML é uma descrição do job a ser executado sem nenhuma dependência com o airflow;
- Nós podemos mudar o comportamento do airflow alterando o gerador de DAGs para recriar todos os fluxos, graças a essa nova camada criada;
- Todas as nossas DAGs são muito similares e todos são aplicações com docker;
- Para pessoas que não conhecem docker, nós podemos ter mais de uma imagem
base
para rodar o código delas; - E a parte mais importante, airflow foi aceito rapidamente por muitos usuários da nossa stack de dados.
Obrigado :)