Skip to main content

Usage

1. Adding relationship hints for fact tables

To generate fact tables, you will first need to add additional relationship hints to your pipeline. This requires ensuring that each table has a primary key defined, as relationships are based on these keys:

import dlt 


@dlt.resource(name="customers", primary_key="id")
def customers():
...

To add relationship hints, use the relationship adapter:

import dlt
from dlt_plus.dbt_generator.utils import table_reference_adapter


# Example countries table
@dlt.resource(name="countries", primary_key="id", write_disposition="merge")
def countries():
yield from [
{"id": 1, "name": "USA"},
{"id": 2, "name": "Germany"},
]


# Example companies table
@dlt.resource(name="companies", primary_key="id", write_disposition="merge")
def companies():
yield from [
{"id": 1, "name": "GiggleTech", "country_id": 2},
{"id": 2, "name": "HappyHacks", "country_id": 1},
]


# Example customers table which references company
@dlt.resource(name="customers", primary_key="id", write_disposition="merge")
def customers():
yield from [
{"id": 1, "name": "andrea", "company_id": 1},
{"id": 2, "name": "violetta", "company_id": 2},
{"id": 3, "name": "marcin", "company_id": 1},
]


# Example orders table which references customer
@dlt.resource(name="orders", primary_key="id", write_disposition="merge")
def orders():
yield from [
{"id": 1, "date": "1-2-2020", "customer_id": 1},
{"id": 2, "date": "14-2-2020", "customer_id": 2},
{"id": 3, "date": "18-2-2020", "customer_id": 1},
{"id": 4, "date": "1-3-2020", "customer_id": 3},
{"id": 5, "date": "2-3-2020", "customer_id": 3},
]

# run your pipeline
p = dlt.pipeline(pipeline_name="example_shop", destination="duckdb")
p.run([customers(), companies(), orders(), countries()])

# Define relationships in your schema
table_reference_adapter(
p,
"companies",
references=[
{
"referenced_table": "countries",
"columns": ["country_id"],
"referenced_columns": ["id"],
}
],
)

table_reference_adapter(
p,
"customers",
references=[
{
"referenced_table": "companies",
"columns": ["company_id"],
"referenced_columns": ["id"],
}
],
)

table_reference_adapter(
p,
"orders",
references=[
{
"referenced_table": "customers",
"columns": ["customer_id"],
"referenced_columns": ["id"],
}
],
)

note

Note that only the relationships that the pipeline is not aware of need to be explicitly passed to the adapter, meaning you don't need to define the parent-child relationships created by dlt during the normalization stage, as it will already know about them.

2. Generating your baseline project

Ensure that your dlt pipeline has been run at least once locally or restored from the destination. Then, navigate to the directory where your pipeline is located and, using its name, execute the following command to create a baseline dbt project with dimensional tables for all existing pipeline tables:

dlt-dbt-generator <pipeline-name>

This command generates a new folder named dbt_<pipeline-name>, which contains the project with the following structure:

dbt_<pipeline-name>/
├── analysis/
├── macros/
├── models/
│ ├── marts/
│ │ ├── dim_<pipeline-name>__<table1>.sql
│ │ ├── dim_<pipeline-name>__<table2>.sql
│ │ └── dim_<pipeline-name>__<tableN>.sql
│ ├── staging/
│ │ ├── sources.yml
│ │ ├── stg_<pipeline-name>__<table1>.sql
│ │ ├── stg_<pipeline-name>__<table2>.sql
│ │ └── stg_<pipeline-name>__<tableN>.sql
│ ├── <pipeline-name>_dlt_active_load_ids.sql # Used for incremental processing of data
│ └── <pipeline-name>_dlt_processed_load.sql # Used for incremental processing of data
├── tests/
├── dbt_project.yml
└── requirements.txt

Additionally, in the directory where you ran the generator, you will find a new Python file named run_<pipeline-name>_dbt.py, which you can execute to run the project.

3. Generating fact tables

After creating the base project with dimensional tables, you can create fact tables that will use the previously added relationship hints by running:

dlt-dbt-generator <pipeline-name> --fact <fact_table_name>

The <fact_table_name> you provide should be the name of the base table from which the relationships are to be found. This fact table will automatically join all related tables IDs discovered through dlt parent-child relationships, as well as any relationship IDs manually added through the adapter. You can then select and add additional fields in the generated model.

For the example above, we can run this for the orders table. This will generate the fact_<pipeline-name>__orders.sql model in the marts folder of the dbt project:

dbt_<pipeline-name>/
├── analysis/
├── macros/
├── models/
│ ├── marts/
│ │ ├── dim_<pipeline-name>__<table1>.sql
│ │ ├── dim_<pipeline-name>__<table2>.sql
│ │ └── dim_<pipeline-name>__<tableN>.sql
│ │ └── fact_<pipeline-name>__orders.sql # <-- This is the fact table model
│ ├── staging/
│ │ ├── sources.yml
│ │ ├── stg_<pipeline-name>__<table1>.sql
│ │ ├── stg_<pipeline-name>__<table2>.sql
│ │ └── stg_<pipeline-name>__<tableN>.sql
│ ├── <pipeline-name>_dlt_active_load_ids.sql # Used for incremental processing of data
│ └── <pipeline-name>_dlt_processed_load.sql # Used for incremental processing of data
├── tests/
├── dbt_project.yml
└── requirements.txt

4. Running your dbt project

You can run your dbt project with the previously mentioned script that was generated by dlt-dbt-generator <pipeline-name>:

python run_<pipeline_name>_dbt.py

This script executes your dbt transformations, loads the results into a new dataset named <original-dataset>_transformed and runs the dbt tests. If needed, you can adjust the dataset name directly in the script.

If you want to see dbt run command output, increase the logging level. For example

RUNTIME__LOG_LEVEL=INFO python run_<pipeline_name>_dbt.py

or by setting config.toml:

[runtime]
log_level="INFO"

5. Running dbt package directly

If you'd like to run your dbt package without a pipeline isntance please refer to our dbt runner docs.

6. Understanding incremental processing

dlt generates unique IDs for load packages, which are stored in the _dlt_load_id column of all tables in the dataset. This column indicates the specific load package to which each row belongs.

The generated dbt project uses these load IDs to process data incrementally. To manage this process, the project includes two key tables that track the status of load packages:

  • <pipeline_name>_dlt_active_load_ids: At the start of each dbt run, this table is populated with all load IDs that were successful and have not yet been processed in previous dbt runs, referred to as active load IDs. The staging tables are then populated only with rows associated with these active load IDs.

  • <pipeline_name>_dlt_processed_load_ids: At the end of each dbt run, the active load IDs are recorded in this table, along with a timestamp. This allows you to track when each load ID was processed.

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.