Skip to content

Commit 8844bd9

Browse files
committed
feat: Add DagRun note to OL events
1 parent 0131897 commit 8844bd9

File tree

2 files changed

+21
-1
lines changed
  • providers/openlineage

2 files changed

+21
-1
lines changed

providers/openlineage/src/airflow/providers/openlineage/utils/utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -740,6 +740,7 @@ class DagRunInfo(InfoJsonEncodable):
740740
"execution_date", # Airflow 2
741741
"external_trigger", # Removed in Airflow 3, use run_type instead
742742
"logical_date", # Airflow 3
743+
"note", # Airflow 3.2+
743744
"run_after", # Airflow 3
744745
"run_id",
745746
"run_type",
@@ -837,6 +838,7 @@ class TaskInfo(InfoJsonEncodable):
837838
# Operator-specific useful attributes
838839
"trigger_dag_id", # TriggerDagRunOperator
839840
"trigger_run_id", # TriggerDagRunOperator
841+
"note", # TriggerDagRunOperator
840842
"external_dag_id", # ExternalTaskSensor and ExternalTaskMarker (if run, as it's EmptyOperator)
841843
"external_task_id", # ExternalTaskSensor and ExternalTaskMarker (if run, as it's EmptyOperator)
842844
"external_task_ids", # ExternalTaskSensor

providers/openlineage/tests/unit/openlineage/utils/test_utils.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,11 @@
8080
from tests_common.test_utils.compat import BashOperator, OperatorSerialization, PythonOperator
8181
from tests_common.test_utils.mock_operators import MockOperator
8282
from tests_common.test_utils.taskinstance import create_task_instance
83-
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_3_PLUS, AIRFLOW_V_3_0_PLUS
83+
from tests_common.test_utils.version_compat import (
84+
AIRFLOW_V_3_0_3_PLUS,
85+
AIRFLOW_V_3_0_PLUS,
86+
AIRFLOW_V_3_2_PLUS,
87+
)
8488

8589
BASH_OPERATOR_PATH = "airflow.providers.standard.operators.bash"
8690
PYTHON_OPERATOR_PATH = "airflow.providers.standard.operators.python"
@@ -185,6 +189,7 @@ def test_get_airflow_dag_run_facet():
185189
dagrun_mock.end_date = datetime.datetime(2024, 6, 1, 1, 2, 14, 34172, tzinfo=datetime.timezone.utc)
186190
dagrun_mock.triggering_user_name = "user1"
187191
dagrun_mock.triggered_by = "something"
192+
dagrun_mock.note = "note"
188193
dagrun_mock.dag_versions = [
189194
MagicMock(
190195
bundle_name="bundle_name",
@@ -226,6 +231,7 @@ def test_get_airflow_dag_run_facet():
226231
"duration": 10.034172,
227232
"execution_date": "2024-06-01T01:02:04+00:00",
228233
"logical_date": "2024-06-01T01:02:04+00:00",
234+
"note": "note",
229235
"run_after": "2024-06-01T01:02:04+00:00",
230236
"dag_bundle_name": "bundle_name",
231237
"dag_bundle_version": "bundle_version",
@@ -2665,6 +2671,10 @@ def test_dagrun_info_af3(mocked_dag_versions):
26652671
dv2.bundle_name = "bundle_name"
26662672
dv2.bundle_version = "bundle_version"
26672673

2674+
optional_args = {}
2675+
if AIRFLOW_V_3_2_PLUS:
2676+
optional_args["note"] = "note"
2677+
26682678
mocked_dag_versions.return_value = [dv1, dv2]
26692679
dagrun = DagRun(
26702680
dag_id="dag_id",
@@ -2681,6 +2691,7 @@ def test_dagrun_info_af3(mocked_dag_versions):
26812691
triggered_by=DagRunTriggeredByType.UI,
26822692
backfill_id=999,
26832693
bundle_version="bundle_version",
2694+
**optional_args,
26842695
)
26852696
assert dagrun.dag_versions == [dv1, dv2]
26862697
dagrun.end_date = date + datetime.timedelta(seconds=74, microseconds=546)
@@ -2706,6 +2717,7 @@ def test_dagrun_info_af3(mocked_dag_versions):
27062717
"dag_version_number": "version_number",
27072718
"triggered_by": DagRunTriggeredByType.UI,
27082719
"triggering_user_name": "my_user",
2720+
"note": optional_args.get("note"),
27092721
}
27102722

27112723

@@ -2728,6 +2740,7 @@ def test_dagrun_info_af2():
27282740
)
27292741
dagrun.start_date = date
27302742
dagrun.end_date = date + datetime.timedelta(seconds=74, microseconds=546)
2743+
dagrun.note = "note"
27312744

27322745
result = DagRunInfo(dagrun)
27332746
assert dict(result) == {
@@ -2748,6 +2761,7 @@ def test_dagrun_info_af2():
27482761
"dag_bundle_version": None,
27492762
"dag_version_id": None,
27502763
"dag_version_number": None,
2764+
"note": "note",
27512765
}
27522766

27532767

@@ -2852,6 +2866,7 @@ def __init__(self, *args, **kwargs):
28522866
self.tol = "tol" # SQLValueCheckOperator
28532867
self.trigger_dag_id = "trigger_dag_id" # TriggerDagRunOperator
28542868
self.trigger_run_id = "trigger_run_id" # TriggerDagRunOperator
2869+
self.note = "note" # TriggerDagRunOperator
28552870
self.hitl_summary = "hitl_summary" # HITLOperator
28562871
super().__init__(*args, **kwargs)
28572872

@@ -2899,6 +2914,7 @@ def __init__(self, *args, **kwargs):
28992914
"max_active_tis_per_dagrun": None,
29002915
"max_retry_delay": None,
29012916
"multiple_outputs": False,
2917+
"note": "note",
29022918
"operator_class": "CustomOperator",
29032919
"operator_class_path": get_fully_qualified_class_name(task_10),
29042920
"operator_provider_version": None, # Custom operator doesn't have provider version
@@ -2979,6 +2995,7 @@ def __init__(self, *args, **kwargs):
29792995
self.tol = "tol" # SQLValueCheckOperator
29802996
self.trigger_dag_id = "trigger_dag_id" # TriggerDagRunOperator
29812997
self.trigger_run_id = "trigger_run_id" # TriggerDagRunOperator
2998+
self.note = "note" # TriggerDagRunOperator
29822999
self.hitl_summary = "hitl_summary" # HITLOperator
29833000
super().__init__(*args, **kwargs)
29843001

@@ -3029,6 +3046,7 @@ def __init__(self, *args, **kwargs):
30293046
"max_active_tis_per_dagrun": None,
30303047
"max_retry_delay": None,
30313048
"multiple_outputs": False,
3049+
"note": "note",
30323050
"operator_class": "CustomOperator",
30333051
"operator_class_path": get_fully_qualified_class_name(task_10),
30343052
"operator_provider_version": None, # Custom operator doesn't have provider version

0 commit comments

Comments
 (0)