diff --git a/.env.example b/.env.example new file mode 100644 index 00000000..cd1d3c19 --- /dev/null +++ b/.env.example @@ -0,0 +1,43 @@ +# <---------- Airflow ----------> + +AIRFLOW_IMAGE_NAME=apache/airflow:latest +AIRFLOW__CORE__FERNET_KEY= +AIRFLOW_HOME=/opt/airflow/ +_AIRFLOW_WWW_USER_USERNAME=airflow +_AIRFLOW_WWW_USER_PASSWORD=airflow +AIRFLOW_UID=50000 + + +# <---------- Postgres Airflow ----------> + +POSTGRES_USER=postgres +POSTGRES_PASSWORD=postgres +POSTGRES_DB=postgres + + +# <---------- Postgres Data Warehouse ----------> + +POSTGRES_USER_DW=postgres +POSTGRES_PASSWORD_DW=postgres +POSTGRES_DB_DW=postgres + + +# <---------- dbt Profile Variables (Cultura) ----------> + +DB_DW_HOST_CULTURA=localhost +DB_DW_PORT_CULTURA=5433 +DB_DW_USER_CULTURA=postgres +DB_DW_PASS_CULTURA=postgres +DB_DW_DATABASE_CULTURA=postgres +DB_DW_SCHEMA_CULTURA=cultura + + +# <---------- MinIO ----------> + +MINIO_ENDPOINT=minio:9000 +MINIO_ACCESS_KEY=minioadmin +MINIO_SECRET_KEY=minioadmin +MINIO_BUCKET=data-lake-ipea + + +PYTHONPATH=/opt/airflow/dags:/opt/airflow/plugins:/opt/airflow/helpers diff --git a/dbt/minc/dbt_project.yml b/dbt/minc/dbt_project.yml index e7969b73..101ccb44 100644 --- a/dbt/minc/dbt_project.yml +++ b/dbt/minc/dbt_project.yml @@ -1,16 +1,15 @@ -name: "minc" +name: 'cultura' -version: "1.0.0" +version: 1.0.0 config-version: 2 -profile: "minc" +profile: cultura model-paths: ["models"] analysis-paths: ["analyses"] -test-paths: ["tests"] seed-paths: ["seeds"] +test-paths: ["tests"] macro-paths: ["macros"] -snapshot-paths: ["snapshots"] clean-targets: - "target" @@ -18,8 +17,14 @@ clean-targets: - "logs" models: - minc: - +database: analytics + cultura: metadata: +materialized: incremental +schema: metadata + agentes_dbt: + +materialized: table + +schema: agentes + bronze: + +materialized: incremental + views: + +materialized: view diff --git a/dbt/minc/descriptions.yml b/dbt/minc/descriptions.yml new file mode 100644 index 00000000..3c7c007c --- /dev/null +++ b/dbt/minc/descriptions.yml @@ -0,0 +1,13 @@ +models: + - name: agentes_pf + description: Dados brutos de proponentes pessoa física da LPG + - name: agentes_pj + description: Dados brutos de proponentes pessoa jurídica da LPG + - name: agentes_coletivos + description: Dados brutos de proponentes coletivos da LPG + - name: identificadores_agentes + description: Consolidação dos identificadores de todos os tipos de proponentes + - name: perfil_agentes_historico + description: Perfil dos agentes com higienização do histórico de acesso a recursos de fomento + - name: primeiro_acesso_resumo + description: Resumo agregado do indicador de primeiro acesso a recursos de fomento (Meta 5) diff --git a/dbt/minc/macros/create_udfs.sql b/dbt/minc/macros/create_udfs.sql new file mode 100644 index 00000000..850b1e85 --- /dev/null +++ b/dbt/minc/macros/create_udfs.sql @@ -0,0 +1,33 @@ +{% macro create_udfs() %} +{# + Cria/atualiza as UDFs do projeto via run_query, isolada do on-run-start. + + Antes, esta macro era chamada em todo on-run-start do dbt — como o Cosmos + dispara um processo dbt por task/modelo em paralelo, várias conexões + executavam CREATE OR REPLACE FUNCTION ao mesmo tempo, causando + "tuple concurrently updated" no catálogo pg_proc do Postgres. + + Agora ela só deve ser chamada explicitamente, uma única vez, via: + dbt run-operation create_udfs + em uma task isolada do Airflow, executada antes das tasks de modelo + geradas pelo Cosmos — eliminando a concorrência na origem. +#} + +{% set create_schema_sql %} + CREATE SCHEMA IF NOT EXISTS {{ target.schema }}; +{% endset %} +{% do run_query(create_schema_sql) %} + +{% set parse_date_sql %} + {{ create_f_parse_dates() }} +{% endset %} +{% do run_query(parse_date_sql) %} + +{% set format_nc_sql %} + {{ create_f_format_nc() }} +{% endset %} +{% do run_query(format_nc_sql) %} + +{% do log("UDFs criadas/atualizadas no schema " ~ target.schema, info=True) %} + +{% endmacro %} diff --git a/dbt/minc/macros/data_quality/row_count_match.sql b/dbt/minc/macros/data_quality/row_count_match.sql new file mode 100644 index 00000000..f248e30c --- /dev/null +++ b/dbt/minc/macros/data_quality/row_count_match.sql @@ -0,0 +1,14 @@ +{% macro test_row_count_match(model, source_table, target_table) %} + with + source_count as (select count(*) as row_count from {{ source_table }}), + target_count as (select count(*) as row_count from {{ target_table }}), + comparison as ( + select + source_count.row_count as source_row_count, + target_count.row_count as target_row_count + from source_count, target_count + ) + select * + from comparison + where source_row_count != target_row_count +{% endmacro %} diff --git a/dbt/minc/macros/data_quality/verificacao_tipagem.sql b/dbt/minc/macros/data_quality/verificacao_tipagem.sql new file mode 100644 index 00000000..34c3d392 --- /dev/null +++ b/dbt/minc/macros/data_quality/verificacao_tipagem.sql @@ -0,0 +1,25 @@ +{% macro test_verificacao_tipagem(model, nome_tabela, nome_coluna, tipo_esperado) %} + with + column_info as ( + select + table_schema, + table_name, -- Nome real da coluna no information_schema + column_name, -- Nome real da coluna no information_schema + data_type + from information_schema.columns + where + table_schema || '.' || table_name = '{{ nome_tabela }}' + and column_name = '{{ nome_coluna }}' + ), + comparison as ( + select + '{{ nome_tabela }}' as nome_tabela, + '{{ nome_coluna }}' as nome_coluna, + '{{ tipo_esperado }}' as tipo_esperado, + data_type as actual_type + from column_info + ) + select * + from comparison + where actual_type != tipo_esperado +{% endmacro %} diff --git a/dbt/minc/macros/get_custom_schema.sql b/dbt/minc/macros/get_custom_schema.sql new file mode 100644 index 00000000..701964ca --- /dev/null +++ b/dbt/minc/macros/get_custom_schema.sql @@ -0,0 +1,4 @@ +-- built-in schema generator +{% macro generate_schema_name(custom_schema_name, node) -%} + {{ generate_schema_name_for_env(custom_schema_name, node) }} +{%- endmacro %} diff --git a/dbt/minc/macros/metadata/generate_metadata.sql b/dbt/minc/macros/metadata/generate_metadata.sql new file mode 100644 index 00000000..8bfb115b --- /dev/null +++ b/dbt/minc/macros/metadata/generate_metadata.sql @@ -0,0 +1,46 @@ +{% macro get_model_metadata() %} +{# + Esta macro retorna os metadados do modelo atual. + Pode ser usada em post-hooks para registrar metadados automaticamente. +#} + SELECT + '{{ this.schema }}' AS schema_name, + '{{ this.name }}' AS table_name, + '{{ this.database }}' AS database_name, + ('{{ run_started_at }}'::TIMESTAMP AT TIME ZONE 'UTC' AT TIME ZONE 'America/Sao_Paulo') AS dt_transform, + '{{ invocation_id }}' AS run_id +{% endmacro %} + + +{% macro register_model_metadata() %} +{# + Esta macro registra os metadados do modelo em uma tabela central. + Deve ser usada como post-hook nos modelos que deseja rastrear. + + Uso no dbt_project.yml: + models: + ipea: + +post-hook: + - "{{ register_model_metadata() }}" +#} + + INSERT INTO {{ target.database }}.metadata.models_metadata ( + schema_name, + table_name, + database_name, + dt_transform, + run_id + ) + VALUES ( + '{{ this.schema }}', + '{{ this.name }}', + '{{ this.database }}', + ('{{ run_started_at }}'::TIMESTAMP AT TIME ZONE 'UTC' AT TIME ZONE 'America/Sao_Paulo'), + '{{ invocation_id }}' + ) + ON CONFLICT (schema_name, table_name) + DO UPDATE SET + dt_transform = EXCLUDED.dt_transform, + run_id = EXCLUDED.run_id; + +{% endmacro %} diff --git a/dbt/minc/macros/parse_financial_value.sql b/dbt/minc/macros/parse_financial_value.sql new file mode 100644 index 00000000..437b673c --- /dev/null +++ b/dbt/minc/macros/parse_financial_value.sql @@ -0,0 +1,21 @@ +{% macro parse_financial_value(column_name) %} + + case + when {{ column_name }} is null or trim({{ column_name }}) = '' + then 0.00::numeric(15, 2) + when {{ column_name }} like '%NaN%' + then 0.00::numeric(15, 2) + when {{ column_name }} like '(%' + then + regexp_replace( + replace(coalesce({{ column_name }}, '0'), '.', ''), + '(\()?(\d+),(\d+)(\))?', + '-\2.\3' + )::numeric(15, 2) + else + replace( + replace(coalesce({{ column_name }}, '0'), '.', ''), ',', '.' + )::numeric(15, 2) + end + +{% endmacro %} diff --git a/dbt/minc/macros/schema.yml b/dbt/minc/macros/schema.yml new file mode 100644 index 00000000..694f3b23 --- /dev/null +++ b/dbt/minc/macros/schema.yml @@ -0,0 +1,24 @@ + +version: 2 + +macros: + - name: create_udfs + description: > + Função que cria as UDFs necessárias para o funcionamento do projeto. + Essa função deve ser chamada no início de cada run para garantir que todas as UDFs estejam disponíveis. + + - name: generate_schema_name + description: > + Função que gera o nome do schema a ser utilizado no projeto. + A função dentro desta macro é built-in do dbt. + + ## UDFS + - name: create_f_parse_dates + description: > + Função que cria a UDF f_parse_dates, que é utilizada para converter texto no formato MÊS(texto)/ANO(numero) em datas. + arguments: + - name: in_text + type: text + description: > + Texto a ser convertido em data. + O texto deve estar no formato MÊS(texto)/ANO(numero). Ex.: FEV/2024 -> 2024-02-01 \ No newline at end of file diff --git a/dbt/minc/macros/udfs/f_format_nc.sql b/dbt/minc/macros/udfs/f_format_nc.sql new file mode 100644 index 00000000..f7a06c86 --- /dev/null +++ b/dbt/minc/macros/udfs/f_format_nc.sql @@ -0,0 +1,19 @@ +{% macro create_f_format_nc() %} + create or replace function {{ target.schema }}.format_nc(in_text text) + returns text + as $$ + + with + + pre_process as ( + select left(in_text, 7) as prefix, + right(in_text, 4)::numeric as posfix + ) + + select concat(prefix, to_char(posfix, 'FM00000')) as result + from pre_process + + $$ + language sql + ; +{% endmacro %} diff --git a/dbt/minc/macros/udfs/f_parse_dates.sql b/dbt/minc/macros/udfs/f_parse_dates.sql new file mode 100644 index 00000000..3fd8693e --- /dev/null +++ b/dbt/minc/macros/udfs/f_parse_dates.sql @@ -0,0 +1,44 @@ +-- Essa fun +{% macro create_f_parse_dates() %} + + create or replace function {{ target.schema }}.parse_date(in_text text) + returns date + as + $$ + + with + + split_column as ( + select + split_part(in_text, '/', 1) as mes, + split_part(in_text, '/', 2) as ano + ), + + fixed_month as ( + select + ano, + case + when mes = 'JAN' then '01' + when mes = 'FEV' then '02' + when mes = 'MAR' then '03' + when mes = 'ABR' then '04' + when mes = 'MAI' then '05' + when mes = 'JUN' then '06' + when mes = 'JUL' then '07' + when mes = 'AGO' then '08' + when mes = 'SET' then '09' + when mes = 'OUT' then '10' + when mes = 'NOV' then '11' + when mes = 'DEZ' then '12' + else mes end as mes_num + from split_column + ) + + select + (to_date(ano::numeric - 1 || '-' || '12', 'YYYY-MM') + (mes_num || ' months')::interval)::date as result + from fixed_month + $$ + language sql + ; + +{% endmacro %} diff --git a/dbt/minc/models/agentes_dbt/bronze/lpg_agentes_coletivos.sql b/dbt/minc/models/agentes_dbt/bronze/lpg_agentes_coletivos.sql new file mode 100644 index 00000000..8900643c --- /dev/null +++ b/dbt/minc/models/agentes_dbt/bronze/lpg_agentes_coletivos.sql @@ -0,0 +1,16 @@ +{{ config( + materialized='incremental', + unique_key='identificador_unico' +) }} + +SELECT + LOWER(TRIM("nº do cpf do representante do grupo/coletivo")) AS identificador_unico, + LOWER(TRIM("já acessou recursos públicos do fomento à cultura anteriorme")) AS historico_acesso_bruto, + 'LPG' AS programa_fomento +FROM {{ source('transferegov_fundo_a_fundo', 'lpg_dados_coletivos') }} + +{% if is_incremental() %} +WHERE LOWER(TRIM("nº do cpf do representante do grupo/coletivo")) NOT IN ( + SELECT identificador_unico FROM {{ this }} +) +{% endif %} diff --git a/dbt/minc/models/agentes_dbt/bronze/lpg_agentes_pf.sql b/dbt/minc/models/agentes_dbt/bronze/lpg_agentes_pf.sql new file mode 100644 index 00000000..d66d9aa0 --- /dev/null +++ b/dbt/minc/models/agentes_dbt/bronze/lpg_agentes_pf.sql @@ -0,0 +1,18 @@ +{{ config( + materialized='incremental', + unique_key='identificador_unico' +) }} + +SELECT + LOWER(TRIM("nº do cpf")) AS identificador_unico, + LOWER(TRIM("já acessou recursos públicos do fomento à cultura anteriorme")) AS historico_acesso_bruto, + 'LPG' AS programa_fomento +FROM {{ source('transferegov_fundo_a_fundo', 'lpg_dados_pessoa_fisica') }} +WHERE "nº do cpf" IS NOT NULL + AND LOWER(TRIM("nº do cpf")) NOT IN ('', 'nan', 'none') + +{% if is_incremental() %} + AND LOWER(TRIM("nº do cpf")) NOT IN ( + SELECT identificador_unico FROM {{ this }} + ) +{% endif %} \ No newline at end of file diff --git a/dbt/minc/models/agentes_dbt/bronze/lpg_agentes_pj.sql b/dbt/minc/models/agentes_dbt/bronze/lpg_agentes_pj.sql new file mode 100644 index 00000000..21a3e396 --- /dev/null +++ b/dbt/minc/models/agentes_dbt/bronze/lpg_agentes_pj.sql @@ -0,0 +1,18 @@ +{{ config( + materialized='incremental', + unique_key='identificador_unico' +) }} + +SELECT + LOWER(TRIM("nº do cnpj")) AS identificador_unico, + LOWER(TRIM("já acessou recursos públicos do fomento à cultura anteriorme")) AS historico_acesso_bruto, + 'LPG' AS programa_fomento +FROM {{ source('transferegov_fundo_a_fundo', 'lpg_dados_pessoa_juridica') }} +WHERE "nº do cnpj" IS NOT NULL + AND LOWER(TRIM("nº do cnpj")) NOT IN ('', 'nan', 'none') + +{% if is_incremental() %} + AND LOWER(TRIM("nº do cnpj")) NOT IN ( + SELECT identificador_unico FROM {{ this }} + ) +{% endif %} diff --git a/dbt/minc/models/agentes_dbt/bronze/pnab_agentes_pf.sql b/dbt/minc/models/agentes_dbt/bronze/pnab_agentes_pf.sql new file mode 100644 index 00000000..f1e97b75 --- /dev/null +++ b/dbt/minc/models/agentes_dbt/bronze/pnab_agentes_pf.sql @@ -0,0 +1,18 @@ +{{ config( + materialized='incremental', + unique_key='identificador_unico' +) }} + +SELECT + LOWER(TRIM("nº do cpf")) AS identificador_unico, + LOWER(TRIM("já acessou recursos públicos do fomento à cultura nos últim")) AS historico_acesso_bruto, + 'PNAB' AS programa_fomento +FROM {{ source('transferegov_fundo_a_fundo', 'pnab_pessoas') }} +WHERE "nº do cpf" IS NOT NULL + AND LOWER(TRIM("nº do cpf")) NOT IN ('', 'nan', 'none') + +{% if is_incremental() %} + AND LOWER(TRIM("nº do cpf")) NOT IN ( + SELECT identificador_unico FROM {{ this }} + ) +{% endif %} diff --git a/dbt/minc/models/agentes_dbt/bronze/pnab_agentes_pj.sql b/dbt/minc/models/agentes_dbt/bronze/pnab_agentes_pj.sql new file mode 100644 index 00000000..e67a7131 --- /dev/null +++ b/dbt/minc/models/agentes_dbt/bronze/pnab_agentes_pj.sql @@ -0,0 +1,16 @@ +{{ config( + materialized='incremental', + unique_key='identificador_unico' +) }} + +SELECT + LOWER(TRIM("nº do cnpj")) AS identificador_unico, + LOWER(TRIM("já acessou recursos públicos do fomento à cultura anteriorment")) AS historico_acesso_bruto, + 'PNAB' AS programa_fomento +FROM {{ source('transferegov_fundo_a_fundo', 'pnab_organizacoes') }} + +{% if is_incremental() %} +WHERE LOWER(TRIM("nº do cnpj")) NOT IN ( + SELECT identificador_unico FROM {{ this }} +) +{% endif %} diff --git a/dbt/minc/models/agentes_dbt/bronze/schema.yml b/dbt/minc/models/agentes_dbt/bronze/schema.yml new file mode 100644 index 00000000..409d7b17 --- /dev/null +++ b/dbt/minc/models/agentes_dbt/bronze/schema.yml @@ -0,0 +1,112 @@ +version: 2 + +models: + - name: lpg_agentes_pf + description: > + Camada Bronze — dados brutos de pessoas físicas da LPG (Lei Paulo Gustavo). + Mapeia o CPF como identificador único e traz a coluna bruta de histórico + de acesso a recursos públicos de fomento à cultura. + config: + meta: + tags: + - bronze + - agentes + - pessoa_fisica + - lpg + columns: + - name: identificador_unico + description: CPF do proponente pessoa física, normalizado para lowercase. + tests: + - not_null + - name: historico_acesso_bruto + description: Resposta bruta da LPG sobre acesso anterior a recursos públicos de fomento. + - name: programa_fomento + description: Programa de fomento de origem — sempre 'LPG' neste modelo. + + - name: lpg_agentes_pj + description: > + Camada Bronze — dados brutos de pessoas jurídicas da LPG. + Mapeia o CNPJ como identificador único e traz a coluna bruta de histórico + de acesso a recursos públicos de fomento à cultura. + config: + meta: + tags: + - bronze + - agentes + - pessoa_juridica + - lpg + columns: + - name: identificador_unico + description: CNPJ do proponente pessoa jurídica, normalizado para lowercase. + tests: + - not_null + - name: historico_acesso_bruto + description: Resposta bruta da LPG sobre acesso anterior a recursos públicos de fomento. + - name: programa_fomento + description: Programa de fomento de origem — sempre 'LPG' neste modelo. + + - name: lpg_agentes_coletivos + description: > + Camada Bronze — dados brutos de proponentes coletivos da LPG. + Mapeia o CPF do representante do grupo/coletivo como identificador único + e traz a coluna bruta de histórico de acesso a recursos públicos de fomento. + config: + meta: + tags: + - bronze + - agentes + - coletivo + - lpg + columns: + - name: identificador_unico + description: CPF do representante do grupo/coletivo, normalizado para lowercase. + tests: + - not_null + - name: historico_acesso_bruto + description: Resposta bruta da LPG sobre acesso anterior a recursos públicos de fomento. + - name: programa_fomento + description: Programa de fomento de origem — sempre 'LPG' neste modelo. + + - name: pnab_agentes_pf + description: > + Camada Bronze — dados brutos de pessoas físicas da PNAB (Política Nacional Aldir Blanc). + Mapeia o CPF como identificador único e traz a coluna bruta de histórico + de acesso a recursos públicos de fomento à cultura nos últimos 5 anos. + config: + meta: + tags: + - bronze + - agentes + - pessoa_fisica + - pnab + columns: + - name: identificador_unico + description: CPF do proponente pessoa física, normalizado para lowercase. + tests: + - not_null + - name: historico_acesso_bruto + description: Resposta bruta da PNAB sobre acesso anterior a recursos públicos de fomento. + - name: programa_fomento + description: Programa de fomento de origem — sempre 'PNAB' neste modelo. + + - name: pnab_agentes_pj + description: > + Camada Bronze — dados brutos de organizações da PNAB. + Mapeia o CNPJ como identificador único e traz a coluna bruta de histórico + de acesso a recursos públicos de fomento à cultura anteriormente. + config: + meta: + tags: + - bronze + - agentes + - pessoa_juridica + - pnab + columns: + - name: identificador_unico + description: CNPJ da organização, normalizado para lowercase. + tests: + - not_null + - name: historico_acesso_bruto + description: Resposta bruta da PNAB sobre acesso anterior a recursos públicos de fomento. + - name: programa_fomento + description: Programa de fomento de origem — sempre 'PNAB' neste modelo. diff --git a/dbt/minc/models/agentes_dbt/gold/perfil_acesso_fomento.sql b/dbt/minc/models/agentes_dbt/gold/perfil_acesso_fomento.sql new file mode 100644 index 00000000..a599659a --- /dev/null +++ b/dbt/minc/models/agentes_dbt/gold/perfil_acesso_fomento.sql @@ -0,0 +1,84 @@ +{{ config( + materialized='table' +) }} + +WITH todos_proponentes AS ( + SELECT + identificador_unico, + tipo_proponente, + programa_fomento, + historico_acesso_bruto + FROM {{ ref('identificadores_agentes') }} +), + +historico_limpo AS ( + SELECT + identificador_unico, + tipo_proponente, + programa_fomento, + CASE + WHEN historico_acesso_bruto IS NULL + OR LOWER(TRIM(historico_acesso_bruto)) IN ('', 'nan') + THEN NULL + ELSE TRIM( + REPLACE( + REPLACE( + REPLACE(historico_acesso_bruto, '.', ''), + ';', '' + ), + '"', '' + ) + ) + END AS historico_acesso_limpo + FROM todos_proponentes +), + +historico_ordenado AS ( + SELECT + identificador_unico, + tipo_proponente, + programa_fomento, + historico_acesso_limpo, + ROW_NUMBER() OVER ( + PARTITION BY identificador_unico + ORDER BY programa_fomento ASC + ) AS sequencia_fomento + FROM historico_limpo +) + +SELECT + identificador_unico, + tipo_proponente, + programa_fomento, + historico_acesso_limpo, + sequencia_fomento, + CASE + WHEN historico_acesso_limpo = 'sim' + THEN 'Confirmado - Primeira Vez' + WHEN historico_acesso_limpo IN ('não', 'nao', 'nâo') + THEN 'Confirmado - Veterano' + WHEN ( + historico_acesso_limpo IS NULL + OR historico_acesso_limpo IN ( + 'não sei informar', 'nao sei informar', + 'não informado', 'nao informado', + 'nao_declarar', + 'não sei', 'nao sei', + 'não sabe', 'nao sabe' + ) + ) AND sequencia_fomento = 1 + THEN 'Inferido - Primeira Vez (Estreante na base)' + WHEN ( + historico_acesso_limpo IS NULL + OR historico_acesso_limpo IN ( + 'não sei informar', 'nao sei informar', + 'não informado', 'nao informado', + 'nao_declarar', + 'não sei', 'nao sei', + 'não sabe', 'nao sabe' + ) + ) AND sequencia_fomento > 1 + THEN 'Inferido - Veterano (Possui histórico)' + ELSE 'Não sabe/Não informou' + END AS perfil_acesso_fomento +FROM historico_ordenado diff --git a/dbt/minc/models/agentes_dbt/gold/perfil_agentes_completo.sql b/dbt/minc/models/agentes_dbt/gold/perfil_agentes_completo.sql new file mode 100644 index 00000000..03adf2e5 --- /dev/null +++ b/dbt/minc/models/agentes_dbt/gold/perfil_agentes_completo.sql @@ -0,0 +1,97 @@ +{{ config(materialized='table') }} + +WITH perfil_base AS ( + SELECT + identificador_unico, + tipo_proponente, + programa_fomento, + historico_acesso_limpo, + sequencia_fomento, + perfil_acesso_fomento AS perfil_original + FROM {{ ref('perfil_acesso_fomento') }} +), + +-- Detecta veterania multi-programa: presença em > 1 programa confirma veterano independente +-- da resposta declarada +contagem_programas AS ( + SELECT + identificador_unico, + COUNT(DISTINCT programa_fomento) AS qtd_programas + FROM perfil_base + GROUP BY identificador_unico +), + +-- Registro canônico: primeiro programa de cada proponente (sequencia_fomento = 1) +-- Proxy temporal: LPG < PNAB alfabeticamente, refletindo a ordem de ingresso na base +primeiro_registro AS ( + SELECT DISTINCT ON (identificador_unico) + identificador_unico, + tipo_proponente, + programa_fomento, + historico_acesso_limpo, + perfil_original + FROM perfil_base + ORDER BY identificador_unico, sequencia_fomento ASC +), + +-- historico_acesso_bruto vem direto da fonte bronze, alinhado ao primeiro programa +bruto AS ( + SELECT DISTINCT ON (identificador_unico) + identificador_unico, + historico_acesso_bruto + FROM {{ ref('identificadores_agentes') }} + ORDER BY identificador_unico, programa_fomento ASC +), + +consolidado AS ( + SELECT + pr.identificador_unico, + pr.tipo_proponente, + pr.programa_fomento, + b.historico_acesso_bruto, + pr.historico_acesso_limpo, + pr.perfil_original, + c.qtd_programas + FROM primeiro_registro pr + LEFT JOIN bruto b + ON pr.identificador_unico = b.identificador_unico + LEFT JOIN contagem_programas c + ON pr.identificador_unico = c.identificador_unico +) + +SELECT + identificador_unico, + tipo_proponente, + programa_fomento, + historico_acesso_bruto, + + -- status_origem: 'Inferido' quando a veterania vem da presença multi-programa e + -- não de uma resposta explícita; 'Confirmado' quando o proponente respondeu sim/não + CASE + WHEN qtd_programas > 1 AND perfil_original != 'Confirmado - Veterano' + THEN 'Inferido' + WHEN perfil_original LIKE 'Confirmado%' + THEN 'Confirmado' + WHEN perfil_original LIKE 'Inferido%' + THEN 'Inferido' + ELSE 'Não Informado' + END AS status_origem, + + -- perfil_classificacao: classificação final de veterania, consolidada em 1 linha + CASE + WHEN qtd_programas > 1 AND perfil_original = 'Confirmado - Veterano' + THEN 'Veterano' + WHEN qtd_programas > 1 + THEN 'Veterano (Multi-Programa)' + WHEN perfil_original = 'Confirmado - Primeira Vez' + THEN 'Primeira Vez' + WHEN perfil_original = 'Confirmado - Veterano' + THEN 'Veterano' + WHEN perfil_original = 'Inferido - Primeira Vez (Estreante na base)' + THEN 'Provável Primeira Vez' + WHEN perfil_original = 'Inferido - Veterano (Possui histórico)' + THEN 'Provável Veterano' + ELSE 'Indeterminado' + END AS perfil_classificacao + +FROM consolidado diff --git a/dbt/minc/models/agentes_dbt/gold/primeiro_acesso_contemplados.sql b/dbt/minc/models/agentes_dbt/gold/primeiro_acesso_contemplados.sql new file mode 100644 index 00000000..0e2c3861 --- /dev/null +++ b/dbt/minc/models/agentes_dbt/gold/primeiro_acesso_contemplados.sql @@ -0,0 +1,164 @@ +{{ config(materialized='table') }} + +-- Normaliza identificadores para dígitos puro, eliminando diferenças de +-- formatação entre tabelas (CPF com pontos/traço vs. só dígitos, CNPJ com +-- pontos/barra vs. só dígitos). +-- Usada em ambos os lados do JOIN para garantir correspondência consistente. +-- +-- ATENÇÃO — CPF mascarado dos proponentes LPG: a base de proponentes +-- (lpg_agentes_pf/coletivos) traz o CPF anonimizado no formato +-- "***.NNN.NNN-**" (apenas os 6 dígitos centrais visíveis), enquanto +-- lpg_contemplados traz o CPF completo. Um match exato (dígito a dígito) +-- nunca ocorre para esses casos, gerando falso negativo sistemático de +-- 'contemplado' para LPG. Por isso, quando o identificador do proponente +-- vier mascarado, o JOIN usa um match parcial pelo "miolo" do CPF +-- (posições 4-9, os mesmos 6 dígitos centrais expostos pela máscara). +-- Esse match parcial tem risco de colisão entre CPFs com miolo igual. +-- +-- ATENÇÃO — colunas "fantasma" em lpg_contemplados: a ingestão dinâmica de +-- planilhas (extracao_planilhas.py) não normaliza espaços/caracteres +-- invisíveis (ex.: NBSP) nos nomes de coluna, então o header "CPF ou CNPJ" +-- de arquivos diferentes pode virar colunas distintas no Postgres +-- (ex.: "cpf ou cnpj" vs. "cpf ou cnpj"), cada uma com parte dos +-- dados. Por isso a coluna de CPF/CNPJ é resolvida dinamicamente via +-- information_schema (qualquer coluna cujo nome contenha "cpf" e "cnpj"), +-- em vez de um nome fixo — o que tornaria a maior parte dos contemplados +-- invisível para o JOIN. + +{% set cpf_cnpj_cols_query %} + SELECT column_name + FROM information_schema.columns + WHERE table_schema = '{{ source('transferegov_fundo_a_fundo', 'lpg_contemplados').schema }}' + AND table_name = '{{ source('transferegov_fundo_a_fundo', 'lpg_contemplados').identifier }}' + AND column_name ILIKE '%cpf%cnpj%' + ORDER BY column_name +{% endset %} +{% set cpf_cnpj_results = run_query(cpf_cnpj_cols_query) %} +{% set cpf_cnpj_cols = cpf_cnpj_results.columns[0].values() if execute else ['cpf ou cnpj'] %} + +WITH contemplados_lpg_raw AS ( + SELECT + COALESCE( + {% for col in cpf_cnpj_cols %} + NULLIF(LOWER(TRIM("{{ col }}")), 'nan') + {%- if not loop.last %}, + {% endif %} + {% endfor %} + ) AS cpf_cnpj_bruto + FROM {{ source('transferegov_fundo_a_fundo', 'lpg_contemplados') }} +), + +contemplados_lpg AS ( + SELECT DISTINCT + REGEXP_REPLACE(cpf_cnpj_bruto, '[^0-9]', '', 'g') AS id_normalizado, + 'LPG' AS programa_fomento + FROM contemplados_lpg_raw + WHERE cpf_cnpj_bruto IS NOT NULL + AND cpf_cnpj_bruto NOT IN ('', 'cpf ou cnpj') + AND LENGTH(REGEXP_REPLACE(cpf_cnpj_bruto, '[^0-9]', '', 'g')) >= 11 +), + +-- PNCV fornece CPF real dos contemplados PNAB (PF) +contemplados_pnab_pncv AS ( + SELECT DISTINCT + REGEXP_REPLACE(TRIM(cpf), '[^0-9]', '', 'g') AS id_normalizado, + 'PNAB' AS programa_fomento + FROM {{ source('transferegov_fundo_a_fundo', 'raw_pnab_lista_contemplados_pncv') }} + WHERE cpf IS NOT NULL + AND LOWER(TRIM(cpf)) NOT IN ('nan', '', 'cpf') + AND LENGTH(REGEXP_REPLACE(TRIM(cpf), '[^0-9]', '', 'g')) = 11 +), + +-- Lista geral PNAB: CPF está anonimizado (***XXXXXX**) — só o CNPJ é utilizável +-- Cobertura parcial: PJ contemplada via PNAB geral sem registro no PNCV fica de fora +contemplados_pnab_geral_pj AS ( + SELECT DISTINCT + REGEXP_REPLACE(TRIM(cnpj), '[^0-9]', '', 'g') AS id_normalizado, + 'PNAB' AS programa_fomento + FROM {{ source('transferegov_fundo_a_fundo', 'raw_pnab_lista_contemplados_geral') }} + WHERE cnpj IS NOT NULL + AND LOWER(TRIM(cnpj)) NOT IN ('nan', '', 'cnpj') + AND LENGTH(REGEXP_REPLACE(TRIM(cnpj), '[^0-9]', '', 'g')) = 14 +), + +todos_contemplados AS ( + SELECT id_normalizado, programa_fomento FROM contemplados_lpg + UNION + SELECT id_normalizado, programa_fomento FROM contemplados_pnab_pncv + UNION + SELECT id_normalizado, programa_fomento FROM contemplados_pnab_geral_pj +), + +-- Miolo (6 dígitos centrais) do CPF, usado para casar com identificadores +-- mascarados de proponentes ("***.NNN.NNN-**"). Só faz sentido para CPF +-- (11 dígitos); CNPJ (14 dígitos) não é mascarado na base de proponentes. +todos_contemplados_miolo AS ( + SELECT + id_normalizado, + programa_fomento, + SUBSTRING(id_normalizado FROM 4 FOR 6) AS miolo_cpf + FROM todos_contemplados + WHERE LENGTH(id_normalizado) = 11 +), + +-- Base: perfil_acesso_fomento tem 1 linha por (identificador × programa), +-- preservando a granularidade por programa para o JOIN de contemplação +perfil_base AS ( + SELECT + identificador_unico, + programa_fomento, + CASE + WHEN perfil_acesso_fomento IN ( + 'Confirmado - Primeira Vez', + 'Inferido - Primeira Vez (Estreante na base)' + ) THEN 'Sim' + WHEN perfil_acesso_fomento IN ( + 'Confirmado - Veterano', + 'Inferido - Veterano (Possui histórico)' + ) THEN 'Não' + ELSE 'Não sabe/Não informou' + END AS categoria_primeiro_acesso, + CASE + WHEN perfil_acesso_fomento LIKE 'Confirmado%' THEN 'Confirmado' + WHEN perfil_acesso_fomento LIKE 'Inferido%' THEN 'Inferido' + ELSE 'Não Informado' + END AS status_dado + FROM {{ ref('perfil_acesso_fomento') }} +), + +perfil_com_contemplado AS ( + SELECT + pb.identificador_unico, + pb.programa_fomento, + pb.categoria_primeiro_acesso, + pb.status_dado, + CASE + WHEN tc.id_normalizado IS NOT NULL OR tcm.id_normalizado IS NOT NULL THEN 'sim' + ELSE 'não' + END AS contemplado + FROM perfil_base pb + LEFT JOIN todos_contemplados tc + ON REGEXP_REPLACE(pb.identificador_unico, '[^0-9]', '', 'g') = tc.id_normalizado + AND pb.programa_fomento = tc.programa_fomento + LEFT JOIN todos_contemplados_miolo tcm + ON pb.identificador_unico LIKE '%*%' + AND LENGTH(REGEXP_REPLACE(pb.identificador_unico, '[^0-9]', '', 'g')) = 6 + AND REGEXP_REPLACE(pb.identificador_unico, '[^0-9]', '', 'g') = tcm.miolo_cpf + AND pb.programa_fomento = tcm.programa_fomento +) + +SELECT + programa_fomento, + categoria_primeiro_acesso, + contemplado, + COUNT(DISTINCT identificador_unico) AS total_proponentes, + COUNT(DISTINCT CASE WHEN status_dado = 'Confirmado' THEN identificador_unico END) AS total_campo_preenchido, + COUNT(DISTINCT CASE WHEN status_dado = 'Inferido' THEN identificador_unico END) AS total_inferido, + ROUND( + COUNT(DISTINCT identificador_unico)::NUMERIC + / SUM(COUNT(DISTINCT identificador_unico)) OVER (PARTITION BY programa_fomento, contemplado) + * 100, 2 + ) AS percentual +FROM perfil_com_contemplado +GROUP BY programa_fomento, categoria_primeiro_acesso, contemplado +ORDER BY programa_fomento, contemplado DESC, total_proponentes DESC diff --git a/dbt/minc/models/agentes_dbt/gold/primeiro_acesso_resumo.sql b/dbt/minc/models/agentes_dbt/gold/primeiro_acesso_resumo.sql new file mode 100644 index 00000000..c071e6f8 --- /dev/null +++ b/dbt/minc/models/agentes_dbt/gold/primeiro_acesso_resumo.sql @@ -0,0 +1,25 @@ +{{ config( + materialized='table' +) }} + +WITH resumo AS ( + SELECT + programa_fomento, + categoria_primeiro_acesso, + COUNT(DISTINCT identificador_unico) AS total_proponentes + FROM {{ ref('perfil_agentes_historico') }} + GROUP BY programa_fomento, categoria_primeiro_acesso +) + +SELECT + programa_fomento, + categoria_primeiro_acesso, + total_proponentes, + ROUND( + (total_proponentes::NUMERIC + / SUM(total_proponentes) OVER (PARTITION BY programa_fomento)) + * 100, + 2 + ) AS percentual +FROM resumo +ORDER BY programa_fomento, total_proponentes DESC diff --git a/dbt/minc/models/agentes_dbt/gold/schema.yml b/dbt/minc/models/agentes_dbt/gold/schema.yml new file mode 100644 index 00000000..565d4c49 --- /dev/null +++ b/dbt/minc/models/agentes_dbt/gold/schema.yml @@ -0,0 +1,224 @@ +version: 2 + +models: + - name: perfil_agentes_completo + description: > + Camada Gold — Master Data de proponentes (Meta 5). + Grain: 1 linha por identificador_unico (CPF/CNPJ). Consolida perfil_acesso_fomento + eliminando duplicatas entre programas: proponentes presentes em mais de um programa + têm veterania confirmada por presença multi-programa, sobrepondo a resposta declarada. + Serve como base para dois consumos: + - Relatório consolidado (sem inferência): filtrar WHERE status_origem = 'Confirmado' + - Visão analítica (com inferência): consumir todos os registros + config: + meta: + meta_objetivo: 'Meta 5 — Master Data de Proponentes' + origem_dados: 'perfil_acesso_fomento e identificadores_agentes' + tags: [gold, agentes, master_data, meta5, deduplicado] + columns: + - name: identificador_unico + description: CPF ou CNPJ do proponente (normalizado). Chave única do modelo. + tests: + - not_null + - unique + - name: tipo_proponente + description: > + Classificação do proponente — 'Pessoa Física', 'Pessoa Jurídica', + 'Coletivo' ou 'Organização'. + tests: + - not_null + - name: programa_fomento + description: > + Programa de origem do primeiro registro do proponente (proxy temporal: + LPG precede PNAB). Reflete o contexto da análise inicial do perfil. + tests: + - not_null + - name: historico_acesso_bruto + description: > + Resposta original do proponente sobre acesso anterior a recursos públicos + (dado cru, sem normalização), alinhado ao primeiro programa de ingresso. + - name: status_origem + description: > + Flag de qualidade: 'Confirmado' — proponente respondeu explicitamente sim/não; + 'Inferido' — classificação derivada da sequência de programas ou presença + multi-programa; 'Não Informado' — dado ausente ou inválido. + tests: + - not_null + - accepted_values: + values: ['Confirmado', 'Inferido', 'Não Informado'] + - name: perfil_classificacao + description: > + Classificação final consolidada (1 linha por proponente): + 'Primeira Vez' — confirmado pelo proponente; + 'Veterano' — confirmado pelo proponente ou por presença em >1 programa com resposta vet.; + 'Veterano (Multi-Programa)' — veterania inferida pela presença em múltiplos programas; + 'Provável Primeira Vez' — inferido por ser estreante na base; + 'Provável Veterano' — inferido por sequência histórica; + 'Indeterminado' — dado insuficiente para classificação. + tests: + - not_null + - accepted_values: + values: + - 'Primeira Vez' + - 'Veterano' + - 'Veterano (Multi-Programa)' + - 'Provável Primeira Vez' + - 'Provável Veterano' + - 'Indeterminado' + + - name: perfil_acesso_fomento + description: > + Camada Gold — Meta 5 (Mapeamento de agentes culturais acessando fomento + pela primeira vez). Classifica cada proponente por programa com a coluna + perfil_acesso_fomento. Respostas confirmadas ('Sim'/'Não') são mantidas; + respostas nulas ou omitidas recebem inferência cronológica via ROW_NUMBER() + particionado por CPF/CNPJ e ordenado por programa_fomento (proxy temporal: + LPG precede PNAB). Um proponente com sequencia_fomento > 1 acessou mais + de um programa, evidenciando veterania. + config: + meta: + meta_objetivo: 'Meta 5 — Inferência de Primeiro Acesso' + origem_dados: 'LPG e PNAB (via identificadores_agentes)' + tags: [gold, agentes, primeiro_acesso, meta5, inferencia] + columns: + - name: identificador_unico + description: CPF ou CNPJ do proponente (já normalizado em minúsculas e sem espaços). + tests: + - not_null + - name: tipo_proponente + description: Classificação do proponente — 'Pessoa Física', 'Pessoa Jurídica', 'Coletivo' ou 'Organização'. + tests: + - not_null + - name: programa_fomento + description: Programa de fomento de origem — 'LPG' ou 'PNAB'. + tests: + - not_null + - name: historico_acesso_limpo + description: Resposta bruta normalizada (lowercase, sem pontuação). NULL quando omitida ou inválida. + - name: sequencia_fomento + description: > + Ordem de acesso do proponente dentro de seu histórico, particionado por + identificador_unico e ordenado por programa_fomento (proxy temporal). + sequencia_fomento = 1 indica o primeiro registro; > 1 indica veterano. + tests: + - not_null + - name: perfil_acesso_fomento + description: > + Classificação final para a Meta 5: + 'Confirmado - Primeira Vez' | 'Confirmado - Veterano' | + 'Inferido - Primeira Vez (Estreante na base)' | + 'Inferido - Veterano (Possui histórico)' | + 'Não sabe/Não informou'. + tests: + - not_null + - accepted_values: + values: + - 'Confirmado - Primeira Vez' + - 'Confirmado - Veterano' + - 'Inferido - Primeira Vez (Estreante na base)' + - 'Inferido - Veterano (Possui histórico)' + - 'Não sabe/Não informou' + + - name: primeiro_acesso_contemplados + description: > + Camada Gold — Meta 5 com flag de contemplação. Mesma estrutura de + primeiro_acesso_resumo com duas dimensões adicionais: se o proponente + foi contemplado em edital e a discriminação entre dados confirmados + (resposta declarada no formulário) vs. inferidos (derivados por sequência + ou presença multi-programa). Grain: 1 linha por + (programa_fomento × categoria_primeiro_acesso × contemplado). + + Cobertura de contemplados: + - LPG: CPF e CNPJ completos via lpg_contemplados — cobertura total. + - PNAB PNCV: CPF real via raw_pnab_lista_contemplados_pncv — cobertura PF. + - PNAB Geral: apenas CNPJ via raw_pnab_lista_contemplados_geral — CPF + anonimizado (***XXXXXX**), PF da lista geral fora do PNCV não é rastreável. + config: + meta: + meta_objetivo: 'Meta 5 — Primeiro Acesso com Contemplação' + origem_dados: 'perfil_acesso_fomento + lpg_contemplados + pnab_contemplados' + tags: [gold, agentes, meta5, contemplados, primeiro_acesso] + columns: + - name: programa_fomento + description: Programa de fomento — 'LPG' ou 'PNAB'. + tests: + - not_null + - accepted_values: + values: ['LPG', 'PNAB'] + - name: categoria_primeiro_acesso + description: > + Categoria de acesso derivada do perfil_acesso_fomento: + 'Sim' (Primeira Vez), 'Não' (Veterano) ou 'Não sabe/Não informou'. + tests: + - not_null + - accepted_values: + values: ['Sim', 'Não', 'Não sabe/Não informou'] + - name: contemplado + description: > + Flag de contemplação em edital público — 'sim' se o identificador + aparece em alguma lista de contemplados do respectivo programa, + 'não' caso contrário. + tests: + - not_null + - accepted_values: + values: ['sim', 'não'] + - name: total_proponentes + description: Contagem distinta de proponentes no cruzamento programa × categoria × contemplado. + tests: + - not_null + - name: total_campo_preenchido + description: > + Proponentes onde a categoria foi determinada por resposta declarada + no formulário (status_dado = 'Confirmado'). Exclui inferências. + tests: + - not_null + - name: total_inferido + description: > + Proponentes onde a categoria foi inferida por sequência de programas + ou presença multi-programa (status_dado = 'Inferido'). + tests: + - not_null + - name: percentual + description: > + Percentual da linha sobre o total de proponentes do mesmo + programa_fomento e status de contemplação. Cada combinação + (programa × contemplado) soma 100% independentemente. + tests: + - not_null + + - name: primeiro_acesso_resumo + description: > + Camada Gold — resumo agregado da Meta 5 (Indicador de Primeiro Acesso). + Conta proponentes distintos por programa de fomento e categoria de acesso + anterior a recursos públicos. Calcula o percentual de cada categoria + sobre o total do respectivo programa (cada programa soma 100%). + config: + meta: + meta_objetivo: 'Meta 5 — Indicador de Primeiro Acesso' + origem_dados: 'LPG e PNAB' + tags: [gold, agentes, primeiro_acesso, meta5] + columns: + - name: programa_fomento + description: Programa de fomento — 'LPG' ou 'PNAB'. + tests: + - not_null + - accepted_values: + values: ['LPG', 'PNAB'] + - name: categoria_primeiro_acesso + description: > + Categoria padronizada da resposta sobre acesso anterior a recursos + de fomento — 'Sim', 'Não' ou 'Não sabe/Não informou'. + tests: + - not_null + - accepted_values: + values: ['Sim', 'Não', 'Não sabe/Não informou'] + - name: total_proponentes + description: Contagem distinta de proponentes na categoria dentro do programa. + tests: + - not_null + - name: percentual + description: > + Percentual da categoria sobre o total de proponentes do mesmo programa + (cada programa soma 100% independentemente). + tests: + - not_null diff --git a/dbt/minc/models/agentes_dbt/silver/perfil_agentes_historico.sql b/dbt/minc/models/agentes_dbt/silver/perfil_agentes_historico.sql new file mode 100644 index 00000000..8eb5e300 --- /dev/null +++ b/dbt/minc/models/agentes_dbt/silver/perfil_agentes_historico.sql @@ -0,0 +1,58 @@ +{{ config( + materialized='table' +) }} + +WITH identificadores AS ( + SELECT + identificador_unico, + tipo_proponente, + programa_fomento, + historico_acesso_bruto + FROM {{ ref('identificadores_agentes') }} +), + +higienizados AS ( + SELECT + identificador_unico, + tipo_proponente, + programa_fomento, + TRIM( + REPLACE( + REPLACE( + REPLACE( + historico_acesso_bruto, + '.', '' + ), + ';', '' + ), + '"', '' + ) + ) AS historico_acesso_limpo + FROM identificadores + WHERE historico_acesso_bruto IS NOT NULL + AND LOWER(historico_acesso_bruto) != 'nan' + AND TRIM(historico_acesso_bruto) != '' +) + +SELECT + identificador_unico, + tipo_proponente, + programa_fomento, + historico_acesso_limpo, + CASE + WHEN historico_acesso_limpo = 'sim' THEN 'Sim' + WHEN historico_acesso_limpo IN ('não', 'nao', 'nâo') THEN 'Não' + WHEN historico_acesso_limpo IN ( + 'não sei informar', + 'nao sei informar', + 'não informado', + 'nao informado', + 'nao_declarar', + 'não sei', + 'nao sei', + 'não sabe', + 'nao sabe' + ) THEN 'Não sabe/Não informou' + ELSE 'Não sabe/Não informou' + END AS categoria_primeiro_acesso +FROM higienizados diff --git a/dbt/minc/models/agentes_dbt/silver/schema.yml b/dbt/minc/models/agentes_dbt/silver/schema.yml new file mode 100644 index 00000000..1aad7a68 --- /dev/null +++ b/dbt/minc/models/agentes_dbt/silver/schema.yml @@ -0,0 +1,36 @@ +version: 2 + +models: + - name: perfil_agentes_historico + description: > + Camada Silver — perfil dos agentes com higienização da coluna de histórico + de acesso a recursos públicos. Remove valores nulos, 'nan' e caracteres + inválidos (ponto, ponto-e-vírgula). Classifica as respostas em três + categorias padronizadas: 'Sim', 'Não' e 'Não sabe/Não informou'. + Propaga a coluna programa_fomento para rastreabilidade por programa. + config: + meta: + tags: [silver, agentes, higienizacao] + columns: + - name: identificador_unico + description: Identificador único do proponente (CPF, CNPJ ou CPF do representante). + tests: + - not_null + - name: tipo_proponente + description: > + Tipo do proponente — 'Pessoa Física', 'Pessoa Jurídica', + 'Coletivo' ou 'Organização'. + - name: programa_fomento + description: Programa de fomento de origem — 'LPG' ou 'PNAB'. + tests: + - accepted_values: + values: ['LPG', 'PNAB'] + - name: historico_acesso_limpo + description: Texto limpo da resposta bruta sobre acesso anterior a recursos de fomento. + - name: categoria_primeiro_acesso + description: > + Categoria padronizada da resposta: 'Sim', 'Não' ou 'Não sabe/Não informou'. + tests: + - not_null + - accepted_values: + values: ['Sim', 'Não', 'Não sabe/Não informou'] diff --git a/dbt/minc/models/agentes_dbt/views/identificadores_agentes.sql b/dbt/minc/models/agentes_dbt/views/identificadores_agentes.sql new file mode 100644 index 00000000..c5a90864 --- /dev/null +++ b/dbt/minc/models/agentes_dbt/views/identificadores_agentes.sql @@ -0,0 +1,44 @@ +{{ config(materialized='view') }} + +SELECT + identificador_unico, + historico_acesso_bruto, + programa_fomento, + 'Pessoa Física' AS tipo_proponente +FROM {{ ref('lpg_agentes_pf') }} + +UNION ALL + +SELECT + identificador_unico, + historico_acesso_bruto, + programa_fomento, + 'Pessoa Jurídica' AS tipo_proponente +FROM {{ ref('lpg_agentes_pj') }} + +UNION ALL + +SELECT + identificador_unico, + historico_acesso_bruto, + programa_fomento, + 'Coletivo' AS tipo_proponente +FROM {{ ref('lpg_agentes_coletivos') }} + +UNION ALL + +SELECT + identificador_unico, + historico_acesso_bruto, + programa_fomento, + 'Pessoa Física' AS tipo_proponente +FROM {{ ref('pnab_agentes_pf') }} + +UNION ALL + +SELECT + identificador_unico, + historico_acesso_bruto, + programa_fomento, + 'Organização' AS tipo_proponente +FROM {{ ref('pnab_agentes_pj') }} diff --git a/dbt/minc/models/agentes_dbt/views/schema.yml b/dbt/minc/models/agentes_dbt/views/schema.yml new file mode 100644 index 00000000..5a29a421 --- /dev/null +++ b/dbt/minc/models/agentes_dbt/views/schema.yml @@ -0,0 +1,26 @@ +version: 2 + +models: + - name: identificadores_agentes + description: > + Camada Views — consolidação dos cinco modelos bronze (3 LPG + 2 PNAB) + em um único dataset via UNION ALL. Padroniza identificador_unico e adiciona + as colunas literais programa_fomento e tipo_proponente para rastrear + a origem de cada registro. + config: + meta: + tags: [views, agentes, identificadores] + columns: + - name: identificador_unico + description: Identificador único do proponente (CPF, CNPJ ou CPF do representante). + - name: historico_acesso_bruto + description: Resposta bruta sobre acesso anterior a recursos de fomento. + - name: programa_fomento + description: Programa de fomento de origem — 'LPG' ou 'PNAB'. + tests: + - accepted_values: + values: ['LPG', 'PNAB'] + - name: tipo_proponente + description: > + Tipo do proponente — 'Pessoa Física', 'Pessoa Jurídica', + 'Coletivo' ou 'Organização'. diff --git a/dbt/minc/models/metadata/models_metadata.sql b/dbt/minc/models/metadata/models_metadata.sql index 46d5400b..f1981994 100644 --- a/dbt/minc/models/metadata/models_metadata.sql +++ b/dbt/minc/models/metadata/models_metadata.sql @@ -1,46 +1,62 @@ {{ config( - materialized="incremental", - unique_key=["schema_name", "table_name"], - on_schema_change="sync_all_columns", + materialized='incremental', + unique_key=['schema_name', 'table_name'], + on_schema_change='sync_all_columns' ) }} -with dbt_models as ( - {% set models_data = [] %} +{# + Tabela de Metadados dos Modelos dbt + =================================== + + Esta tabela armazena metadados de todos os modelos executados no dbt. + + Campos principais: + - schema_name: Schema do modelo + - table_name: Nome da tabela/modelo + - dt_transform: Data da última transformação (quando o modelo foi executado) + - run_id: ID único da execução do dbt + + A tabela é atualizada de forma incremental, mantendo apenas o registro + mais recente para cada combinação de schema + table_name. +#} +WITH dbt_models AS ( + {# + Usando a função graph do dbt para iterar sobre todos os modelos do projeto. + Isso garante que capturamos metadados de todos os modelos definidos. + #} + {% set models_data = [] %} + {% for node in graph.nodes.values() %} - {% if node.resource_type == "model" %} + {% if node.resource_type == 'model' %} {% do models_data.append({ - "schema_name": node.schema, - "table_name": node.name, - "database_name": node.database, - "materialization": node.config.materialized, - "description": node.description | default("") | replace("'", "''"), + 'schema_name': node.schema, + 'table_name': node.name, + 'database_name': node.database, + 'materialization': node.config.materialized, + 'description': node.description | default('') | replace("'", "''") }) %} {% endif %} {% endfor %} {% for model in models_data %} - select - '{{ model.schema_name }}' as schema_name, - '{{ model.table_name }}' as table_name, - '{{ model.database_name }}' as database_name, - '{{ model.materialization }}' as materialization, - '{{ model.description[:500] }}' as description, - ( - '{{ run_started_at }}'::timestamp - at time zone 'UTC' - at time zone 'America/Sao_Paulo' - ) as dt_transform, - '{{ invocation_id }}' as run_id + SELECT + '{{ model.schema_name }}' AS schema_name, + '{{ model.table_name }}' AS table_name, + '{{ model.database_name }}' AS database_name, + '{{ model.materialization }}' AS materialization, + '{{ model.description[:500] }}' AS description, + ('{{ run_started_at }}'::TIMESTAMP AT TIME ZONE 'UTC' AT TIME ZONE 'America/Sao_Paulo') AS dt_transform, + '{{ invocation_id }}' AS run_id {% if not loop.last %} - union all + UNION ALL {% endif %} {% endfor %} ) -select +SELECT schema_name, table_name, database_name, @@ -48,4 +64,4 @@ select description, dt_transform, run_id -from dbt_models +FROM dbt_models diff --git a/dbt/minc/models/metadata/schema.yml b/dbt/minc/models/metadata/schema.yml index 1c8498cf..bb85b1e5 100644 --- a/dbt/minc/models/metadata/schema.yml +++ b/dbt/minc/models/metadata/schema.yml @@ -2,18 +2,44 @@ version: 2 models: - name: models_metadata - description: Metadados dos modelos dbt do projeto MinC. + description: > + Tabela central de metadados que armazena informações sobre todos os modelos dbt executados. + Cada linha representa um modelo único, identificado pela combinação de schema e table_name. + A tabela é atualizada de forma incremental, mantendo histórico das execuções. + config: + meta: + tags: + - metadata + - governance columns: - name: schema_name + description: Nome do schema onde o modelo está localizado. tests: - not_null + - name: table_name + description: Nome da tabela/modelo. tests: - not_null + - name: database_name + description: Nome do banco de dados onde o modelo está materializado. + - name: materialization + description: Tipo de materialização do modelo (table, view, incremental, etc). + - name: description + description: Descrição do modelo extraída do schema.yml. + - name: dt_transform + description: > + Data e hora em que o modelo foi transformado/executado pela última vez. + Corresponde ao momento em que a execução do dbt foi iniciada (run_started_at). + Timezone: America/Sao_Paulo (UTC-3). tests: - not_null + - name: run_id + description: > + Identificador único da execução do dbt (invocation_id). + Permite rastrear qual execução gerou a transformação. diff --git a/dbt/minc/models/sources.yml b/dbt/minc/models/sources.yml index 096cf1d5..d0a090ef 100644 --- a/dbt/minc/models/sources.yml +++ b/dbt/minc/models/sources.yml @@ -4,7 +4,11 @@ sources: - name: transferegov_fundo_a_fundo schema: transferegov_fundo_a_fundo tables: - - name: raw_programas - - name: raw_planos_acao - - name: relatorios_gestao - - name: anexos_relatorios + - name: lpg_dados_pessoa_fisica + - name: lpg_dados_pessoa_juridica + - name: lpg_dados_coletivos + - name: pnab_pessoas + - name: pnab_organizacoes + - name: lpg_contemplados + - name: raw_pnab_lista_contemplados_geral + - name: raw_pnab_lista_contemplados_pncv diff --git a/dbt/minc/profiles.yml b/dbt/minc/profiles.yml index 490f95d8..78e03022 100644 --- a/dbt/minc/profiles.yml +++ b/dbt/minc/profiles.yml @@ -1,11 +1,12 @@ -minc: +cultura: target: prod outputs: prod: type: postgres - host: "{{ env_var('DB_DW_HOST', 'postgres') }}" - user: "{{ env_var('DB_DW_USER', 'postgres_dw') }}" - password: "{{ env_var('DB_DW_PASSWORD', 'postgres_dw') }}" - port: "{{ env_var('DB_DW_PORT', '5432') | int }}" - dbname: "{{ env_var('DB_DW_DBNAME', 'data_warehouse') }}" - schema: "{{ env_var('DB_DW_SCHEMA', 'minc') }}" + host: "{{ env_var('DB_DW_HOST_CULTURA', 'dummy_host') }}" + port: "{{ env_var('DB_DW_PORT_CULTURA', '5432') | int }}" + user: "{{ env_var('DB_DW_USER_CULTURA', 'dummy_user') }}" + password: "{{ env_var('DB_DW_PASS_CULTURA', 'dummy_pass') }}" + dbname: "{{ env_var('DB_DW_DATABASE_CULTURA', 'dummy_db') }}" + schema: "{{ env_var('DB_DW_SCHEMA_CULTURA', 'cultura') }}" + threads: 4 \ No newline at end of file diff --git a/plugins/extracao_planilhas.py b/plugins/extracao_planilhas.py index 3ccd9d6b..64791c23 100644 --- a/plugins/extracao_planilhas.py +++ b/plugins/extracao_planilhas.py @@ -54,8 +54,15 @@ def _handler(signum, frame): "operacionalizacao": "raw_pnab_operacionalizacao", "lista de contemplados geral": "raw_pnab_lista_contemplados_geral", "lista contemplados pncv": "raw_pnab_lista_contemplados_pncv", + "pessoas": "pnab_pessoas", + "organizacoes": "pnab_organizacoes", } +# Abas de proponentes da PNAB +_ABAS_PROPONENTES_PNAB = {"pessoas", "organizacoes"} +# Aba "Organizações" tem células mescladas no topo — exige leitura dinâmica +_ABA_ORGANIZACOES_PNAB = {"organizacoes"} + def _norm_texto(s: str) -> str: """Normaliza texto para comparação tolerante de nomes de abas: @@ -102,19 +109,18 @@ def normalizar_nome(nome: Any) -> str: ".xlsm": "calamine", ".xls": "calamine", ".xlsb": "calamine", - # .ods intencionalmente REMOVIDO — a engine odf (fallback) carrega - # o DOM XML inteiro em memória e causa OOM Kills em planilhas grandes. - # Calamine pode ler .ods, mas se falhar o fallback é destrutivo. + ".ods": "calamine", } # Extensões proibidas: causam OOM no Worker do Airflow -_EXTENSOES_PROIBIDAS = {".ods"} +_EXTENSOES_PROIBIDAS: set[str] = set() def detectar_engine(file_path: str | Path) -> Optional[str]: """Retorna a engine pandas adequada para a extensão do arquivo. - Retorna ``None`` para extensões proibidas (.ods), emitindo warning. + Extensões proibidas (``_EXTENSOES_PROIBIDAS``) retornam ``None`` + com warning, mas atualmente nenhuma extensão é proibida. """ ext = Path(file_path).suffix.lower() if ext in _EXTENSOES_PROIBIDAS: @@ -546,6 +552,10 @@ def extrair_pnab( separadas por linhas de categoria (nome do edital). Cada DataFrame gerado recebe ``id_anexo`` na primeira posição e ``tipo_edital``. + Abas de proponentes (``Pessoas`` / ``Organizações``) são lidas com + ``header=1`` (linha 0 = título, linha 1 = cabeçalho, linha 2+ = dados), + seguindo o mesmo padrão dos "Dados Básicos" da LPG. + Parameters ---------- file_buffer : io.BytesIO @@ -592,6 +602,45 @@ def extrair_pnab( tabela_destino, ) + # ── Abas de proponentes (Pessoas / Organizações) ── + # Seguem o padrão "Dados Básicos" da LPG: linha 0 = título, + # linha 1 = header, linha 2+ = dados + linha de instruções. + aba_norm = _norm_texto(aba) + if aba_norm in _ABAS_PROPONENTES_PNAB: + try: + df = _com_timeout( + pd.read_excel, xls, sheet_name=aba, header=1, dtype=str + ) + except TimeoutLeituraError: + raise + except Exception as exc: + log.warning( + "[extracao_planilhas.py] Erro ao ler aba de " + "proponentes '%s' de '%s': %s", + aba, + file_name, + exc, + ) + continue + + if df.empty: + continue + + # Drop da linha de instruções (primeira linha após header) + df = df.iloc[1:].copy() + df = df.reset_index(drop=True) + df.insert(0, "id_anexo", id_anexo) + df = df.astype(str) + + resultados.append({ + "nome_tabela_destino": tabela_destino, + "dataframe": df, + }) + del df + gc.collect() + continue + + # ── Abas padrão (editais / contemplados / operacionalização) ── try: df = _com_timeout(pd.read_excel, xls, sheet_name=aba, dtype=str) except TimeoutLeituraError: diff --git a/pyproject.toml b/pyproject.toml index e7c4c427..0aa723f3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,8 @@ requests = "*" zeep = "*" imap-tools = "*" astronomer-cosmos = "1.14.2" +odfpy = "^1.4.1" +python-calamine = "^0.6.2" [tool.poetry.group.dev.dependencies] black = "*"