|
| 1 | +""" |
| 2 | +
|
| 3 | +Ce dag permet d'exécuter des commandes dbt. |
| 4 | +
|
| 5 | +""" |
| 6 | + |
| 7 | +from datetime import datetime, timedelta |
| 8 | + |
| 9 | +from airflow.models.param import Param |
| 10 | +from airflow.operators.bash import BashOperator |
| 11 | +from include.utils import get_dbt_command_from_directory |
| 12 | + |
| 13 | +from airflow import DAG |
| 14 | + |
| 15 | +default_args = { |
| 16 | + "owner": "airflow", |
| 17 | + "depends_on_past": False, |
| 18 | + "email_on_failure": True, |
| 19 | + "email_on_retry": False, |
| 20 | + "retries": 1, |
| 21 | + "retry_delay": timedelta(minutes=5), |
| 22 | +} |
| 23 | + |
| 24 | +with DAG( |
| 25 | + "dbt_selector_dag", |
| 26 | + default_args=default_args, |
| 27 | + description="Exécute des commandes DBT avec un sélecteur", |
| 28 | + schedule_interval=None, |
| 29 | + start_date=datetime(2024, 1, 1), |
| 30 | + catchup=False, |
| 31 | + params={ |
| 32 | + "select": Param(default="default", type="string", description="Sélecteur DBT à utiliser pour l'exécution"), |
| 33 | + "command": Param( |
| 34 | + default="build", |
| 35 | + type="string", |
| 36 | + description="Commande DBT à exécuter (build, run, test, etc.)", |
| 37 | + enum=["build", "run", "test", "compile", "debug", "deps", "clean"], |
| 38 | + ), |
| 39 | + }, |
| 40 | +) as dag: |
| 41 | + # Validation du sélecteur |
| 42 | + validate_selector = BashOperator( |
| 43 | + task_id="validate_selector", |
| 44 | + bash_command=get_dbt_command_from_directory("dbt ls --select {{ params.select }} || exit 1"), |
| 45 | + ) |
| 46 | + |
| 47 | + # Exécution de dbt avec le sélecteur |
| 48 | + execute_dbt = BashOperator( |
| 49 | + task_id="execute_dbt", |
| 50 | + bash_command=get_dbt_command_from_directory("dbt {{ params.command }} --select {{ params.select }}"), |
| 51 | + ) |
| 52 | + |
| 53 | + validate_selector >> execute_dbt |
0 commit comments