Skip to content

Commit b80895e

Browse files
authored
Add samples for using GCP connections in an Airflow DAG (#1590)
1 parent f645db7 commit b80895e

File tree

7 files changed

+88
-5
lines changed

7 files changed

+88
-5
lines changed

composer/workflows/bq_notify.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@
8080
}
8181

8282
with models.DAG(
83-
'bq_notify',
83+
'composer_sample_bq_notify',
8484
schedule_interval=datetime.timedelta(weeks=4),
8585
default_args=default_dag_args) as dag:
8686
# [END composer_notify_failure]

composer/workflows/connections.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Copyright 2018 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Demonstrates how to use connections in an Airflow DAG."""
16+
17+
import datetime
18+
19+
from airflow import models
20+
from airflow.contrib.operators import bigquery_operator
21+
22+
23+
yesterday = datetime.datetime.combine(
24+
datetime.datetime.today() - datetime.timedelta(1),
25+
datetime.datetime.min.time())
26+
27+
default_dag_args = {
28+
# Setting start date as yesterday starts the DAG immediately when it is
29+
# detected in the Cloud Storage bucket.
30+
'start_date': yesterday,
31+
}
32+
33+
# Define a DAG (directed acyclic graph) of tasks.
34+
# Any task you create within the context manager is automatically added to the
35+
# DAG object.
36+
with models.DAG(
37+
'composer_sample_connections',
38+
schedule_interval=datetime.timedelta(days=1),
39+
default_args=default_dag_args) as dag:
40+
# [START composer_connections_default]
41+
task_default = bigquery_operator.BigQueryOperator(
42+
task_id='task_default_connection',
43+
bql='SELECT 1', use_legacy_sql=False)
44+
# [END composer_connections_default]
45+
# [START composer_connections_explicit]
46+
task_explicit = bigquery_operator.BigQueryOperator(
47+
task_id='task_explicit_connection',
48+
bql='SELECT 1', use_legacy_sql=False,
49+
# Composer creates a 'google_cloud_default' connection by default.
50+
bigquery_conn_id='google_cloud_default')
51+
# [END composer_connections_explicit]
52+
# [START composer_connections_custom]
53+
task_custom = bigquery_operator.BigQueryOperator(
54+
task_id='task_custom_connection',
55+
bql='SELECT 1', use_legacy_sql=False,
56+
# Set a connection ID to use a connection that you have created.
57+
bigquery_conn_id='my_gcp_connection')
58+
# [END composer_connections_custom]
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Copyright 2018 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
def test_dag_import():
17+
"""Test that the DAG file can be successfully imported.
18+
19+
This tests that the DAG can be parsed, but does not run it in an Airflow
20+
environment. This is a recommended sanity check by the official Airflow
21+
docs: https://airflow.incubator.apache.org/tutorial.html#testing
22+
"""
23+
from . import connections # noqa

composer/workflows/kubernetes_pod_operator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
# will show the task as failed, as well as contain all of the task logs
6161
# required to debug.
6262
with models.DAG(
63-
dag_id='kubernetes-pod-example',
63+
dag_id='composer_sample_kubernetes_pod',
6464
schedule_interval=datetime.timedelta(days=1),
6565
start_date=YESTERDAY) as dag:
6666
# Only name, namespace, image, and task_id are required to create a

composer/workflows/quickstart.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464

6565
# [START composer_quickstart_schedule]
6666
with models.DAG(
67-
'quickstart',
67+
'composer_sample_quickstart',
6868
# Continue to run DAG once per day
6969
schedule_interval=datetime.timedelta(days=1),
7070
default_args=default_dag_args) as dag:

composer/workflows/simple.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
# Any task you create within the context manager is automatically added to the
4242
# DAG object.
4343
with models.DAG(
44-
'simple_greeting',
44+
'composer_sample_simple_greeting',
4545
schedule_interval=datetime.timedelta(days=1),
4646
default_args=default_dag_args) as dag:
4747
# [END composer_simple_define_dag]

composer/workflows/use_local_deps.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@
3030
datetime.datetime.min.time()),
3131
}
3232

33-
with airflow.DAG('dependencies_dag', default_args=default_args) as dag:
33+
with airflow.DAG(
34+
'composer_sample_dependencies_dag',
35+
default_args=default_args) as dag:
3436
t1 = bash_operator.BashOperator(
3537
task_id='print_coin_result',
3638
bash_command='echo "{0}"'.format(coin_module.flip_coin()),

0 commit comments

Comments
 (0)