Skip to content

Commit 7704b72

Browse files
authored
feat: adding submit_job samples (#88)
Adding submit_job samples and updating quickstart samples.
1 parent ef0ea6c commit 7704b72

File tree

4 files changed

+159
-55
lines changed

4 files changed

+159
-55
lines changed

dataproc/snippets/quickstart/quickstart.py

Lines changed: 12 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
"""
2828

2929
import argparse
30-
import time
30+
import re
3131

3232
from google.cloud import dataproc_v1 as dataproc
3333
from google.cloud import storage
@@ -68,64 +68,23 @@ def quickstart(project_id, region, cluster_name, job_file_path):
6868
"pyspark_job": {"main_python_file_uri": job_file_path},
6969
}
7070

71-
job_response = job_client.submit_job(
71+
operation = job_client.submit_job_as_operation(
7272
request={"project_id": project_id, "region": region, "job": job}
7373
)
74-
job_id = job_response.reference.job_id
74+
response = operation.result()
7575

76-
print('Submitted job "{}".'.format(job_id))
76+
# Dataproc job output gets saved to the Google Cloud Storage bucket
77+
# allocated to the job. Use a regex to obtain the bucket and blob info.
78+
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
7779

78-
# Termimal states for a job.
79-
terminal_states = {
80-
dataproc.JobStatus.State.ERROR,
81-
dataproc.JobStatus.State.CANCELLED,
82-
dataproc.JobStatus.State.DONE,
83-
}
84-
85-
# Create a timeout such that the job gets cancelled if not in a
86-
# terminal state after a fixed period of time.
87-
timeout_seconds = 600
88-
time_start = time.time()
89-
90-
# Wait for the job to complete.
91-
while job_response.status.state not in terminal_states:
92-
if time.time() > time_start + timeout_seconds:
93-
job_client.cancel_job(
94-
request={"project_id": project_id, "region": region, "job_id": job_id}
95-
)
96-
print(
97-
"Job {} timed out after threshold of {} seconds.".format(
98-
job_id, timeout_seconds
99-
)
100-
)
101-
102-
# Poll for job termination once a second.
103-
time.sleep(1)
104-
job_response = job_client.get_job(
105-
request={"project_id": project_id, "region": region, "job_id": job_id}
106-
)
107-
108-
# Cloud Dataproc job output gets saved to a GCS bucket allocated to it.
109-
cluster_info = cluster_client.get_cluster(
110-
request={
111-
"project_id": project_id,
112-
"region": region,
113-
"cluster_name": cluster_name,
114-
}
80+
output = (
81+
storage.Client()
82+
.get_bucket(matches.group(1))
83+
.blob(f"{matches.group(2)}.000000000")
84+
.download_as_string()
11585
)
11686

117-
storage_client = storage.Client()
118-
bucket = storage_client.get_bucket(cluster_info.config.config_bucket)
119-
output_blob = "google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000".format(
120-
cluster_info.cluster_uuid, job_id
121-
)
122-
output = bucket.blob(output_blob).download_as_string()
123-
124-
print(
125-
"Job {} finished with state {}:\n{}".format(
126-
job_id, job_response.status.state.name, output
127-
)
128-
)
87+
print(f"Job finished successfully: {output}")
12988

13089
# Delete the cluster once the job has terminated.
13190
operation = cluster_client.delete_cluster(

dataproc/snippets/quickstart/quickstart_test.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,5 @@ def test_quickstart(capsys):
7474
out, _ = capsys.readouterr()
7575

7676
assert "Cluster created successfully" in out
77-
assert "Submitted job" in out
78-
assert "finished with state DONE:" in out
77+
assert "Job finished successfully" in out
7978
assert "successfully deleted" in out

dataproc/snippets/submit_job.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2020 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
# This sample walks a user through submitting a Spark job using the Dataproc
18+
# client library.
19+
20+
# Usage:
21+
# python submit_job.py --project_id <PROJECT_ID> --region <REGION> \
22+
# --cluster_name <CLUSTER_NAME>
23+
24+
# [START dataproc_submit_job]
25+
import re
26+
# [END dataproc_submit_job]
27+
import sys
28+
# [START dataproc_submit_job]
29+
30+
from google.cloud import dataproc_v1 as dataproc
31+
from google.cloud import storage
32+
33+
34+
def submit_job(project_id, region, cluster_name):
35+
# Create the job client.
36+
job_client = dataproc.JobControllerClient(client_options={
37+
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region)
38+
})
39+
40+
# Create the job config. 'main_jar_file_uri' can also be a
41+
# Google Cloud Storage URL.
42+
job = {
43+
'placement': {
44+
'cluster_name': cluster_name
45+
},
46+
'spark_job': {
47+
'main_class': 'org.apache.spark.examples.SparkPi',
48+
'jar_file_uris': ['file:///usr/lib/spark/examples/jars/spark-examples.jar'],
49+
'args': ['1000']
50+
}
51+
}
52+
53+
operation = job_client.submit_job_as_operation(
54+
request={"project_id": project_id, "region": region, "job": job}
55+
)
56+
response = operation.result()
57+
58+
# Dataproc job output gets saved to the Google Cloud Storage bucket
59+
# allocated to the job. Use a regex to obtain the bucket and blob info.
60+
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
61+
62+
output = (
63+
storage.Client()
64+
.get_bucket(matches.group(1))
65+
.blob(f"{matches.group(2)}.000000000")
66+
.download_as_string()
67+
)
68+
69+
print(f"Job finished successfully: {output}")
70+
# [END dataproc_submit_job]
71+
72+
73+
if __name__ == "__main__":
74+
if len(sys.argv) < 3:
75+
sys.exit('python submit_job.py project_id region cluster_name')
76+
77+
project_id = sys.argv[1]
78+
region = sys.argv[2]
79+
cluster_name = sys.argv[3]
80+
submit_job(project_id, region, cluster_name)

dataproc/snippets/submit_job_test.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import os
16+
import uuid
17+
18+
from google.cloud import dataproc_v1 as dataproc
19+
import pytest
20+
21+
import submit_job
22+
23+
24+
PROJECT_ID = os.environ['GOOGLE_CLOUD_PROJECT']
25+
REGION = 'us-central1'
26+
CLUSTER_NAME = 'py-sj-test-{}'.format(str(uuid.uuid4()))
27+
CLUSTER = {
28+
'project_id': PROJECT_ID,
29+
'cluster_name': CLUSTER_NAME,
30+
'config': {
31+
'master_config': {
32+
'num_instances': 1,
33+
'machine_type_uri': 'n1-standard-1'
34+
},
35+
'worker_config': {
36+
'num_instances': 2,
37+
'machine_type_uri': 'n1-standard-1'
38+
}
39+
}
40+
}
41+
42+
43+
@pytest.fixture(autouse=True)
44+
def setup_teardown():
45+
cluster_client = dataproc.ClusterControllerClient(client_options={
46+
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(REGION)
47+
})
48+
49+
# Create the cluster.
50+
operation = cluster_client.create_cluster(
51+
request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER}
52+
)
53+
operation.result()
54+
55+
yield
56+
57+
cluster_client.delete_cluster(request={
58+
"project_id": PROJECT_ID, "region": REGION, "cluster_name": CLUSTER_NAME
59+
})
60+
61+
62+
def test_submit_job(capsys):
63+
submit_job.submit_job(PROJECT_ID, REGION, CLUSTER_NAME)
64+
out, _ = capsys.readouterr()
65+
66+
assert 'Job finished successfully' in out

0 commit comments

Comments
 (0)