Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions merino/configs/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,10 @@ gcs_bucket = ""
# CDN hostname used for public URL
cdn_hostname = ""

# MERINO_GAMES_PARTICLE__GREEN_DEPLOYMENT_FOLDER
# "folder" name (file prefix) used for staging green deployments
green_deployment_folder = "green_deployment"

[default.engagement]
# MERINO_ENGAGEMENT__GCS_STORAGE_BUCKET
gcs_storage_bucket = ""
Expand Down
8 changes: 6 additions & 2 deletions merino/providers/games/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
_gcs_project = settings.games_particle_gcs.gcs_project
_gcs_bucket = settings.games_particle_gcs.gcs_bucket
_gcs_cdn_hostname = settings.games_particle_gcs.cdn_hostname
_gcs_green_deployment_folder = settings.games_particle_gcs.green_deployment_folder
_manifest_gcs_file_name = settings.games_providers.particle.manifest_gcs_file_name
_manifest_schema_file_path = (
f"{_app_root_dir}/{settings.games_providers.particle.manifest_schema_file_path}"
Expand Down Expand Up @@ -54,12 +55,15 @@ async def init_providers() -> None:
local_file_manager = ParticleLocalFileManager(_manifest_schema_file_path)

# manages particle files in gcs
remote_file_manager = ParticleRemoteFileManager(gcs_uploader, _manifest_gcs_file_name)
remote_file_manager = ParticleRemoteFileManager(
gcs_client=gcs_uploader,
manifest_file_name=_manifest_gcs_file_name,
green_deployment_folder=_gcs_green_deployment_folder,
)

particle_backend = ParticleBackend(
gcs_uploader=gcs_uploader,
http_client=http_client,
manifest_gcs_file_name=_manifest_gcs_file_name,
metrics_client=metrics_client,
particle_url_root=_url_root,
particle_url_path_manifest=_url_path_manifest,
Expand Down
54 changes: 45 additions & 9 deletions merino/providers/games/particle/backends/filemanager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""File manager class for the Particle backend."""

import asyncio
import json
import logging
import sentry_sdk
Expand Down Expand Up @@ -47,25 +48,30 @@ class ParticleRemoteFileManager:
"""Filemanager for processing remote (GCS) Particle data."""

gcs_client: GcsUploader
green_deployment_folder: str
manifest_file_name: str

def __init__(
self,
gcs_client: GcsUploader,
green_deployment_folder: str,
manifest_file_name: str,
) -> None:
"""Initialize the remote filemanager."""
self.gcs_client = gcs_client
self.green_deployment_folder = green_deployment_folder
self.manifest_file_name = manifest_file_name

def get_manifest_file(self) -> dict[str, Any] | None:
"""Read remote manifest file.
def get_manifest_file(self) -> dict[str, Any]:
"""Read remote manifest file from GCS.

Raises:
ParticleFileManagerError: If the manifest file cannot be accessed.
ParticleFileManagerError: If the manifest file cannot be accessed or cannot be converted to JSON.
Returns:
Dictionary containing manifest.
"""
manifest: dict[str, Any] | None = None

try:
blob = self.gcs_client.get_file_by_name(self.manifest_file_name)

Expand All @@ -74,14 +80,44 @@ def get_manifest_file(self) -> dict[str, Any] | None:
file_contents: dict = json.loads(blob_data)
logger.info("Successfully loaded remote Particle manifest file.")

return file_contents

return None
manifest = file_contents
except Exception as ex:
error_msg = f"Error retrieving remote Particle manifest file. {ex}"

error_msg = f"Error retrieving GCS manifest file. {ex}"
logger.error(error_msg)

sentry_sdk.capture_exception(ex)

raise ParticleFileManagerError(error_msg) from ex

# if manifest is still None, the blob in GCS was empty, so we need to raise
if manifest is None:
raise ParticleFileManagerError("GCS manifest file is empty.")

# if the manifest was retrieved and converted to JSON, return it
return manifest

async def upload_file(self, file_name: str, file_path: str, content_type: str) -> str:
"""Attempt to upload the file to GCS. Overwrites an existing file."""
blob_name = ""

try:
# wrap the call in an async thread to unblock other processing
blob = await asyncio.to_thread(
self.gcs_client.upload_from_filename,
file_path=file_path,
destination_name=f"{self.green_deployment_folder}/{file_name}",
content_type=content_type,
forced_upload=True, # force an overwrite if necessary
)

if blob:
blob_name = str(blob.name)

return blob_name
except Exception as ex:
sentry_sdk.capture_exception(ParticleFileManagerError(str(ex)))
return ""

async def empty_staging_folder(self) -> bool:
"""Delete all contents of the GCS staging folder. Used when a channel staging fails, e.g. due to SHA validation or upload failure."""
# stub
return True
113 changes: 111 additions & 2 deletions merino/providers/games/particle/backends/particle.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,22 @@
import logging
import orjson
import sentry_sdk
import tempfile

from httpx import AsyncClient, HTTPError, Response
from pydantic import Json

from merino.configs import settings
from merino.providers.games.particle.backends.protocol import Particle
from merino.providers.games.particle.backends.errors import ParticleRemoteFileProcessError
from merino.providers.games.particle.backends.filemanager import ParticleRemoteFileManager
from merino.providers.games.particle.backends.utils import (
download_remote_file,
GameFile,
get_remote_files_and_shas_for_channel,
RemoteChannelEnum,
remote_manifest_channel_is_updated,
)
from merino.utils.gcs.gcs_uploader import GcsUploader

logger = logging.getLogger(__name__)
Expand All @@ -37,13 +46,12 @@ def __init__(
self,
gcs_uploader: GcsUploader,
http_client: AsyncClient,
manifest_gcs_file_name: str,
metrics_client: aiodogstatsd.Client,
particle_url_root: str,
particle_url_path_manifest: str,
remote_file_manager: ParticleRemoteFileManager,
) -> None:
"""Initialize the Polygon backend."""
"""Initialize the Particle backend."""
self.gcs_uploader = gcs_uploader
self.http_client = http_client
self.metrics_client = metrics_client
Expand Down Expand Up @@ -94,3 +102,104 @@ async def fetch_manifest_json_from_gcs(self) -> Json | None:
# the gcp client library is synchronous - wrap in an async thread so
# we don't block processing
return await asyncio.to_thread(self.remote_file_manager.get_manifest_file)

async def update_channel_files(
self, manifest_remote: Json, manifest_gcs: Json, channel: RemoteChannelEnum
) -> bool:
"""Attempt to update files for the given channel."""
if remote_manifest_channel_is_updated(manifest_remote, manifest_gcs, channel):
# get the files for the given channel from the remote manifest
files: list[GameFile] = get_remote_files_and_shas_for_channel(manifest_remote, channel)

# steps necessary for the process to be considered a success
staged = False
deployed = False

if not len(files):
# if no files were found, exit early
sentry_sdk.capture_exception(
ParticleRemoteFileProcessError(
f"No files found in remote manifest for {channel} channel."
)
)
return False
else:
# download the remote files, verify their SHAs, and upload to GCS
staged, files = await self.stage_channel_files(files=files)

if staged:
# if staging was successful, attempt to deploy the channel
deployed = await self.deploy_channel_files(
files, manifest_remote=manifest_remote, manifest_gcs=manifest_gcs
)

# if staging and deploying succeeded, the process was a success
return staged and deployed
else:
# if the channel files don't need to be updated, return False
return False

async def stage_channel_files(self, files: list[GameFile]) -> tuple[bool, list[GameFile]]:
"""Orchestration function to download remote files for the given channel to a temporary directory,
verify their SHAs, and, if valid, upload them to GCS. If one file fails, we cancel the rest of the checks, as all files in a set must
validate and upload successfully.

This will prepare the channel to be "deployed".

Returns overall success status and the list of GameFiles with their verified/uploaded statuses updated.
"""
# create a temporary directory to store remote files.
# at the end of this context, the temp dir will be automatically deleted.
with tempfile.TemporaryDirectory() as tmpdir_name:
for file in files:
# where to store the remote file locally in the context's tempdir
file.local_path = f"{tmpdir_name}/{file.name}"

# attempt to download the file into the temp directory
try:
download_remote_file(
f"{self.particle_url_root}/{file.remote_url}", file.local_path
)
except Exception as ex:
# capture any exceptions during download and stop processing the manifest files
sentry_sdk.capture_exception(ParticleRemoteFileProcessError(str(ex)))
break

# validate the SHA of each file
file.sha_computed = GameFile.compute_sha(file.local_path)

if file.sha_target != file.sha_computed:
sentry_sdk.capture_exception(
ParticleRemoteFileProcessError("File SHA mismatch")
)
break

# if all the above succeeds, the file is considered verified
file.sha_verified = True

# attempt to upload file to GCS - will return the name of the
# remote file if successful, an empty string on failure
file.gcs_staging_name = await self.remote_file_manager.upload_file(
file_name=file.name, file_path=file.local_path, content_type=file.content_type
)

if file.gcs_staging_name:
file.uploaded = True
else:
break

# if some files were uploaded but not all, erase the partial green deployment
# by clearing out the GCS "green" folder
if any(f.uploaded for f in files) and not all(f.uploaded for f in files):
await self.remote_file_manager.empty_staging_folder()

# return set of files with sha_verified and uploaded properties
# (potentially) updated
return all(f.sha_verified and f.uploaded for f in files), files

async def deploy_channel_files(
self, files: list[GameFile], manifest_remote: Json, manifest_gcs: Json
) -> bool:
"""Deploy files from the 'green' folder in GCS to the root."""
# stub
return True
47 changes: 43 additions & 4 deletions merino/providers/games/particle/backends/protocol.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
"""Protocol for Particle provider backend."""

from pydantic import BaseModel, Field, Json
import aiodogstatsd

from httpx import AsyncClient
from pydantic import BaseModel, Field, Json
from typing import Protocol

from merino.providers.games.particle.backends.filemanager import ParticleRemoteFileManager
from merino.providers.games.particle.backends.utils import GameFile, RemoteChannelEnum
from merino.utils.gcs.gcs_uploader import GcsUploader


class Particle(BaseModel):
"""Model for Particle game data"""
Expand All @@ -19,6 +25,16 @@ class ParticleBackend(Protocol):
directly depend on.
"""

gcs_uploader: GcsUploader
http_client: AsyncClient
metrics_client: aiodogstatsd.Client
# remote endpoint
particle_url_root: str
# path to the manifest on the remote endpoint
particle_url_path_manifest: str
# manages files stored in GCS
remote_file_manager: ParticleRemoteFileManager

async def get_game_url(self) -> Particle | None:
"""Fetch the Particle game data.

Expand All @@ -28,9 +44,32 @@ async def get_game_url(self) -> Particle | None:
...

async def fetch_manifest_json_from_remote(self) -> Json | None:
"""Retrieve the latest manifest JSON from Particle"""
"""Retrieve the latest manifest JSON from Particle."""
...

async def fetch_manifest_json_from_gcs(self) -> Json:
"""Retrieve the manifest json last stored in GCS."""
...

async def update_channel_files(
self, manifest_remote: Json, manifest_gcs: Json, channel: RemoteChannelEnum
) -> bool:
"""Attempt to update files in GCS for the given channel."""
...

async def stage_channel_files(self, files: list[GameFile]) -> tuple[bool, list[GameFile]]:
"""Orchestration function to download remote files for the given channel to a temporary directory,
verify their SHAs, and, if valid, upload them to GCS. If one file fails, we cancel the rest of the checks, as all files in a set must
validate and upload successfully.

This will prepare the channel to be "deployed".

Returns overall success status and the list of GameFiles with their verified/uploaded statuses updated.
"""
...

async def fetch_manifest_json_from_gcs(self) -> Json | None:
"""Retrieve the manifest json last stored in GCS"""
async def deploy_channel_files(
self, files: list[GameFile], manifest_remote: Json, manifest_gcs: Json
) -> bool:
"""Deploy files from the 'green' folder in GCS to the root."""
...
Loading
Loading