Skip to content

Commit ae07932

Browse files
committed
feat: Add DagRun note to OL events
1 parent 0131897 commit ae07932

File tree

2 files changed

+15
-1
lines changed
  • providers/openlineage

2 files changed

+15
-1
lines changed

providers/openlineage/src/airflow/providers/openlineage/utils/utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -740,6 +740,7 @@ class DagRunInfo(InfoJsonEncodable):
740740
"execution_date", # Airflow 2
741741
"external_trigger", # Removed in Airflow 3, use run_type instead
742742
"logical_date", # Airflow 3
743+
"note", # Airflow 3.2+
743744
"run_after", # Airflow 3
744745
"run_id",
745746
"run_type",
@@ -837,6 +838,7 @@ class TaskInfo(InfoJsonEncodable):
837838
# Operator-specific useful attributes
838839
"trigger_dag_id", # TriggerDagRunOperator
839840
"trigger_run_id", # TriggerDagRunOperator
841+
"note", # TriggerDagRunOperator
840842
"external_dag_id", # ExternalTaskSensor and ExternalTaskMarker (if run, as it's EmptyOperator)
841843
"external_task_id", # ExternalTaskSensor and ExternalTaskMarker (if run, as it's EmptyOperator)
842844
"external_task_ids", # ExternalTaskSensor

providers/openlineage/tests/unit/openlineage/utils/test_utils.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@
8080
from tests_common.test_utils.compat import BashOperator, OperatorSerialization, PythonOperator
8181
from tests_common.test_utils.mock_operators import MockOperator
8282
from tests_common.test_utils.taskinstance import create_task_instance
83-
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_3_PLUS, AIRFLOW_V_3_0_PLUS
83+
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_3_PLUS, AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS
8484

8585
BASH_OPERATOR_PATH = "airflow.providers.standard.operators.bash"
8686
PYTHON_OPERATOR_PATH = "airflow.providers.standard.operators.python"
@@ -185,6 +185,7 @@ def test_get_airflow_dag_run_facet():
185185
dagrun_mock.end_date = datetime.datetime(2024, 6, 1, 1, 2, 14, 34172, tzinfo=datetime.timezone.utc)
186186
dagrun_mock.triggering_user_name = "user1"
187187
dagrun_mock.triggered_by = "something"
188+
dagrun_mock.note = "note"
188189
dagrun_mock.dag_versions = [
189190
MagicMock(
190191
bundle_name="bundle_name",
@@ -226,6 +227,7 @@ def test_get_airflow_dag_run_facet():
226227
"duration": 10.034172,
227228
"execution_date": "2024-06-01T01:02:04+00:00",
228229
"logical_date": "2024-06-01T01:02:04+00:00",
230+
"note": "note",
229231
"run_after": "2024-06-01T01:02:04+00:00",
230232
"dag_bundle_name": "bundle_name",
231233
"dag_bundle_version": "bundle_version",
@@ -2665,6 +2667,10 @@ def test_dagrun_info_af3(mocked_dag_versions):
26652667
dv2.bundle_name = "bundle_name"
26662668
dv2.bundle_version = "bundle_version"
26672669

2670+
optional_args = {}
2671+
if AIRFLOW_V_3_2_PLUS:
2672+
optional_args["note"] = "note"
2673+
26682674
mocked_dag_versions.return_value = [dv1, dv2]
26692675
dagrun = DagRun(
26702676
dag_id="dag_id",
@@ -2681,6 +2687,7 @@ def test_dagrun_info_af3(mocked_dag_versions):
26812687
triggered_by=DagRunTriggeredByType.UI,
26822688
backfill_id=999,
26832689
bundle_version="bundle_version",
2690+
**optional_args
26842691
)
26852692
assert dagrun.dag_versions == [dv1, dv2]
26862693
dagrun.end_date = date + datetime.timedelta(seconds=74, microseconds=546)
@@ -2706,6 +2713,7 @@ def test_dagrun_info_af3(mocked_dag_versions):
27062713
"dag_version_number": "version_number",
27072714
"triggered_by": DagRunTriggeredByType.UI,
27082715
"triggering_user_name": "my_user",
2716+
**optional_args
27092717
}
27102718

27112719

@@ -2852,6 +2860,7 @@ def __init__(self, *args, **kwargs):
28522860
self.tol = "tol" # SQLValueCheckOperator
28532861
self.trigger_dag_id = "trigger_dag_id" # TriggerDagRunOperator
28542862
self.trigger_run_id = "trigger_run_id" # TriggerDagRunOperator
2863+
self.note = "note" # TriggerDagRunOperator
28552864
self.hitl_summary = "hitl_summary" # HITLOperator
28562865
super().__init__(*args, **kwargs)
28572866

@@ -2899,6 +2908,7 @@ def __init__(self, *args, **kwargs):
28992908
"max_active_tis_per_dagrun": None,
29002909
"max_retry_delay": None,
29012910
"multiple_outputs": False,
2911+
"note": "note",
29022912
"operator_class": "CustomOperator",
29032913
"operator_class_path": get_fully_qualified_class_name(task_10),
29042914
"operator_provider_version": None, # Custom operator doesn't have provider version
@@ -2979,6 +2989,7 @@ def __init__(self, *args, **kwargs):
29792989
self.tol = "tol" # SQLValueCheckOperator
29802990
self.trigger_dag_id = "trigger_dag_id" # TriggerDagRunOperator
29812991
self.trigger_run_id = "trigger_run_id" # TriggerDagRunOperator
2992+
self.note = "note" # TriggerDagRunOperator
29822993
self.hitl_summary = "hitl_summary" # HITLOperator
29832994
super().__init__(*args, **kwargs)
29842995

@@ -3029,6 +3040,7 @@ def __init__(self, *args, **kwargs):
30293040
"max_active_tis_per_dagrun": None,
30303041
"max_retry_delay": None,
30313042
"multiple_outputs": False,
3043+
"note": "note",
30323044
"operator_class": "CustomOperator",
30333045
"operator_class_path": get_fully_qualified_class_name(task_10),
30343046
"operator_provider_version": None, # Custom operator doesn't have provider version

0 commit comments

Comments
 (0)