Skip to content

Commit c4a106e

Browse files
authored
Create GKESuspendJobOperator and GKEResumeJobOperator operators (#38677)
1 parent 3b3ae96 commit c4a106e

File tree

4 files changed

+485
-1
lines changed

4 files changed

+485
-1
lines changed

airflow/providers/google/cloud/operators/kubernetes_engine.py

Lines changed: 210 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from deprecated import deprecated
3030
from google.api_core.exceptions import AlreadyExists
3131
from google.cloud.container_v1.types import Cluster
32-
from kubernetes.client import V1JobList
32+
from kubernetes.client import V1JobList, models as k8s
3333
from kubernetes.utils.create_from_yaml import FailToCreateError
3434
from packaging.version import parse as parse_version
3535

@@ -47,6 +47,7 @@
4747
GKEDeploymentHook,
4848
GKEHook,
4949
GKEJobHook,
50+
GKEKubernetesHook,
5051
GKEPodHook,
5152
)
5253
from airflow.providers.google.cloud.links.kubernetes_engine import (
@@ -1494,3 +1495,211 @@ def execute(self, context: Context):
14941495
).fetch_cluster_info()
14951496

14961497
return super().execute(context)
1498+
1499+
1500+
class GKESuspendJobOperator(GoogleCloudBaseOperator):
1501+
"""
1502+
Suspend Job by given name.
1503+
1504+
.. seealso::
1505+
For more information on how to use this operator, take a look at the guide:
1506+
:ref:`howto/operator:GKESuspendJobOperator`
1507+
1508+
:param name: The name of the Job to suspend
1509+
:param project_id: The Google Developers Console project id.
1510+
:param location: The name of the Google Kubernetes Engine zone or region in which the cluster
1511+
resides.
1512+
:param cluster_name: The name of the Google Kubernetes Engine cluster.
1513+
:param namespace: The name of the Google Kubernetes Engine namespace.
1514+
:param use_internal_ip: Use the internal IP address as the endpoint.
1515+
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
1516+
:param impersonation_chain: Optional service account to impersonate using short-term
1517+
credentials, or chained list of accounts required to get the access_token
1518+
of the last account in the list, which will be impersonated in the request.
1519+
If set as a string, the account must grant the originating account
1520+
the Service Account Token Creator IAM role.
1521+
If set as a sequence, the identities from the list must grant
1522+
Service Account Token Creator IAM role to the directly preceding identity, with first
1523+
account from the list granting this role to the originating account (templated).
1524+
"""
1525+
1526+
template_fields: Sequence[str] = (
1527+
"project_id",
1528+
"gcp_conn_id",
1529+
"name",
1530+
"namespace",
1531+
"cluster_name",
1532+
"location",
1533+
"impersonation_chain",
1534+
)
1535+
operator_extra_links = (KubernetesEngineJobLink(),)
1536+
1537+
def __init__(
1538+
self,
1539+
*,
1540+
name: str,
1541+
location: str,
1542+
namespace: str,
1543+
cluster_name: str,
1544+
project_id: str | None = None,
1545+
use_internal_ip: bool = False,
1546+
gcp_conn_id: str = "google_cloud_default",
1547+
impersonation_chain: str | Sequence[str] | None = None,
1548+
**kwargs,
1549+
) -> None:
1550+
super().__init__(**kwargs)
1551+
1552+
self.project_id = project_id
1553+
self.gcp_conn_id = gcp_conn_id
1554+
self.location = location
1555+
self.name = name
1556+
self.namespace = namespace
1557+
self.cluster_name = cluster_name
1558+
self.use_internal_ip = use_internal_ip
1559+
self.impersonation_chain = impersonation_chain
1560+
1561+
self.job: V1Job | None = None
1562+
self._ssl_ca_cert: str
1563+
self._cluster_url: str
1564+
1565+
@cached_property
1566+
def cluster_hook(self) -> GKEHook:
1567+
return GKEHook(
1568+
gcp_conn_id=self.gcp_conn_id,
1569+
location=self.location,
1570+
impersonation_chain=self.impersonation_chain,
1571+
)
1572+
1573+
@cached_property
1574+
def hook(self) -> GKEKubernetesHook:
1575+
self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
1576+
cluster_name=self.cluster_name,
1577+
project_id=self.project_id,
1578+
use_internal_ip=self.use_internal_ip,
1579+
cluster_hook=self.cluster_hook,
1580+
).fetch_cluster_info()
1581+
1582+
return GKEKubernetesHook(
1583+
gcp_conn_id=self.gcp_conn_id,
1584+
cluster_url=self._cluster_url,
1585+
ssl_ca_cert=self._ssl_ca_cert,
1586+
)
1587+
1588+
def execute(self, context: Context) -> None:
1589+
self.job = self.hook.patch_namespaced_job(
1590+
job_name=self.name,
1591+
namespace=self.namespace,
1592+
body={"spec": {"suspend": True}},
1593+
)
1594+
self.log.info(
1595+
"Job %s from cluster %s was suspended.",
1596+
self.name,
1597+
self.cluster_name,
1598+
)
1599+
KubernetesEngineJobLink.persist(context=context, task_instance=self)
1600+
1601+
return k8s.V1Job.to_dict(self.job)
1602+
1603+
1604+
class GKEResumeJobOperator(GoogleCloudBaseOperator):
1605+
"""
1606+
Resume Job by given name.
1607+
1608+
.. seealso::
1609+
For more information on how to use this operator, take a look at the guide:
1610+
:ref:`howto/operator:GKEResumeJobOperator`
1611+
1612+
:param name: The name of the Job to resume
1613+
:param project_id: The Google Developers Console project id.
1614+
:param location: The name of the Google Kubernetes Engine zone or region in which the cluster
1615+
resides.
1616+
:param cluster_name: The name of the Google Kubernetes Engine cluster.
1617+
:param namespace: The name of the Google Kubernetes Engine namespace.
1618+
:param use_internal_ip: Use the internal IP address as the endpoint.
1619+
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
1620+
:param impersonation_chain: Optional service account to impersonate using short-term
1621+
credentials, or chained list of accounts required to get the access_token
1622+
of the last account in the list, which will be impersonated in the request.
1623+
If set as a string, the account must grant the originating account
1624+
the Service Account Token Creator IAM role.
1625+
If set as a sequence, the identities from the list must grant
1626+
Service Account Token Creator IAM role to the directly preceding identity, with first
1627+
account from the list granting this role to the originating account (templated).
1628+
"""
1629+
1630+
template_fields: Sequence[str] = (
1631+
"project_id",
1632+
"gcp_conn_id",
1633+
"name",
1634+
"namespace",
1635+
"cluster_name",
1636+
"location",
1637+
"impersonation_chain",
1638+
)
1639+
operator_extra_links = (KubernetesEngineJobLink(),)
1640+
1641+
def __init__(
1642+
self,
1643+
*,
1644+
name: str,
1645+
location: str,
1646+
namespace: str,
1647+
cluster_name: str,
1648+
project_id: str | None = None,
1649+
use_internal_ip: bool = False,
1650+
gcp_conn_id: str = "google_cloud_default",
1651+
impersonation_chain: str | Sequence[str] | None = None,
1652+
**kwargs,
1653+
) -> None:
1654+
super().__init__(**kwargs)
1655+
1656+
self.project_id = project_id
1657+
self.gcp_conn_id = gcp_conn_id
1658+
self.location = location
1659+
self.name = name
1660+
self.namespace = namespace
1661+
self.cluster_name = cluster_name
1662+
self.use_internal_ip = use_internal_ip
1663+
self.impersonation_chain = impersonation_chain
1664+
1665+
self.job: V1Job | None = None
1666+
self._ssl_ca_cert: str
1667+
self._cluster_url: str
1668+
1669+
@cached_property
1670+
def cluster_hook(self) -> GKEHook:
1671+
return GKEHook(
1672+
gcp_conn_id=self.gcp_conn_id,
1673+
location=self.location,
1674+
impersonation_chain=self.impersonation_chain,
1675+
)
1676+
1677+
@cached_property
1678+
def hook(self) -> GKEKubernetesHook:
1679+
self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
1680+
cluster_name=self.cluster_name,
1681+
project_id=self.project_id,
1682+
use_internal_ip=self.use_internal_ip,
1683+
cluster_hook=self.cluster_hook,
1684+
).fetch_cluster_info()
1685+
1686+
return GKEKubernetesHook(
1687+
gcp_conn_id=self.gcp_conn_id,
1688+
cluster_url=self._cluster_url,
1689+
ssl_ca_cert=self._ssl_ca_cert,
1690+
)
1691+
1692+
def execute(self, context: Context) -> None:
1693+
self.job = self.hook.patch_namespaced_job(
1694+
job_name=self.name,
1695+
namespace=self.namespace,
1696+
body={"spec": {"suspend": False}},
1697+
)
1698+
self.log.info(
1699+
"Job %s from cluster %s was resumed.",
1700+
self.name,
1701+
self.cluster_name,
1702+
)
1703+
KubernetesEngineJobLink.persist(context=context, task_instance=self)
1704+
1705+
return k8s.V1Job.to_dict(self.job)

docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,36 @@ delete resource in the specified Google Kubernetes Engine cluster.
312312
:start-after: [START howto_operator_gke_delete_resource]
313313
:end-before: [END howto_operator_gke_delete_resource]
314314

315+
316+
.. _howto/operator:GKESuspendJobOperator:
317+
318+
Suspend a Job on a GKE cluster
319+
""""""""""""""""""""""""""""""
320+
321+
You can use :class:`~airflow.providers.google.cloud.operators.kubernetes_engine.GKESuspendJobOperator` to
322+
suspend Job in the specified Google Kubernetes Engine cluster.
323+
324+
.. exampleinclude:: /../../tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
325+
:language: python
326+
:dedent: 4
327+
:start-after: [START howto_operator_gke_suspend_job]
328+
:end-before: [END howto_operator_gke_suspend_job]
329+
330+
331+
.. _howto/operator:GKEResumeJobOperator:
332+
333+
Resume a Job on a GKE cluster
334+
"""""""""""""""""""""""""""""
335+
336+
You can use :class:`~airflow.providers.google.cloud.operators.kubernetes_engine.GKEResumeJobOperator` to
337+
resume Job in the specified Google Kubernetes Engine cluster.
338+
339+
.. exampleinclude:: /../../tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
340+
:language: python
341+
:dedent: 4
342+
:start-after: [START howto_operator_gke_resume_job]
343+
:end-before: [END howto_operator_gke_resume_job]
344+
315345
Reference
316346
^^^^^^^^^
317347

0 commit comments

Comments
 (0)