Skip to content

DM-51735: Stop caching spatial datasets in Prompt Processing #335

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 13 additions & 34 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,8 @@
_log_trace3 = logging.getLogger("TRACE3.lsst." + __name__)
_log_trace3.setLevel(logging.CRITICAL) # Turn off by default.

# The number of calib datasets to keep from previous runs, or one less than the
# number of calibs that may be present *during* a run.
base_keep_limit = int(os.environ.get("LOCAL_REPO_CACHE_SIZE", 3))-1
# Multipliers to base_keep_limit for refcats and templates.
refcat_factor = int(os.environ.get("REFCATS_PER_IMAGE", 4))
template_factor = int(os.environ.get("PATCHES_PER_IMAGE", 4))
# The number of calib datasets to keep, including the current run.
base_keep_limit = int(os.environ.get("LOCAL_REPO_CACHE_SIZE", 3))
# Whether or not to export to the central repo.
do_export = bool(int(os.environ.get("DEBUG_EXPORT_OUTPUTS", '1')))
# The number of arcseconds to pad the region in preloading spatial datasets.
Expand Down Expand Up @@ -168,25 +164,7 @@ def make_local_cache():
cache : `activator.caching.DatasetCache`
An empty cache with configured caching strategy and limits.
"""
return DatasetCache(
base_keep_limit,
cache_sizes={
# TODO: find an API that doesn't require explicit enumeration
"goodSeeingCoadd": template_factor * base_keep_limit,
"deepCoadd": template_factor * base_keep_limit,
"template_coadd": template_factor * base_keep_limit,
"uw_stars_20240524": refcat_factor * base_keep_limit,
"uw_stars_20240228": refcat_factor * base_keep_limit,
"uw_stars_20240130": refcat_factor * base_keep_limit,
"cal_ref_cat_2_2": refcat_factor * base_keep_limit,
"ps1_pv3_3pi_20170110": refcat_factor * base_keep_limit,
"gaia_dr3_20230707": refcat_factor * base_keep_limit,
"gaia_dr2_20200414": refcat_factor * base_keep_limit,
"atlas_refcat2_20220201": refcat_factor * base_keep_limit,
"the_monster_20240904": refcat_factor * base_keep_limit,
"the_monster_20250219": refcat_factor * base_keep_limit,
},
)
return DatasetCache(base_keep_limit)


def _get_sasquatch_dispatcher():
Expand Down Expand Up @@ -406,15 +384,16 @@ def _define_dimensions(self):
"instrument": self.instrument.getName(),
})

def _mark_dataset_usage(self, refs: collections.abc.Iterable[lsst.daf.butler.DatasetRef]):
"""Mark requested datasets in the cache.
def _cache_datasets(self, refs: collections.abc.Iterable[lsst.daf.butler.DatasetRef]):
"""Add or mark requested datasets in the cache.

Parameters
----------
refs : iterable [`lsst.daf.butler.DatasetRef`]
The datasets to mark. Assumed to all fit inside the cache.
The datasets to cache. Assumed to all fit inside the cache.
"""
self.cache.update(refs)
evicted = self.cache.update(refs)
self.butler.pruneDatasets(evicted, disassociate=True, unstore=True, purge=True)
try:
self.cache.access(refs)
except LookupError as e:
Expand Down Expand Up @@ -734,7 +713,7 @@ def _find_refcats(self, dataset_type, region):
bind={"search_region": region},
find_first=True,
),
all_callback=self._mark_dataset_usage,
# Don't cache refcats
))
if refcats:
_log.debug("Found %d new refcat datasets from catalog '%s'.", len(refcats), dataset_type.name)
Expand Down Expand Up @@ -775,7 +754,7 @@ def _find_templates(self, dataset_type, region, physical_filter):
bind={"search_region": region},
find_first=True,
),
all_callback=self._mark_dataset_usage,
# Don't cache templates
))
if templates:
_log.debug("Found %d new template datasets of type %s.", len(templates), dataset_type.name)
Expand Down Expand Up @@ -835,7 +814,7 @@ def query_calibs_by_date(butler, label):
calibs = set(_filter_datasets(
self.read_central_butler, self.butler,
query_calibs_by_date,
all_callback=self._mark_dataset_usage,
all_callback=self._cache_datasets,
))
if calibs:
_log.debug("Found %d new calib datasets of type '%s'.", len(calibs), dataset_type.name)
Expand Down Expand Up @@ -872,7 +851,7 @@ def _find_generic_datasets(self, dataset_type, detector_id, physical_filter):
data_id=data_id,
find_first=True,
),
all_callback=self._mark_dataset_usage,
all_callback=self._cache_datasets,
))
if datasets:
_log.debug("Found %d new datasets of type %s.", len(datasets), dataset_type.name)
Expand Down Expand Up @@ -912,7 +891,7 @@ def _find_init_outputs(self):
run = runs.get_output_run(self.instrument, self._deployment, pipeline_file, self._day_obs)
types = self._get_init_output_types(pipeline_file)
# Output runs are always cleared after execution, so _filter_datasets would always warn.
# This also means the init-outputs don't need to be cached with _mark_dataset_usage.
# This also means the init-outputs don't need to be cached with _cache_datasets.
query = _generic_query(types, collections=run)
datasets.update(query(self.read_central_butler, "source datasets"))
if not datasets:
Expand Down