Skip to content

Commit c5e5b2e

Browse files
authored
Dataflow version bump and project separation (#805)
* Dataflow version bump and project separation * fixed the description of projectId
1 parent 796e8d8 commit c5e5b2e

File tree

3 files changed

+21
-11
lines changed

3 files changed

+21
-11
lines changed

docs/reverse-replication/ReverseReplicationUserGuide.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ The Dataflow job that writes to source database exposes the following per shard
111111
|replication_lag_in_seconds_\<logical shard name\>| Replication lag min,max and count value for the shard|
112112
| metadata_file_create_lag_retry_\<logical shard name\> | Count of file lookup retries done when the job that writes to GCS is lagging |
113113
| mySQL_retry_\<logical shard name\> | Number of retries done when MySQL is not reachable|
114+
| shard_failed_\<logical shard name\> | Published when there is a failure while processing the shard |
114115

115116
These can be used to track the pipeline progress.
116117
However, there is a limit of 100 on the total number of metrics per project. So if this limit is exhausted, the Dataflow job will give a message like so:
@@ -199,6 +200,7 @@ In this case, check if you observe the following:
199200
2. The primary key value was not present in the change stream data
200201
3. When there is no data written to Spanner for a given interval for a given shard, no file is created in GCS. In such a case, the interval is skipped by the writer Dataflow job. This can be verified in the logs by searching for the text ```skipping the file```. If a file is marked as skipped in the logs but it exists in GCS - this indicates a data loss scenario - please raise a bug.
201202
4. Check the shard_file_process_progress table in the metadata database. If it is lagging, then wait for the pipeline to catch up so such that data gets reverse replicated.
203+
5. Check if the shard_failed_\<logical shard name\> metric is present, this indicates there was a failure when processing the shard. Look at the logs for the failure details.
202204

203205

204206
#### There is higher load than the expected QPS on spanner instance post cutover

docs/reverse-replication/RunnigReverseReplication.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,15 @@ The script takes in multiple arguments to orchestrate the pipeline. They are:
4545
- `metadataInstance`: Spanner instance name to store changestream metadata. Defaults to target spanner instance id.
4646
- `metadataTableSuffix`: The suffix to apply when creating metadata tables.Helpful in case of multiple runs.Default is no suffix.
4747
- `networkTags`: network tags addded to the Dataflow jobs worker and launcher VMs.
48-
- `projectId`: Project id of the Spanner instance.
48+
- `projectId`: projectId for Dataflow jobs. If spannerProjectId is not specified, this value is used for Cloud Spanner project id as well.
4949
- `sessionFilePath`: GCS file path for session file generated via Spanner migration tool.
5050
- `serviceAccountEmail`: the email address of the service account to run the job as.
5151
- `skipChangeStreamCreation`: whether to skip the change stream creation. Default is false.
5252
- `skipMetadataDatabaseCreation`: whether to skip Metadata database creation.Default is false.
5353
- `sourceDbTimezoneOffset`: the timezone offset with respect to UTC for the source database.Defaults to +00:00.
5454
- `sourceShardsFilePath`: GCS file path for file containing shard info. Details on structure mentioned later.
5555
- `sourceWriterTemplateLocation` : the dataflow template location for the Source writer job.
56+
- `spannerProjectId`: the project id where Cloud Spanner resides, for use case when Cloud Spanner is in a different project than where Dataflow would run.
5657
- `spannerReaderTemplateLocation`: the dataflow template location for the Spanner reader job
5758
- `startTimestamp`: Timestamp from which the changestream should start reading changes in RFC 3339 format, defaults to empty string which is equivalent to the current timestamp.
5859
- `readerMaxWorkers`: Number of maximum workers for the reader job.

reverse_replication/reverse-replication-runner.go

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,15 @@ var (
5757
networkTags string
5858
runIdentifier string
5959
readerMaxWorkers int
60+
spannerProjectId string
6061
)
6162

6263
const (
6364
ALREADY_EXISTS_ERROR = "code = AlreadyExists"
6465
)
6566

6667
func setupGlobalFlags() {
67-
flag.StringVar(&projectId, "projectId", "", "ProjectId.")
68+
flag.StringVar(&projectId, "projectId", "", "ProjectId for Dataflow jobs. If spannerProjectId is not specified, this value is used for Cloud Spanner project id as well.")
6869
flag.StringVar(&dataflowRegion, "dataflowRegion", "", "Region for dataflow jobs.")
6970
flag.StringVar(&jobNamePrefix, "jobNamePrefix", "smt-reverse-replication", "Job name prefix for the dataflow jobs, defaults to reverse-rep. Automatically converted to lower case due to Dataflow name constraints.")
7071
flag.StringVar(&changeStreamName, "changeStreamName", "reverseReplicationStream", "Change stream name, defaults to reverseReplicationStream.")
@@ -90,8 +91,8 @@ func setupGlobalFlags() {
9091
flag.StringVar(&serviceAccountEmail, "serviceAccountEmail", "", "The email address of the service account to run the job as.")
9192
flag.IntVar(&readerWorkers, "readerWorkers", 5, "Number of workers for reader job.")
9293
flag.IntVar(&writerWorkers, "writerWorkers", 5, "Number of workers for writer job.")
93-
flag.StringVar(&spannerReaderTemplateLocation, "spannerReaderTemplateLocation", "gs://dataflow-templates-us-east7/2024-03-06-00_RC00/flex/Spanner_Change_Streams_to_Sharded_File_Sink", "The dataflow template location for the Spanner reader job.")
94-
flag.StringVar(&sourceWriterTemplateLocation, "sourceWriterTemplateLocation", "gs://dataflow-templates-us-east7/2024-03-06-00_RC00/flex/GCS_to_Sourcedb", "The dataflow template location for the Source writer job.")
94+
flag.StringVar(&spannerReaderTemplateLocation, "spannerReaderTemplateLocation", "gs://dataflow-templates-us-east7/2024-03-27-00_RC00/flex/Spanner_Change_Streams_to_Sharded_File_Sink", "The dataflow template location for the Spanner reader job.")
95+
flag.StringVar(&sourceWriterTemplateLocation, "sourceWriterTemplateLocation", "gs://dataflow-templates-us-east7/2024-03-27-00_RC00/flex/GCS_to_Sourcedb", "The dataflow template location for the Source writer job.")
9596
flag.StringVar(&jobsToLaunch, "jobsToLaunch", "both", "Whether to launch the spanner reader job or the source writer job or both. Default is both. Support values are both,reader,writer.")
9697
flag.BoolVar(&skipChangeStreamCreation, "skipChangeStreamCreation", false, "Whether to skip the change stream creation. Default is false.")
9798
flag.BoolVar(&skipMetadataDatabaseCreation, "skipMetadataDatabaseCreation", false, "Whether to skip Metadata database creation.Default is false.")
@@ -101,6 +102,7 @@ func setupGlobalFlags() {
101102
flag.StringVar(&runIdentifier, "runIdentifier", "", "The run identifier for the Dataflow jobs.")
102103
flag.StringVar(&readerShardingCustomParameters, "readerShardingCustomParameters", "", "Any custom parameters to be supplied to custom sharding class.")
103104
flag.IntVar(&readerMaxWorkers, "readerMaxWorkers", 20, "Number of max workers for reader job.")
105+
flag.StringVar(&spannerProjectId, "spannerProjectId", "", "The project id where Cloud Spanner resides, for use case when Cloud Spanner is in a different project than where Dataflow would run.")
104106

105107
}
106108

@@ -175,6 +177,11 @@ func prechecks() error {
175177
return fmt.Errorf("please specify a valid GCS path for readerShardingCustomJarPath, like gs://<>")
176178
}
177179

180+
if spannerProjectId == "" {
181+
fmt.Println("Setting the Spanner Project Id to Dataflow project id: ", projectId)
182+
spannerProjectId = projectId
183+
}
184+
178185
return nil
179186
}
180187

@@ -190,7 +197,7 @@ func main() {
190197
return
191198
}
192199

193-
dbUri := fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectId, instanceId, dbName)
200+
dbUri := fmt.Sprintf("projects/%s/instances/%s/databases/%s", spannerProjectId, instanceId, dbName)
194201

195202
ctx := context.Background()
196203
adminClient, _ := database.NewDatabaseAdminClient(ctx)
@@ -223,7 +230,7 @@ func main() {
223230

224231
if !skipMetadataDatabaseCreation {
225232
createDbReq := &adminpb.CreateDatabaseRequest{
226-
Parent: fmt.Sprintf("projects/%s/instances/%s", projectId, metadataInstance),
233+
Parent: fmt.Sprintf("projects/%s/instances/%s", spannerProjectId, metadataInstance),
227234
CreateStatement: fmt.Sprintf("CREATE DATABASE `%s`", metadataDatabase),
228235
}
229236

@@ -233,18 +240,18 @@ func main() {
233240
fmt.Printf("Cannot submit create database request for metadata db: %v\n", err)
234241
return
235242
} else {
236-
fmt.Printf("metadata db %s already exists...skipping creation\n", fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectId, metadataInstance, metadataDatabase))
243+
fmt.Printf("metadata db %s already exists...skipping creation\n", fmt.Sprintf("projects/%s/instances/%s/databases/%s", spannerProjectId, metadataInstance, metadataDatabase))
237244
}
238245
} else {
239246
if _, err := createDbOp.Wait(ctx); err != nil {
240247
if !strings.Contains(err.Error(), ALREADY_EXISTS_ERROR) {
241248
fmt.Printf("create database request failed for metadata db: %v\n", err)
242249
return
243250
} else {
244-
fmt.Printf("metadata db %s already exists...skipping creation\n", fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectId, metadataInstance, metadataDatabase))
251+
fmt.Printf("metadata db %s already exists...skipping creation\n", fmt.Sprintf("projects/%s/instances/%s/databases/%s", spannerProjectId, metadataInstance, metadataDatabase))
245252
}
246253
} else {
247-
fmt.Println("Created metadata db", fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectId, metadataInstance, metadataDatabase))
254+
fmt.Println("Created metadata db", fmt.Sprintf("projects/%s/instances/%s/databases/%s", spannerProjectId, metadataInstance, metadataDatabase))
248255
}
249256
}
250257
}
@@ -288,7 +295,7 @@ func main() {
288295
"changeStreamName": changeStreamName,
289296
"instanceId": instanceId,
290297
"databaseId": dbName,
291-
"spannerProjectId": projectId,
298+
"spannerProjectId": spannerProjectId,
292299
"metadataInstance": metadataInstance,
293300
"metadataDatabase": metadataDatabase,
294301
"startTimestamp": startTimestamp,
@@ -356,7 +363,7 @@ func main() {
356363
"sourceDbTimezoneOffset": sourceDbTimezoneOffset,
357364
"metadataTableSuffix": metadataTableSuffix,
358365
"GCSInputDirectoryPath": gcsPath,
359-
"spannerProjectId": projectId,
366+
"spannerProjectId": spannerProjectId,
360367
"metadataInstance": metadataInstance,
361368
"metadataDatabase": metadataDatabase,
362369
"runMode": writerRunMode,

0 commit comments

Comments
 (0)