Skip to content

Commit 3f68184

Browse files
authored
adding submit job sample and updating submit job in quickstart (#284)
1 parent c8fcaa3 commit 3f68184

File tree

4 files changed

+225
-31
lines changed

4 files changed

+225
-31
lines changed

dataproc/snippets/src/main/java/Quickstart.java

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.google.cloud.dataproc.v1.Job;
3939
import com.google.cloud.dataproc.v1.JobControllerClient;
4040
import com.google.cloud.dataproc.v1.JobControllerSettings;
41+
import com.google.cloud.dataproc.v1.JobMetadata;
4142
import com.google.cloud.dataproc.v1.JobPlacement;
4243
import com.google.cloud.dataproc.v1.PySparkJob;
4344
import com.google.cloud.storage.Blob;
@@ -49,6 +50,8 @@
4950
import java.util.concurrent.ExecutionException;
5051
import java.util.concurrent.TimeUnit;
5152
import java.util.concurrent.TimeoutException;
53+
import java.util.regex.Matcher;
54+
import java.util.regex.Pattern;
5255

5356
public class Quickstart {
5457

@@ -117,9 +120,9 @@ public static void quickstart(
117120
// Create the Cloud Dataproc cluster.
118121
OperationFuture<Cluster, ClusterOperationMetadata> createClusterAsyncRequest =
119122
clusterControllerClient.createClusterAsync(projectId, region, cluster);
120-
Cluster response = createClusterAsyncRequest.get();
123+
Cluster clusterResponse = createClusterAsyncRequest.get();
121124
System.out.println(
122-
String.format("Cluster created successfully: %s", response.getClusterName()));
125+
String.format("Cluster created successfully: %s", clusterResponse.getClusterName()));
123126

124127
// Configure the settings for our job.
125128
JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
@@ -129,34 +132,26 @@ public static void quickstart(
129132
// Submit an asynchronous request to execute the job.
130133
Job request = jobControllerClient.submitJob(projectId, region, job);
131134
String jobId = request.getReference().getJobId();
132-
System.out.println(String.format("Submitted job \"%s\"", jobId));
135+
System.out.println(String.format("Submitting job \"%s\"", jobId));
133136

134137
// Wait for the job to finish.
135-
CompletableFuture<Job> finishedJobFuture =
136-
CompletableFuture.supplyAsync(
137-
() -> waitForJobCompletion(jobControllerClient, projectId, region, jobId));
138-
int timeout = 10;
139-
try {
140-
Job jobInfo = finishedJobFuture.get(timeout, TimeUnit.MINUTES);
141-
System.out.println(String.format("Job %s finished successfully.", jobId));
142-
143-
// Cloud Dataproc job output gets saved to a GCS bucket allocated to it.
144-
Cluster clusterInfo = clusterControllerClient.getCluster(projectId, region, clusterName);
145-
Storage storage = StorageOptions.getDefaultInstance().getService();
146-
Blob blob =
147-
storage.get(
148-
clusterInfo.getConfig().getConfigBucket(),
149-
String.format(
150-
"google-cloud-dataproc-metainfo/%s/jobs/%s/driveroutput.000000000",
151-
clusterInfo.getClusterUuid(), jobId));
152-
System.out.println(
153-
String.format(
154-
"Job \"%s\" finished with state %s:\n%s",
155-
jobId, jobInfo.getStatus().getState(), new String(blob.getContent())));
156-
} catch (TimeoutException e) {
157-
System.err.println(
158-
String.format("Job timed out after %d minutes: %s", timeout, e.getMessage()));
159-
}
138+
System.out.println(String.format("Job %s finished successfully.", jobId));
139+
140+
OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
141+
jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
142+
143+
Job jobResponse = submitJobAsOperationAsyncRequest.get();
144+
145+
// Print output from Google Cloud Storage.
146+
Matcher matches =
147+
Pattern.compile("gs://(.*?)/(.*)").matcher(jobResponse.getDriverOutputResourceUri());
148+
matches.matches();
149+
150+
Storage storage = StorageOptions.getDefaultInstance().getService();
151+
Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
152+
153+
System.out.println(
154+
String.format("Job finished successfully: %s", new String(blob.getContent())));
160155

161156
// Delete the cluster.
162157
OperationFuture<Empty, ClusterOperationMetadata> deleteClusterAsyncRequest =
@@ -165,7 +160,7 @@ public static void quickstart(
165160
System.out.println(String.format("Cluster \"%s\" successfully deleted.", clusterName));
166161

167162
} catch (ExecutionException e) {
168-
System.err.println(String.format("Error executing quickstart: %s ", e.getMessage()));
163+
System.err.println(String.format("quickstart: %s ", e.getMessage()));
169164
}
170165
}
171166

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
// [START dataproc_submit_job]
18+
19+
import com.google.api.gax.longrunning.OperationFuture;
20+
import com.google.cloud.dataproc.v1.HadoopJob;
21+
import com.google.cloud.dataproc.v1.Job;
22+
import com.google.cloud.dataproc.v1.JobControllerClient;
23+
import com.google.cloud.dataproc.v1.JobControllerSettings;
24+
import com.google.cloud.dataproc.v1.JobMetadata;
25+
import com.google.cloud.dataproc.v1.JobPlacement;
26+
import com.google.cloud.dataproc.v1.SparkJob;
27+
import com.google.cloud.storage.Blob;
28+
import com.google.cloud.storage.Storage;
29+
import com.google.cloud.storage.StorageOptions;
30+
import java.io.IOException;
31+
import java.util.ArrayList;
32+
import java.util.Arrays;
33+
import java.util.concurrent.ExecutionException;
34+
import java.util.regex.Matcher;
35+
import java.util.regex.Pattern;
36+
37+
public class SubmitJob {
38+
39+
public static void submitJob() throws IOException, InterruptedException {
40+
// TODO(developer): Replace these variables before running the sample.
41+
String projectId = "your-project-id";
42+
String region = "your-project-region";
43+
String clusterName = "your-cluster-name";
44+
submitJob(projectId, region, clusterName);
45+
}
46+
47+
public static void submitJob(
48+
String projectId, String region, String clusterName)
49+
throws IOException, InterruptedException {
50+
String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
51+
52+
// Configure the settings for the job controller client.
53+
JobControllerSettings jobControllerSettings =
54+
JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
55+
56+
// Create a job controller client with the configured settings. Using a try-with-resources
57+
// closes the client,
58+
// but this can also be done manually with the .close() method.
59+
try (JobControllerClient jobControllerClient =
60+
JobControllerClient.create(jobControllerSettings)) {
61+
62+
// Configure cluster placement for the job.
63+
JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
64+
65+
// Configure Spark job settings.
66+
SparkJob sparkJob =
67+
SparkJob.newBuilder()
68+
.setMainClass("org.apache.spark.examples.SparkPi")
69+
.addJarFileUris("file:///usr/lib/spark/examples/jars/spark-examples.jar")
70+
.addArgs("1000")
71+
.build();
72+
73+
Job job = Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build();
74+
75+
// Submit an asynchronous request to execute the job.
76+
OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
77+
jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
78+
79+
Job response = submitJobAsOperationAsyncRequest.get();
80+
81+
// Print output from Google Cloud Storage.
82+
Matcher matches =
83+
Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
84+
matches.matches();
85+
86+
Storage storage = StorageOptions.getDefaultInstance().getService();
87+
Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
88+
89+
System.out.println(
90+
String.format("Job finished successfully: %s", new String(blob.getContent())));
91+
92+
} catch (ExecutionException e) {
93+
// If the job does not complete successfully, print the error message.
94+
System.err.println(String.format("submitJob: %s ", e.getMessage()));
95+
}
96+
}
97+
}
98+
// [END dataproc_submit_job]

dataproc/snippets/src/test/java/QuickstartTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ public void quickstartTest() throws IOException, InterruptedException {
9292
String output = bout.toString();
9393

9494
assertThat(output, CoreMatchers.containsString("Cluster created successfully"));
95-
assertThat(output, CoreMatchers.containsString("Submitted job"));
96-
assertThat(output, CoreMatchers.containsString("finished with state DONE:"));
95+
assertThat(output, CoreMatchers.containsString("Submitting job"));
96+
assertThat(output, CoreMatchers.containsString("Job finished successfully:"));
9797
assertThat(output, CoreMatchers.containsString("successfully deleted"));
9898
}
9999

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import static junit.framework.TestCase.assertNotNull;
18+
import static org.hamcrest.MatcherAssert.assertThat;
19+
20+
import com.google.api.gax.longrunning.OperationFuture;
21+
import com.google.cloud.dataproc.v1.Cluster;
22+
import com.google.cloud.dataproc.v1.ClusterControllerClient;
23+
import com.google.cloud.dataproc.v1.ClusterControllerSettings;
24+
import com.google.cloud.dataproc.v1.ClusterOperationMetadata;
25+
import com.google.protobuf.Empty;
26+
import java.io.ByteArrayOutputStream;
27+
import java.io.IOException;
28+
import java.io.PrintStream;
29+
import java.util.UUID;
30+
import java.util.concurrent.ExecutionException;
31+
import org.hamcrest.CoreMatchers;
32+
import org.junit.After;
33+
import org.junit.Before;
34+
import org.junit.BeforeClass;
35+
import org.junit.Test;
36+
import org.junit.runner.RunWith;
37+
import org.junit.runners.JUnit4;
38+
39+
@RunWith(JUnit4.class)
40+
public class SubmitJobTest {
41+
42+
private static final String CLUSTER_NAME =
43+
String.format("java-sj-test--%s", UUID.randomUUID().toString());
44+
private static final String REGION = "us-central1";
45+
private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");
46+
private static final String ENDPOINT = String.format("%s-dataproc.googleapis.com:443", REGION);
47+
48+
private ByteArrayOutputStream bout;
49+
50+
private static void requireEnv(String varName) {
51+
assertNotNull(
52+
String.format("Environment variable '%s' is required to perform these tests.", varName),
53+
System.getenv(varName));
54+
}
55+
56+
@BeforeClass
57+
public static void checkRequirements() {
58+
requireEnv("GOOGLE_APPLICATION_CREDENTIALS");
59+
requireEnv("GOOGLE_CLOUD_PROJECT");
60+
}
61+
62+
@Before
63+
public void setUp() throws IOException, ExecutionException, InterruptedException {
64+
bout = new ByteArrayOutputStream();
65+
System.setOut(new PrintStream(bout));
66+
67+
ClusterControllerSettings clusterControllerSettings =
68+
ClusterControllerSettings.newBuilder().setEndpoint(ENDPOINT).build();
69+
70+
try (ClusterControllerClient clusterControllerClient =
71+
ClusterControllerClient.create(clusterControllerSettings)) {
72+
// Create the Dataproc cluster.
73+
Cluster cluster = Cluster.newBuilder().setClusterName(CLUSTER_NAME).build();
74+
OperationFuture<Cluster, ClusterOperationMetadata> createClusterAsyncRequest =
75+
clusterControllerClient.createClusterAsync(PROJECT_ID, REGION, cluster);
76+
createClusterAsyncRequest.get();
77+
}
78+
}
79+
80+
@Test
81+
public void submitJobTest() throws IOException, InterruptedException {
82+
SubmitJob.submitJob(PROJECT_ID, REGION, CLUSTER_NAME);
83+
String output = bout.toString();
84+
85+
assertThat(output, CoreMatchers.containsString("Job finished successfully"));
86+
}
87+
88+
@After
89+
public void tearDown() throws IOException, InterruptedException, ExecutionException {
90+
91+
ClusterControllerSettings clusterControllerSettings =
92+
ClusterControllerSettings.newBuilder().setEndpoint(ENDPOINT).build();
93+
94+
try (ClusterControllerClient clusterControllerClient =
95+
ClusterControllerClient.create(clusterControllerSettings)) {
96+
OperationFuture<Empty, ClusterOperationMetadata> deleteClusterAsyncRequest =
97+
clusterControllerClient.deleteClusterAsync(PROJECT_ID, REGION, CLUSTER_NAME);
98+
deleteClusterAsyncRequest.get();
99+
}
100+
}
101+
}

0 commit comments

Comments
 (0)