Skip to content

Commit e1ab6f5

Browse files
David Cavazoslesv
David Cavazos
authored andcommitted
Dataflow SpannerIO sample (#931)
* Add Dataflow SpannerIO sample * Removed counts and just left the size estimation * Tidy up dependencies * Add sample to write to Spanner * Add dataflow module * Cleaned up dependencies * Added more comments and a description on how to run
1 parent d9ba182 commit e1ab6f5

File tree

4 files changed

+480
-0
lines changed

4 files changed

+480
-0
lines changed

dataflow/spanner-io/pom.xml

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
<!--
2+
Copyright 2017 Google Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
-->
16+
<project xmlns="http://maven.apache.org/POM/4.0.0"
17+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
18+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
19+
<modelVersion>4.0.0</modelVersion>
20+
21+
<groupId>com.example.dataflow</groupId>
22+
<artifactId>dataflow-spanner</artifactId>
23+
<version>1.0-SNAPSHOT</version>
24+
<packaging>jar</packaging>
25+
26+
<properties>
27+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
28+
<java.version>1.8</java.version>
29+
<maven.compiler.source>1.8</maven.compiler.source>
30+
<maven.compiler.target>1.8</maven.compiler.target>
31+
<apache_beam.version>2.2.0</apache_beam.version>
32+
</properties>
33+
34+
<build>
35+
<plugins>
36+
<plugin>
37+
<groupId>org.apache.maven.plugins</groupId>
38+
<artifactId>maven-compiler-plugin</artifactId>
39+
<version>3.7.0</version>
40+
</plugin>
41+
</plugins>
42+
</build>
43+
44+
<dependencies>
45+
46+
<!-- Apache Beam -->
47+
<dependency>
48+
<groupId>org.apache.beam</groupId>
49+
<artifactId>beam-sdks-java-core</artifactId>
50+
<version>${apache_beam.version}</version>
51+
</dependency>
52+
53+
<dependency>
54+
<groupId>org.apache.beam</groupId>
55+
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
56+
<version>${apache_beam.version}</version>
57+
<exclusions>
58+
<exclusion>
59+
<groupId>com.google.api.grpc</groupId>
60+
<artifactId>grpc-google-common-protos</artifactId>
61+
</exclusion>
62+
</exclusions>
63+
</dependency>
64+
65+
<dependency>
66+
<groupId>org.apache.beam</groupId>
67+
<artifactId>beam-runners-direct-java</artifactId>
68+
<version>${apache_beam.version}</version>
69+
</dependency>
70+
71+
<dependency>
72+
<groupId>org.apache.beam</groupId>
73+
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
74+
<version>${apache_beam.version}</version>
75+
</dependency>
76+
77+
<!-- Google Cloud -->
78+
<dependency>
79+
<groupId>com.google.cloud</groupId>
80+
<artifactId>google-cloud-spanner</artifactId>
81+
<version>0.20.0-beta</version>
82+
</dependency>
83+
84+
<!-- Misc -->
85+
<dependency>
86+
<groupId>org.slf4j</groupId>
87+
<artifactId>slf4j-jdk14</artifactId>
88+
<version>1.7.25</version>
89+
</dependency>
90+
91+
</dependencies>
92+
93+
</project>
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
Copyright 2017, Google, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package com.example.dataflow;
18+
19+
import com.google.cloud.spanner.Struct;
20+
import org.apache.beam.sdk.Pipeline;
21+
import org.apache.beam.sdk.io.TextIO;
22+
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
23+
import org.apache.beam.sdk.options.Description;
24+
import org.apache.beam.sdk.options.PipelineOptions;
25+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
26+
import org.apache.beam.sdk.options.Validation;
27+
import org.apache.beam.sdk.transforms.Count;
28+
import org.apache.beam.sdk.transforms.DoFn;
29+
import org.apache.beam.sdk.transforms.PTransform;
30+
import org.apache.beam.sdk.transforms.ParDo;
31+
import org.apache.beam.sdk.transforms.Sum;
32+
import org.apache.beam.sdk.transforms.ToString;
33+
import org.apache.beam.sdk.values.PCollection;
34+
35+
/*
36+
This sample demonstrates how to read from a Spanner table.
37+
38+
## Prerequisites
39+
* Maven installed
40+
* Set up GCP default credentials, one of the following:
41+
- export GOOGLE_APPLICATION_CREDENTIALS=path/to/credentials.json
42+
- gcloud auth application-default login
43+
[https://developers.google.com/identity/protocols/application-default-credentials]
44+
* Create the Spanner table to read from, you'll need:
45+
- Instance ID
46+
- Database ID
47+
- Any table, preferably populated
48+
[https://cloud.google.com/spanner/docs/quickstart-console]
49+
50+
## How to run
51+
cd java-docs-samples/dataflow/spanner-io
52+
mvn clean
53+
mvn compile
54+
mvn exec:java \
55+
-Dexec.mainClass=com.example.dataflow.SpannerRead \
56+
-Dexec.args="--instanceId=my-instance-id \
57+
--databaseId=my-database-id \
58+
--table=my_table \
59+
--output=path/to/output_file"
60+
*/
61+
public class SpannerRead {
62+
63+
public interface Options extends PipelineOptions {
64+
65+
@Description("Spanner instance ID to query from")
66+
@Validation.Required
67+
String getInstanceId();
68+
void setInstanceId(String value);
69+
70+
@Description("Spanner database name to query from")
71+
@Validation.Required
72+
String getDatabaseId();
73+
void setDatabaseId(String value);
74+
75+
@Description("Spanner table name to query from")
76+
@Validation.Required
77+
String getTable();
78+
void setTable(String value);
79+
80+
@Description("Output filename for records size")
81+
@Validation.Required
82+
String getOutput();
83+
void setOutput(String value);
84+
}
85+
86+
/**
87+
* Estimates the size of a Spanner row. For simplicity, arrays and structs aren't supported.
88+
*/
89+
public static class EstimateStructSizeFn extends DoFn<Struct, Long> {
90+
91+
@ProcessElement
92+
public void processElement(ProcessContext c) throws Exception {
93+
Struct row = c.element();
94+
long sum = 0;
95+
for (int i = 0; i < row.getColumnCount(); i++) {
96+
if (row.isNull(i)) {
97+
continue;
98+
}
99+
100+
switch (row.getColumnType(i).getCode()) {
101+
case BOOL:
102+
sum += 1;
103+
break;
104+
case INT64:
105+
case FLOAT64:
106+
sum += 8;
107+
break;
108+
case TIMESTAMP:
109+
case DATE:
110+
sum += 12;
111+
break;
112+
case BYTES:
113+
sum += row.getBytes(i).length();
114+
break;
115+
case STRING:
116+
sum += row.getString(i).length();
117+
break;
118+
case ARRAY:
119+
throw new IllegalArgumentException("Arrays are not supported :(");
120+
case STRUCT:
121+
throw new IllegalArgumentException("Structs are not supported :(");
122+
}
123+
}
124+
c.output(sum);
125+
}
126+
}
127+
128+
public static void main(String[] args) {
129+
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
130+
Pipeline p = Pipeline.create(options);
131+
132+
String instanceId = options.getInstanceId();
133+
String databaseId = options.getDatabaseId();
134+
String query = "SELECT * FROM " + options.getTable();
135+
136+
PCollection<Long> tableEstimatedSize = p
137+
// Query for all the columns and rows in the specified Spanner table
138+
.apply(SpannerIO.read()
139+
.withInstanceId(instanceId)
140+
.withDatabaseId(databaseId)
141+
.withQuery(query))
142+
// Estimate the size of every row
143+
.apply(ParDo.of(new EstimateStructSizeFn()))
144+
// Sum all the row sizes to get the total estimated size of the table
145+
.apply(Sum.longsGlobally());
146+
147+
// Write the total size to a file
148+
tableEstimatedSize
149+
.apply(ToString.elements())
150+
.apply(TextIO.write().to(options.getOutput()));
151+
152+
p.run().waitUntilFinish();
153+
}
154+
}

0 commit comments

Comments
 (0)