Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# <---------- Airflow ---------->

AIRFLOW_IMAGE_NAME=apache/airflow:latest
AIRFLOW__CORE__FERNET_KEY=
AIRFLOW_HOME=/opt/airflow/
_AIRFLOW_WWW_USER_USERNAME=airflow
_AIRFLOW_WWW_USER_PASSWORD=airflow
AIRFLOW_UID=50000


# <---------- Postgres Airflow ---------->

POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres
POSTGRES_DB=postgres


# <---------- Postgres Data Warehouse ---------->

POSTGRES_USER_DW=postgres
POSTGRES_PASSWORD_DW=postgres
POSTGRES_DB_DW=postgres


# <---------- dbt Profile Variables (Cultura) ---------->

DB_DW_HOST_CULTURA=localhost
DB_DW_PORT_CULTURA=5433
DB_DW_USER_CULTURA=postgres
DB_DW_PASS_CULTURA=postgres
DB_DW_DATABASE_CULTURA=postgres
DB_DW_SCHEMA_CULTURA=cultura


# <---------- MinIO ---------->

MINIO_ENDPOINT=minio:9000
MINIO_ACCESS_KEY=minioadmin
MINIO_SECRET_KEY=minioadmin
MINIO_BUCKET=data-lake-ipea


PYTHONPATH=/opt/airflow/dags:/opt/airflow/plugins:/opt/airflow/helpers
1 change: 1 addition & 0 deletions airflow_lappis/dags/dbt/cultura/.user.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
id: 7c7b491c-e2c6-4636-9e72-b0ae2d207a9b
33 changes: 33 additions & 0 deletions airflow_lappis/dags/dbt/cultura/cosmos_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import os
from datetime import datetime
from cosmos.profiles import PostgresUserPasswordProfileMapping
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.constants import DBT_LOG_PATH_ENVVAR

dbt_log_path = "/tmp/dbt_logs"
os.makedirs(dbt_log_path, exist_ok=True)
os.environ[DBT_LOG_PATH_ENVVAR] = dbt_log_path

profile_config = ProfileConfig(
profile_name="cultura",
target_name="prod",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="postgres_default",
profile_args={"schema": "cultura"},
)
)

my_cosmos_dag = DbtDag(
project_config=ProjectConfig(
f"{os.environ['AIRFLOW_REPO_BASE']}/dags/dbt/cultura"
),
profile_config=profile_config,
execution_config=ExecutionConfig(
dbt_executable_path=f"{os.environ['AIRFLOW_REPO_BASE']}/.local/bin/dbt",
),
schedule_interval="0 1 * * *",
start_date=datetime(2025, 1, 1),
catchup=False,
dag_id="cultura_cosmos_dag",
default_args={"retries": 2},
)
30 changes: 30 additions & 0 deletions airflow_lappis/dags/dbt/cultura/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
name: 'cultura'

version: 1.0.0
config-version: 2

profile: cultura

model-paths: ["models"]
analysis-paths: ["analyses"]
seed-paths: ["seeds"]
test-paths: ["tests"]
macro-paths: ["macros"]

clean-targets:
- "target"
- "dbt_packages"
- "logs"

models:
cultura:
metadata:
+materialized: incremental
+schema: metadata
agentes_dbt:
+materialized: table
+schema: agentes
bronze:
+materialized: incremental
views:
+materialized: view
13 changes: 13 additions & 0 deletions airflow_lappis/dags/dbt/cultura/descriptions.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
models:
- name: agentes_pf
description: Dados brutos de proponentes pessoa física da LPG
- name: agentes_pj
description: Dados brutos de proponentes pessoa jurídica da LPG
- name: agentes_coletivos
description: Dados brutos de proponentes coletivos da LPG
- name: identificadores_agentes
description: Consolidação dos identificadores de todos os tipos de proponentes
- name: perfil_agentes_historico
description: Perfil dos agentes com higienização do histórico de acesso a recursos de fomento
- name: primeiro_acesso_resumo
description: Resumo agregado do indicador de primeiro acesso a recursos de fomento (Meta 5)
33 changes: 33 additions & 0 deletions airflow_lappis/dags/dbt/cultura/macros/create_udfs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{% macro create_udfs() %}
{#
Cria/atualiza as UDFs do projeto via run_query, isolada do on-run-start.

Antes, esta macro era chamada em todo on-run-start do dbt — como o Cosmos
dispara um processo dbt por task/modelo em paralelo, várias conexões
executavam CREATE OR REPLACE FUNCTION ao mesmo tempo, causando
"tuple concurrently updated" no catálogo pg_proc do Postgres.

Agora ela só deve ser chamada explicitamente, uma única vez, via:
dbt run-operation create_udfs
em uma task isolada do Airflow, executada antes das tasks de modelo
geradas pelo Cosmos — eliminando a concorrência na origem.
#}

{% set create_schema_sql %}
CREATE SCHEMA IF NOT EXISTS {{ target.schema }};
{% endset %}
{% do run_query(create_schema_sql) %}

{% set parse_date_sql %}
{{ create_f_parse_dates() }}
{% endset %}
{% do run_query(parse_date_sql) %}

{% set format_nc_sql %}
{{ create_f_format_nc() }}
{% endset %}
{% do run_query(format_nc_sql) %}

{% do log("UDFs criadas/atualizadas no schema " ~ target.schema, info=True) %}

{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{% macro test_row_count_match(model, source_table, target_table) %}
with
source_count as (select count(*) as row_count from {{ source_table }}),
target_count as (select count(*) as row_count from {{ target_table }}),
comparison as (
select
source_count.row_count as source_row_count,
target_count.row_count as target_row_count
from source_count, target_count
)
select *
from comparison
where source_row_count != target_row_count
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{% macro test_verificacao_tipagem(model, nome_tabela, nome_coluna, tipo_esperado) %}
with
column_info as (
select
table_schema,
table_name, -- Nome real da coluna no information_schema
column_name, -- Nome real da coluna no information_schema
data_type
from information_schema.columns
where
table_schema || '.' || table_name = '{{ nome_tabela }}'
and column_name = '{{ nome_coluna }}'
),
comparison as (
select
'{{ nome_tabela }}' as nome_tabela,
'{{ nome_coluna }}' as nome_coluna,
'{{ tipo_esperado }}' as tipo_esperado,
data_type as actual_type
from column_info
)
select *
from comparison
where actual_type != tipo_esperado
{% endmacro %}
4 changes: 4 additions & 0 deletions airflow_lappis/dags/dbt/cultura/macros/get_custom_schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- built-in schema generator
{% macro generate_schema_name(custom_schema_name, node) -%}
{{ generate_schema_name_for_env(custom_schema_name, node) }}
{%- endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{% macro get_model_metadata() %}
{#
Esta macro retorna os metadados do modelo atual.
Pode ser usada em post-hooks para registrar metadados automaticamente.
#}
SELECT
'{{ this.schema }}' AS schema_name,
'{{ this.name }}' AS table_name,
'{{ this.database }}' AS database_name,
('{{ run_started_at }}'::TIMESTAMP AT TIME ZONE 'UTC' AT TIME ZONE 'America/Sao_Paulo') AS dt_transform,
'{{ invocation_id }}' AS run_id
{% endmacro %}


{% macro register_model_metadata() %}
{#
Esta macro registra os metadados do modelo em uma tabela central.
Deve ser usada como post-hook nos modelos que deseja rastrear.

Uso no dbt_project.yml:
models:
ipea:
+post-hook:
- "{{ register_model_metadata() }}"
#}

INSERT INTO {{ target.database }}.metadata.models_metadata (
schema_name,
table_name,
database_name,
dt_transform,
run_id
)
VALUES (
'{{ this.schema }}',
'{{ this.name }}',
'{{ this.database }}',
('{{ run_started_at }}'::TIMESTAMP AT TIME ZONE 'UTC' AT TIME ZONE 'America/Sao_Paulo'),
'{{ invocation_id }}'
)
ON CONFLICT (schema_name, table_name)
DO UPDATE SET
dt_transform = EXCLUDED.dt_transform,
run_id = EXCLUDED.run_id;

{% endmacro %}
21 changes: 21 additions & 0 deletions airflow_lappis/dags/dbt/cultura/macros/parse_financial_value.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{% macro parse_financial_value(column_name) %}

case
when {{ column_name }} is null or trim({{ column_name }}) = ''
then 0.00::numeric(15, 2)
when {{ column_name }} like '%NaN%'
then 0.00::numeric(15, 2)
when {{ column_name }} like '(%'
then
regexp_replace(
replace(coalesce({{ column_name }}, '0'), '.', ''),
'(\()?(\d+),(\d+)(\))?',
'-\2.\3'
)::numeric(15, 2)
else
replace(
replace(coalesce({{ column_name }}, '0'), '.', ''), ',', '.'
)::numeric(15, 2)
end

{% endmacro %}
24 changes: 24 additions & 0 deletions airflow_lappis/dags/dbt/cultura/macros/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@

version: 2

macros:
- name: create_udfs
description: >
Função que cria as UDFs necessárias para o funcionamento do projeto.
Essa função deve ser chamada no início de cada run para garantir que todas as UDFs estejam disponíveis.

- name: generate_schema_name
description: >
Função que gera o nome do schema a ser utilizado no projeto.
A função dentro desta macro é built-in do dbt.

## UDFS
- name: create_f_parse_dates
description: >
Função que cria a UDF f_parse_dates, que é utilizada para converter texto no formato MÊS(texto)/ANO(numero) em datas.
arguments:
- name: in_text
type: text
description: >
Texto a ser convertido em data.
O texto deve estar no formato MÊS(texto)/ANO(numero). Ex.: FEV/2024 -> 2024-02-01
19 changes: 19 additions & 0 deletions airflow_lappis/dags/dbt/cultura/macros/udfs/f_format_nc.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{% macro create_f_format_nc() %}
create or replace function {{ target.schema }}.format_nc(in_text text)
returns text
as $$

with

pre_process as (
select left(in_text, 7) as prefix,
right(in_text, 4)::numeric as posfix
)

select concat(prefix, to_char(posfix, 'FM00000')) as result
from pre_process

$$
language sql
;
{% endmacro %}
44 changes: 44 additions & 0 deletions airflow_lappis/dags/dbt/cultura/macros/udfs/f_parse_dates.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
-- Essa fun
{% macro create_f_parse_dates() %}

create or replace function {{ target.schema }}.parse_date(in_text text)
returns date
as
$$

with

split_column as (
select
split_part(in_text, '/', 1) as mes,
split_part(in_text, '/', 2) as ano
),

fixed_month as (
select
ano,
case
when mes = 'JAN' then '01'
when mes = 'FEV' then '02'
when mes = 'MAR' then '03'
when mes = 'ABR' then '04'
when mes = 'MAI' then '05'
when mes = 'JUN' then '06'
when mes = 'JUL' then '07'
when mes = 'AGO' then '08'
when mes = 'SET' then '09'
when mes = 'OUT' then '10'
when mes = 'NOV' then '11'
when mes = 'DEZ' then '12'
else mes end as mes_num
from split_column
)

select
(to_date(ano::numeric - 1 || '-' || '12', 'YYYY-MM') + (mes_num || ' months')::interval)::date as result
from fixed_month
$$
language sql
;

{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{{ config(
materialized='incremental',
unique_key='identificador_unico'
) }}

SELECT
LOWER(TRIM("nº do cpf do representante do grupo/coletivo")) AS identificador_unico,
LOWER(TRIM("já acessou recursos públicos do fomento à cultura anteriorme")) AS historico_acesso_bruto,
'LPG' AS programa_fomento
FROM {{ source('transferegov_fundo_a_fundo', 'lpg_dados_coletivos') }}

{% if is_incremental() %}
WHERE LOWER(TRIM("nº do cpf do representante do grupo/coletivo")) NOT IN (
SELECT identificador_unico FROM {{ this }}
)
{% endif %}
Loading
Loading