Skip to content

Commit 08424fe

Browse files
SameerMesiah97Lee-WSameer Mesiah
authored
Raise ValueError instead of KeyError when cancel_previous_runs=True and no job identifier is provided (#62393)
Co-authored-by: Wei Lee <weilee.rx@gmail.com> Co-authored-by: Sameer Mesiah <smesiah971@gmail.com>
1 parent 1fd89e0 commit 08424fe

File tree

2 files changed

+42
-3
lines changed

2 files changed

+42
-3
lines changed

providers/databricks/src/airflow/providers/databricks/operators/databricks.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -921,8 +921,13 @@ def execute(self, context: Context):
921921
self.json["job_id"] = job_id
922922
del self.json["job_name"]
923923

924-
if self.cancel_previous_runs and self.json["job_id"] is not None:
925-
hook.cancel_all_runs(self.json["job_id"])
924+
if self.cancel_previous_runs:
925+
if (job_id := self.json.get("job_id")) is None:
926+
raise ValueError(
927+
"cancel_previous_runs=True requires either job_id or job_name to be provided."
928+
)
929+
930+
hook.cancel_all_runs(job_id)
926931

927932
self.run_id = hook.run_now(self.json)
928933
if self.deferrable:

providers/databricks/tests/unit/databricks/operators/test_databricks.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from datetime import datetime, timedelta
2222
from typing import Any
2323
from unittest import mock
24-
from unittest.mock import MagicMock
24+
from unittest.mock import MagicMock, call
2525

2626
import pytest
2727

@@ -1634,6 +1634,40 @@ def test_no_cancel_previous_runs(self, db_mock_class):
16341634
db_mock.get_run_page_url.assert_called_once_with(RUN_ID)
16351635
db_mock.get_run.assert_not_called()
16361636

1637+
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
1638+
def test_cancel_previous_runs_without_job_id_raises(self, db_mock_class):
1639+
run = {
1640+
"notebook_params": NOTEBOOK_PARAMS,
1641+
"notebook_task": NOTEBOOK_TASK,
1642+
"jar_params": JAR_PARAMS,
1643+
}
1644+
1645+
op = DatabricksRunNowOperator(
1646+
task_id=TASK_ID,
1647+
json=run,
1648+
cancel_previous_runs=True,
1649+
)
1650+
1651+
db_mock = db_mock_class.return_value
1652+
1653+
with pytest.raises(
1654+
ValueError,
1655+
match="cancel_previous_runs=True requires either job_id or job_name",
1656+
):
1657+
op.execute(None)
1658+
1659+
assert db_mock_class.mock_calls == [
1660+
call(
1661+
DEFAULT_CONN_ID,
1662+
retry_limit=op.databricks_retry_limit,
1663+
retry_delay=op.databricks_retry_delay,
1664+
retry_args=None,
1665+
caller="DatabricksRunNowOperator",
1666+
)
1667+
]
1668+
assert db_mock.cancel_all_runs.mock_calls == []
1669+
assert db_mock.run_now.mock_calls == []
1670+
16371671
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
16381672
def test_execute_task_deferred(self, db_mock_class):
16391673
"""

0 commit comments

Comments
 (0)