diff --git a/datasets/aster/aster.py b/datasets/aster/aster.py index 08c7ced0..2b90d8bb 100644 --- a/datasets/aster/aster.py +++ b/datasets/aster/aster.py @@ -92,18 +92,10 @@ def run(self, input: CreateChunksInput, context: TaskContext) -> CreateChunksOut item_collection = read_item_collection( asset["href"], storage_options=asset["table:storage_options"] ) - chunks = [] - chunk = [] - for item in itertools.islice(item_collection, input.limit): - chunk.append(fix_dict(item.to_dict(include_self_link=False))) - if len(chunk) >= input.chunk_size: - chunks.append(chunk) - chunk = [] - if chunk: - chunks.append(chunk) output = [] - for i, chunk in enumerate(chunks): - uri = f"{input.dst_uri}/{asset['partition_number']}/{i}.ndjson" + + def write_chunk(chunk: List[Item], chunk_number: int) -> None: + uri = f"{input.dst_uri}/{asset['partition_number']}/{chunk_number}.ndjson" storage, path = context.storage_factory.get_storage_for_file(uri) storage.write_text( path, "\n".join(orjson.dumps(item).decode("utf-8") for item in chunk) @@ -111,10 +103,23 @@ def run(self, input: CreateChunksInput, context: TaskContext) -> CreateChunksOut output.append( Chunk( uri=storage.get_uri(path), - id=str(i), + id=str(chunk_number), partition_number=str(asset["partition_number"]), ) ) + return chunk_number + 1 + + chunk = [] + chunk_number = 0 + for item in itertools.islice(item_collection, input.limit): + chunk.append( + fix_dict(item.to_dict(include_self_link=False, transform_hrefs=False)) + ) + if len(chunk) >= input.chunk_size: + chunk_number = write_chunk(chunk, chunk_number) + chunk = [] + if chunk: + write_chunk(chunk, chunk_number) return CreateChunksOutput(chunks=output) @@ -147,9 +152,17 @@ def run(self, input: UpdateItemsInput, context: TaskContext) -> UpdateItemsOutpu new_item = sign_and_update(item, input.simplify_tolerance) except Exception as e: logger.error(e) - error_items.append(fix_dict(item.to_dict(include_self_link=False))) + error_items.append( + fix_dict( + item.to_dict(include_self_link=False, transform_hrefs=False) + ) + ) else: - items.append(fix_dict(new_item.to_dict(include_self_link=False))) + items.append( + fix_dict( + new_item.to_dict(include_self_link=False, transform_hrefs=False) + ) + ) logger.info(f"{len(items)} items updated, {len(error_items)} errors") storage, path = context.storage_factory.get_storage_for_file( f"{input.item_chunkset_uri}/{input.partition_number}/{input.chunk_id}.ndjson" diff --git a/datasets/aster/dataset.yaml b/datasets/aster/dataset.yaml index 6adf77a3..d7f45bbe 100644 --- a/datasets/aster/dataset.yaml +++ b/datasets/aster/dataset.yaml @@ -1,6 +1,6 @@ # TODO actually implement this -- this is currently a placeholder just to upload the collection id: aster -image: ${{ args.registry }}/pctasks-task-base:latest +image: ${{ args.registry }}/pctasks-aster:latest args: - registry diff --git a/datasets/aster/update-geometries-ingest.yaml b/datasets/aster/update-geometries-ingest.yaml new file mode 100644 index 00000000..c859d125 --- /dev/null +++ b/datasets/aster/update-geometries-ingest.yaml @@ -0,0 +1,28 @@ +name: Ingest NDJsons from blob://astersa/aster-etl-data/items/update-geometries +jobs: + ingest-items: + id: ingest-items + tasks: + - id: ingest-ndjson + image_key: ingest + task: pctasks.ingest_task.task:ingest_task + args: + content: + type: Ndjson + ndjson_folder: + uri: blob://astersa/aster-etl-data/items/update-geometries + extensions: + - .ndjson + matches: \d+.ndjson + options: + insert_group_size: 5000 + insert_only: false + environment: + AZURE_TENANT_ID: ${{ secrets.task-tenant-id }} + AZURE_CLIENT_ID: ${{ secrets.task-client-id }} + AZURE_CLIENT_SECRET: ${{ secrets.task-client-secret }} + schema_version: 1.0.0 +schema_version: 1.0.0 +id: aster-update-geometries-ingest +dataset: microsoft/aster-l1t + diff --git a/datasets/aster/update-geometries.yaml b/datasets/aster/update-geometries.yaml index 72225b4c..6cf544f8 100644 --- a/datasets/aster/update-geometries.yaml +++ b/datasets/aster/update-geometries.yaml @@ -32,12 +32,14 @@ jobs: task: aster:create_chunks_task args: asset: ${{ item }} - dst_uri: blob://astersa/aster-etl-data/chunks/update-geometries + dst_uri: blob://astersa/aster-etl-data/chunks/update-geometries-2023-01-23-00 chunk_size: 5000 environment: AZURE_TENANT_ID: ${{ secrets.task-tenant-id }} AZURE_CLIENT_ID: ${{ secrets.task-client-id }} AZURE_CLIENT_SECRET: ${{ secrets.task-client-secret }} + tags: + batch_pool_id: high_memory_pool update-items: foreach: items: ${{ jobs.create-chunks.tasks.create-chunks.output.chunks }} @@ -54,7 +56,7 @@ jobs: partition_number: ${{ item.partition_number }} chunk_uri: ${{ item.uri }} chunk_id: ${{ item.id }} - item_chunkset_uri: blob://astersa/aster-etl-data/items/update-geometries + item_chunkset_uri: blob://astersa/aster-etl-data/items/update-geometries-2023-01-23-00 simplify_tolerance: 0.001 environment: AZURE_TENANT_ID: ${{ secrets.task-tenant-id }}