Run independent dbt selects for a @dbt_assets declaration with many models #23665
-
|
I'm wondering if there is a way to break up dbt run statement for So in Dagster this creates a run where dbt command would look like |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 1 reply
-
|
Up! Any ideas? 🤔 |
Beta Was this translation helpful? Give feedback.
-
|
Hi @ssillaots-boku - When using dbt assets, the run launched by Dagster is leveraging a dbt CLI invocation with the To break your run statement in three, you'll need to create your own asset selections and jobs to launched individual Dagster runs. You can use schedules and sensors to orchestrate everything: @dbt_assets(manifest=manifest)
def my_dbt_assets(...):
yield from dbt.cli(...)
# Create the asset selections and jobs
some_model_job = define_asset_job(
name="some_model_job",
selection=build_dbt_asset_selection(
[my_dbt_assets],
dbt_select="some_model"
),
)
another_model_job = define_asset_job(
name="another_model_job",
selection=build_dbt_asset_selection(
[my_dbt_assets],
dbt_select="another_model"
),
)
last_model_job = define_asset_job(
name="last_model_job",
selection=build_dbt_asset_selection(
[my_dbt_assets],
dbt_select="last_model"
),
)
# Create a daily schedule for `some_model`, the most upstream model
some_model_schedule = ScheduleDefinition(
name="some_model_schedule",
cron_schedule="@daily",
job=some_model_job,
)
# Create a sensor for `another_model`
# If `some_model_job` succeeds, a run for `another_model_job` is launched
@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
request_job=another_model_job,
monitor_jobs=[some_model_job]
)
def another_model_sensor(context):
yield RunRequest(...)
# Create a sensor for `last_model`
# If `another_model_job` succeeds, a run for `last_model_job` is launched
@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
request_job=last_model_job,
monitor_jobs=[another_model_job]
)
def last_model_sensor(context):
yield RunRequest(...)Note that in this pattern, you will launch N Dagster jobs, where N is the total number of dbt models that you want to select. If one specific model fails more often and you want to isolate it, but reduce the number of Dagster runs that are being launched, you can update your asset selections to group your dbt models: some_model_and_upstream_selection = build_dbt_asset_selection(
[my_dbt_assets],
dbt_select="some_model"
).upstream()
isolated_model_selection = build_dbt_asset_selection(
[my_dbt_assets],
dbt_select="isolated_model"
)
another_model_and_downstream_selection = build_dbt_asset_selection(
[my_dbt_assets],
dbt_select="another_model"
).downstream() |
Beta Was this translation helpful? Give feedback.
Hi @ssillaots-boku - When using dbt assets, the run launched by Dagster is leveraging a dbt CLI invocation with the
--selectflag, which means that a single Dagster run selecting multiple dbt assets will always be invoked with a command likedbt run --select some_model another_model last_model.To break your run statement in three, you'll need to create your own asset selections and jobs to launched individual Dagster runs. You can use schedules and sensors to orchestrate everything: