Skip to content

Feature/dbt data quality#14

Open
CaioMelo25 wants to merge 9 commits into
mainfrom
feature/dbt-data-quality
Open

Feature/dbt data quality#14
CaioMelo25 wants to merge 9 commits into
mainfrom
feature/dbt-data-quality

Conversation

@CaioMelo25

Copy link
Copy Markdown
Collaborator

Pull Request: Modelagem Gold e Engenharia de Resiliência (Meta 5)

Objetivo Arquitetural

A Meta 5 mede o "primeiro acesso ao fomento à cultura": para cada proponente (PF, PJ ou coletivo) inscrito na LPG ou na PNAB, é preciso responder duas perguntas — ele já tinha acessado fomento antes? (veterania) e ele foi contemplado? — e cruzar isso de forma consistente entre dois programas que vivem em fontes, planilhas e formatos de identificador completamente diferentes. O dbt foi usado aqui não como um simples camadeamento Bronze/Silver/Gold cosmético, mas como o mecanismo de normalização de identidade entre fontes heterogêneas: a origem (planilhas TransfereGov ingeridas por extracao_planilhas.py) tem headers inconsistentes, caracteres invisíveis, CPFs mascarados por LGPD em uma fonte e completos em outra, e nenhuma chave de join confiável de fábrica. Cada camada do projeto resolve uma dessas inconsistências antes que ela chegue à camada de inteligência de negócio: Bronze elimina lixo na chave, Silver unifica vocabulário, e Gold absorve os dois workarounds mais delicados — schema drift por NBSP e bypass parcial de máscara de CPF — via Jinja executado em tempo de compilação do dbt.

Modelagem dbt: Camada por Camada (Deep Dive)

Bronze (Ingestão e Higienização Base)

Todos os modelos de agentes (lpg_agentes_pf, lpg_agentes_pj, lpg_agentes_coletivos, pnab_agentes_pf, pnab_agentes_pj) usam:

{{ config(
    materialized='incremental',
    unique_key='identificador_unico'
) }}

A materialização incremental com unique_key='identificador_unico' evita reprocessar a base completa de cada fonte (que pode ter dezenas de milhares de linhas vindas de planilhas) em toda execução da DAG — o dbt só insere/atualiza os registros cujo identificador ainda não está na tabela física.

Em lpg_agentes_pf.sql, a chave é construída assim:

SELECT
    LOWER(TRIM("nº do cpf")) AS identificador_unico,
    LOWER(TRIM("já acessou recursos públicos do fomento à cultura anteriorme")) AS historico_acesso_bruto,
    'LPG' AS programa_fomento
FROM {{ source('transferegov_fundo_a_fundo', 'lpg_dados_pessoa_fisica') }}
WHERE "nº do cpf" IS NOT NULL
  AND LOWER(TRIM("nº do cpf")) NOT IN ('', 'nan', 'none')

{% if is_incremental() %}
  AND LOWER(TRIM("nº do cpf")) NOT IN (
      SELECT identificador_unico FROM {{ this }}
  )
{% endif %}

LOWER(TRIM(...)) neutraliza variações de caixa e espaços em branco residuais de exportação de planilha — sem isso, o mesmo CPF poderia gerar duas linhas distintas ("123.456.789-00" vs " 123.456.789-00 ") e quebrar tanto a deduplicação incremental quanto os JOINs posteriores. O filtro WHERE ... IS NOT NULL AND ... NOT IN ('', 'nan', 'none') descarta literais que a leitura de planilha (pandas/openpyxl, via extracao_planilhas.py) materializa como string quando a célula está vazia — 'nan' e 'none' são strings literais nesse contexto, não os tipos nativos do SQL, então um IS NOT NULL isolado não pegaria esses casos. Esse filtro é aplicado tanto na carga full quanto dentro do is_incremental(), porque um CPF nulo/"nan" colado na primeira carga ficaria preso na tabela física para sempre — o filtro incremental por si só não teria como removê-lo depois.

lpg_agentes_pj.sql repete exatamente a mesma estrutura, troca a chave para "nº do cnpj" e a fonte para lpg_dados_pessoa_juridica. O padrão é deliberadamente repetitivo entre os 5 modelos Bronze — cada fonte tem nomes de coluna ligeiramente diferentes ("já acessou recursos públicos do fomento à cultura anteriorme" na LPG PF/PJ/Coletivos vs. "já acessou recursos públicos do fomento à cultura nos últim" na PNAB PF, headers truncados pelo limite de 63 caracteres de identificador do Postgres), então um modelo único parametrizado geraria SQL gerado dinamicamente sem ganho real de manutenibilidade frente a 5 arquivos curtos e explícitos.

Silver (Master Data e Padronização)

identificadores_agentes (materializada como view, sem custo de armazenamento) faz o trabalho de Master Data: empilha os 5 modelos Bronze via UNION ALL, alinhando-os em um schema comum de 4 colunas:

SELECT
    identificador_unico,
    historico_acesso_bruto,
    programa_fomento,
    'Pessoa Física' AS tipo_proponente
FROM {{ ref('lpg_agentes_pf') }}

UNION ALL
...
SELECT identificador_unico, historico_acesso_bruto, programa_fomento, 'Organização' AS tipo_proponente
FROM {{ ref('pnab_agentes_pj') }}

UNION ALL (não UNION) é proposital: a deduplicação cross-source não é responsabilidade dessa view — cada Bronze já é único dentro de si (via unique_key), e a granularidade que importa para o resto do pipeline é por (identificador × programa), não por identificador isolado. tipo_proponente é hardcoded por bloco de SELECT em vez de derivado de uma coluna, porque o tipo é uma propriedade da fonte, não do dado — não existe ambiguidade a resolver em runtime.

perfil_agentes_historico.sql consome essa view e padroniza o vocabulário de resposta (que vem em texto livre digitado/selecionado em formulário, com variações de acentuação e formatação) em uma CTE de higienização:

TRIM(
    REPLACE(
        REPLACE(
            REPLACE(
                historico_acesso_bruto,
                '.', ''
            ),
            ';', ''
        ),
        '"', ''
    )
) AS historico_acesso_limpo
FROM identificadores
WHERE historico_acesso_bruto IS NOT NULL
  AND LOWER(historico_acesso_bruto) != 'nan'
  AND TRIM(historico_acesso_bruto) != ''

Os três REPLACE encadeados removem pontuação que aparece como ruído de exportação (pontos finais de frase, ponto-e-vírgula de CSV mal escapado, aspas de campos quoted) sem alterar o conteúdo semântico da resposta. Em seguida, o CASE final mapeia esse texto limpo para 3 categorias fechadas:

CASE
    WHEN historico_acesso_limpo = 'sim' THEN 'Sim'
    WHEN historico_acesso_limpo IN ('não', 'nao', 'nâo') THEN 'Não'
    WHEN historico_acesso_limpo IN (
        'não sei informar', 'nao sei informar', 'não informado',
        'nao informado', 'nao_declarar', 'não sei', 'nao sei',
        'não sabe', 'nao sabe'
    ) THEN 'Não sabe/Não informou'
    ELSE 'Não sabe/Não informou'
END AS categoria_primeiro_acesso

Note o 'nâo' na lista de negativas — é um erro de digitação/encoding real observado na origem (acento circunflexo em vez de til), incluído explicitamente porque normalização de acentuação via função genérica (unaccent) traria efeitos colaterais sobre outras respostas livres do dataset.

Gold (Inteligência de Negócio e Workarounds)

1. Inferência de Veterania (perfil_acesso_fomento)

O ponto central é a window function que estabelece a ordem de ingresso do proponente nos programas:

ROW_NUMBER() OVER (
    PARTITION BY identificador_unico
    ORDER BY programa_fomento ASC
) AS sequencia_fomento

Como a granularidade de entrada é (identificador × programa), um proponente que aparece em LPG e PNAB gera 2 linhas com o mesmo identificador_unico. O PARTITION BY identificador_unico isola cada proponente; o ORDER BY programa_fomento ASC ordena alfabeticamente ('LPG' < 'PNAB'), então sequencia_fomento = 1 é sempre o primeiro programa em ordem alfabética em que aquele identificador aparece — usado como proxy temporal de ingresso na ausência de uma data de inscrição confiável e comparável entre as duas fontes.

Esse sequencia_fomento alimenta o CASE de classificação, que tem duas camadas: resposta declarada tem prioridade, e só cai para inferência por sequência quando o dado é nulo ou uma das variantes de "não sei informar":

WHEN historico_acesso_limpo = 'sim' THEN 'Confirmado - Primeira Vez'
WHEN historico_acesso_limpo IN ('não', 'nao', 'nâo') THEN 'Confirmado - Veterano'
WHEN (historico_acesso_limpo IS NULL OR historico_acesso_limpo IN (...)) AND sequencia_fomento = 1
    THEN 'Inferido - Primeira Vez (Estreante na base)'
WHEN (historico_acesso_limpo IS NULL OR historico_acesso_limpo IN (...)) AND sequencia_fomento > 1
    THEN 'Inferido - Veterano (Possui histórico)'
ELSE 'Não sabe/Não informou'

A lógica: se o proponente não respondeu (ou respondeu "não sei"), mas o identificador_unico dele já apareceu em um programa "anterior" (sequencia_fomento > 1), ele é classificado como veterano inferido — a própria presença na base de outro programa é evidência de acesso prévio, independente do que ele declarou.

2. Deduplicação (perfil_agentes_completo)

Esse modelo recebe perfil_acesso_fomento (1 linha por identificador × programa) e precisa entregar 1 linha por proponente. A unicidade é garantida em duas etapas:

contagem_programas AS (
    SELECT identificador_unico, COUNT(DISTINCT programa_fomento) AS qtd_programas
    FROM perfil_base
    GROUP BY identificador_unico
),

primeiro_registro AS (
    SELECT DISTINCT ON (identificador_unico)
        identificador_unico, tipo_proponente, programa_fomento,
        historico_acesso_limpo, perfil_original
    FROM perfil_base
    ORDER BY identificador_unico, sequencia_fomento ASC
)

DISTINCT ON (identificador_unico) (extensão Postgres) colapsa as N linhas de um proponente multi-programa em exatamente 1, mantendo a primeira por ordem de sequencia_fomento — ou seja, o registro canônico é sempre o do programa de ingresso mais antigo (proxy LPG-antes-de-PNAB já calculado na camada anterior). qtd_programas, calculado separadamente sobre a base não deduplicada, é reanexado via LEFT JOIN ao registro canônico para permitir a regra de veterania multi-programa no SELECT final:

WHEN qtd_programas > 1 AND perfil_original != 'Confirmado - Veterano'
    THEN 'Inferido'
...
WHEN qtd_programas > 1 THEN 'Veterano (Multi-Programa)'

Isto é, mesmo que o proponente tenha respondido "sim, primeira vez" em um programa, se ele aparece em ambos os programas isso é tratado como sinal mais forte que a resposta declarada — presença cruzada vira fonte de verdade sobre auto-relato.

3. Resiliência a Schema Drift (primeiro_acesso_contemplados)

A descoberta dinâmica de coluna roda em tempo de compilação do dbt, antes do SELECT principal ser montado:

{% set cpf_cnpj_cols_query %}
    SELECT column_name
    FROM information_schema.columns
    WHERE table_schema = '{{ source('transferegov_fundo_a_fundo', 'lpg_contemplados').schema }}'
      AND table_name = '{{ source('transferegov_fundo_a_fundo', 'lpg_contemplados').identifier }}'
      AND column_name ILIKE '%cpf%cnpj%'
    ORDER BY column_name
{% endset %}
{% set cpf_cnpj_results = run_query(cpf_cnpj_cols_query) %}
{% set cpf_cnpj_cols = cpf_cnpj_results.columns[0].values() if execute else ['cpf ou cnpj'] %}

run_query executa essa query contra o catálogo do Postgres durante o dbt run, não como parte do SELECT compilado — o resultado (lista de nomes de coluna) é capturado em uma variável Jinja (cpf_cnpj_cols) e usado para gerar o SQL final via loop. Isso é necessário porque a ingestão dinâmica de planilhas não normaliza caracteres invisíveis nos headers: o cabeçalho "CPF ou CNPJ" de arquivos diferentes pode virar colunas distintas no Postgres ("cpf ou cnpj" vs. "cpf ou cnpj<NBSP>", onde o segundo carrega um non-breaking space invisível no nome). Um nome de coluna fixo no SQL veria só uma dessas colunas e perderia silenciosamente os registros que vieram pela outra. O ILIKE '%cpf%cnpj%' captura qualquer variante do nome, independente de espaço extra, NBSP ou outro lixo de encoding entre as palavras.

O if execute else [...] é um detalhe de compatibilidade com o dbt: durante dbt compile/dbt docs generate (parse-only, sem conexão ativa), execute é False e run_query não pode ser chamado — o fallback ['cpf ou cnpj'] garante que o parsing do projeto não quebre fora de uma execução real.

As colunas descobertas são então combinadas com COALESCE dentro de um loop Jinja:

COALESCE(
    {% for col in cpf_cnpj_cols %}
    NULLIF(LOWER(TRIM("{{ col }}")), 'nan')
    {%- if not loop.last %},
    {% endif %}
    {% endfor %}
) AS cpf_cnpj_bruto

Cada coluna candidata passa por NULLIF(LOWER(TRIM(...)), 'nan') (vira NULL se for a string "nan" pós-normalização) e o COALESCE pega o primeiro valor não-nulo entre todas elas — como cada linha da planilha só preenche uma das colunas fantasma (nunca as duas simultaneamente, já que vêm de arquivos diferentes ingeridos em momentos diferentes), o COALESCE reconstrói o valor real de CPF/CNPJ independente de qual variante de header aquele registro específico carregava.

4. Bypass de Anonimização LGPD (primeiro_acesso_contemplados)

A base de proponentes da LPG (lpg_agentes_pf/lpg_agentes_coletivos) traz CPF mascarado no formato ***.NNN.NNN-** por exigência de LGPD, enquanto lpg_contemplados traz o CPF completo — um JOIN exato nunca bateria para esses casos. A correção monta uma CTE com o "miolo" (os 6 dígitos centrais, exatamente a porção que a máscara deixa visível) de todo contemplado:

todos_contemplados_miolo AS (
    SELECT
        id_normalizado,
        programa_fomento,
        SUBSTRING(id_normalizado FROM 4 FOR 6) AS miolo_cpf
    FROM todos_contemplados
    WHERE LENGTH(id_normalizado) = 11
)

SUBSTRING(id_normalizado FROM 4 FOR 6) extrai os caracteres das posições 4 a 9 do CPF de 11 dígitos só-números — exatamente os 3 blocos centrais (NNN.NNN) que sobrevivem à máscara ***.NNN.NNN-**. O filtro WHERE LENGTH(id_normalizado) = 11 restringe essa CTE a CPFs (PF); CNPJ tem 14 dígitos e nunca aparece mascarado na base de proponentes, então aplicar esse match nele só aumentaria o risco de colisão sem ganho de recall.

O LEFT JOIN parcial entra como uma segunda tentativa de match, condicionado a o identificador do proponente estar de fato mascarado:

LEFT JOIN todos_contemplados_miolo tcm
    ON pb.identificador_unico LIKE '%*%'
    AND LENGTH(REGEXP_REPLACE(pb.identificador_unico, '[^0-9]', '', 'g')) = 6
    AND REGEXP_REPLACE(pb.identificador_unico, '[^0-9]', '', 'g') = tcm.miolo_cpf
    AND pb.programa_fomento = tcm.programa_fomento

pb.identificador_unico LIKE '%*%' é o gate: só ativa esse caminho de JOIN quando o identificador contém literalmente o caractere * da máscara. LENGTH(REGEXP_REPLACE(..., '[^0-9]', '', 'g')) = 6 confirma que, depois de remover tudo que não é dígito, restam exatamente 6 dígitos — o miolo exposto. Esses 6 dígitos são comparados contra miolo_cpf da CTE de contemplados, particionado também por programa_fomento para não cruzar miolo de CPF entre programas distintos. O resultado final do contemplado combina os dois caminhos de match:

CASE
    WHEN tc.id_normalizado IS NOT NULL OR tcm.id_normalizado IS NOT NULL THEN 'sim'
    ELSE 'não'
END AS contemplado

Match exato (tc) ou match parcial por miolo (tcm) já classifica como contemplado — assumindo o trade-off documentado em comentário no próprio modelo: esse match parcial tem risco teórico de colisão entre CPFs distintos que compartilham o mesmo miolo de 6 dígitos, risco aceito em troca de eliminar o falso negativo sistemático que existia antes.

Infraestrutura e Airflow (Cosmos)

Antes desta mudança, create_udfs() (que cria o schema de destino e as UDFs f_parse_dates/f_format_nc) era registrada em on-run-start no dbt_project.yml — disparada automaticamente a cada dbt run. O Cosmos, ao orquestrar o projeto no Airflow, materializa uma task por modelo e as executa em paralelo conforme o grafo de dependências permite. Cada task roda seu próprio processo dbt, e cada um desses processos reexecutava o on-run-start — múltiplas conexões executando CREATE OR REPLACE FUNCTION simultaneamente contra o mesmo catálogo pg_proc do Postgres, gerando o erro tuple concurrently updated (conflito de atualização concorrente em catálogo de sistema, que não tem proteção de lock turística contra DDL paralelo).

A correção removeu o hook do dbt_project.yml:

models:
  cultura:
    ...
# on-run-start: - '{{ create_udfs() }}'   <- removido

E isolou a criação em uma macro chamada explicitamente, usando run_query em vez de DDL solto no corpo da macro (o que permite reutilizar a macro também via dbt run-operation fora do fluxo normal de run-start):

{% macro create_udfs() %}
{% set create_schema_sql %}
  CREATE SCHEMA IF NOT EXISTS {{ target.schema }};
{% endset %}
{% do run_query(create_schema_sql) %}

{% set parse_date_sql %}{{ create_f_parse_dates() }}{% endset %}
{% do run_query(parse_date_sql) %}

{% set format_nc_sql %}{{ create_f_format_nc() }}{% endset %}
{% do run_query(format_nc_sql) %}
{% endmacro %}

A invocação agora é manual e única, via dbt run-operation create_udfs, eliminando a concorrência na origem (zero tasks paralelas tentando recriar a mesma função) em vez de mitigá-la com retry/lock applicativo. Ponto de atenção para follow-up: o cosmos_dag.py atual ainda não tem uma task explícita de run-operation create_udfs antes das tasks de modelo geradas pelo Cosmos — a chamada precisa ser disparada manualmente (ou via uma task dedicada a ser adicionada na DAG) antes da primeira execução em um ambiente novo, já que as UDFs (f_parse_dates, f_format_nc) são pré-requisito para os modelos que as utilizam.

Impacto nos Dados

Antes da combinação dos dois workarounds da camada Gold, o JOIN de contemplação para a LPG efetivamente zerava: a query buscava uma coluna fixa de CPF/CNPJ que só capturava parte dos registros (schema drift por NBSP) e, mesmo nos registros capturados, comparava dígito a dígito contra um identificador de proponente mascarado por LGPD — duas falhas independentes, cada uma suficiente isoladamente para produzir falso negativo total. Com a descoberta dinâmica de coluna via information_schema + COALESCE resolvendo o schema drift, e o LEFT JOIN por miolo de 6 dígitos resolvendo o bypass de máscara, a LPG saiu de zero para milhares de matches corretos de contemplação — fechando a lacuna que invalidava o indicador de Meta 5 para esse programa.

@CaioMelo25 CaioMelo25 self-assigned this Jun 18, 2026
@CaioMelo25 CaioMelo25 requested review from LuizaMaluf, Copilot and davi-aguiar-vieira and removed request for Copilot June 18, 2026 22:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant