Feature/dbt data quality#14
Open
CaioMelo25 wants to merge 9 commits into
Open
Conversation
…igir parsing da DAG
…ente no banco e barra nulos nos modelos incrementais
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Pull Request: Modelagem Gold e Engenharia de Resiliência (Meta 5)
Objetivo Arquitetural
A Meta 5 mede o "primeiro acesso ao fomento à cultura": para cada proponente (PF, PJ ou coletivo) inscrito na LPG ou na PNAB, é preciso responder duas perguntas — ele já tinha acessado fomento antes? (veterania) e ele foi contemplado? — e cruzar isso de forma consistente entre dois programas que vivem em fontes, planilhas e formatos de identificador completamente diferentes. O dbt foi usado aqui não como um simples camadeamento Bronze/Silver/Gold cosmético, mas como o mecanismo de normalização de identidade entre fontes heterogêneas: a origem (planilhas TransfereGov ingeridas por
extracao_planilhas.py) tem headers inconsistentes, caracteres invisíveis, CPFs mascarados por LGPD em uma fonte e completos em outra, e nenhuma chave de join confiável de fábrica. Cada camada do projeto resolve uma dessas inconsistências antes que ela chegue à camada de inteligência de negócio: Bronze elimina lixo na chave, Silver unifica vocabulário, e Gold absorve os dois workarounds mais delicados — schema drift por NBSP e bypass parcial de máscara de CPF — via Jinja executado em tempo de compilação do dbt.Modelagem dbt: Camada por Camada (Deep Dive)
Bronze (Ingestão e Higienização Base)
Todos os modelos de agentes (
lpg_agentes_pf,lpg_agentes_pj,lpg_agentes_coletivos,pnab_agentes_pf,pnab_agentes_pj) usam:{{ config( materialized='incremental', unique_key='identificador_unico' ) }}A materialização
incrementalcomunique_key='identificador_unico'evita reprocessar a base completa de cada fonte (que pode ter dezenas de milhares de linhas vindas de planilhas) em toda execução da DAG — o dbt só insere/atualiza os registros cujo identificador ainda não está na tabela física.Em
lpg_agentes_pf.sql, a chave é construída assim:LOWER(TRIM(...))neutraliza variações de caixa e espaços em branco residuais de exportação de planilha — sem isso, o mesmo CPF poderia gerar duas linhas distintas ("123.456.789-00"vs" 123.456.789-00 ") e quebrar tanto a deduplicação incremental quanto os JOINs posteriores. O filtroWHERE ... IS NOT NULL AND ... NOT IN ('', 'nan', 'none')descarta literais que a leitura de planilha (pandas/openpyxl, viaextracao_planilhas.py) materializa como string quando a célula está vazia —'nan'e'none'são strings literais nesse contexto, não os tipos nativos do SQL, então umIS NOT NULLisolado não pegaria esses casos. Esse filtro é aplicado tanto na carga full quanto dentro dois_incremental(), porque um CPF nulo/"nan" colado na primeira carga ficaria preso na tabela física para sempre — o filtro incremental por si só não teria como removê-lo depois.lpg_agentes_pj.sqlrepete exatamente a mesma estrutura, troca a chave para"nº do cnpj"e a fonte paralpg_dados_pessoa_juridica. O padrão é deliberadamente repetitivo entre os 5 modelos Bronze — cada fonte tem nomes de coluna ligeiramente diferentes ("já acessou recursos públicos do fomento à cultura anteriorme"na LPG PF/PJ/Coletivos vs."já acessou recursos públicos do fomento à cultura nos últim"na PNAB PF, headers truncados pelo limite de 63 caracteres de identificador do Postgres), então um modelo único parametrizado geraria SQL gerado dinamicamente sem ganho real de manutenibilidade frente a 5 arquivos curtos e explícitos.Silver (Master Data e Padronização)
identificadores_agentes(materializada comoview, sem custo de armazenamento) faz o trabalho de Master Data: empilha os 5 modelos Bronze viaUNION ALL, alinhando-os em um schema comum de 4 colunas:UNION ALL(nãoUNION) é proposital: a deduplicação cross-source não é responsabilidade dessa view — cada Bronze já é único dentro de si (viaunique_key), e a granularidade que importa para o resto do pipeline é por (identificador × programa), não por identificador isolado.tipo_proponenteé hardcoded por bloco deSELECTem vez de derivado de uma coluna, porque o tipo é uma propriedade da fonte, não do dado — não existe ambiguidade a resolver em runtime.perfil_agentes_historico.sqlconsome essa view e padroniza o vocabulário de resposta (que vem em texto livre digitado/selecionado em formulário, com variações de acentuação e formatação) em uma CTE de higienização:Os três
REPLACEencadeados removem pontuação que aparece como ruído de exportação (pontos finais de frase, ponto-e-vírgula de CSV mal escapado, aspas de campos quoted) sem alterar o conteúdo semântico da resposta. Em seguida, oCASEfinal mapeia esse texto limpo para 3 categorias fechadas:CASE WHEN historico_acesso_limpo = 'sim' THEN 'Sim' WHEN historico_acesso_limpo IN ('não', 'nao', 'nâo') THEN 'Não' WHEN historico_acesso_limpo IN ( 'não sei informar', 'nao sei informar', 'não informado', 'nao informado', 'nao_declarar', 'não sei', 'nao sei', 'não sabe', 'nao sabe' ) THEN 'Não sabe/Não informou' ELSE 'Não sabe/Não informou' END AS categoria_primeiro_acessoNote o
'nâo'na lista de negativas — é um erro de digitação/encoding real observado na origem (acento circunflexo em vez de til), incluído explicitamente porque normalização de acentuação via função genérica (unaccent) traria efeitos colaterais sobre outras respostas livres do dataset.Gold (Inteligência de Negócio e Workarounds)
1. Inferência de Veterania (
perfil_acesso_fomento)O ponto central é a window function que estabelece a ordem de ingresso do proponente nos programas:
ROW_NUMBER() OVER ( PARTITION BY identificador_unico ORDER BY programa_fomento ASC ) AS sequencia_fomentoComo a granularidade de entrada é (identificador × programa), um proponente que aparece em LPG e PNAB gera 2 linhas com o mesmo
identificador_unico. OPARTITION BY identificador_unicoisola cada proponente; oORDER BY programa_fomento ASCordena alfabeticamente ('LPG' < 'PNAB'), entãosequencia_fomento = 1é sempre o primeiro programa em ordem alfabética em que aquele identificador aparece — usado como proxy temporal de ingresso na ausência de uma data de inscrição confiável e comparável entre as duas fontes.Esse
sequencia_fomentoalimenta oCASEde classificação, que tem duas camadas: resposta declarada tem prioridade, e só cai para inferência por sequência quando o dado é nulo ou uma das variantes de "não sei informar":A lógica: se o proponente não respondeu (ou respondeu "não sei"), mas o
identificador_unicodele já apareceu em um programa "anterior" (sequencia_fomento > 1), ele é classificado como veterano inferido — a própria presença na base de outro programa é evidência de acesso prévio, independente do que ele declarou.2. Deduplicação (
perfil_agentes_completo)Esse modelo recebe
perfil_acesso_fomento(1 linha por identificador × programa) e precisa entregar 1 linha por proponente. A unicidade é garantida em duas etapas:DISTINCT ON (identificador_unico)(extensão Postgres) colapsa as N linhas de um proponente multi-programa em exatamente 1, mantendo a primeira por ordem desequencia_fomento— ou seja, o registro canônico é sempre o do programa de ingresso mais antigo (proxy LPG-antes-de-PNAB já calculado na camada anterior).qtd_programas, calculado separadamente sobre a base não deduplicada, é reanexado viaLEFT JOINao registro canônico para permitir a regra de veterania multi-programa noSELECTfinal:Isto é, mesmo que o proponente tenha respondido "sim, primeira vez" em um programa, se ele aparece em ambos os programas isso é tratado como sinal mais forte que a resposta declarada — presença cruzada vira fonte de verdade sobre auto-relato.
3. Resiliência a Schema Drift (
primeiro_acesso_contemplados)A descoberta dinâmica de coluna roda em tempo de compilação do dbt, antes do SELECT principal ser montado:
{% set cpf_cnpj_cols_query %} SELECT column_name FROM information_schema.columns WHERE table_schema = '{{ source('transferegov_fundo_a_fundo', 'lpg_contemplados').schema }}' AND table_name = '{{ source('transferegov_fundo_a_fundo', 'lpg_contemplados').identifier }}' AND column_name ILIKE '%cpf%cnpj%' ORDER BY column_name {% endset %} {% set cpf_cnpj_results = run_query(cpf_cnpj_cols_query) %} {% set cpf_cnpj_cols = cpf_cnpj_results.columns[0].values() if execute else ['cpf ou cnpj'] %}run_queryexecuta essa query contra o catálogo do Postgres durante odbt run, não como parte do SELECT compilado — o resultado (lista de nomes de coluna) é capturado em uma variável Jinja (cpf_cnpj_cols) e usado para gerar o SQL final via loop. Isso é necessário porque a ingestão dinâmica de planilhas não normaliza caracteres invisíveis nos headers: o cabeçalho "CPF ou CNPJ" de arquivos diferentes pode virar colunas distintas no Postgres ("cpf ou cnpj"vs."cpf ou cnpj<NBSP>", onde o segundo carrega um non-breaking space invisível no nome). Um nome de coluna fixo no SQL veria só uma dessas colunas e perderia silenciosamente os registros que vieram pela outra. OILIKE '%cpf%cnpj%'captura qualquer variante do nome, independente de espaço extra, NBSP ou outro lixo de encoding entre as palavras.O
if execute else [...]é um detalhe de compatibilidade com o dbt: durantedbt compile/dbt docs generate(parse-only, sem conexão ativa),executeéFalseerun_querynão pode ser chamado — o fallback['cpf ou cnpj']garante que o parsing do projeto não quebre fora de uma execução real.As colunas descobertas são então combinadas com
COALESCEdentro de um loop Jinja:COALESCE( {% for col in cpf_cnpj_cols %} NULLIF(LOWER(TRIM("{{ col }}")), 'nan') {%- if not loop.last %}, {% endif %} {% endfor %} ) AS cpf_cnpj_brutoCada coluna candidata passa por
NULLIF(LOWER(TRIM(...)), 'nan')(viraNULLse for a string "nan" pós-normalização) e oCOALESCEpega o primeiro valor não-nulo entre todas elas — como cada linha da planilha só preenche uma das colunas fantasma (nunca as duas simultaneamente, já que vêm de arquivos diferentes ingeridos em momentos diferentes), oCOALESCEreconstrói o valor real de CPF/CNPJ independente de qual variante de header aquele registro específico carregava.4. Bypass de Anonimização LGPD (
primeiro_acesso_contemplados)A base de proponentes da LPG (
lpg_agentes_pf/lpg_agentes_coletivos) traz CPF mascarado no formato***.NNN.NNN-**por exigência de LGPD, enquantolpg_contempladostraz o CPF completo — um JOIN exato nunca bateria para esses casos. A correção monta uma CTE com o "miolo" (os 6 dígitos centrais, exatamente a porção que a máscara deixa visível) de todo contemplado:SUBSTRING(id_normalizado FROM 4 FOR 6)extrai os caracteres das posições 4 a 9 do CPF de 11 dígitos só-números — exatamente os 3 blocos centrais (NNN.NNN) que sobrevivem à máscara***.NNN.NNN-**. O filtroWHERE LENGTH(id_normalizado) = 11restringe essa CTE a CPFs (PF); CNPJ tem 14 dígitos e nunca aparece mascarado na base de proponentes, então aplicar esse match nele só aumentaria o risco de colisão sem ganho de recall.O
LEFT JOINparcial entra como uma segunda tentativa de match, condicionado a o identificador do proponente estar de fato mascarado:pb.identificador_unico LIKE '%*%'é o gate: só ativa esse caminho de JOIN quando o identificador contém literalmente o caractere*da máscara.LENGTH(REGEXP_REPLACE(..., '[^0-9]', '', 'g')) = 6confirma que, depois de remover tudo que não é dígito, restam exatamente 6 dígitos — o miolo exposto. Esses 6 dígitos são comparados contramiolo_cpfda CTE de contemplados, particionado também porprograma_fomentopara não cruzar miolo de CPF entre programas distintos. O resultado final docontempladocombina os dois caminhos de match:CASE WHEN tc.id_normalizado IS NOT NULL OR tcm.id_normalizado IS NOT NULL THEN 'sim' ELSE 'não' END AS contempladoMatch exato (
tc) ou match parcial por miolo (tcm) já classifica como contemplado — assumindo o trade-off documentado em comentário no próprio modelo: esse match parcial tem risco teórico de colisão entre CPFs distintos que compartilham o mesmo miolo de 6 dígitos, risco aceito em troca de eliminar o falso negativo sistemático que existia antes.Infraestrutura e Airflow (Cosmos)
Antes desta mudança,
create_udfs()(que cria o schema de destino e as UDFsf_parse_dates/f_format_nc) era registrada emon-run-startnodbt_project.yml— disparada automaticamente a cadadbt run. O Cosmos, ao orquestrar o projeto no Airflow, materializa uma task por modelo e as executa em paralelo conforme o grafo de dependências permite. Cada task roda seu próprio processodbt, e cada um desses processos reexecutava oon-run-start— múltiplas conexões executandoCREATE OR REPLACE FUNCTIONsimultaneamente contra o mesmo catálogopg_procdo Postgres, gerando o errotuple concurrently updated(conflito de atualização concorrente em catálogo de sistema, que não tem proteção de lock turística contra DDL paralelo).A correção removeu o hook do
dbt_project.yml:E isolou a criação em uma macro chamada explicitamente, usando
run_queryem vez de DDL solto no corpo da macro (o que permite reutilizar a macro também viadbt run-operationfora do fluxo normal derun-start):{% macro create_udfs() %} {% set create_schema_sql %} CREATE SCHEMA IF NOT EXISTS {{ target.schema }}; {% endset %} {% do run_query(create_schema_sql) %} {% set parse_date_sql %}{{ create_f_parse_dates() }}{% endset %} {% do run_query(parse_date_sql) %} {% set format_nc_sql %}{{ create_f_format_nc() }}{% endset %} {% do run_query(format_nc_sql) %} {% endmacro %}A invocação agora é manual e única, via
dbt run-operation create_udfs, eliminando a concorrência na origem (zero tasks paralelas tentando recriar a mesma função) em vez de mitigá-la com retry/lock applicativo. Ponto de atenção para follow-up: ocosmos_dag.pyatual ainda não tem uma task explícita derun-operation create_udfsantes das tasks de modelo geradas pelo Cosmos — a chamada precisa ser disparada manualmente (ou via uma task dedicada a ser adicionada na DAG) antes da primeira execução em um ambiente novo, já que as UDFs (f_parse_dates,f_format_nc) são pré-requisito para os modelos que as utilizam.Impacto nos Dados
Antes da combinação dos dois workarounds da camada Gold, o JOIN de contemplação para a LPG efetivamente zerava: a query buscava uma coluna fixa de CPF/CNPJ que só capturava parte dos registros (schema drift por NBSP) e, mesmo nos registros capturados, comparava dígito a dígito contra um identificador de proponente mascarado por LGPD — duas falhas independentes, cada uma suficiente isoladamente para produzir falso negativo total. Com a descoberta dinâmica de coluna via
information_schema+COALESCEresolvendo o schema drift, e oLEFT JOINpor miolo de 6 dígitos resolvendo o bypass de máscara, a LPG saiu de zero para milhares de matches corretos de contemplação — fechando a lacuna que invalidava o indicador de Meta 5 para esse programa.