From a980c0bd8b143f406a8f89801858e1910f032c36 Mon Sep 17 00:00:00 2001 From: Caio Melo Borges Date: Tue, 9 Jun 2026 19:22:37 -0300 Subject: [PATCH 1/8] feat: adiciona pipeline dbt para meta 5 de agentes (bronze a gold) --- airflow_lappis/dags/dbt/cultura/.user.yml | 1 + airflow_lappis/dags/dbt/cultura/cosmos_dag.py | 30 +++++++ .../dags/dbt/cultura/dbt_project.yml | 33 +++++++ .../dags/dbt/cultura/descriptions.yml | 13 +++ .../dags/dbt/cultura/macros/create_udfs.sql | 10 +++ .../macros/data_quality/row_count_match.sql | 14 +++ .../data_quality/verificacao_tipagem.sql | 25 ++++++ .../dbt/cultura/macros/get_custom_schema.sql | 4 + .../macros/metadata/generate_metadata.sql | 46 ++++++++++ .../cultura/macros/parse_financial_value.sql | 21 +++++ .../dags/dbt/cultura/macros/schema.yml | 24 +++++ .../dbt/cultura/macros/udfs/f_format_nc.sql | 19 ++++ .../dbt/cultura/macros/udfs/f_parse_dates.sql | 44 +++++++++ .../agentes_dbt/bronze/agentes_coletivos.sql | 15 ++++ .../models/agentes_dbt/bronze/agentes_pf.sql | 15 ++++ .../models/agentes_dbt/bronze/agentes_pj.sql | 15 ++++ .../models/agentes_dbt/bronze/schema.yml | 89 +++++++++++++++++++ .../gold/primeiro_acesso_resumo.sql | 21 +++++ .../models/agentes_dbt/gold/schema.yml | 33 +++++++ .../silver/perfil_agentes_historico.sql | 55 ++++++++++++ .../models/agentes_dbt/silver/schema.yml | 29 ++++++ .../views/identificadores_agentes.sql | 25 ++++++ .../models/agentes_dbt/views/schema.yml | 18 ++++ .../models/metadata/models_metadata.sql | 67 ++++++++++++++ .../dbt/cultura/models/metadata/schema.yml | 45 ++++++++++ .../dags/dbt/cultura/models/sources.yml | 9 ++ airflow_lappis/dags/dbt/cultura/profiles.yml | 12 +++ 27 files changed, 732 insertions(+) create mode 100644 airflow_lappis/dags/dbt/cultura/.user.yml create mode 100644 airflow_lappis/dags/dbt/cultura/cosmos_dag.py create mode 100644 airflow_lappis/dags/dbt/cultura/dbt_project.yml create mode 100644 airflow_lappis/dags/dbt/cultura/descriptions.yml create mode 100644 airflow_lappis/dags/dbt/cultura/macros/create_udfs.sql create mode 100644 airflow_lappis/dags/dbt/cultura/macros/data_quality/row_count_match.sql create mode 100644 airflow_lappis/dags/dbt/cultura/macros/data_quality/verificacao_tipagem.sql create mode 100644 airflow_lappis/dags/dbt/cultura/macros/get_custom_schema.sql create mode 100644 airflow_lappis/dags/dbt/cultura/macros/metadata/generate_metadata.sql create mode 100644 airflow_lappis/dags/dbt/cultura/macros/parse_financial_value.sql create mode 100644 airflow_lappis/dags/dbt/cultura/macros/schema.yml create mode 100644 airflow_lappis/dags/dbt/cultura/macros/udfs/f_format_nc.sql create mode 100644 airflow_lappis/dags/dbt/cultura/macros/udfs/f_parse_dates.sql create mode 100644 airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/agentes_coletivos.sql create mode 100644 airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/agentes_pf.sql create mode 100644 airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/agentes_pj.sql create mode 100644 airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/schema.yml create mode 100644 airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/primeiro_acesso_resumo.sql create mode 100644 airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/schema.yml create mode 100644 airflow_lappis/dags/dbt/cultura/models/agentes_dbt/silver/perfil_agentes_historico.sql create mode 100644 airflow_lappis/dags/dbt/cultura/models/agentes_dbt/silver/schema.yml create mode 100644 airflow_lappis/dags/dbt/cultura/models/agentes_dbt/views/identificadores_agentes.sql create mode 100644 airflow_lappis/dags/dbt/cultura/models/agentes_dbt/views/schema.yml create mode 100644 airflow_lappis/dags/dbt/cultura/models/metadata/models_metadata.sql create mode 100644 airflow_lappis/dags/dbt/cultura/models/metadata/schema.yml create mode 100644 airflow_lappis/dags/dbt/cultura/models/sources.yml create mode 100644 airflow_lappis/dags/dbt/cultura/profiles.yml diff --git a/airflow_lappis/dags/dbt/cultura/.user.yml b/airflow_lappis/dags/dbt/cultura/.user.yml new file mode 100644 index 00000000..24b47f34 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/.user.yml @@ -0,0 +1 @@ +id: 7c7b491c-e2c6-4636-9e72-b0ae2d207a9b diff --git a/airflow_lappis/dags/dbt/cultura/cosmos_dag.py b/airflow_lappis/dags/dbt/cultura/cosmos_dag.py new file mode 100644 index 00000000..9ffdcc70 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/cosmos_dag.py @@ -0,0 +1,30 @@ +import os +from datetime import datetime + +from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig +from cosmos.constants import DBT_LOG_PATH_ENVVAR + +dbt_log_path = "/tmp/dbt_logs" +os.makedirs(dbt_log_path, exist_ok=True) +os.environ[DBT_LOG_PATH_ENVVAR] = dbt_log_path + +profile_config = ProfileConfig( + profiles_yml_filepath=f"{os.environ['AIRFLOW_REPO_BASE']}/dags/dbt/cultura/profiles.yml", + profile_name="cultura", + target_name="prod", +) + +my_cosmos_dag = DbtDag( + project_config=ProjectConfig( + f"{os.environ['AIRFLOW_REPO_BASE']}/dags/dbt/cultura" + ), + profile_config=profile_config, + execution_config=ExecutionConfig( + dbt_executable_path=f"{os.environ['AIRFLOW_REPO_BASE']}/.local/bin/dbt", + ), + schedule_interval="0 1 * * *", + start_date=datetime(2025, 1, 1), + catchup=False, + dag_id="cultura_cosmos_dag", + default_args={"retries": 2}, +) diff --git a/airflow_lappis/dags/dbt/cultura/dbt_project.yml b/airflow_lappis/dags/dbt/cultura/dbt_project.yml new file mode 100644 index 00000000..fc956772 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/dbt_project.yml @@ -0,0 +1,33 @@ +name: 'cultura' + +version: 1.0.0 +config-version: 2 + +profile: cultura + +model-paths: ["models"] +analysis-paths: ["analyses"] +seed-paths: ["seeds"] +test-paths: ["tests"] +macro-paths: ["macros"] + +clean-targets: + - "target" + - "dbt_packages" + - "logs" + +models: + cultura: + metadata: + +materialized: incremental + +schema: metadata + agentes_dbt: + +materialized: table + +schema: agentes + bronze: + +materialized: incremental + views: + +materialized: view + +on-run-start: + - '{{ create_udfs() }}' diff --git a/airflow_lappis/dags/dbt/cultura/descriptions.yml b/airflow_lappis/dags/dbt/cultura/descriptions.yml new file mode 100644 index 00000000..3c7c007c --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/descriptions.yml @@ -0,0 +1,13 @@ +models: + - name: agentes_pf + description: Dados brutos de proponentes pessoa física da LPG + - name: agentes_pj + description: Dados brutos de proponentes pessoa jurídica da LPG + - name: agentes_coletivos + description: Dados brutos de proponentes coletivos da LPG + - name: identificadores_agentes + description: Consolidação dos identificadores de todos os tipos de proponentes + - name: perfil_agentes_historico + description: Perfil dos agentes com higienização do histórico de acesso a recursos de fomento + - name: primeiro_acesso_resumo + description: Resumo agregado do indicador de primeiro acesso a recursos de fomento (Meta 5) diff --git a/airflow_lappis/dags/dbt/cultura/macros/create_udfs.sql b/airflow_lappis/dags/dbt/cultura/macros/create_udfs.sql new file mode 100644 index 00000000..dd230cf5 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/macros/create_udfs.sql @@ -0,0 +1,10 @@ +{% macro create_udfs() %} + +create schema if not exists {{ target.schema }}; + + {{ create_f_parse_dates() }} + ; + {{ create_f_format_nc() }} + ; + +{% endmacro %} diff --git a/airflow_lappis/dags/dbt/cultura/macros/data_quality/row_count_match.sql b/airflow_lappis/dags/dbt/cultura/macros/data_quality/row_count_match.sql new file mode 100644 index 00000000..f248e30c --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/macros/data_quality/row_count_match.sql @@ -0,0 +1,14 @@ +{% macro test_row_count_match(model, source_table, target_table) %} + with + source_count as (select count(*) as row_count from {{ source_table }}), + target_count as (select count(*) as row_count from {{ target_table }}), + comparison as ( + select + source_count.row_count as source_row_count, + target_count.row_count as target_row_count + from source_count, target_count + ) + select * + from comparison + where source_row_count != target_row_count +{% endmacro %} diff --git a/airflow_lappis/dags/dbt/cultura/macros/data_quality/verificacao_tipagem.sql b/airflow_lappis/dags/dbt/cultura/macros/data_quality/verificacao_tipagem.sql new file mode 100644 index 00000000..34c3d392 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/macros/data_quality/verificacao_tipagem.sql @@ -0,0 +1,25 @@ +{% macro test_verificacao_tipagem(model, nome_tabela, nome_coluna, tipo_esperado) %} + with + column_info as ( + select + table_schema, + table_name, -- Nome real da coluna no information_schema + column_name, -- Nome real da coluna no information_schema + data_type + from information_schema.columns + where + table_schema || '.' || table_name = '{{ nome_tabela }}' + and column_name = '{{ nome_coluna }}' + ), + comparison as ( + select + '{{ nome_tabela }}' as nome_tabela, + '{{ nome_coluna }}' as nome_coluna, + '{{ tipo_esperado }}' as tipo_esperado, + data_type as actual_type + from column_info + ) + select * + from comparison + where actual_type != tipo_esperado +{% endmacro %} diff --git a/airflow_lappis/dags/dbt/cultura/macros/get_custom_schema.sql b/airflow_lappis/dags/dbt/cultura/macros/get_custom_schema.sql new file mode 100644 index 00000000..701964ca --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/macros/get_custom_schema.sql @@ -0,0 +1,4 @@ +-- built-in schema generator +{% macro generate_schema_name(custom_schema_name, node) -%} + {{ generate_schema_name_for_env(custom_schema_name, node) }} +{%- endmacro %} diff --git a/airflow_lappis/dags/dbt/cultura/macros/metadata/generate_metadata.sql b/airflow_lappis/dags/dbt/cultura/macros/metadata/generate_metadata.sql new file mode 100644 index 00000000..8bfb115b --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/macros/metadata/generate_metadata.sql @@ -0,0 +1,46 @@ +{% macro get_model_metadata() %} +{# + Esta macro retorna os metadados do modelo atual. + Pode ser usada em post-hooks para registrar metadados automaticamente. +#} + SELECT + '{{ this.schema }}' AS schema_name, + '{{ this.name }}' AS table_name, + '{{ this.database }}' AS database_name, + ('{{ run_started_at }}'::TIMESTAMP AT TIME ZONE 'UTC' AT TIME ZONE 'America/Sao_Paulo') AS dt_transform, + '{{ invocation_id }}' AS run_id +{% endmacro %} + + +{% macro register_model_metadata() %} +{# + Esta macro registra os metadados do modelo em uma tabela central. + Deve ser usada como post-hook nos modelos que deseja rastrear. + + Uso no dbt_project.yml: + models: + ipea: + +post-hook: + - "{{ register_model_metadata() }}" +#} + + INSERT INTO {{ target.database }}.metadata.models_metadata ( + schema_name, + table_name, + database_name, + dt_transform, + run_id + ) + VALUES ( + '{{ this.schema }}', + '{{ this.name }}', + '{{ this.database }}', + ('{{ run_started_at }}'::TIMESTAMP AT TIME ZONE 'UTC' AT TIME ZONE 'America/Sao_Paulo'), + '{{ invocation_id }}' + ) + ON CONFLICT (schema_name, table_name) + DO UPDATE SET + dt_transform = EXCLUDED.dt_transform, + run_id = EXCLUDED.run_id; + +{% endmacro %} diff --git a/airflow_lappis/dags/dbt/cultura/macros/parse_financial_value.sql b/airflow_lappis/dags/dbt/cultura/macros/parse_financial_value.sql new file mode 100644 index 00000000..437b673c --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/macros/parse_financial_value.sql @@ -0,0 +1,21 @@ +{% macro parse_financial_value(column_name) %} + + case + when {{ column_name }} is null or trim({{ column_name }}) = '' + then 0.00::numeric(15, 2) + when {{ column_name }} like '%NaN%' + then 0.00::numeric(15, 2) + when {{ column_name }} like '(%' + then + regexp_replace( + replace(coalesce({{ column_name }}, '0'), '.', ''), + '(\()?(\d+),(\d+)(\))?', + '-\2.\3' + )::numeric(15, 2) + else + replace( + replace(coalesce({{ column_name }}, '0'), '.', ''), ',', '.' + )::numeric(15, 2) + end + +{% endmacro %} diff --git a/airflow_lappis/dags/dbt/cultura/macros/schema.yml b/airflow_lappis/dags/dbt/cultura/macros/schema.yml new file mode 100644 index 00000000..694f3b23 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/macros/schema.yml @@ -0,0 +1,24 @@ + +version: 2 + +macros: + - name: create_udfs + description: > + Função que cria as UDFs necessárias para o funcionamento do projeto. + Essa função deve ser chamada no início de cada run para garantir que todas as UDFs estejam disponíveis. + + - name: generate_schema_name + description: > + Função que gera o nome do schema a ser utilizado no projeto. + A função dentro desta macro é built-in do dbt. + + ## UDFS + - name: create_f_parse_dates + description: > + Função que cria a UDF f_parse_dates, que é utilizada para converter texto no formato MÊS(texto)/ANO(numero) em datas. + arguments: + - name: in_text + type: text + description: > + Texto a ser convertido em data. + O texto deve estar no formato MÊS(texto)/ANO(numero). Ex.: FEV/2024 -> 2024-02-01 \ No newline at end of file diff --git a/airflow_lappis/dags/dbt/cultura/macros/udfs/f_format_nc.sql b/airflow_lappis/dags/dbt/cultura/macros/udfs/f_format_nc.sql new file mode 100644 index 00000000..f7a06c86 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/macros/udfs/f_format_nc.sql @@ -0,0 +1,19 @@ +{% macro create_f_format_nc() %} + create or replace function {{ target.schema }}.format_nc(in_text text) + returns text + as $$ + + with + + pre_process as ( + select left(in_text, 7) as prefix, + right(in_text, 4)::numeric as posfix + ) + + select concat(prefix, to_char(posfix, 'FM00000')) as result + from pre_process + + $$ + language sql + ; +{% endmacro %} diff --git a/airflow_lappis/dags/dbt/cultura/macros/udfs/f_parse_dates.sql b/airflow_lappis/dags/dbt/cultura/macros/udfs/f_parse_dates.sql new file mode 100644 index 00000000..3fd8693e --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/macros/udfs/f_parse_dates.sql @@ -0,0 +1,44 @@ +-- Essa fun +{% macro create_f_parse_dates() %} + + create or replace function {{ target.schema }}.parse_date(in_text text) + returns date + as + $$ + + with + + split_column as ( + select + split_part(in_text, '/', 1) as mes, + split_part(in_text, '/', 2) as ano + ), + + fixed_month as ( + select + ano, + case + when mes = 'JAN' then '01' + when mes = 'FEV' then '02' + when mes = 'MAR' then '03' + when mes = 'ABR' then '04' + when mes = 'MAI' then '05' + when mes = 'JUN' then '06' + when mes = 'JUL' then '07' + when mes = 'AGO' then '08' + when mes = 'SET' then '09' + when mes = 'OUT' then '10' + when mes = 'NOV' then '11' + when mes = 'DEZ' then '12' + else mes end as mes_num + from split_column + ) + + select + (to_date(ano::numeric - 1 || '-' || '12', 'YYYY-MM') + (mes_num || ' months')::interval)::date as result + from fixed_month + $$ + language sql + ; + +{% endmacro %} diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/agentes_coletivos.sql b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/agentes_coletivos.sql new file mode 100644 index 00000000..8089ae16 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/agentes_coletivos.sql @@ -0,0 +1,15 @@ +{{ config( + materialized='incremental', + unique_key='identificador_unico' +) }} + +SELECT + LOWER(TRIM("nº do cpf do representante do grupo/coletivo")) AS identificador_unico, + LOWER(TRIM("já acessou recursos públicos do fomento à cultura anteriorme")) AS ja_acessou_recursos_bruto +FROM {{ source('transferegov_fundo_a_fundo', 'lpg_dados_coletivos') }} + +{% if is_incremental() %} + WHERE LOWER(TRIM("nº do cpf do representante do grupo/coletivo")) NOT IN ( + SELECT identificador_unico FROM {{ this }} + ) +{% endif %} diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/agentes_pf.sql b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/agentes_pf.sql new file mode 100644 index 00000000..75a33256 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/agentes_pf.sql @@ -0,0 +1,15 @@ +{{ config( + materialized='incremental', + unique_key='identificador_unico' +) }} + +SELECT + LOWER(TRIM("nº do cpf")) AS identificador_unico, + LOWER(TRIM("já acessou recursos públicos do fomento à cultura anteriorme")) AS ja_acessou_recursos_bruto +FROM {{ source('transferegov_fundo_a_fundo', 'lpg_dados_pessoa_fisica') }} + +{% if is_incremental() %} + WHERE LOWER(TRIM("nº do cpf")) NOT IN ( + SELECT identificador_unico FROM {{ this }} + ) +{% endif %} diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/agentes_pj.sql b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/agentes_pj.sql new file mode 100644 index 00000000..ad9a013b --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/agentes_pj.sql @@ -0,0 +1,15 @@ +{{ config( + materialized='incremental', + unique_key='identificador_unico' +) }} + +SELECT + LOWER(TRIM("nº do cnpj")) AS identificador_unico, + LOWER(TRIM("já acessou recursos públicos do fomento à cultura anteriorme")) AS ja_acessou_recursos_bruto +FROM {{ source('transferegov_fundo_a_fundo', 'lpg_dados_pessoa_juridica') }} + +{% if is_incremental() %} + WHERE LOWER(TRIM("nº do cnpj")) NOT IN ( + SELECT identificador_unico FROM {{ this }} + ) +{% endif %} diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/schema.yml b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/schema.yml new file mode 100644 index 00000000..ca4b3fe8 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/schema.yml @@ -0,0 +1,89 @@ +version: 2 + +models: + - name: agentes_pf + description: > + Camada Bronze — dados brutos de pessoas físicas da LPG (Lei Paulo Gustavo). + Mapeia o CPF como identificador único e traz a coluna bruta de histórico + de acesso a recursos públicos de fomento à cultura. + config: + meta: + tags: + - bronze + - agentes + - pessoa_fisica + columns: + - name: identificador_unico + description: CPF do proponente pessoa física, normalizado para lowercase. + tests: + - not_null + - name: ja_acessou_recursos_bruto + description: Resposta bruta da LPG sobre acesso anterior a recursos públicos de fomento. + data_tests: + - row_count_match: + arguments: + source_table: transferegov_fundo_a_fundo.lpg_dados_pessoa_fisica + target_table: agentes.agentes_pf + - verificacao_tipagem: + arguments: + nome_tabela: 'agentes.agentes_pf' + nome_coluna: 'identificador_unico' + tipo_esperado: 'text' + + - name: agentes_pj + description: > + Camada Bronze — dados brutos de pessoas jurídicas da LPG. + Mapeia o CNPJ como identificador único e traz a coluna bruta de histórico + de acesso a recursos públicos de fomento à cultura. + config: + meta: + tags: + - bronze + - agentes + - pessoa_juridica + columns: + - name: identificador_unico + description: CNPJ do proponente pessoa jurídica, normalizado para lowercase. + tests: + - not_null + - name: ja_acessou_recursos_bruto + description: Resposta bruta da LPG sobre acesso anterior a recursos públicos de fomento. + data_tests: + - row_count_match: + arguments: + source_table: transferegov_fundo_a_fundo.lpg_dados_pessoa_juridica + target_table: agentes.agentes_pj + - verificacao_tipagem: + arguments: + nome_tabela: 'agentes.agentes_pj' + nome_coluna: 'identificador_unico' + tipo_esperado: 'text' + + - name: agentes_coletivos + description: > + Camada Bronze — dados brutos de proponentes coletivos da LPG. + Mapeia o CPF do representante como identificador único e traz a coluna bruta + de histórico de acesso a recursos públicos de fomento à cultura. + config: + meta: + tags: + - bronze + - agentes + - coletivo + columns: + - name: identificador_unico + description: CPF do representante do coletivo, normalizado para lowercase. + tests: + - not_null + - name: ja_acessou_recursos_bruto + description: Resposta bruta da LPG sobre acesso anterior a recursos públicos de fomento. + data_tests: + - row_count_match: + arguments: + source_table: transferegov_fundo_a_fundo.lpg_dados_coletivos + target_table: agentes.agentes_coletivos + - verificacao_tipagem: + arguments: + nome_tabela: 'agentes.agentes_coletivos' + nome_coluna: 'identificador_unico' + tipo_esperado: 'text' diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/primeiro_acesso_resumo.sql b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/primeiro_acesso_resumo.sql new file mode 100644 index 00000000..228ee055 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/primeiro_acesso_resumo.sql @@ -0,0 +1,21 @@ +{{ config( + materialized='table' +) }} + +WITH resumo AS ( + SELECT + categoria_primeiro_acesso, + COUNT(DISTINCT identificador_unico) AS total_proponentes + FROM {{ ref('perfil_agentes_historico') }} + GROUP BY categoria_primeiro_acesso +) + +SELECT + categoria_primeiro_acesso, + total_proponentes, + ROUND( + (total_proponentes * 100.0) / SUM(total_proponentes) OVER (), + 2 + ) AS percentual +FROM resumo +ORDER BY total_proponentes DESC diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/schema.yml b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/schema.yml new file mode 100644 index 00000000..00b3e419 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/schema.yml @@ -0,0 +1,33 @@ +version: 2 + +models: + - name: primeiro_acesso_resumo + description: > + Camada Gold — resumo agregado da Meta 5 (Indicador de Primeiro Acesso). + Conta proponentes distintos por categoria de acesso anterior a recursos + públicos de fomento à cultura e calcula o percentual de cada categoria + sobre o total geral. + config: + meta: + meta_objetivo: 'Meta 5 — Indicador de Primeiro Acesso' + origem_dados: 'LPG — Lei Paulo Gustavo' + tags: [gold, agentes, primeiro_acesso, meta5] + columns: + - name: categoria_primeiro_acesso + description: > + Categoria padronizada da resposta sobre acesso anterior a recursos + de fomento — 'Sim', 'Não' ou 'Não sabe/Não informou'. + tests: + - not_null + - unique + - accepted_values: + arguments: + values: ['Sim', 'Não', 'Não sabe/Não informou'] + - name: total_proponentes + description: Contagem distinta de proponentes na categoria. + tests: + - not_null + - name: percentual + description: Percentual da categoria sobre o total geral de proponentes. + tests: + - not_null diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/silver/perfil_agentes_historico.sql b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/silver/perfil_agentes_historico.sql new file mode 100644 index 00000000..8c896169 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/silver/perfil_agentes_historico.sql @@ -0,0 +1,55 @@ +{{ config( + materialized='table' +) }} + +WITH identificadores AS ( + SELECT + identificador_unico, + tipo_proponente, + ja_acessou_recursos_bruto + FROM {{ ref('identificadores_agentes') }} +), + +higienizados AS ( + SELECT + identificador_unico, + tipo_proponente, + TRIM( + REPLACE( + REPLACE( + REPLACE( + ja_acessou_recursos_bruto, + '.', '' + ), + ';', '' + ), + '"', '' + ) + ) AS ja_acessou_recursos_limpo + FROM identificadores + WHERE ja_acessou_recursos_bruto IS NOT NULL + AND LOWER(ja_acessou_recursos_bruto) != 'nan' + AND TRIM(ja_acessou_recursos_bruto) != '' +) + +SELECT + identificador_unico, + tipo_proponente, + ja_acessou_recursos_limpo, + CASE + WHEN ja_acessou_recursos_limpo = 'sim' THEN 'Sim' + WHEN ja_acessou_recursos_limpo IN ('não', 'nao', 'nâo') THEN 'Não' + WHEN ja_acessou_recursos_limpo IN ( + 'não sei informar', + 'nao sei informar', + 'não informado', + 'nao informado', + 'nao_declarar', + 'não sei', + 'nao sei', + 'não sabe', + 'nao sabe' + ) THEN 'Não sabe/Não informou' + ELSE 'Não sabe/Não informou' + END AS categoria_primeiro_acesso +FROM higienizados diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/silver/schema.yml b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/silver/schema.yml new file mode 100644 index 00000000..d3023b6a --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/silver/schema.yml @@ -0,0 +1,29 @@ +version: 2 + +models: + - name: perfil_agentes_historico + description: > + Camada Silver — perfil dos agentes com higienização da coluna de histórico + de acesso a recursos públicos. Remove valores nulos, 'nan' e caracteres + inválidos (ponto, ponto-e-vírgula). Classifica as respostas em três + categorias padronizadas: 'Sim', 'Não' e 'Não sabe/Não informou'. + config: + meta: + tags: [silver, agentes, higienizacao] + columns: + - name: identificador_unico + description: Identificador único do proponente (CPF, CNPJ ou CPF do representante). + tests: + - not_null + - name: tipo_proponente + description: Tipo do proponente — 'Pessoa Física', 'Pessoa Jurídica' ou 'Coletivo'. + - name: ja_acessou_recursos_limpo + description: Texto limpo da resposta bruta sobre acesso anterior a recursos de fomento. + - name: categoria_primeiro_acesso + description: > + Categoria padronizada da resposta: 'Sim', 'Não' ou 'Não sabe/Não informou'. + tests: + - not_null + - accepted_values: + arguments: + values: ['Sim', 'Não', 'Não sabe/Não informou'] diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/views/identificadores_agentes.sql b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/views/identificadores_agentes.sql new file mode 100644 index 00000000..7c2fff83 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/views/identificadores_agentes.sql @@ -0,0 +1,25 @@ +{{ config( + materialized='view' +) }} + +SELECT + identificador_unico, + ja_acessou_recursos_bruto, + 'Pessoa Física' AS tipo_proponente +FROM {{ ref('agentes_pf') }} + +UNION ALL + +SELECT + identificador_unico, + ja_acessou_recursos_bruto, + 'Pessoa Jurídica' AS tipo_proponente +FROM {{ ref('agentes_pj') }} + +UNION ALL + +SELECT + identificador_unico, + ja_acessou_recursos_bruto, + 'Coletivo' AS tipo_proponente +FROM {{ ref('agentes_coletivos') }} diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/views/schema.yml b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/views/schema.yml new file mode 100644 index 00000000..49326f21 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/views/schema.yml @@ -0,0 +1,18 @@ +version: 2 + +models: + - name: identificadores_agentes + description: > + Camada Views — consolidação dos três modelos bronze (PF, PJ e Coletivos) + em um único dataset via UNION ALL. Padroniza identificador_unico e adiciona + a coluna literal tipo_proponente para rastrear a origem de cada registro. + config: + meta: + tags: [views, agentes, identificadores] + columns: + - name: identificador_unico + description: Identificador único do proponente (CPF, CNPJ ou CPF do representante). + - name: ja_acessou_recursos_bruto + description: Resposta bruta sobre acesso anterior a recursos de fomento. + - name: tipo_proponente + description: Tipo do proponente — 'Pessoa Física', 'Pessoa Jurídica' ou 'Coletivo'. diff --git a/airflow_lappis/dags/dbt/cultura/models/metadata/models_metadata.sql b/airflow_lappis/dags/dbt/cultura/models/metadata/models_metadata.sql new file mode 100644 index 00000000..f1981994 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/models/metadata/models_metadata.sql @@ -0,0 +1,67 @@ +{{ + config( + materialized='incremental', + unique_key=['schema_name', 'table_name'], + on_schema_change='sync_all_columns' + ) +}} + +{# + Tabela de Metadados dos Modelos dbt + =================================== + + Esta tabela armazena metadados de todos os modelos executados no dbt. + + Campos principais: + - schema_name: Schema do modelo + - table_name: Nome da tabela/modelo + - dt_transform: Data da última transformação (quando o modelo foi executado) + - run_id: ID único da execução do dbt + + A tabela é atualizada de forma incremental, mantendo apenas o registro + mais recente para cada combinação de schema + table_name. +#} + +WITH dbt_models AS ( + {# + Usando a função graph do dbt para iterar sobre todos os modelos do projeto. + Isso garante que capturamos metadados de todos os modelos definidos. + #} + {% set models_data = [] %} + + {% for node in graph.nodes.values() %} + {% if node.resource_type == 'model' %} + {% do models_data.append({ + 'schema_name': node.schema, + 'table_name': node.name, + 'database_name': node.database, + 'materialization': node.config.materialized, + 'description': node.description | default('') | replace("'", "''") + }) %} + {% endif %} + {% endfor %} + + {% for model in models_data %} + SELECT + '{{ model.schema_name }}' AS schema_name, + '{{ model.table_name }}' AS table_name, + '{{ model.database_name }}' AS database_name, + '{{ model.materialization }}' AS materialization, + '{{ model.description[:500] }}' AS description, + ('{{ run_started_at }}'::TIMESTAMP AT TIME ZONE 'UTC' AT TIME ZONE 'America/Sao_Paulo') AS dt_transform, + '{{ invocation_id }}' AS run_id + {% if not loop.last %} + UNION ALL + {% endif %} + {% endfor %} +) + +SELECT + schema_name, + table_name, + database_name, + materialization, + description, + dt_transform, + run_id +FROM dbt_models diff --git a/airflow_lappis/dags/dbt/cultura/models/metadata/schema.yml b/airflow_lappis/dags/dbt/cultura/models/metadata/schema.yml new file mode 100644 index 00000000..bb85b1e5 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/models/metadata/schema.yml @@ -0,0 +1,45 @@ +version: 2 + +models: + - name: models_metadata + description: > + Tabela central de metadados que armazena informações sobre todos os modelos dbt executados. + Cada linha representa um modelo único, identificado pela combinação de schema e table_name. + A tabela é atualizada de forma incremental, mantendo histórico das execuções. + config: + meta: + tags: + - metadata + - governance + columns: + - name: schema_name + description: Nome do schema onde o modelo está localizado. + tests: + - not_null + + - name: table_name + description: Nome da tabela/modelo. + tests: + - not_null + + - name: database_name + description: Nome do banco de dados onde o modelo está materializado. + + - name: materialization + description: Tipo de materialização do modelo (table, view, incremental, etc). + + - name: description + description: Descrição do modelo extraída do schema.yml. + + - name: dt_transform + description: > + Data e hora em que o modelo foi transformado/executado pela última vez. + Corresponde ao momento em que a execução do dbt foi iniciada (run_started_at). + Timezone: America/Sao_Paulo (UTC-3). + tests: + - not_null + + - name: run_id + description: > + Identificador único da execução do dbt (invocation_id). + Permite rastrear qual execução gerou a transformação. diff --git a/airflow_lappis/dags/dbt/cultura/models/sources.yml b/airflow_lappis/dags/dbt/cultura/models/sources.yml new file mode 100644 index 00000000..8b7992a2 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/models/sources.yml @@ -0,0 +1,9 @@ +version: 2 + +sources: + - name: transferegov_fundo_a_fundo + schema: transferegov_fundo_a_fundo + tables: + - name: lpg_dados_pessoa_fisica + - name: lpg_dados_pessoa_juridica + - name: lpg_dados_coletivos diff --git a/airflow_lappis/dags/dbt/cultura/profiles.yml b/airflow_lappis/dags/dbt/cultura/profiles.yml new file mode 100644 index 00000000..caa63f5d --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/profiles.yml @@ -0,0 +1,12 @@ +cultura: + target: prod + outputs: + prod: + type: postgres + host: "{{ env_var('DB_DW_HOST_CULTURA', 'postgres') }}" + user: "{{ env_var('DB_DW_USER_CULTURA', 'postgres_dw') }}" + password: "{{ env_var('DB_DW_PASSWORD_CULTURA', 'postgres_dw') }}" + port: "{{ env_var('DB_DW_PORT_CULTURA', '5432') | int }}" + dbname: "{{ env_var('DB_DW_DBNAME_CULTURA', 'data_warehouse') }}" + schema: "{{ env_var('DB_DW_SCHEMA_CULTURA', 'cultura') }}" + threads: 4 From c4fea61fbaf6d6b7959b97d819a5b2f40383a6d8 Mon Sep 17 00:00:00 2001 From: Caio Melo Borges Date: Thu, 11 Jun 2026 06:14:36 -0300 Subject: [PATCH 2/8] feat(analytics): unifica dados de proponentes LPG e PNAB para Meta 5 --- ...oletivos.sql => lpg_agentes_coletivos.sql} | 11 +- .../{agentes_pf.sql => lpg_agentes_pf.sql} | 11 +- .../{agentes_pj.sql => lpg_agentes_pj.sql} | 11 +- .../agentes_dbt/bronze/pnab_agentes_pf.sql | 16 +++ .../agentes_dbt/bronze/pnab_agentes_pj.sql | 16 +++ .../models/agentes_dbt/bronze/schema.yml | 101 +++++++++++------- .../gold/primeiro_acesso_resumo.sql | 10 +- .../models/agentes_dbt/gold/schema.yml | 24 +++-- .../silver/perfil_agentes_historico.sql | 23 ++-- .../models/agentes_dbt/silver/schema.yml | 15 ++- .../views/identificadores_agentes.sql | 37 +++++-- .../models/agentes_dbt/views/schema.yml | 16 ++- .../dags/dbt/cultura/models/sources.yml | 2 + airflow_lappis/dags/dbt/cultura/profiles.yml | 10 +- airflow_lappis/plugins/extracao_planilhas.py | 50 +++++++++ 15 files changed, 255 insertions(+), 98 deletions(-) rename airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/{agentes_coletivos.sql => lpg_agentes_coletivos.sql} (59%) rename airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/{agentes_pf.sql => lpg_agentes_pf.sql} (53%) rename airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/{agentes_pj.sql => lpg_agentes_pj.sql} (53%) create mode 100644 airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/pnab_agentes_pf.sql create mode 100644 airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/pnab_agentes_pj.sql diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/agentes_coletivos.sql b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_coletivos.sql similarity index 59% rename from airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/agentes_coletivos.sql rename to airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_coletivos.sql index 8089ae16..8900643c 100644 --- a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/agentes_coletivos.sql +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_coletivos.sql @@ -4,12 +4,13 @@ ) }} SELECT - LOWER(TRIM("nº do cpf do representante do grupo/coletivo")) AS identificador_unico, - LOWER(TRIM("já acessou recursos públicos do fomento à cultura anteriorme")) AS ja_acessou_recursos_bruto + LOWER(TRIM("nº do cpf do representante do grupo/coletivo")) AS identificador_unico, + LOWER(TRIM("já acessou recursos públicos do fomento à cultura anteriorme")) AS historico_acesso_bruto, + 'LPG' AS programa_fomento FROM {{ source('transferegov_fundo_a_fundo', 'lpg_dados_coletivos') }} {% if is_incremental() %} - WHERE LOWER(TRIM("nº do cpf do representante do grupo/coletivo")) NOT IN ( - SELECT identificador_unico FROM {{ this }} - ) +WHERE LOWER(TRIM("nº do cpf do representante do grupo/coletivo")) NOT IN ( + SELECT identificador_unico FROM {{ this }} +) {% endif %} diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/agentes_pf.sql b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_pf.sql similarity index 53% rename from airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/agentes_pf.sql rename to airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_pf.sql index 75a33256..85ad8368 100644 --- a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/agentes_pf.sql +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_pf.sql @@ -4,12 +4,13 @@ ) }} SELECT - LOWER(TRIM("nº do cpf")) AS identificador_unico, - LOWER(TRIM("já acessou recursos públicos do fomento à cultura anteriorme")) AS ja_acessou_recursos_bruto + LOWER(TRIM("nº do cpf")) AS identificador_unico, + LOWER(TRIM("já acessou recursos públicos do fomento à cultura anteriorme")) AS historico_acesso_bruto, + 'LPG' AS programa_fomento FROM {{ source('transferegov_fundo_a_fundo', 'lpg_dados_pessoa_fisica') }} {% if is_incremental() %} - WHERE LOWER(TRIM("nº do cpf")) NOT IN ( - SELECT identificador_unico FROM {{ this }} - ) +WHERE LOWER(TRIM("nº do cpf")) NOT IN ( + SELECT identificador_unico FROM {{ this }} +) {% endif %} diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/agentes_pj.sql b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_pj.sql similarity index 53% rename from airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/agentes_pj.sql rename to airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_pj.sql index ad9a013b..8bdb07f2 100644 --- a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/agentes_pj.sql +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_pj.sql @@ -4,12 +4,13 @@ ) }} SELECT - LOWER(TRIM("nº do cnpj")) AS identificador_unico, - LOWER(TRIM("já acessou recursos públicos do fomento à cultura anteriorme")) AS ja_acessou_recursos_bruto + LOWER(TRIM("nº do cnpj")) AS identificador_unico, + LOWER(TRIM("já acessou recursos públicos do fomento à cultura anteriorme")) AS historico_acesso_bruto, + 'LPG' AS programa_fomento FROM {{ source('transferegov_fundo_a_fundo', 'lpg_dados_pessoa_juridica') }} {% if is_incremental() %} - WHERE LOWER(TRIM("nº do cnpj")) NOT IN ( - SELECT identificador_unico FROM {{ this }} - ) +WHERE LOWER(TRIM("nº do cnpj")) NOT IN ( + SELECT identificador_unico FROM {{ this }} +) {% endif %} diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/pnab_agentes_pf.sql b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/pnab_agentes_pf.sql new file mode 100644 index 00000000..a5670205 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/pnab_agentes_pf.sql @@ -0,0 +1,16 @@ +{{ config( + materialized='incremental', + unique_key='identificador_unico' +) }} + +SELECT + LOWER(TRIM("nº do cpf")) AS identificador_unico, + LOWER(TRIM("já acessou recursos públicos do fomento à cultura nos últim")) AS historico_acesso_bruto, + 'PNAB' AS programa_fomento +FROM {{ source('transferegov_fundo_a_fundo', 'pnab_pessoas') }} + +{% if is_incremental() %} +WHERE LOWER(TRIM("nº do cpf")) NOT IN ( + SELECT identificador_unico FROM {{ this }} +) +{% endif %} diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/pnab_agentes_pj.sql b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/pnab_agentes_pj.sql new file mode 100644 index 00000000..e67a7131 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/pnab_agentes_pj.sql @@ -0,0 +1,16 @@ +{{ config( + materialized='incremental', + unique_key='identificador_unico' +) }} + +SELECT + LOWER(TRIM("nº do cnpj")) AS identificador_unico, + LOWER(TRIM("já acessou recursos públicos do fomento à cultura anteriorment")) AS historico_acesso_bruto, + 'PNAB' AS programa_fomento +FROM {{ source('transferegov_fundo_a_fundo', 'pnab_organizacoes') }} + +{% if is_incremental() %} +WHERE LOWER(TRIM("nº do cnpj")) NOT IN ( + SELECT identificador_unico FROM {{ this }} +) +{% endif %} diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/schema.yml b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/schema.yml index ca4b3fe8..409d7b17 100644 --- a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/schema.yml +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/schema.yml @@ -1,7 +1,7 @@ version: 2 models: - - name: agentes_pf + - name: lpg_agentes_pf description: > Camada Bronze — dados brutos de pessoas físicas da LPG (Lei Paulo Gustavo). Mapeia o CPF como identificador único e traz a coluna bruta de histórico @@ -12,25 +12,18 @@ models: - bronze - agentes - pessoa_fisica + - lpg columns: - name: identificador_unico description: CPF do proponente pessoa física, normalizado para lowercase. tests: - not_null - - name: ja_acessou_recursos_bruto + - name: historico_acesso_bruto description: Resposta bruta da LPG sobre acesso anterior a recursos públicos de fomento. - data_tests: - - row_count_match: - arguments: - source_table: transferegov_fundo_a_fundo.lpg_dados_pessoa_fisica - target_table: agentes.agentes_pf - - verificacao_tipagem: - arguments: - nome_tabela: 'agentes.agentes_pf' - nome_coluna: 'identificador_unico' - tipo_esperado: 'text' + - name: programa_fomento + description: Programa de fomento de origem — sempre 'LPG' neste modelo. - - name: agentes_pj + - name: lpg_agentes_pj description: > Camada Bronze — dados brutos de pessoas jurídicas da LPG. Mapeia o CNPJ como identificador único e traz a coluna bruta de histórico @@ -41,49 +34,79 @@ models: - bronze - agentes - pessoa_juridica + - lpg columns: - name: identificador_unico description: CNPJ do proponente pessoa jurídica, normalizado para lowercase. tests: - not_null - - name: ja_acessou_recursos_bruto + - name: historico_acesso_bruto description: Resposta bruta da LPG sobre acesso anterior a recursos públicos de fomento. - data_tests: - - row_count_match: - arguments: - source_table: transferegov_fundo_a_fundo.lpg_dados_pessoa_juridica - target_table: agentes.agentes_pj - - verificacao_tipagem: - arguments: - nome_tabela: 'agentes.agentes_pj' - nome_coluna: 'identificador_unico' - tipo_esperado: 'text' + - name: programa_fomento + description: Programa de fomento de origem — sempre 'LPG' neste modelo. - - name: agentes_coletivos + - name: lpg_agentes_coletivos description: > Camada Bronze — dados brutos de proponentes coletivos da LPG. - Mapeia o CPF do representante como identificador único e traz a coluna bruta - de histórico de acesso a recursos públicos de fomento à cultura. + Mapeia o CPF do representante do grupo/coletivo como identificador único + e traz a coluna bruta de histórico de acesso a recursos públicos de fomento. config: meta: tags: - bronze - agentes - coletivo + - lpg columns: - name: identificador_unico - description: CPF do representante do coletivo, normalizado para lowercase. + description: CPF do representante do grupo/coletivo, normalizado para lowercase. tests: - not_null - - name: ja_acessou_recursos_bruto + - name: historico_acesso_bruto description: Resposta bruta da LPG sobre acesso anterior a recursos públicos de fomento. - data_tests: - - row_count_match: - arguments: - source_table: transferegov_fundo_a_fundo.lpg_dados_coletivos - target_table: agentes.agentes_coletivos - - verificacao_tipagem: - arguments: - nome_tabela: 'agentes.agentes_coletivos' - nome_coluna: 'identificador_unico' - tipo_esperado: 'text' + - name: programa_fomento + description: Programa de fomento de origem — sempre 'LPG' neste modelo. + + - name: pnab_agentes_pf + description: > + Camada Bronze — dados brutos de pessoas físicas da PNAB (Política Nacional Aldir Blanc). + Mapeia o CPF como identificador único e traz a coluna bruta de histórico + de acesso a recursos públicos de fomento à cultura nos últimos 5 anos. + config: + meta: + tags: + - bronze + - agentes + - pessoa_fisica + - pnab + columns: + - name: identificador_unico + description: CPF do proponente pessoa física, normalizado para lowercase. + tests: + - not_null + - name: historico_acesso_bruto + description: Resposta bruta da PNAB sobre acesso anterior a recursos públicos de fomento. + - name: programa_fomento + description: Programa de fomento de origem — sempre 'PNAB' neste modelo. + + - name: pnab_agentes_pj + description: > + Camada Bronze — dados brutos de organizações da PNAB. + Mapeia o CNPJ como identificador único e traz a coluna bruta de histórico + de acesso a recursos públicos de fomento à cultura anteriormente. + config: + meta: + tags: + - bronze + - agentes + - pessoa_juridica + - pnab + columns: + - name: identificador_unico + description: CNPJ da organização, normalizado para lowercase. + tests: + - not_null + - name: historico_acesso_bruto + description: Resposta bruta da PNAB sobre acesso anterior a recursos públicos de fomento. + - name: programa_fomento + description: Programa de fomento de origem — sempre 'PNAB' neste modelo. diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/primeiro_acesso_resumo.sql b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/primeiro_acesso_resumo.sql index 228ee055..c071e6f8 100644 --- a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/primeiro_acesso_resumo.sql +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/primeiro_acesso_resumo.sql @@ -4,18 +4,22 @@ WITH resumo AS ( SELECT + programa_fomento, categoria_primeiro_acesso, COUNT(DISTINCT identificador_unico) AS total_proponentes FROM {{ ref('perfil_agentes_historico') }} - GROUP BY categoria_primeiro_acesso + GROUP BY programa_fomento, categoria_primeiro_acesso ) SELECT + programa_fomento, categoria_primeiro_acesso, total_proponentes, ROUND( - (total_proponentes * 100.0) / SUM(total_proponentes) OVER (), + (total_proponentes::NUMERIC + / SUM(total_proponentes) OVER (PARTITION BY programa_fomento)) + * 100, 2 ) AS percentual FROM resumo -ORDER BY total_proponentes DESC +ORDER BY programa_fomento, total_proponentes DESC diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/schema.yml b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/schema.yml index 00b3e419..fd56c6b2 100644 --- a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/schema.yml +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/schema.yml @@ -4,30 +4,36 @@ models: - name: primeiro_acesso_resumo description: > Camada Gold — resumo agregado da Meta 5 (Indicador de Primeiro Acesso). - Conta proponentes distintos por categoria de acesso anterior a recursos - públicos de fomento à cultura e calcula o percentual de cada categoria - sobre o total geral. + Conta proponentes distintos por programa de fomento e categoria de acesso + anterior a recursos públicos. Calcula o percentual de cada categoria + sobre o total do respectivo programa (cada programa soma 100%). config: meta: meta_objetivo: 'Meta 5 — Indicador de Primeiro Acesso' - origem_dados: 'LPG — Lei Paulo Gustavo' + origem_dados: 'LPG e PNAB' tags: [gold, agentes, primeiro_acesso, meta5] columns: + - name: programa_fomento + description: Programa de fomento — 'LPG' ou 'PNAB'. + tests: + - not_null + - accepted_values: + values: ['LPG', 'PNAB'] - name: categoria_primeiro_acesso description: > Categoria padronizada da resposta sobre acesso anterior a recursos de fomento — 'Sim', 'Não' ou 'Não sabe/Não informou'. tests: - not_null - - unique - accepted_values: - arguments: - values: ['Sim', 'Não', 'Não sabe/Não informou'] + values: ['Sim', 'Não', 'Não sabe/Não informou'] - name: total_proponentes - description: Contagem distinta de proponentes na categoria. + description: Contagem distinta de proponentes na categoria dentro do programa. tests: - not_null - name: percentual - description: Percentual da categoria sobre o total geral de proponentes. + description: > + Percentual da categoria sobre o total de proponentes do mesmo programa + (cada programa soma 100% independentemente). tests: - not_null diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/silver/perfil_agentes_historico.sql b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/silver/perfil_agentes_historico.sql index 8c896169..8eb5e300 100644 --- a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/silver/perfil_agentes_historico.sql +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/silver/perfil_agentes_historico.sql @@ -6,7 +6,8 @@ WITH identificadores AS ( SELECT identificador_unico, tipo_proponente, - ja_acessou_recursos_bruto + programa_fomento, + historico_acesso_bruto FROM {{ ref('identificadores_agentes') }} ), @@ -14,32 +15,34 @@ higienizados AS ( SELECT identificador_unico, tipo_proponente, + programa_fomento, TRIM( REPLACE( REPLACE( REPLACE( - ja_acessou_recursos_bruto, + historico_acesso_bruto, '.', '' ), ';', '' ), '"', '' ) - ) AS ja_acessou_recursos_limpo + ) AS historico_acesso_limpo FROM identificadores - WHERE ja_acessou_recursos_bruto IS NOT NULL - AND LOWER(ja_acessou_recursos_bruto) != 'nan' - AND TRIM(ja_acessou_recursos_bruto) != '' + WHERE historico_acesso_bruto IS NOT NULL + AND LOWER(historico_acesso_bruto) != 'nan' + AND TRIM(historico_acesso_bruto) != '' ) SELECT identificador_unico, tipo_proponente, - ja_acessou_recursos_limpo, + programa_fomento, + historico_acesso_limpo, CASE - WHEN ja_acessou_recursos_limpo = 'sim' THEN 'Sim' - WHEN ja_acessou_recursos_limpo IN ('não', 'nao', 'nâo') THEN 'Não' - WHEN ja_acessou_recursos_limpo IN ( + WHEN historico_acesso_limpo = 'sim' THEN 'Sim' + WHEN historico_acesso_limpo IN ('não', 'nao', 'nâo') THEN 'Não' + WHEN historico_acesso_limpo IN ( 'não sei informar', 'nao sei informar', 'não informado', diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/silver/schema.yml b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/silver/schema.yml index d3023b6a..1aad7a68 100644 --- a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/silver/schema.yml +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/silver/schema.yml @@ -7,6 +7,7 @@ models: de acesso a recursos públicos. Remove valores nulos, 'nan' e caracteres inválidos (ponto, ponto-e-vírgula). Classifica as respostas em três categorias padronizadas: 'Sim', 'Não' e 'Não sabe/Não informou'. + Propaga a coluna programa_fomento para rastreabilidade por programa. config: meta: tags: [silver, agentes, higienizacao] @@ -16,8 +17,15 @@ models: tests: - not_null - name: tipo_proponente - description: Tipo do proponente — 'Pessoa Física', 'Pessoa Jurídica' ou 'Coletivo'. - - name: ja_acessou_recursos_limpo + description: > + Tipo do proponente — 'Pessoa Física', 'Pessoa Jurídica', + 'Coletivo' ou 'Organização'. + - name: programa_fomento + description: Programa de fomento de origem — 'LPG' ou 'PNAB'. + tests: + - accepted_values: + values: ['LPG', 'PNAB'] + - name: historico_acesso_limpo description: Texto limpo da resposta bruta sobre acesso anterior a recursos de fomento. - name: categoria_primeiro_acesso description: > @@ -25,5 +33,4 @@ models: tests: - not_null - accepted_values: - arguments: - values: ['Sim', 'Não', 'Não sabe/Não informou'] + values: ['Sim', 'Não', 'Não sabe/Não informou'] diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/views/identificadores_agentes.sql b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/views/identificadores_agentes.sql index 7c2fff83..c5a90864 100644 --- a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/views/identificadores_agentes.sql +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/views/identificadores_agentes.sql @@ -1,25 +1,44 @@ -{{ config( - materialized='view' -) }} +{{ config(materialized='view') }} SELECT identificador_unico, - ja_acessou_recursos_bruto, + historico_acesso_bruto, + programa_fomento, 'Pessoa Física' AS tipo_proponente -FROM {{ ref('agentes_pf') }} +FROM {{ ref('lpg_agentes_pf') }} UNION ALL SELECT identificador_unico, - ja_acessou_recursos_bruto, + historico_acesso_bruto, + programa_fomento, 'Pessoa Jurídica' AS tipo_proponente -FROM {{ ref('agentes_pj') }} +FROM {{ ref('lpg_agentes_pj') }} UNION ALL SELECT identificador_unico, - ja_acessou_recursos_bruto, + historico_acesso_bruto, + programa_fomento, 'Coletivo' AS tipo_proponente -FROM {{ ref('agentes_coletivos') }} +FROM {{ ref('lpg_agentes_coletivos') }} + +UNION ALL + +SELECT + identificador_unico, + historico_acesso_bruto, + programa_fomento, + 'Pessoa Física' AS tipo_proponente +FROM {{ ref('pnab_agentes_pf') }} + +UNION ALL + +SELECT + identificador_unico, + historico_acesso_bruto, + programa_fomento, + 'Organização' AS tipo_proponente +FROM {{ ref('pnab_agentes_pj') }} diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/views/schema.yml b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/views/schema.yml index 49326f21..5a29a421 100644 --- a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/views/schema.yml +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/views/schema.yml @@ -3,16 +3,24 @@ version: 2 models: - name: identificadores_agentes description: > - Camada Views — consolidação dos três modelos bronze (PF, PJ e Coletivos) + Camada Views — consolidação dos cinco modelos bronze (3 LPG + 2 PNAB) em um único dataset via UNION ALL. Padroniza identificador_unico e adiciona - a coluna literal tipo_proponente para rastrear a origem de cada registro. + as colunas literais programa_fomento e tipo_proponente para rastrear + a origem de cada registro. config: meta: tags: [views, agentes, identificadores] columns: - name: identificador_unico description: Identificador único do proponente (CPF, CNPJ ou CPF do representante). - - name: ja_acessou_recursos_bruto + - name: historico_acesso_bruto description: Resposta bruta sobre acesso anterior a recursos de fomento. + - name: programa_fomento + description: Programa de fomento de origem — 'LPG' ou 'PNAB'. + tests: + - accepted_values: + values: ['LPG', 'PNAB'] - name: tipo_proponente - description: Tipo do proponente — 'Pessoa Física', 'Pessoa Jurídica' ou 'Coletivo'. + description: > + Tipo do proponente — 'Pessoa Física', 'Pessoa Jurídica', + 'Coletivo' ou 'Organização'. diff --git a/airflow_lappis/dags/dbt/cultura/models/sources.yml b/airflow_lappis/dags/dbt/cultura/models/sources.yml index 8b7992a2..96411804 100644 --- a/airflow_lappis/dags/dbt/cultura/models/sources.yml +++ b/airflow_lappis/dags/dbt/cultura/models/sources.yml @@ -7,3 +7,5 @@ sources: - name: lpg_dados_pessoa_fisica - name: lpg_dados_pessoa_juridica - name: lpg_dados_coletivos + - name: pnab_pessoas + - name: pnab_organizacoes diff --git a/airflow_lappis/dags/dbt/cultura/profiles.yml b/airflow_lappis/dags/dbt/cultura/profiles.yml index caa63f5d..ce58d3db 100644 --- a/airflow_lappis/dags/dbt/cultura/profiles.yml +++ b/airflow_lappis/dags/dbt/cultura/profiles.yml @@ -3,10 +3,10 @@ cultura: outputs: prod: type: postgres - host: "{{ env_var('DB_DW_HOST_CULTURA', 'postgres') }}" - user: "{{ env_var('DB_DW_USER_CULTURA', 'postgres_dw') }}" - password: "{{ env_var('DB_DW_PASSWORD_CULTURA', 'postgres_dw') }}" - port: "{{ env_var('DB_DW_PORT_CULTURA', '5432') | int }}" - dbname: "{{ env_var('DB_DW_DBNAME_CULTURA', 'data_warehouse') }}" + host: "{{ env_var('DB_DW_HOST_CULTURA') }}" + port: "{{ env_var('DB_DW_PORT_CULTURA') | int }}" + user: "{{ env_var('DB_DW_USER_CULTURA') }}" + password: "{{ env_var('DB_DW_PASS_CULTURA') }}" + dbname: "{{ env_var('DB_DW_DATABASE_CULTURA') }}" schema: "{{ env_var('DB_DW_SCHEMA_CULTURA', 'cultura') }}" threads: 4 diff --git a/airflow_lappis/plugins/extracao_planilhas.py b/airflow_lappis/plugins/extracao_planilhas.py index 3ccd9d6b..a1d4b0ff 100644 --- a/airflow_lappis/plugins/extracao_planilhas.py +++ b/airflow_lappis/plugins/extracao_planilhas.py @@ -54,8 +54,15 @@ def _handler(signum, frame): "operacionalizacao": "raw_pnab_operacionalizacao", "lista de contemplados geral": "raw_pnab_lista_contemplados_geral", "lista contemplados pncv": "raw_pnab_lista_contemplados_pncv", + "pessoas": "pnab_pessoas", + "organizacoes": "pnab_organizacoes", } +# Abas de proponentes da PNAB +_ABAS_PROPONENTES_PNAB = {"pessoas", "organizacoes"} +# Aba "Organizações" tem células mescladas no topo — exige leitura dinâmica +_ABA_ORGANIZACOES_PNAB = {"organizacoes"} + def _norm_texto(s: str) -> str: """Normaliza texto para comparação tolerante de nomes de abas: @@ -546,6 +553,10 @@ def extrair_pnab( separadas por linhas de categoria (nome do edital). Cada DataFrame gerado recebe ``id_anexo`` na primeira posição e ``tipo_edital``. + Abas de proponentes (``Pessoas`` / ``Organizações``) são lidas com + ``header=1`` (linha 0 = título, linha 1 = cabeçalho, linha 2+ = dados), + seguindo o mesmo padrão dos "Dados Básicos" da LPG. + Parameters ---------- file_buffer : io.BytesIO @@ -592,6 +603,45 @@ def extrair_pnab( tabela_destino, ) + # ── Abas de proponentes (Pessoas / Organizações) ── + # Seguem o padrão "Dados Básicos" da LPG: linha 0 = título, + # linha 1 = header, linha 2+ = dados + linha de instruções. + aba_norm = _norm_texto(aba) + if aba_norm in _ABAS_PROPONENTES_PNAB: + try: + df = _com_timeout( + pd.read_excel, xls, sheet_name=aba, header=1, dtype=str + ) + except TimeoutLeituraError: + raise + except Exception as exc: + log.warning( + "[extracao_planilhas.py] Erro ao ler aba de " + "proponentes '%s' de '%s': %s", + aba, + file_name, + exc, + ) + continue + + if df.empty: + continue + + # Drop da linha de instruções (primeira linha após header) + df = df.iloc[1:].copy() + df = df.reset_index(drop=True) + df.insert(0, "id_anexo", id_anexo) + df = df.astype(str) + + resultados.append({ + "nome_tabela_destino": tabela_destino, + "dataframe": df, + }) + del df + gc.collect() + continue + + # ── Abas padrão (editais / contemplados / operacionalização) ── try: df = _com_timeout(pd.read_excel, xls, sheet_name=aba, dtype=str) except TimeoutLeituraError: From 36a1a8861192b2ecdab5235aed6a1e9446c2e7f3 Mon Sep 17 00:00:00 2001 From: Caio Melo Borges Date: Thu, 11 Jun 2026 06:16:14 -0300 Subject: [PATCH 3/8] =?UTF-8?q?refactor:=20refatora=20env.example=20para?= =?UTF-8?q?=20conex=C3=A3o=20com=20o=20banco=20para=20o=20dbt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env.example | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 .env.example diff --git a/.env.example b/.env.example new file mode 100644 index 00000000..cd1d3c19 --- /dev/null +++ b/.env.example @@ -0,0 +1,43 @@ +# <---------- Airflow ----------> + +AIRFLOW_IMAGE_NAME=apache/airflow:latest +AIRFLOW__CORE__FERNET_KEY= +AIRFLOW_HOME=/opt/airflow/ +_AIRFLOW_WWW_USER_USERNAME=airflow +_AIRFLOW_WWW_USER_PASSWORD=airflow +AIRFLOW_UID=50000 + + +# <---------- Postgres Airflow ----------> + +POSTGRES_USER=postgres +POSTGRES_PASSWORD=postgres +POSTGRES_DB=postgres + + +# <---------- Postgres Data Warehouse ----------> + +POSTGRES_USER_DW=postgres +POSTGRES_PASSWORD_DW=postgres +POSTGRES_DB_DW=postgres + + +# <---------- dbt Profile Variables (Cultura) ----------> + +DB_DW_HOST_CULTURA=localhost +DB_DW_PORT_CULTURA=5433 +DB_DW_USER_CULTURA=postgres +DB_DW_PASS_CULTURA=postgres +DB_DW_DATABASE_CULTURA=postgres +DB_DW_SCHEMA_CULTURA=cultura + + +# <---------- MinIO ----------> + +MINIO_ENDPOINT=minio:9000 +MINIO_ACCESS_KEY=minioadmin +MINIO_SECRET_KEY=minioadmin +MINIO_BUCKET=data-lake-ipea + + +PYTHONPATH=/opt/airflow/dags:/opt/airflow/plugins:/opt/airflow/helpers From 8be698be8f48ce61b6ad4880ee326482eabc7e2b Mon Sep 17 00:00:00 2001 From: Caio Melo Borges Date: Thu, 11 Jun 2026 08:00:14 -0300 Subject: [PATCH 4/8] fix(cosmos): adiciona fallback nas env_vars do profiles.yml para corrigir parsing da DAG --- airflow_lappis/dags/dbt/cultura/cosmos_dag.py | 7 +++++-- airflow_lappis/dags/dbt/cultura/profiles.yml | 12 ++++++------ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/airflow_lappis/dags/dbt/cultura/cosmos_dag.py b/airflow_lappis/dags/dbt/cultura/cosmos_dag.py index 9ffdcc70..d5e0975f 100644 --- a/airflow_lappis/dags/dbt/cultura/cosmos_dag.py +++ b/airflow_lappis/dags/dbt/cultura/cosmos_dag.py @@ -1,6 +1,6 @@ import os from datetime import datetime - +from cosmos.profiles import PostgresUserPasswordProfileMapping from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig from cosmos.constants import DBT_LOG_PATH_ENVVAR @@ -9,9 +9,12 @@ os.environ[DBT_LOG_PATH_ENVVAR] = dbt_log_path profile_config = ProfileConfig( - profiles_yml_filepath=f"{os.environ['AIRFLOW_REPO_BASE']}/dags/dbt/cultura/profiles.yml", profile_name="cultura", target_name="prod", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="postgres_default", + profile_args={"schema": "cultura"}, + ) ) my_cosmos_dag = DbtDag( diff --git a/airflow_lappis/dags/dbt/cultura/profiles.yml b/airflow_lappis/dags/dbt/cultura/profiles.yml index ce58d3db..78e03022 100644 --- a/airflow_lappis/dags/dbt/cultura/profiles.yml +++ b/airflow_lappis/dags/dbt/cultura/profiles.yml @@ -3,10 +3,10 @@ cultura: outputs: prod: type: postgres - host: "{{ env_var('DB_DW_HOST_CULTURA') }}" - port: "{{ env_var('DB_DW_PORT_CULTURA') | int }}" - user: "{{ env_var('DB_DW_USER_CULTURA') }}" - password: "{{ env_var('DB_DW_PASS_CULTURA') }}" - dbname: "{{ env_var('DB_DW_DATABASE_CULTURA') }}" + host: "{{ env_var('DB_DW_HOST_CULTURA', 'dummy_host') }}" + port: "{{ env_var('DB_DW_PORT_CULTURA', '5432') | int }}" + user: "{{ env_var('DB_DW_USER_CULTURA', 'dummy_user') }}" + password: "{{ env_var('DB_DW_PASS_CULTURA', 'dummy_pass') }}" + dbname: "{{ env_var('DB_DW_DATABASE_CULTURA', 'dummy_db') }}" schema: "{{ env_var('DB_DW_SCHEMA_CULTURA', 'cultura') }}" - threads: 4 + threads: 4 \ No newline at end of file From 5f643b6b0c87b0c252f6ec06df9b2b55954158ca Mon Sep 17 00:00:00 2001 From: Caio Melo Borges Date: Tue, 16 Jun 2026 19:52:58 -0300 Subject: [PATCH 5/8] =?UTF-8?q?feat(meta5):=20implementa=20infer=C3=AAncia?= =?UTF-8?q?=20de=20veterania,=20resolve=20lock=20concorrente=20no=20banco?= =?UTF-8?q?=20e=20barra=20nulos=20nos=20modelos=20incrementais?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dags/dbt/cultura/macros/create_udfs.sql | 22 ++- .../agentes_dbt/bronze/lpg_agentes_pf.sql | 10 +- .../agentes_dbt/bronze/lpg_agentes_pj.sql | 8 +- .../agentes_dbt/bronze/pnab_agentes_pf.sql | 8 +- .../gold/perfil_acesso_fomento.sql | 84 ++++++++ .../gold/perfil_agentes_completo.sql | 97 +++++++++ .../gold/primeiro_acesso_contemplados.sql | 104 ++++++++++ .../models/agentes_dbt/gold/schema.yml | 185 ++++++++++++++++++ .../dags/dbt/cultura/models/sources.yml | 3 + airflow_lappis/plugins/extracao_planilhas.py | 9 +- pyproject.toml | 2 + 11 files changed, 512 insertions(+), 20 deletions(-) create mode 100644 airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/perfil_acesso_fomento.sql create mode 100644 airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/perfil_agentes_completo.sql create mode 100644 airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/primeiro_acesso_contemplados.sql diff --git a/airflow_lappis/dags/dbt/cultura/macros/create_udfs.sql b/airflow_lappis/dags/dbt/cultura/macros/create_udfs.sql index dd230cf5..e459f9b2 100644 --- a/airflow_lappis/dags/dbt/cultura/macros/create_udfs.sql +++ b/airflow_lappis/dags/dbt/cultura/macros/create_udfs.sql @@ -1,10 +1,22 @@ {% macro create_udfs() %} -create schema if not exists {{ target.schema }}; +CREATE SCHEMA IF NOT EXISTS {{ target.schema }}; - {{ create_f_parse_dates() }} - ; - {{ create_f_format_nc() }} - ; +DO $do$ +BEGIN + -- Serializa a criação das UDFs entre threads paralelas do dbt/Cosmos. + -- Sem isso, múltiplas conexões tentam CREATE OR REPLACE FUNCTION ao mesmo + -- tempo, causando "tuple concurrently updated" no catálogo pg_proc. + PERFORM pg_advisory_lock(hashtext('{{ target.schema }}_create_udfs')); + + {{ create_f_parse_dates() }} + {{ create_f_format_nc() }} + + PERFORM pg_advisory_unlock(hashtext('{{ target.schema }}_create_udfs')); +EXCEPTION WHEN OTHERS THEN + PERFORM pg_advisory_unlock(hashtext('{{ target.schema }}_create_udfs')); + RAISE; +END; +$do$; {% endmacro %} diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_pf.sql b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_pf.sql index 85ad8368..d66d9aa0 100644 --- a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_pf.sql +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_pf.sql @@ -8,9 +8,11 @@ SELECT LOWER(TRIM("já acessou recursos públicos do fomento à cultura anteriorme")) AS historico_acesso_bruto, 'LPG' AS programa_fomento FROM {{ source('transferegov_fundo_a_fundo', 'lpg_dados_pessoa_fisica') }} +WHERE "nº do cpf" IS NOT NULL + AND LOWER(TRIM("nº do cpf")) NOT IN ('', 'nan', 'none') {% if is_incremental() %} -WHERE LOWER(TRIM("nº do cpf")) NOT IN ( - SELECT identificador_unico FROM {{ this }} -) -{% endif %} + AND LOWER(TRIM("nº do cpf")) NOT IN ( + SELECT identificador_unico FROM {{ this }} + ) +{% endif %} \ No newline at end of file diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_pj.sql b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_pj.sql index 8bdb07f2..21a3e396 100644 --- a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_pj.sql +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_pj.sql @@ -8,9 +8,11 @@ SELECT LOWER(TRIM("já acessou recursos públicos do fomento à cultura anteriorme")) AS historico_acesso_bruto, 'LPG' AS programa_fomento FROM {{ source('transferegov_fundo_a_fundo', 'lpg_dados_pessoa_juridica') }} +WHERE "nº do cnpj" IS NOT NULL + AND LOWER(TRIM("nº do cnpj")) NOT IN ('', 'nan', 'none') {% if is_incremental() %} -WHERE LOWER(TRIM("nº do cnpj")) NOT IN ( - SELECT identificador_unico FROM {{ this }} -) + AND LOWER(TRIM("nº do cnpj")) NOT IN ( + SELECT identificador_unico FROM {{ this }} + ) {% endif %} diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/pnab_agentes_pf.sql b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/pnab_agentes_pf.sql index a5670205..f1e97b75 100644 --- a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/pnab_agentes_pf.sql +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/pnab_agentes_pf.sql @@ -8,9 +8,11 @@ SELECT LOWER(TRIM("já acessou recursos públicos do fomento à cultura nos últim")) AS historico_acesso_bruto, 'PNAB' AS programa_fomento FROM {{ source('transferegov_fundo_a_fundo', 'pnab_pessoas') }} +WHERE "nº do cpf" IS NOT NULL + AND LOWER(TRIM("nº do cpf")) NOT IN ('', 'nan', 'none') {% if is_incremental() %} -WHERE LOWER(TRIM("nº do cpf")) NOT IN ( - SELECT identificador_unico FROM {{ this }} -) + AND LOWER(TRIM("nº do cpf")) NOT IN ( + SELECT identificador_unico FROM {{ this }} + ) {% endif %} diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/perfil_acesso_fomento.sql b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/perfil_acesso_fomento.sql new file mode 100644 index 00000000..a599659a --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/perfil_acesso_fomento.sql @@ -0,0 +1,84 @@ +{{ config( + materialized='table' +) }} + +WITH todos_proponentes AS ( + SELECT + identificador_unico, + tipo_proponente, + programa_fomento, + historico_acesso_bruto + FROM {{ ref('identificadores_agentes') }} +), + +historico_limpo AS ( + SELECT + identificador_unico, + tipo_proponente, + programa_fomento, + CASE + WHEN historico_acesso_bruto IS NULL + OR LOWER(TRIM(historico_acesso_bruto)) IN ('', 'nan') + THEN NULL + ELSE TRIM( + REPLACE( + REPLACE( + REPLACE(historico_acesso_bruto, '.', ''), + ';', '' + ), + '"', '' + ) + ) + END AS historico_acesso_limpo + FROM todos_proponentes +), + +historico_ordenado AS ( + SELECT + identificador_unico, + tipo_proponente, + programa_fomento, + historico_acesso_limpo, + ROW_NUMBER() OVER ( + PARTITION BY identificador_unico + ORDER BY programa_fomento ASC + ) AS sequencia_fomento + FROM historico_limpo +) + +SELECT + identificador_unico, + tipo_proponente, + programa_fomento, + historico_acesso_limpo, + sequencia_fomento, + CASE + WHEN historico_acesso_limpo = 'sim' + THEN 'Confirmado - Primeira Vez' + WHEN historico_acesso_limpo IN ('não', 'nao', 'nâo') + THEN 'Confirmado - Veterano' + WHEN ( + historico_acesso_limpo IS NULL + OR historico_acesso_limpo IN ( + 'não sei informar', 'nao sei informar', + 'não informado', 'nao informado', + 'nao_declarar', + 'não sei', 'nao sei', + 'não sabe', 'nao sabe' + ) + ) AND sequencia_fomento = 1 + THEN 'Inferido - Primeira Vez (Estreante na base)' + WHEN ( + historico_acesso_limpo IS NULL + OR historico_acesso_limpo IN ( + 'não sei informar', 'nao sei informar', + 'não informado', 'nao informado', + 'nao_declarar', + 'não sei', 'nao sei', + 'não sabe', 'nao sabe' + ) + ) AND sequencia_fomento > 1 + THEN 'Inferido - Veterano (Possui histórico)' + ELSE 'Não sabe/Não informou' + END AS perfil_acesso_fomento +FROM historico_ordenado diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/perfil_agentes_completo.sql b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/perfil_agentes_completo.sql new file mode 100644 index 00000000..03adf2e5 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/perfil_agentes_completo.sql @@ -0,0 +1,97 @@ +{{ config(materialized='table') }} + +WITH perfil_base AS ( + SELECT + identificador_unico, + tipo_proponente, + programa_fomento, + historico_acesso_limpo, + sequencia_fomento, + perfil_acesso_fomento AS perfil_original + FROM {{ ref('perfil_acesso_fomento') }} +), + +-- Detecta veterania multi-programa: presença em > 1 programa confirma veterano independente +-- da resposta declarada +contagem_programas AS ( + SELECT + identificador_unico, + COUNT(DISTINCT programa_fomento) AS qtd_programas + FROM perfil_base + GROUP BY identificador_unico +), + +-- Registro canônico: primeiro programa de cada proponente (sequencia_fomento = 1) +-- Proxy temporal: LPG < PNAB alfabeticamente, refletindo a ordem de ingresso na base +primeiro_registro AS ( + SELECT DISTINCT ON (identificador_unico) + identificador_unico, + tipo_proponente, + programa_fomento, + historico_acesso_limpo, + perfil_original + FROM perfil_base + ORDER BY identificador_unico, sequencia_fomento ASC +), + +-- historico_acesso_bruto vem direto da fonte bronze, alinhado ao primeiro programa +bruto AS ( + SELECT DISTINCT ON (identificador_unico) + identificador_unico, + historico_acesso_bruto + FROM {{ ref('identificadores_agentes') }} + ORDER BY identificador_unico, programa_fomento ASC +), + +consolidado AS ( + SELECT + pr.identificador_unico, + pr.tipo_proponente, + pr.programa_fomento, + b.historico_acesso_bruto, + pr.historico_acesso_limpo, + pr.perfil_original, + c.qtd_programas + FROM primeiro_registro pr + LEFT JOIN bruto b + ON pr.identificador_unico = b.identificador_unico + LEFT JOIN contagem_programas c + ON pr.identificador_unico = c.identificador_unico +) + +SELECT + identificador_unico, + tipo_proponente, + programa_fomento, + historico_acesso_bruto, + + -- status_origem: 'Inferido' quando a veterania vem da presença multi-programa e + -- não de uma resposta explícita; 'Confirmado' quando o proponente respondeu sim/não + CASE + WHEN qtd_programas > 1 AND perfil_original != 'Confirmado - Veterano' + THEN 'Inferido' + WHEN perfil_original LIKE 'Confirmado%' + THEN 'Confirmado' + WHEN perfil_original LIKE 'Inferido%' + THEN 'Inferido' + ELSE 'Não Informado' + END AS status_origem, + + -- perfil_classificacao: classificação final de veterania, consolidada em 1 linha + CASE + WHEN qtd_programas > 1 AND perfil_original = 'Confirmado - Veterano' + THEN 'Veterano' + WHEN qtd_programas > 1 + THEN 'Veterano (Multi-Programa)' + WHEN perfil_original = 'Confirmado - Primeira Vez' + THEN 'Primeira Vez' + WHEN perfil_original = 'Confirmado - Veterano' + THEN 'Veterano' + WHEN perfil_original = 'Inferido - Primeira Vez (Estreante na base)' + THEN 'Provável Primeira Vez' + WHEN perfil_original = 'Inferido - Veterano (Possui histórico)' + THEN 'Provável Veterano' + ELSE 'Indeterminado' + END AS perfil_classificacao + +FROM consolidado diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/primeiro_acesso_contemplados.sql b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/primeiro_acesso_contemplados.sql new file mode 100644 index 00000000..974218a0 --- /dev/null +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/primeiro_acesso_contemplados.sql @@ -0,0 +1,104 @@ +{{ config(materialized='table') }} + +-- Normaliza identificadores para dígitos puro, eliminando diferenças de +-- formatação entre tabelas (CPF com pontos/traço vs. só dígitos, CNPJ com +-- pontos/barra vs. só dígitos). +-- Usada em ambos os lados do JOIN para garantir correspondência consistente. + +WITH contemplados_lpg AS ( + SELECT DISTINCT + REGEXP_REPLACE(LOWER(TRIM("cpf ou cnpj")), '[^0-9]', '', 'g') AS id_normalizado, + 'LPG' AS programa_fomento + FROM {{ source('transferegov_fundo_a_fundo', 'lpg_contemplados') }} + WHERE "cpf ou cnpj" IS NOT NULL + AND LOWER(TRIM("cpf ou cnpj")) NOT IN ('nan', '', 'cpf ou cnpj') + AND LENGTH(REGEXP_REPLACE(TRIM("cpf ou cnpj"), '[^0-9]', '', 'g')) >= 11 +), + +-- PNCV fornece CPF real dos contemplados PNAB (PF) +contemplados_pnab_pncv AS ( + SELECT DISTINCT + REGEXP_REPLACE(TRIM(cpf), '[^0-9]', '', 'g') AS id_normalizado, + 'PNAB' AS programa_fomento + FROM {{ source('transferegov_fundo_a_fundo', 'raw_pnab_lista_contemplados_pncv') }} + WHERE cpf IS NOT NULL + AND LOWER(TRIM(cpf)) NOT IN ('nan', '', 'cpf') + AND LENGTH(REGEXP_REPLACE(TRIM(cpf), '[^0-9]', '', 'g')) = 11 +), + +-- Lista geral PNAB: CPF está anonimizado (***XXXXXX**) — só o CNPJ é utilizável +-- Cobertura parcial: PJ contemplada via PNAB geral sem registro no PNCV fica de fora +contemplados_pnab_geral_pj AS ( + SELECT DISTINCT + REGEXP_REPLACE(TRIM(cnpj), '[^0-9]', '', 'g') AS id_normalizado, + 'PNAB' AS programa_fomento + FROM {{ source('transferegov_fundo_a_fundo', 'raw_pnab_lista_contemplados_geral') }} + WHERE cnpj IS NOT NULL + AND LOWER(TRIM(cnpj)) NOT IN ('nan', '', 'cnpj') + AND LENGTH(REGEXP_REPLACE(TRIM(cnpj), '[^0-9]', '', 'g')) = 14 +), + +todos_contemplados AS ( + SELECT id_normalizado, programa_fomento FROM contemplados_lpg + UNION + SELECT id_normalizado, programa_fomento FROM contemplados_pnab_pncv + UNION + SELECT id_normalizado, programa_fomento FROM contemplados_pnab_geral_pj +), + +-- Base: perfil_acesso_fomento tem 1 linha por (identificador × programa), +-- preservando a granularidade por programa para o JOIN de contemplação +perfil_base AS ( + SELECT + identificador_unico, + programa_fomento, + CASE + WHEN perfil_acesso_fomento IN ( + 'Confirmado - Primeira Vez', + 'Inferido - Primeira Vez (Estreante na base)' + ) THEN 'Sim' + WHEN perfil_acesso_fomento IN ( + 'Confirmado - Veterano', + 'Inferido - Veterano (Possui histórico)' + ) THEN 'Não' + ELSE 'Não sabe/Não informou' + END AS categoria_primeiro_acesso, + CASE + WHEN perfil_acesso_fomento LIKE 'Confirmado%' THEN 'Confirmado' + WHEN perfil_acesso_fomento LIKE 'Inferido%' THEN 'Inferido' + ELSE 'Não Informado' + END AS status_dado + FROM {{ ref('perfil_acesso_fomento') }} +), + +perfil_com_contemplado AS ( + SELECT + pb.identificador_unico, + pb.programa_fomento, + pb.categoria_primeiro_acesso, + pb.status_dado, + CASE + WHEN tc.id_normalizado IS NOT NULL THEN 'sim' + ELSE 'não' + END AS contemplado + FROM perfil_base pb + LEFT JOIN todos_contemplados tc + ON REGEXP_REPLACE(pb.identificador_unico, '[^0-9]', '', 'g') = tc.id_normalizado + AND pb.programa_fomento = tc.programa_fomento +) + +SELECT + programa_fomento, + categoria_primeiro_acesso, + contemplado, + COUNT(DISTINCT identificador_unico) AS total_proponentes, + COUNT(DISTINCT CASE WHEN status_dado = 'Confirmado' THEN identificador_unico END) AS total_campo_preenchido, + COUNT(DISTINCT CASE WHEN status_dado = 'Inferido' THEN identificador_unico END) AS total_inferido, + ROUND( + COUNT(DISTINCT identificador_unico)::NUMERIC + / SUM(COUNT(DISTINCT identificador_unico)) OVER (PARTITION BY programa_fomento, contemplado) + * 100, 2 + ) AS percentual +FROM perfil_com_contemplado +GROUP BY programa_fomento, categoria_primeiro_acesso, contemplado +ORDER BY programa_fomento, contemplado DESC, total_proponentes DESC diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/schema.yml b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/schema.yml index fd56c6b2..565d4c49 100644 --- a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/schema.yml +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/schema.yml @@ -1,6 +1,191 @@ version: 2 models: + - name: perfil_agentes_completo + description: > + Camada Gold — Master Data de proponentes (Meta 5). + Grain: 1 linha por identificador_unico (CPF/CNPJ). Consolida perfil_acesso_fomento + eliminando duplicatas entre programas: proponentes presentes em mais de um programa + têm veterania confirmada por presença multi-programa, sobrepondo a resposta declarada. + Serve como base para dois consumos: + - Relatório consolidado (sem inferência): filtrar WHERE status_origem = 'Confirmado' + - Visão analítica (com inferência): consumir todos os registros + config: + meta: + meta_objetivo: 'Meta 5 — Master Data de Proponentes' + origem_dados: 'perfil_acesso_fomento e identificadores_agentes' + tags: [gold, agentes, master_data, meta5, deduplicado] + columns: + - name: identificador_unico + description: CPF ou CNPJ do proponente (normalizado). Chave única do modelo. + tests: + - not_null + - unique + - name: tipo_proponente + description: > + Classificação do proponente — 'Pessoa Física', 'Pessoa Jurídica', + 'Coletivo' ou 'Organização'. + tests: + - not_null + - name: programa_fomento + description: > + Programa de origem do primeiro registro do proponente (proxy temporal: + LPG precede PNAB). Reflete o contexto da análise inicial do perfil. + tests: + - not_null + - name: historico_acesso_bruto + description: > + Resposta original do proponente sobre acesso anterior a recursos públicos + (dado cru, sem normalização), alinhado ao primeiro programa de ingresso. + - name: status_origem + description: > + Flag de qualidade: 'Confirmado' — proponente respondeu explicitamente sim/não; + 'Inferido' — classificação derivada da sequência de programas ou presença + multi-programa; 'Não Informado' — dado ausente ou inválido. + tests: + - not_null + - accepted_values: + values: ['Confirmado', 'Inferido', 'Não Informado'] + - name: perfil_classificacao + description: > + Classificação final consolidada (1 linha por proponente): + 'Primeira Vez' — confirmado pelo proponente; + 'Veterano' — confirmado pelo proponente ou por presença em >1 programa com resposta vet.; + 'Veterano (Multi-Programa)' — veterania inferida pela presença em múltiplos programas; + 'Provável Primeira Vez' — inferido por ser estreante na base; + 'Provável Veterano' — inferido por sequência histórica; + 'Indeterminado' — dado insuficiente para classificação. + tests: + - not_null + - accepted_values: + values: + - 'Primeira Vez' + - 'Veterano' + - 'Veterano (Multi-Programa)' + - 'Provável Primeira Vez' + - 'Provável Veterano' + - 'Indeterminado' + + - name: perfil_acesso_fomento + description: > + Camada Gold — Meta 5 (Mapeamento de agentes culturais acessando fomento + pela primeira vez). Classifica cada proponente por programa com a coluna + perfil_acesso_fomento. Respostas confirmadas ('Sim'/'Não') são mantidas; + respostas nulas ou omitidas recebem inferência cronológica via ROW_NUMBER() + particionado por CPF/CNPJ e ordenado por programa_fomento (proxy temporal: + LPG precede PNAB). Um proponente com sequencia_fomento > 1 acessou mais + de um programa, evidenciando veterania. + config: + meta: + meta_objetivo: 'Meta 5 — Inferência de Primeiro Acesso' + origem_dados: 'LPG e PNAB (via identificadores_agentes)' + tags: [gold, agentes, primeiro_acesso, meta5, inferencia] + columns: + - name: identificador_unico + description: CPF ou CNPJ do proponente (já normalizado em minúsculas e sem espaços). + tests: + - not_null + - name: tipo_proponente + description: Classificação do proponente — 'Pessoa Física', 'Pessoa Jurídica', 'Coletivo' ou 'Organização'. + tests: + - not_null + - name: programa_fomento + description: Programa de fomento de origem — 'LPG' ou 'PNAB'. + tests: + - not_null + - name: historico_acesso_limpo + description: Resposta bruta normalizada (lowercase, sem pontuação). NULL quando omitida ou inválida. + - name: sequencia_fomento + description: > + Ordem de acesso do proponente dentro de seu histórico, particionado por + identificador_unico e ordenado por programa_fomento (proxy temporal). + sequencia_fomento = 1 indica o primeiro registro; > 1 indica veterano. + tests: + - not_null + - name: perfil_acesso_fomento + description: > + Classificação final para a Meta 5: + 'Confirmado - Primeira Vez' | 'Confirmado - Veterano' | + 'Inferido - Primeira Vez (Estreante na base)' | + 'Inferido - Veterano (Possui histórico)' | + 'Não sabe/Não informou'. + tests: + - not_null + - accepted_values: + values: + - 'Confirmado - Primeira Vez' + - 'Confirmado - Veterano' + - 'Inferido - Primeira Vez (Estreante na base)' + - 'Inferido - Veterano (Possui histórico)' + - 'Não sabe/Não informou' + + - name: primeiro_acesso_contemplados + description: > + Camada Gold — Meta 5 com flag de contemplação. Mesma estrutura de + primeiro_acesso_resumo com duas dimensões adicionais: se o proponente + foi contemplado em edital e a discriminação entre dados confirmados + (resposta declarada no formulário) vs. inferidos (derivados por sequência + ou presença multi-programa). Grain: 1 linha por + (programa_fomento × categoria_primeiro_acesso × contemplado). + + Cobertura de contemplados: + - LPG: CPF e CNPJ completos via lpg_contemplados — cobertura total. + - PNAB PNCV: CPF real via raw_pnab_lista_contemplados_pncv — cobertura PF. + - PNAB Geral: apenas CNPJ via raw_pnab_lista_contemplados_geral — CPF + anonimizado (***XXXXXX**), PF da lista geral fora do PNCV não é rastreável. + config: + meta: + meta_objetivo: 'Meta 5 — Primeiro Acesso com Contemplação' + origem_dados: 'perfil_acesso_fomento + lpg_contemplados + pnab_contemplados' + tags: [gold, agentes, meta5, contemplados, primeiro_acesso] + columns: + - name: programa_fomento + description: Programa de fomento — 'LPG' ou 'PNAB'. + tests: + - not_null + - accepted_values: + values: ['LPG', 'PNAB'] + - name: categoria_primeiro_acesso + description: > + Categoria de acesso derivada do perfil_acesso_fomento: + 'Sim' (Primeira Vez), 'Não' (Veterano) ou 'Não sabe/Não informou'. + tests: + - not_null + - accepted_values: + values: ['Sim', 'Não', 'Não sabe/Não informou'] + - name: contemplado + description: > + Flag de contemplação em edital público — 'sim' se o identificador + aparece em alguma lista de contemplados do respectivo programa, + 'não' caso contrário. + tests: + - not_null + - accepted_values: + values: ['sim', 'não'] + - name: total_proponentes + description: Contagem distinta de proponentes no cruzamento programa × categoria × contemplado. + tests: + - not_null + - name: total_campo_preenchido + description: > + Proponentes onde a categoria foi determinada por resposta declarada + no formulário (status_dado = 'Confirmado'). Exclui inferências. + tests: + - not_null + - name: total_inferido + description: > + Proponentes onde a categoria foi inferida por sequência de programas + ou presença multi-programa (status_dado = 'Inferido'). + tests: + - not_null + - name: percentual + description: > + Percentual da linha sobre o total de proponentes do mesmo + programa_fomento e status de contemplação. Cada combinação + (programa × contemplado) soma 100% independentemente. + tests: + - not_null + - name: primeiro_acesso_resumo description: > Camada Gold — resumo agregado da Meta 5 (Indicador de Primeiro Acesso). diff --git a/airflow_lappis/dags/dbt/cultura/models/sources.yml b/airflow_lappis/dags/dbt/cultura/models/sources.yml index 96411804..d0a090ef 100644 --- a/airflow_lappis/dags/dbt/cultura/models/sources.yml +++ b/airflow_lappis/dags/dbt/cultura/models/sources.yml @@ -9,3 +9,6 @@ sources: - name: lpg_dados_coletivos - name: pnab_pessoas - name: pnab_organizacoes + - name: lpg_contemplados + - name: raw_pnab_lista_contemplados_geral + - name: raw_pnab_lista_contemplados_pncv diff --git a/airflow_lappis/plugins/extracao_planilhas.py b/airflow_lappis/plugins/extracao_planilhas.py index a1d4b0ff..64791c23 100644 --- a/airflow_lappis/plugins/extracao_planilhas.py +++ b/airflow_lappis/plugins/extracao_planilhas.py @@ -109,19 +109,18 @@ def normalizar_nome(nome: Any) -> str: ".xlsm": "calamine", ".xls": "calamine", ".xlsb": "calamine", - # .ods intencionalmente REMOVIDO — a engine odf (fallback) carrega - # o DOM XML inteiro em memória e causa OOM Kills em planilhas grandes. - # Calamine pode ler .ods, mas se falhar o fallback é destrutivo. + ".ods": "calamine", } # Extensões proibidas: causam OOM no Worker do Airflow -_EXTENSOES_PROIBIDAS = {".ods"} +_EXTENSOES_PROIBIDAS: set[str] = set() def detectar_engine(file_path: str | Path) -> Optional[str]: """Retorna a engine pandas adequada para a extensão do arquivo. - Retorna ``None`` para extensões proibidas (.ods), emitindo warning. + Extensões proibidas (``_EXTENSOES_PROIBIDAS``) retornam ``None`` + com warning, mas atualmente nenhuma extensão é proibida. """ ext = Path(file_path).suffix.lower() if ext in _EXTENSOES_PROIBIDAS: diff --git a/pyproject.toml b/pyproject.toml index 74d883aa..3d9c3cad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,8 @@ sqlalchemy = "*" zeep = "*" imap-tools = "*" astronomer-cosmos = "*" +odfpy = "^1.4.1" +python-calamine = "^0.6.2" [tool.poetry.group.dev.dependencies] black = "*" From 20a2b8d6f97c9847afc99f361c3aca3dcfd8a034 Mon Sep 17 00:00:00 2001 From: Caio Melo Borges Date: Wed, 17 Jun 2026 13:10:32 -0300 Subject: [PATCH 6/8] fix(meta5): corrige falso negativo de contemplados na LPG --- .../dags/dbt/cultura/dbt_project.yml | 3 - .../dags/dbt/cultura/macros/create_udfs.sql | 37 ++++++---- .../gold/primeiro_acesso_contemplados.sql | 74 +++++++++++++++++-- 3 files changed, 91 insertions(+), 23 deletions(-) diff --git a/airflow_lappis/dags/dbt/cultura/dbt_project.yml b/airflow_lappis/dags/dbt/cultura/dbt_project.yml index fc956772..101ccb44 100644 --- a/airflow_lappis/dags/dbt/cultura/dbt_project.yml +++ b/airflow_lappis/dags/dbt/cultura/dbt_project.yml @@ -28,6 +28,3 @@ models: +materialized: incremental views: +materialized: view - -on-run-start: - - '{{ create_udfs() }}' diff --git a/airflow_lappis/dags/dbt/cultura/macros/create_udfs.sql b/airflow_lappis/dags/dbt/cultura/macros/create_udfs.sql index e459f9b2..850b1e85 100644 --- a/airflow_lappis/dags/dbt/cultura/macros/create_udfs.sql +++ b/airflow_lappis/dags/dbt/cultura/macros/create_udfs.sql @@ -1,22 +1,33 @@ {% macro create_udfs() %} +{# + Cria/atualiza as UDFs do projeto via run_query, isolada do on-run-start. -CREATE SCHEMA IF NOT EXISTS {{ target.schema }}; + Antes, esta macro era chamada em todo on-run-start do dbt — como o Cosmos + dispara um processo dbt por task/modelo em paralelo, várias conexões + executavam CREATE OR REPLACE FUNCTION ao mesmo tempo, causando + "tuple concurrently updated" no catálogo pg_proc do Postgres. -DO $do$ -BEGIN - -- Serializa a criação das UDFs entre threads paralelas do dbt/Cosmos. - -- Sem isso, múltiplas conexões tentam CREATE OR REPLACE FUNCTION ao mesmo - -- tempo, causando "tuple concurrently updated" no catálogo pg_proc. - PERFORM pg_advisory_lock(hashtext('{{ target.schema }}_create_udfs')); + Agora ela só deve ser chamada explicitamente, uma única vez, via: + dbt run-operation create_udfs + em uma task isolada do Airflow, executada antes das tasks de modelo + geradas pelo Cosmos — eliminando a concorrência na origem. +#} +{% set create_schema_sql %} + CREATE SCHEMA IF NOT EXISTS {{ target.schema }}; +{% endset %} +{% do run_query(create_schema_sql) %} + +{% set parse_date_sql %} {{ create_f_parse_dates() }} +{% endset %} +{% do run_query(parse_date_sql) %} + +{% set format_nc_sql %} {{ create_f_format_nc() }} +{% endset %} +{% do run_query(format_nc_sql) %} - PERFORM pg_advisory_unlock(hashtext('{{ target.schema }}_create_udfs')); -EXCEPTION WHEN OTHERS THEN - PERFORM pg_advisory_unlock(hashtext('{{ target.schema }}_create_udfs')); - RAISE; -END; -$do$; +{% do log("UDFs criadas/atualizadas no schema " ~ target.schema, info=True) %} {% endmacro %} diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/primeiro_acesso_contemplados.sql b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/primeiro_acesso_contemplados.sql index 974218a0..0e2c3861 100644 --- a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/primeiro_acesso_contemplados.sql +++ b/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/primeiro_acesso_contemplados.sql @@ -4,15 +4,58 @@ -- formatação entre tabelas (CPF com pontos/traço vs. só dígitos, CNPJ com -- pontos/barra vs. só dígitos). -- Usada em ambos os lados do JOIN para garantir correspondência consistente. +-- +-- ATENÇÃO — CPF mascarado dos proponentes LPG: a base de proponentes +-- (lpg_agentes_pf/coletivos) traz o CPF anonimizado no formato +-- "***.NNN.NNN-**" (apenas os 6 dígitos centrais visíveis), enquanto +-- lpg_contemplados traz o CPF completo. Um match exato (dígito a dígito) +-- nunca ocorre para esses casos, gerando falso negativo sistemático de +-- 'contemplado' para LPG. Por isso, quando o identificador do proponente +-- vier mascarado, o JOIN usa um match parcial pelo "miolo" do CPF +-- (posições 4-9, os mesmos 6 dígitos centrais expostos pela máscara). +-- Esse match parcial tem risco de colisão entre CPFs com miolo igual. +-- +-- ATENÇÃO — colunas "fantasma" em lpg_contemplados: a ingestão dinâmica de +-- planilhas (extracao_planilhas.py) não normaliza espaços/caracteres +-- invisíveis (ex.: NBSP) nos nomes de coluna, então o header "CPF ou CNPJ" +-- de arquivos diferentes pode virar colunas distintas no Postgres +-- (ex.: "cpf ou cnpj" vs. "cpf ou cnpj"), cada uma com parte dos +-- dados. Por isso a coluna de CPF/CNPJ é resolvida dinamicamente via +-- information_schema (qualquer coluna cujo nome contenha "cpf" e "cnpj"), +-- em vez de um nome fixo — o que tornaria a maior parte dos contemplados +-- invisível para o JOIN. -WITH contemplados_lpg AS ( +{% set cpf_cnpj_cols_query %} + SELECT column_name + FROM information_schema.columns + WHERE table_schema = '{{ source('transferegov_fundo_a_fundo', 'lpg_contemplados').schema }}' + AND table_name = '{{ source('transferegov_fundo_a_fundo', 'lpg_contemplados').identifier }}' + AND column_name ILIKE '%cpf%cnpj%' + ORDER BY column_name +{% endset %} +{% set cpf_cnpj_results = run_query(cpf_cnpj_cols_query) %} +{% set cpf_cnpj_cols = cpf_cnpj_results.columns[0].values() if execute else ['cpf ou cnpj'] %} + +WITH contemplados_lpg_raw AS ( + SELECT + COALESCE( + {% for col in cpf_cnpj_cols %} + NULLIF(LOWER(TRIM("{{ col }}")), 'nan') + {%- if not loop.last %}, + {% endif %} + {% endfor %} + ) AS cpf_cnpj_bruto + FROM {{ source('transferegov_fundo_a_fundo', 'lpg_contemplados') }} +), + +contemplados_lpg AS ( SELECT DISTINCT - REGEXP_REPLACE(LOWER(TRIM("cpf ou cnpj")), '[^0-9]', '', 'g') AS id_normalizado, + REGEXP_REPLACE(cpf_cnpj_bruto, '[^0-9]', '', 'g') AS id_normalizado, 'LPG' AS programa_fomento - FROM {{ source('transferegov_fundo_a_fundo', 'lpg_contemplados') }} - WHERE "cpf ou cnpj" IS NOT NULL - AND LOWER(TRIM("cpf ou cnpj")) NOT IN ('nan', '', 'cpf ou cnpj') - AND LENGTH(REGEXP_REPLACE(TRIM("cpf ou cnpj"), '[^0-9]', '', 'g')) >= 11 + FROM contemplados_lpg_raw + WHERE cpf_cnpj_bruto IS NOT NULL + AND cpf_cnpj_bruto NOT IN ('', 'cpf ou cnpj') + AND LENGTH(REGEXP_REPLACE(cpf_cnpj_bruto, '[^0-9]', '', 'g')) >= 11 ), -- PNCV fornece CPF real dos contemplados PNAB (PF) @@ -46,6 +89,18 @@ todos_contemplados AS ( SELECT id_normalizado, programa_fomento FROM contemplados_pnab_geral_pj ), +-- Miolo (6 dígitos centrais) do CPF, usado para casar com identificadores +-- mascarados de proponentes ("***.NNN.NNN-**"). Só faz sentido para CPF +-- (11 dígitos); CNPJ (14 dígitos) não é mascarado na base de proponentes. +todos_contemplados_miolo AS ( + SELECT + id_normalizado, + programa_fomento, + SUBSTRING(id_normalizado FROM 4 FOR 6) AS miolo_cpf + FROM todos_contemplados + WHERE LENGTH(id_normalizado) = 11 +), + -- Base: perfil_acesso_fomento tem 1 linha por (identificador × programa), -- preservando a granularidade por programa para o JOIN de contemplação perfil_base AS ( @@ -78,13 +133,18 @@ perfil_com_contemplado AS ( pb.categoria_primeiro_acesso, pb.status_dado, CASE - WHEN tc.id_normalizado IS NOT NULL THEN 'sim' + WHEN tc.id_normalizado IS NOT NULL OR tcm.id_normalizado IS NOT NULL THEN 'sim' ELSE 'não' END AS contemplado FROM perfil_base pb LEFT JOIN todos_contemplados tc ON REGEXP_REPLACE(pb.identificador_unico, '[^0-9]', '', 'g') = tc.id_normalizado AND pb.programa_fomento = tc.programa_fomento + LEFT JOIN todos_contemplados_miolo tcm + ON pb.identificador_unico LIKE '%*%' + AND LENGTH(REGEXP_REPLACE(pb.identificador_unico, '[^0-9]', '', 'g')) = 6 + AND REGEXP_REPLACE(pb.identificador_unico, '[^0-9]', '', 'g') = tcm.miolo_cpf + AND pb.programa_fomento = tcm.programa_fomento ) SELECT From 872ecdf5defd73758943c35421a0f84a340a8cb1 Mon Sep 17 00:00:00 2001 From: Caio Melo Borges Date: Thu, 18 Jun 2026 19:10:15 -0300 Subject: [PATCH 7/8] fix: resolve conflito de dependencias no pyproject.toml --- pyproject.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index c8dedc11..0aa723f3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,10 +19,9 @@ pandas = "*" requests = "*" zeep = "*" imap-tools = "*" -astronomer-cosmos = "*" +astronomer-cosmos = "1.14.2" odfpy = "^1.4.1" python-calamine = "^0.6.2" -astronomer-cosmos = "1.14.2" [tool.poetry.group.dev.dependencies] black = "*" From a77d1d0dd2f7cca3378f917ef34781f261bdce90 Mon Sep 17 00:00:00 2001 From: devwallyson Date: Wed, 1 Jul 2026 15:48:29 -0300 Subject: [PATCH 8/8] Refactor dbt file --- airflow_lappis/dags/dbt/cultura/.user.yml | 1 - airflow_lappis/dags/dbt/cultura/cosmos_dag.py | 33 --------- .../dags/dbt/cultura/dbt_project.yml | 30 -------- .../models/metadata/models_metadata.sql | 67 ------------------ .../dbt/cultura/models/metadata/schema.yml | 45 ------------ .../dags/dbt/cultura/models/sources.yml | 14 ---- airflow_lappis/dags/dbt/cultura/profiles.yml | 12 ---- dbt/minc/dbt_project.yml | 19 ++++-- .../dbt/cultura => dbt/minc}/descriptions.yml | 0 .../minc}/macros/create_udfs.sql | 0 .../macros/data_quality/row_count_match.sql | 0 .../data_quality/verificacao_tipagem.sql | 0 .../minc}/macros/get_custom_schema.sql | 0 .../macros/metadata/generate_metadata.sql | 0 .../minc}/macros/parse_financial_value.sql | 0 .../cultura => dbt/minc}/macros/schema.yml | 0 .../minc}/macros/udfs/f_format_nc.sql | 0 .../minc}/macros/udfs/f_parse_dates.sql | 0 .../bronze/lpg_agentes_coletivos.sql | 0 .../agentes_dbt/bronze/lpg_agentes_pf.sql | 0 .../agentes_dbt/bronze/lpg_agentes_pj.sql | 0 .../agentes_dbt/bronze/pnab_agentes_pf.sql | 0 .../agentes_dbt/bronze/pnab_agentes_pj.sql | 0 .../models/agentes_dbt/bronze/schema.yml | 0 .../gold/perfil_acesso_fomento.sql | 0 .../gold/perfil_agentes_completo.sql | 0 .../gold/primeiro_acesso_contemplados.sql | 0 .../gold/primeiro_acesso_resumo.sql | 0 .../minc}/models/agentes_dbt/gold/schema.yml | 0 .../silver/perfil_agentes_historico.sql | 0 .../models/agentes_dbt/silver/schema.yml | 0 .../views/identificadores_agentes.sql | 0 .../minc}/models/agentes_dbt/views/schema.yml | 0 dbt/minc/models/metadata/models_metadata.sql | 68 ++++++++++++------- dbt/minc/models/metadata/schema.yml | 28 +++++++- dbt/minc/models/sources.yml | 12 ++-- dbt/minc/profiles.yml | 15 ++-- 37 files changed, 97 insertions(+), 247 deletions(-) delete mode 100644 airflow_lappis/dags/dbt/cultura/.user.yml delete mode 100644 airflow_lappis/dags/dbt/cultura/cosmos_dag.py delete mode 100644 airflow_lappis/dags/dbt/cultura/dbt_project.yml delete mode 100644 airflow_lappis/dags/dbt/cultura/models/metadata/models_metadata.sql delete mode 100644 airflow_lappis/dags/dbt/cultura/models/metadata/schema.yml delete mode 100644 airflow_lappis/dags/dbt/cultura/models/sources.yml delete mode 100644 airflow_lappis/dags/dbt/cultura/profiles.yml rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/descriptions.yml (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/macros/create_udfs.sql (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/macros/data_quality/row_count_match.sql (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/macros/data_quality/verificacao_tipagem.sql (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/macros/get_custom_schema.sql (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/macros/metadata/generate_metadata.sql (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/macros/parse_financial_value.sql (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/macros/schema.yml (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/macros/udfs/f_format_nc.sql (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/macros/udfs/f_parse_dates.sql (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/models/agentes_dbt/bronze/lpg_agentes_coletivos.sql (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/models/agentes_dbt/bronze/lpg_agentes_pf.sql (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/models/agentes_dbt/bronze/lpg_agentes_pj.sql (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/models/agentes_dbt/bronze/pnab_agentes_pf.sql (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/models/agentes_dbt/bronze/pnab_agentes_pj.sql (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/models/agentes_dbt/bronze/schema.yml (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/models/agentes_dbt/gold/perfil_acesso_fomento.sql (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/models/agentes_dbt/gold/perfil_agentes_completo.sql (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/models/agentes_dbt/gold/primeiro_acesso_contemplados.sql (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/models/agentes_dbt/gold/primeiro_acesso_resumo.sql (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/models/agentes_dbt/gold/schema.yml (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/models/agentes_dbt/silver/perfil_agentes_historico.sql (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/models/agentes_dbt/silver/schema.yml (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/models/agentes_dbt/views/identificadores_agentes.sql (100%) rename {airflow_lappis/dags/dbt/cultura => dbt/minc}/models/agentes_dbt/views/schema.yml (100%) diff --git a/airflow_lappis/dags/dbt/cultura/.user.yml b/airflow_lappis/dags/dbt/cultura/.user.yml deleted file mode 100644 index 24b47f34..00000000 --- a/airflow_lappis/dags/dbt/cultura/.user.yml +++ /dev/null @@ -1 +0,0 @@ -id: 7c7b491c-e2c6-4636-9e72-b0ae2d207a9b diff --git a/airflow_lappis/dags/dbt/cultura/cosmos_dag.py b/airflow_lappis/dags/dbt/cultura/cosmos_dag.py deleted file mode 100644 index d5e0975f..00000000 --- a/airflow_lappis/dags/dbt/cultura/cosmos_dag.py +++ /dev/null @@ -1,33 +0,0 @@ -import os -from datetime import datetime -from cosmos.profiles import PostgresUserPasswordProfileMapping -from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig -from cosmos.constants import DBT_LOG_PATH_ENVVAR - -dbt_log_path = "/tmp/dbt_logs" -os.makedirs(dbt_log_path, exist_ok=True) -os.environ[DBT_LOG_PATH_ENVVAR] = dbt_log_path - -profile_config = ProfileConfig( - profile_name="cultura", - target_name="prod", - profile_mapping=PostgresUserPasswordProfileMapping( - conn_id="postgres_default", - profile_args={"schema": "cultura"}, - ) -) - -my_cosmos_dag = DbtDag( - project_config=ProjectConfig( - f"{os.environ['AIRFLOW_REPO_BASE']}/dags/dbt/cultura" - ), - profile_config=profile_config, - execution_config=ExecutionConfig( - dbt_executable_path=f"{os.environ['AIRFLOW_REPO_BASE']}/.local/bin/dbt", - ), - schedule_interval="0 1 * * *", - start_date=datetime(2025, 1, 1), - catchup=False, - dag_id="cultura_cosmos_dag", - default_args={"retries": 2}, -) diff --git a/airflow_lappis/dags/dbt/cultura/dbt_project.yml b/airflow_lappis/dags/dbt/cultura/dbt_project.yml deleted file mode 100644 index 101ccb44..00000000 --- a/airflow_lappis/dags/dbt/cultura/dbt_project.yml +++ /dev/null @@ -1,30 +0,0 @@ -name: 'cultura' - -version: 1.0.0 -config-version: 2 - -profile: cultura - -model-paths: ["models"] -analysis-paths: ["analyses"] -seed-paths: ["seeds"] -test-paths: ["tests"] -macro-paths: ["macros"] - -clean-targets: - - "target" - - "dbt_packages" - - "logs" - -models: - cultura: - metadata: - +materialized: incremental - +schema: metadata - agentes_dbt: - +materialized: table - +schema: agentes - bronze: - +materialized: incremental - views: - +materialized: view diff --git a/airflow_lappis/dags/dbt/cultura/models/metadata/models_metadata.sql b/airflow_lappis/dags/dbt/cultura/models/metadata/models_metadata.sql deleted file mode 100644 index f1981994..00000000 --- a/airflow_lappis/dags/dbt/cultura/models/metadata/models_metadata.sql +++ /dev/null @@ -1,67 +0,0 @@ -{{ - config( - materialized='incremental', - unique_key=['schema_name', 'table_name'], - on_schema_change='sync_all_columns' - ) -}} - -{# - Tabela de Metadados dos Modelos dbt - =================================== - - Esta tabela armazena metadados de todos os modelos executados no dbt. - - Campos principais: - - schema_name: Schema do modelo - - table_name: Nome da tabela/modelo - - dt_transform: Data da última transformação (quando o modelo foi executado) - - run_id: ID único da execução do dbt - - A tabela é atualizada de forma incremental, mantendo apenas o registro - mais recente para cada combinação de schema + table_name. -#} - -WITH dbt_models AS ( - {# - Usando a função graph do dbt para iterar sobre todos os modelos do projeto. - Isso garante que capturamos metadados de todos os modelos definidos. - #} - {% set models_data = [] %} - - {% for node in graph.nodes.values() %} - {% if node.resource_type == 'model' %} - {% do models_data.append({ - 'schema_name': node.schema, - 'table_name': node.name, - 'database_name': node.database, - 'materialization': node.config.materialized, - 'description': node.description | default('') | replace("'", "''") - }) %} - {% endif %} - {% endfor %} - - {% for model in models_data %} - SELECT - '{{ model.schema_name }}' AS schema_name, - '{{ model.table_name }}' AS table_name, - '{{ model.database_name }}' AS database_name, - '{{ model.materialization }}' AS materialization, - '{{ model.description[:500] }}' AS description, - ('{{ run_started_at }}'::TIMESTAMP AT TIME ZONE 'UTC' AT TIME ZONE 'America/Sao_Paulo') AS dt_transform, - '{{ invocation_id }}' AS run_id - {% if not loop.last %} - UNION ALL - {% endif %} - {% endfor %} -) - -SELECT - schema_name, - table_name, - database_name, - materialization, - description, - dt_transform, - run_id -FROM dbt_models diff --git a/airflow_lappis/dags/dbt/cultura/models/metadata/schema.yml b/airflow_lappis/dags/dbt/cultura/models/metadata/schema.yml deleted file mode 100644 index bb85b1e5..00000000 --- a/airflow_lappis/dags/dbt/cultura/models/metadata/schema.yml +++ /dev/null @@ -1,45 +0,0 @@ -version: 2 - -models: - - name: models_metadata - description: > - Tabela central de metadados que armazena informações sobre todos os modelos dbt executados. - Cada linha representa um modelo único, identificado pela combinação de schema e table_name. - A tabela é atualizada de forma incremental, mantendo histórico das execuções. - config: - meta: - tags: - - metadata - - governance - columns: - - name: schema_name - description: Nome do schema onde o modelo está localizado. - tests: - - not_null - - - name: table_name - description: Nome da tabela/modelo. - tests: - - not_null - - - name: database_name - description: Nome do banco de dados onde o modelo está materializado. - - - name: materialization - description: Tipo de materialização do modelo (table, view, incremental, etc). - - - name: description - description: Descrição do modelo extraída do schema.yml. - - - name: dt_transform - description: > - Data e hora em que o modelo foi transformado/executado pela última vez. - Corresponde ao momento em que a execução do dbt foi iniciada (run_started_at). - Timezone: America/Sao_Paulo (UTC-3). - tests: - - not_null - - - name: run_id - description: > - Identificador único da execução do dbt (invocation_id). - Permite rastrear qual execução gerou a transformação. diff --git a/airflow_lappis/dags/dbt/cultura/models/sources.yml b/airflow_lappis/dags/dbt/cultura/models/sources.yml deleted file mode 100644 index d0a090ef..00000000 --- a/airflow_lappis/dags/dbt/cultura/models/sources.yml +++ /dev/null @@ -1,14 +0,0 @@ -version: 2 - -sources: - - name: transferegov_fundo_a_fundo - schema: transferegov_fundo_a_fundo - tables: - - name: lpg_dados_pessoa_fisica - - name: lpg_dados_pessoa_juridica - - name: lpg_dados_coletivos - - name: pnab_pessoas - - name: pnab_organizacoes - - name: lpg_contemplados - - name: raw_pnab_lista_contemplados_geral - - name: raw_pnab_lista_contemplados_pncv diff --git a/airflow_lappis/dags/dbt/cultura/profiles.yml b/airflow_lappis/dags/dbt/cultura/profiles.yml deleted file mode 100644 index 78e03022..00000000 --- a/airflow_lappis/dags/dbt/cultura/profiles.yml +++ /dev/null @@ -1,12 +0,0 @@ -cultura: - target: prod - outputs: - prod: - type: postgres - host: "{{ env_var('DB_DW_HOST_CULTURA', 'dummy_host') }}" - port: "{{ env_var('DB_DW_PORT_CULTURA', '5432') | int }}" - user: "{{ env_var('DB_DW_USER_CULTURA', 'dummy_user') }}" - password: "{{ env_var('DB_DW_PASS_CULTURA', 'dummy_pass') }}" - dbname: "{{ env_var('DB_DW_DATABASE_CULTURA', 'dummy_db') }}" - schema: "{{ env_var('DB_DW_SCHEMA_CULTURA', 'cultura') }}" - threads: 4 \ No newline at end of file diff --git a/dbt/minc/dbt_project.yml b/dbt/minc/dbt_project.yml index e7969b73..101ccb44 100644 --- a/dbt/minc/dbt_project.yml +++ b/dbt/minc/dbt_project.yml @@ -1,16 +1,15 @@ -name: "minc" +name: 'cultura' -version: "1.0.0" +version: 1.0.0 config-version: 2 -profile: "minc" +profile: cultura model-paths: ["models"] analysis-paths: ["analyses"] -test-paths: ["tests"] seed-paths: ["seeds"] +test-paths: ["tests"] macro-paths: ["macros"] -snapshot-paths: ["snapshots"] clean-targets: - "target" @@ -18,8 +17,14 @@ clean-targets: - "logs" models: - minc: - +database: analytics + cultura: metadata: +materialized: incremental +schema: metadata + agentes_dbt: + +materialized: table + +schema: agentes + bronze: + +materialized: incremental + views: + +materialized: view diff --git a/airflow_lappis/dags/dbt/cultura/descriptions.yml b/dbt/minc/descriptions.yml similarity index 100% rename from airflow_lappis/dags/dbt/cultura/descriptions.yml rename to dbt/minc/descriptions.yml diff --git a/airflow_lappis/dags/dbt/cultura/macros/create_udfs.sql b/dbt/minc/macros/create_udfs.sql similarity index 100% rename from airflow_lappis/dags/dbt/cultura/macros/create_udfs.sql rename to dbt/minc/macros/create_udfs.sql diff --git a/airflow_lappis/dags/dbt/cultura/macros/data_quality/row_count_match.sql b/dbt/minc/macros/data_quality/row_count_match.sql similarity index 100% rename from airflow_lappis/dags/dbt/cultura/macros/data_quality/row_count_match.sql rename to dbt/minc/macros/data_quality/row_count_match.sql diff --git a/airflow_lappis/dags/dbt/cultura/macros/data_quality/verificacao_tipagem.sql b/dbt/minc/macros/data_quality/verificacao_tipagem.sql similarity index 100% rename from airflow_lappis/dags/dbt/cultura/macros/data_quality/verificacao_tipagem.sql rename to dbt/minc/macros/data_quality/verificacao_tipagem.sql diff --git a/airflow_lappis/dags/dbt/cultura/macros/get_custom_schema.sql b/dbt/minc/macros/get_custom_schema.sql similarity index 100% rename from airflow_lappis/dags/dbt/cultura/macros/get_custom_schema.sql rename to dbt/minc/macros/get_custom_schema.sql diff --git a/airflow_lappis/dags/dbt/cultura/macros/metadata/generate_metadata.sql b/dbt/minc/macros/metadata/generate_metadata.sql similarity index 100% rename from airflow_lappis/dags/dbt/cultura/macros/metadata/generate_metadata.sql rename to dbt/minc/macros/metadata/generate_metadata.sql diff --git a/airflow_lappis/dags/dbt/cultura/macros/parse_financial_value.sql b/dbt/minc/macros/parse_financial_value.sql similarity index 100% rename from airflow_lappis/dags/dbt/cultura/macros/parse_financial_value.sql rename to dbt/minc/macros/parse_financial_value.sql diff --git a/airflow_lappis/dags/dbt/cultura/macros/schema.yml b/dbt/minc/macros/schema.yml similarity index 100% rename from airflow_lappis/dags/dbt/cultura/macros/schema.yml rename to dbt/minc/macros/schema.yml diff --git a/airflow_lappis/dags/dbt/cultura/macros/udfs/f_format_nc.sql b/dbt/minc/macros/udfs/f_format_nc.sql similarity index 100% rename from airflow_lappis/dags/dbt/cultura/macros/udfs/f_format_nc.sql rename to dbt/minc/macros/udfs/f_format_nc.sql diff --git a/airflow_lappis/dags/dbt/cultura/macros/udfs/f_parse_dates.sql b/dbt/minc/macros/udfs/f_parse_dates.sql similarity index 100% rename from airflow_lappis/dags/dbt/cultura/macros/udfs/f_parse_dates.sql rename to dbt/minc/macros/udfs/f_parse_dates.sql diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_coletivos.sql b/dbt/minc/models/agentes_dbt/bronze/lpg_agentes_coletivos.sql similarity index 100% rename from airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_coletivos.sql rename to dbt/minc/models/agentes_dbt/bronze/lpg_agentes_coletivos.sql diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_pf.sql b/dbt/minc/models/agentes_dbt/bronze/lpg_agentes_pf.sql similarity index 100% rename from airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_pf.sql rename to dbt/minc/models/agentes_dbt/bronze/lpg_agentes_pf.sql diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_pj.sql b/dbt/minc/models/agentes_dbt/bronze/lpg_agentes_pj.sql similarity index 100% rename from airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/lpg_agentes_pj.sql rename to dbt/minc/models/agentes_dbt/bronze/lpg_agentes_pj.sql diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/pnab_agentes_pf.sql b/dbt/minc/models/agentes_dbt/bronze/pnab_agentes_pf.sql similarity index 100% rename from airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/pnab_agentes_pf.sql rename to dbt/minc/models/agentes_dbt/bronze/pnab_agentes_pf.sql diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/pnab_agentes_pj.sql b/dbt/minc/models/agentes_dbt/bronze/pnab_agentes_pj.sql similarity index 100% rename from airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/pnab_agentes_pj.sql rename to dbt/minc/models/agentes_dbt/bronze/pnab_agentes_pj.sql diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/schema.yml b/dbt/minc/models/agentes_dbt/bronze/schema.yml similarity index 100% rename from airflow_lappis/dags/dbt/cultura/models/agentes_dbt/bronze/schema.yml rename to dbt/minc/models/agentes_dbt/bronze/schema.yml diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/perfil_acesso_fomento.sql b/dbt/minc/models/agentes_dbt/gold/perfil_acesso_fomento.sql similarity index 100% rename from airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/perfil_acesso_fomento.sql rename to dbt/minc/models/agentes_dbt/gold/perfil_acesso_fomento.sql diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/perfil_agentes_completo.sql b/dbt/minc/models/agentes_dbt/gold/perfil_agentes_completo.sql similarity index 100% rename from airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/perfil_agentes_completo.sql rename to dbt/minc/models/agentes_dbt/gold/perfil_agentes_completo.sql diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/primeiro_acesso_contemplados.sql b/dbt/minc/models/agentes_dbt/gold/primeiro_acesso_contemplados.sql similarity index 100% rename from airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/primeiro_acesso_contemplados.sql rename to dbt/minc/models/agentes_dbt/gold/primeiro_acesso_contemplados.sql diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/primeiro_acesso_resumo.sql b/dbt/minc/models/agentes_dbt/gold/primeiro_acesso_resumo.sql similarity index 100% rename from airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/primeiro_acesso_resumo.sql rename to dbt/minc/models/agentes_dbt/gold/primeiro_acesso_resumo.sql diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/schema.yml b/dbt/minc/models/agentes_dbt/gold/schema.yml similarity index 100% rename from airflow_lappis/dags/dbt/cultura/models/agentes_dbt/gold/schema.yml rename to dbt/minc/models/agentes_dbt/gold/schema.yml diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/silver/perfil_agentes_historico.sql b/dbt/minc/models/agentes_dbt/silver/perfil_agentes_historico.sql similarity index 100% rename from airflow_lappis/dags/dbt/cultura/models/agentes_dbt/silver/perfil_agentes_historico.sql rename to dbt/minc/models/agentes_dbt/silver/perfil_agentes_historico.sql diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/silver/schema.yml b/dbt/minc/models/agentes_dbt/silver/schema.yml similarity index 100% rename from airflow_lappis/dags/dbt/cultura/models/agentes_dbt/silver/schema.yml rename to dbt/minc/models/agentes_dbt/silver/schema.yml diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/views/identificadores_agentes.sql b/dbt/minc/models/agentes_dbt/views/identificadores_agentes.sql similarity index 100% rename from airflow_lappis/dags/dbt/cultura/models/agentes_dbt/views/identificadores_agentes.sql rename to dbt/minc/models/agentes_dbt/views/identificadores_agentes.sql diff --git a/airflow_lappis/dags/dbt/cultura/models/agentes_dbt/views/schema.yml b/dbt/minc/models/agentes_dbt/views/schema.yml similarity index 100% rename from airflow_lappis/dags/dbt/cultura/models/agentes_dbt/views/schema.yml rename to dbt/minc/models/agentes_dbt/views/schema.yml diff --git a/dbt/minc/models/metadata/models_metadata.sql b/dbt/minc/models/metadata/models_metadata.sql index 46d5400b..f1981994 100644 --- a/dbt/minc/models/metadata/models_metadata.sql +++ b/dbt/minc/models/metadata/models_metadata.sql @@ -1,46 +1,62 @@ {{ config( - materialized="incremental", - unique_key=["schema_name", "table_name"], - on_schema_change="sync_all_columns", + materialized='incremental', + unique_key=['schema_name', 'table_name'], + on_schema_change='sync_all_columns' ) }} -with dbt_models as ( - {% set models_data = [] %} +{# + Tabela de Metadados dos Modelos dbt + =================================== + + Esta tabela armazena metadados de todos os modelos executados no dbt. + + Campos principais: + - schema_name: Schema do modelo + - table_name: Nome da tabela/modelo + - dt_transform: Data da última transformação (quando o modelo foi executado) + - run_id: ID único da execução do dbt + + A tabela é atualizada de forma incremental, mantendo apenas o registro + mais recente para cada combinação de schema + table_name. +#} +WITH dbt_models AS ( + {# + Usando a função graph do dbt para iterar sobre todos os modelos do projeto. + Isso garante que capturamos metadados de todos os modelos definidos. + #} + {% set models_data = [] %} + {% for node in graph.nodes.values() %} - {% if node.resource_type == "model" %} + {% if node.resource_type == 'model' %} {% do models_data.append({ - "schema_name": node.schema, - "table_name": node.name, - "database_name": node.database, - "materialization": node.config.materialized, - "description": node.description | default("") | replace("'", "''"), + 'schema_name': node.schema, + 'table_name': node.name, + 'database_name': node.database, + 'materialization': node.config.materialized, + 'description': node.description | default('') | replace("'", "''") }) %} {% endif %} {% endfor %} {% for model in models_data %} - select - '{{ model.schema_name }}' as schema_name, - '{{ model.table_name }}' as table_name, - '{{ model.database_name }}' as database_name, - '{{ model.materialization }}' as materialization, - '{{ model.description[:500] }}' as description, - ( - '{{ run_started_at }}'::timestamp - at time zone 'UTC' - at time zone 'America/Sao_Paulo' - ) as dt_transform, - '{{ invocation_id }}' as run_id + SELECT + '{{ model.schema_name }}' AS schema_name, + '{{ model.table_name }}' AS table_name, + '{{ model.database_name }}' AS database_name, + '{{ model.materialization }}' AS materialization, + '{{ model.description[:500] }}' AS description, + ('{{ run_started_at }}'::TIMESTAMP AT TIME ZONE 'UTC' AT TIME ZONE 'America/Sao_Paulo') AS dt_transform, + '{{ invocation_id }}' AS run_id {% if not loop.last %} - union all + UNION ALL {% endif %} {% endfor %} ) -select +SELECT schema_name, table_name, database_name, @@ -48,4 +64,4 @@ select description, dt_transform, run_id -from dbt_models +FROM dbt_models diff --git a/dbt/minc/models/metadata/schema.yml b/dbt/minc/models/metadata/schema.yml index 1c8498cf..bb85b1e5 100644 --- a/dbt/minc/models/metadata/schema.yml +++ b/dbt/minc/models/metadata/schema.yml @@ -2,18 +2,44 @@ version: 2 models: - name: models_metadata - description: Metadados dos modelos dbt do projeto MinC. + description: > + Tabela central de metadados que armazena informações sobre todos os modelos dbt executados. + Cada linha representa um modelo único, identificado pela combinação de schema e table_name. + A tabela é atualizada de forma incremental, mantendo histórico das execuções. + config: + meta: + tags: + - metadata + - governance columns: - name: schema_name + description: Nome do schema onde o modelo está localizado. tests: - not_null + - name: table_name + description: Nome da tabela/modelo. tests: - not_null + - name: database_name + description: Nome do banco de dados onde o modelo está materializado. + - name: materialization + description: Tipo de materialização do modelo (table, view, incremental, etc). + - name: description + description: Descrição do modelo extraída do schema.yml. + - name: dt_transform + description: > + Data e hora em que o modelo foi transformado/executado pela última vez. + Corresponde ao momento em que a execução do dbt foi iniciada (run_started_at). + Timezone: America/Sao_Paulo (UTC-3). tests: - not_null + - name: run_id + description: > + Identificador único da execução do dbt (invocation_id). + Permite rastrear qual execução gerou a transformação. diff --git a/dbt/minc/models/sources.yml b/dbt/minc/models/sources.yml index 096cf1d5..d0a090ef 100644 --- a/dbt/minc/models/sources.yml +++ b/dbt/minc/models/sources.yml @@ -4,7 +4,11 @@ sources: - name: transferegov_fundo_a_fundo schema: transferegov_fundo_a_fundo tables: - - name: raw_programas - - name: raw_planos_acao - - name: relatorios_gestao - - name: anexos_relatorios + - name: lpg_dados_pessoa_fisica + - name: lpg_dados_pessoa_juridica + - name: lpg_dados_coletivos + - name: pnab_pessoas + - name: pnab_organizacoes + - name: lpg_contemplados + - name: raw_pnab_lista_contemplados_geral + - name: raw_pnab_lista_contemplados_pncv diff --git a/dbt/minc/profiles.yml b/dbt/minc/profiles.yml index 490f95d8..78e03022 100644 --- a/dbt/minc/profiles.yml +++ b/dbt/minc/profiles.yml @@ -1,11 +1,12 @@ -minc: +cultura: target: prod outputs: prod: type: postgres - host: "{{ env_var('DB_DW_HOST', 'postgres') }}" - user: "{{ env_var('DB_DW_USER', 'postgres_dw') }}" - password: "{{ env_var('DB_DW_PASSWORD', 'postgres_dw') }}" - port: "{{ env_var('DB_DW_PORT', '5432') | int }}" - dbname: "{{ env_var('DB_DW_DBNAME', 'data_warehouse') }}" - schema: "{{ env_var('DB_DW_SCHEMA', 'minc') }}" + host: "{{ env_var('DB_DW_HOST_CULTURA', 'dummy_host') }}" + port: "{{ env_var('DB_DW_PORT_CULTURA', '5432') | int }}" + user: "{{ env_var('DB_DW_USER_CULTURA', 'dummy_user') }}" + password: "{{ env_var('DB_DW_PASS_CULTURA', 'dummy_pass') }}" + dbname: "{{ env_var('DB_DW_DATABASE_CULTURA', 'dummy_db') }}" + schema: "{{ env_var('DB_DW_SCHEMA_CULTURA', 'cultura') }}" + threads: 4 \ No newline at end of file