Skip to content

Commit 1126d8e

Browse files
authored
fix: pass Sfn parameters using S3 during migration (#7131)
1 parent f52d9fa commit 1126d8e

File tree

5 files changed

+237
-142
lines changed

5 files changed

+237
-142
lines changed

.happy/terraform/modules/schema_migration/main.tf

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ resource aws_sfn_state_machine sfn_schema_migration {
313313
"ErrorEquals": [
314314
"States.ALL"
315315
],
316-
"ResultPath": "$.error",
316+
"ResultPath": null,
317317
"Next": "CollectionPublish"
318318
}
319319
]
@@ -322,8 +322,8 @@ resource aws_sfn_state_machine sfn_schema_migration {
322322
"Type": "Choice",
323323
"Choices": [
324324
{
325-
"Variable": "$.no_datasets",
326-
"IsPresent": true,
325+
"Variable": "$.key_name",
326+
"IsPresent": false,
327327
"Next": "CollectionPublish"
328328
}
329329
],
@@ -353,10 +353,6 @@ resource aws_sfn_state_machine sfn_schema_migration {
353353
"Name": "COLLECTION_VERSION_ID",
354354
"Value.$": "$.collection_version_id"
355355
},
356-
{
357-
"Name": "CAN_PUBLISH",
358-
"Value.$": "$.can_publish"
359-
},
360356
{
361357
"Name": "TASK_TOKEN",
362358
"Value.$": "$$.Task.Token"
@@ -381,7 +377,7 @@ resource aws_sfn_state_machine sfn_schema_migration {
381377
"States.ALL"
382378
],
383379
"Next": "CollectionError",
384-
"ResultPath": "$.error"
380+
"ResultPath": null
385381
}
386382
]
387383
},
@@ -454,7 +450,7 @@ resource aws_sfn_state_machine sfn_schema_migration {
454450
"States.ALL"
455451
],
456452
"Next": "DatasetError",
457-
"ResultPath": "$.error"
453+
"ResultPath": null
458454
}
459455
],
460456
"ResultPath": "$.result"
@@ -484,14 +480,23 @@ resource aws_sfn_state_machine sfn_schema_migration {
484480
"States.ALL"
485481
],
486482
"Next": "DatasetError",
487-
"ResultPath": "$.error"
483+
"ResultPath": null
488484
}
489485
],
490-
"ResultPath": "$.result"
486+
"ResultPath": null
491487
}
492488
}
493489
},
494-
"ItemsPath": "$.datasets",
490+
"ItemReader": {
491+
"Resource": "arn:aws:states:::s3:getObject",
492+
"ReaderConfig": {
493+
"InputType": "JSON"
494+
},
495+
"Parameters": {
496+
"Bucket": "${var.artifact_bucket}",
497+
"Key.$": "$.key_name"
498+
}
499+
},
495500
"Next": "CollectionPublish",
496501
"MaxConcurrency": 32,
497502
"Catch": [
@@ -500,7 +505,7 @@ resource aws_sfn_state_machine sfn_schema_migration {
500505
"States.ALL"
501506
],
502507
"Next": "CollectionPublish",
503-
"ResultPath": "$.error"
508+
"ResultPath": null
504509
}
505510
],
506511
"OutputPath": "$[0]",
@@ -512,7 +517,16 @@ resource aws_sfn_state_machine sfn_schema_migration {
512517
}
513518
}
514519
},
515-
"ItemsPath": "$",
520+
"ItemReader": {
521+
"Resource": "arn:aws:states:::s3:getObject",
522+
"ReaderConfig": {
523+
"InputType": "JSON"
524+
},
525+
"Parameters": {
526+
"Bucket": "${var.artifact_bucket}",
527+
"Key.$": "$.key_name"
528+
}
529+
},
516530
"MaxConcurrency": 40,
517531
"Next": "report",
518532
"Catch": [

backend/layers/processing/schema_migration.py

Lines changed: 67 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import logging
44
import os
55
import random
6-
from typing import Any, Dict, Iterable, List
6+
from typing import Dict, Iterable, List, Tuple
77

88
from backend.common.corpora_config import CorporaConfig
99
from backend.common.utils.json import CustomJSONEncoder
@@ -43,10 +43,11 @@ def fetch_collections(self) -> Iterable[CollectionVersion]:
4343
unpublished_collections = [*self.business_logic.get_collections(CollectionQueryFilter(is_published=False))]
4444
return itertools.chain(unpublished_collections, published_collections)
4545

46-
def gather_collections(self, auto_publish: bool) -> List[Dict[str, str]]:
46+
def gather_collections(self, auto_publish: bool) -> Tuple[Dict[str, str], Dict[str, str]]:
4747
"""
4848
This function is used to gather all the collections and their datasets that will be migrated
49-
:return: A dictionary with the following structure:
49+
A json file is created and uploaded to S3 with the list of collections and datasets that will be migrated. It
50+
has the following structure:
5051
[
5152
{"can_publish": "true", "collection_id": "<collection_id>", "collection_version_id":
5253
"<collection_version_id>"},
@@ -57,8 +58,9 @@ def gather_collections(self, auto_publish: bool) -> List[Dict[str, str]]:
5758
5859
:param auto_publish: bool - if False, coerce can_publish to False for all collections. if True, determine
5960
can_publish on collection-by-collection basis based on business logic
61+
:return: the response retuned to the step function and the list of collections to be migrated
6062
"""
61-
response = []
63+
response_for_span_collections = []
6264

6365
has_revision = set()
6466
# iterates over unpublished collections first, so published versions are skipped if there is an active revision
@@ -86,14 +88,15 @@ def gather_collections(self, auto_publish: bool) -> List[Dict[str, str]]:
8688
collection_version_id=collection.version_id.id,
8789
execution_id=self.execution_id,
8890
)
89-
response.append(_resp)
91+
response_for_span_collections.append(_resp)
9092

9193
# For testing purposes, only migrate a randomly sampled subset of the collections gathered
9294
limit = int(self.limit_migration) if isinstance(self.limit_migration, str) else self.limit_migration
9395
if limit > 0:
94-
response = random.sample(response, limit)
95-
96-
return response
96+
response_for_span_collections = random.sample(response_for_span_collections, limit)
97+
key_name = self._store_sfn_response("span_collections", "collections", response_for_span_collections)
98+
response_for_sfn = {"key_name": key_name}
99+
return response_for_sfn, response_for_span_collections
97100

98101
def dataset_migrate(
99102
self, collection_version_id: str, collection_id: str, dataset_id: str, dataset_version_id: str
@@ -133,15 +136,26 @@ def dataset_migrate(
133136
"execution_id": self.execution_id,
134137
}
135138

136-
def collection_migrate(self, collection_id: str, collection_version_id: str, can_publish: bool) -> Dict[str, Any]:
139+
def collection_migrate(
140+
self, collection_id: str, collection_version_id: str, can_publish: bool
141+
) -> Tuple[Dict[str, str], Dict[str, str], List[Dict[str, str]]]:
142+
"""
143+
This function is used to migrate a collection and its datasets to the latest schema version.
144+
145+
:param collection_id: the canonical collection id
146+
:param collection_version_id: the collection version to migrate
147+
:param can_publish: if True, the collection will be published after migration
148+
:return: the response retuned to the step function, the response for the publish_and_cleanup step function, and
149+
the list of datasets to be migrated
150+
"""
137151
# Get datasets from collection
138152
version = self.business_logic.get_collection_version(CollectionVersionId(collection_version_id))
153+
# Filter out datasets that are already on the current schema version
139154
datasets = [dataset for dataset in version.datasets if not self.check_dataset_is_latest_schema_version(dataset)]
140155

141156
# Generate canonical collection url
142157
collection_url = self.business_logic.get_collection_url(version.collection_id.id)
143158

144-
# Filter out datasets that are already on the current schema version
145159
if not datasets:
146160
# Handles the case were the collection has no datasets or all datasets are already migrated.
147161
if len(version.datasets) == 0:
@@ -150,14 +164,13 @@ def collection_migrate(self, collection_id: str, collection_version_id: str, can
150164
self.logger.info(
151165
"All datasets in the collection have been migrated", extra={"dataset_count": len(version.datasets)}
152166
)
153-
response = {
167+
response_for_dataset_migrate = []
168+
response_for_publish_and_cleanup = {
154169
"can_publish": str(False), # skip publishing, because the collection is already published and no
155170
# revision is created, or the collection is private or a revision.
156171
"collection_version_id": collection_version_id,
157-
"collection_url": collection_url,
158-
"datasets": [],
159-
"no_datasets": str(True),
160172
}
173+
response_for_sfn = {"collection_version_id": collection_version_id}
161174
else:
162175
if version.is_published():
163176
# Create a new collection version(revision) if the collection is already published
@@ -166,37 +179,37 @@ def collection_migrate(self, collection_id: str, collection_version_id: str, can
166179
).version_id.id
167180
else:
168181
private_collection_version_id = collection_version_id
169-
170-
response = {
182+
response_for_dataset_migrate = [
183+
{
184+
"collection_id": collection_id,
185+
"collection_url": collection_url,
186+
"collection_version_id": private_collection_version_id,
187+
"dataset_id": dataset.dataset_id.id,
188+
"dataset_version_id": dataset.version_id.id,
189+
"execution_id": self.execution_id,
190+
}
191+
for dataset in datasets
192+
if dataset.status.processing_status == DatasetProcessingStatus.SUCCESS
193+
# Filter out datasets that are not successfully processed
194+
]
195+
response_for_publish_and_cleanup = {
171196
"can_publish": str(can_publish),
172197
"collection_version_id": private_collection_version_id,
173-
"collection_url": collection_url,
174-
# ^^^ The top level fields are used for handling error cases in the AWS SFN.
175-
"datasets": [
176-
{
177-
"can_publish": str(can_publish),
178-
"collection_id": collection_id,
179-
"collection_url": collection_url,
180-
"collection_version_id": private_collection_version_id,
181-
"dataset_id": dataset.dataset_id.id,
182-
"dataset_version_id": dataset.version_id.id,
183-
"execution_id": self.execution_id,
184-
}
185-
for dataset in datasets
186-
if dataset.status.processing_status == DatasetProcessingStatus.SUCCESS
187-
# Filter out datasets that are not successfully processed
188-
],
189-
# The repeated fields in datasets is required for the AWS SFN job that uses it.
190198
}
199+
response_for_sfn = {"collection_version_id": private_collection_version_id}
200+
response_for_publish_and_cleanup["datasets"] = response_for_dataset_migrate
201+
response_for_publish_and_cleanup["collection_url"] = collection_url
191202

192-
if not response["datasets"]:
193-
# Handles the case were the collection has no processed datasets
194-
response["no_datasets"] = str(True)
195-
response["execution_id"] = self.execution_id
196-
self._store_sfn_response("publish_and_cleanup", version.collection_id.id, response)
197-
return response
203+
response_for_sfn["execution_id"] = self.execution_id
198204

199-
def publish_and_cleanup(self, collection_version_id: str, can_publish: bool) -> list:
205+
self._store_sfn_response("publish_and_cleanup", version.collection_id.id, response_for_publish_and_cleanup)
206+
207+
if response_for_dataset_migrate:
208+
key_name = self._store_sfn_response("span_datasets", version.collection_id.id, response_for_dataset_migrate)
209+
response_for_sfn["key_name"] = key_name
210+
return (response_for_sfn, response_for_publish_and_cleanup, response_for_dataset_migrate)
211+
212+
def publish_and_cleanup(self, collection_version_id: str) -> list:
200213
errors = []
201214
collection_version = self.business_logic.get_collection_version(CollectionVersionId(collection_version_id))
202215
object_keys_to_delete = []
@@ -259,7 +272,7 @@ def publish_and_cleanup(self, collection_version_id: str, can_publish: bool) ->
259272
self.s3_provider.delete_files(self.artifact_bucket, object_keys_to_delete)
260273
if errors:
261274
self._store_sfn_response("report/errors", collection_version_id, errors)
262-
elif can_publish:
275+
elif extra_info["can_publish"].lower() == "true":
263276
self.business_logic.publish_collection_version(collection_version.version_id)
264277
return errors
265278

@@ -278,6 +291,7 @@ def _store_sfn_response(self, directory: str, file_name: str, response: Dict[str
278291
self.logger.info(
279292
"Uploaded to S3", extra={"file_name": local_file, "bucket": self.artifact_bucket, "key": key_name}
280293
)
294+
return key_name
281295

282296
def _retrieve_sfn_response(self, directory: str, file_name: str):
283297
"""
@@ -292,6 +306,10 @@ def _retrieve_sfn_response(self, directory: str, file_name: str):
292306
self.s3_provider.download_file(self.artifact_bucket, key_name, local_file)
293307
with open(local_file, "r") as f:
294308
data = json.load(f)
309+
self.logger.info(
310+
"Downloaded from S3",
311+
extra={"file_name": local_file, "bucket": self.artifact_bucket, "key": key_name, "data": data},
312+
)
295313
self.s3_provider.delete_files(self.artifact_bucket, [key_name]) # delete after reading.
296314
return data
297315

@@ -334,9 +352,12 @@ def retrieve_report_files_from_s3(message_type: str):
334352
self.logger.info("Report", extra=report)
335353
report_str = json.dumps(report, indent=4, sort_keys=True, cls=CustomJSONEncoder)
336354
report_message = f"Schema migration results ({os.environ['DEPLOYMENT_STAGE']} env)"
337-
# if report["errors"]:
338-
# report_message += " @sc-oncall-eng"
339355
self._upload_to_slack("schema_migration_report.json", report_str, report_message)
356+
# Cleanup leftover schema migration files
357+
self.s3_provider.delete_prefix(
358+
self.artifact_bucket, self.get_key_prefix(f"schema_migration/{self.execution_id}")
359+
)
360+
340361
return report
341362
except Exception as e:
342363
self.logger.exception("Failed to generate report")
@@ -356,13 +377,13 @@ def migrate(self, step_name) -> bool:
356377
if step_name == "gather_collections":
357378
gather_collections = self.error_wrapper(self.gather_collections, "gather_collections")
358379
auto_publish = os.environ["AUTO_PUBLISH"].lower() == "true"
359-
response = gather_collections(auto_publish)
380+
response, _ = gather_collections(auto_publish)
360381
elif step_name == "collection_migrate":
361382
collection_id = os.environ["COLLECTION_ID"]
362383
collection_version_id = os.environ["COLLECTION_VERSION_ID"]
363384
can_publish = os.environ["CAN_PUBLISH"].lower() == "true"
364385
collection_migrate = self.error_wrapper(self.collection_migrate, collection_id)
365-
response = collection_migrate(
386+
response, _, _ = collection_migrate(
366387
collection_id=collection_id,
367388
collection_version_id=collection_version_id,
368389
can_publish=can_publish,
@@ -381,9 +402,8 @@ def migrate(self, step_name) -> bool:
381402
)
382403
elif step_name == "collection_publish":
383404
collection_version_id = os.environ["COLLECTION_VERSION_ID"]
384-
can_publish = os.environ["CAN_PUBLISH"].lower() == "true"
385405
publish_and_cleanup = self.error_wrapper(self.publish_and_cleanup, collection_version_id)
386-
response = publish_and_cleanup(collection_version_id=collection_version_id, can_publish=can_publish)
406+
response = publish_and_cleanup(collection_version_id=collection_version_id)
387407
elif step_name == "report":
388408
response = self.report()
389409
self.logger.info("output", extra={"response": response})

0 commit comments

Comments
 (0)