From fb573f574701bf421aa5bd0fe2e9c0d4deafcfad Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Fri, 18 Jul 2025 11:06:43 -0700 Subject: [PATCH 1/2] Remove off-by-one offset in dataset cache. The offset is confusing and makes the code harder to maintain. It's also inefficient because it forces one visit's worth of datasets to get dropped even when the cache is being hit. --- python/activator/middleware_interface.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 973198fc..124406dc 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -70,9 +70,8 @@ _log_trace3 = logging.getLogger("TRACE3.lsst." + __name__) _log_trace3.setLevel(logging.CRITICAL) # Turn off by default. -# The number of calib datasets to keep from previous runs, or one less than the -# number of calibs that may be present *during* a run. -base_keep_limit = int(os.environ.get("LOCAL_REPO_CACHE_SIZE", 3))-1 +# The number of calib datasets to keep, including the current run. +base_keep_limit = int(os.environ.get("LOCAL_REPO_CACHE_SIZE", 3)) # Multipliers to base_keep_limit for refcats and templates. refcat_factor = int(os.environ.get("REFCATS_PER_IMAGE", 4)) template_factor = int(os.environ.get("PATCHES_PER_IMAGE", 4)) @@ -406,15 +405,16 @@ def _define_dimensions(self): "instrument": self.instrument.getName(), }) - def _mark_dataset_usage(self, refs: collections.abc.Iterable[lsst.daf.butler.DatasetRef]): - """Mark requested datasets in the cache. + def _cache_datasets(self, refs: collections.abc.Iterable[lsst.daf.butler.DatasetRef]): + """Add or mark requested datasets in the cache. Parameters ---------- refs : iterable [`lsst.daf.butler.DatasetRef`] - The datasets to mark. Assumed to all fit inside the cache. + The datasets to cache. Assumed to all fit inside the cache. """ - self.cache.update(refs) + evicted = self.cache.update(refs) + self.butler.pruneDatasets(evicted, disassociate=True, unstore=True, purge=True) try: self.cache.access(refs) except LookupError as e: @@ -734,7 +734,7 @@ def _find_refcats(self, dataset_type, region): bind={"search_region": region}, find_first=True, ), - all_callback=self._mark_dataset_usage, + all_callback=self._cache_datasets, )) if refcats: _log.debug("Found %d new refcat datasets from catalog '%s'.", len(refcats), dataset_type.name) @@ -775,7 +775,7 @@ def _find_templates(self, dataset_type, region, physical_filter): bind={"search_region": region}, find_first=True, ), - all_callback=self._mark_dataset_usage, + all_callback=self._cache_datasets, )) if templates: _log.debug("Found %d new template datasets of type %s.", len(templates), dataset_type.name) @@ -835,7 +835,7 @@ def query_calibs_by_date(butler, label): calibs = set(_filter_datasets( self.read_central_butler, self.butler, query_calibs_by_date, - all_callback=self._mark_dataset_usage, + all_callback=self._cache_datasets, )) if calibs: _log.debug("Found %d new calib datasets of type '%s'.", len(calibs), dataset_type.name) @@ -872,7 +872,7 @@ def _find_generic_datasets(self, dataset_type, detector_id, physical_filter): data_id=data_id, find_first=True, ), - all_callback=self._mark_dataset_usage, + all_callback=self._cache_datasets, )) if datasets: _log.debug("Found %d new datasets of type %s.", len(datasets), dataset_type.name) @@ -912,7 +912,7 @@ def _find_init_outputs(self): run = runs.get_output_run(self.instrument, self._deployment, pipeline_file, self._day_obs) types = self._get_init_output_types(pipeline_file) # Output runs are always cleared after execution, so _filter_datasets would always warn. - # This also means the init-outputs don't need to be cached with _mark_dataset_usage. + # This also means the init-outputs don't need to be cached with _cache_datasets. query = _generic_query(types, collections=run) datasets.update(query(self.read_central_butler, "source datasets")) if not datasets: From f428c9dde55b28bbde82c1d794e32e8e6e7ca489 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Fri, 18 Jul 2025 11:42:47 -0700 Subject: [PATCH 2/2] Stop caching templates and refcats. These datasets have little reuse value but take up several GB per image. --- python/activator/middleware_interface.py | 27 +++--------------------- 1 file changed, 3 insertions(+), 24 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 124406dc..899df2f1 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -72,9 +72,6 @@ # The number of calib datasets to keep, including the current run. base_keep_limit = int(os.environ.get("LOCAL_REPO_CACHE_SIZE", 3)) -# Multipliers to base_keep_limit for refcats and templates. -refcat_factor = int(os.environ.get("REFCATS_PER_IMAGE", 4)) -template_factor = int(os.environ.get("PATCHES_PER_IMAGE", 4)) # Whether or not to export to the central repo. do_export = bool(int(os.environ.get("DEBUG_EXPORT_OUTPUTS", '1'))) # The number of arcseconds to pad the region in preloading spatial datasets. @@ -167,25 +164,7 @@ def make_local_cache(): cache : `activator.caching.DatasetCache` An empty cache with configured caching strategy and limits. """ - return DatasetCache( - base_keep_limit, - cache_sizes={ - # TODO: find an API that doesn't require explicit enumeration - "goodSeeingCoadd": template_factor * base_keep_limit, - "deepCoadd": template_factor * base_keep_limit, - "template_coadd": template_factor * base_keep_limit, - "uw_stars_20240524": refcat_factor * base_keep_limit, - "uw_stars_20240228": refcat_factor * base_keep_limit, - "uw_stars_20240130": refcat_factor * base_keep_limit, - "cal_ref_cat_2_2": refcat_factor * base_keep_limit, - "ps1_pv3_3pi_20170110": refcat_factor * base_keep_limit, - "gaia_dr3_20230707": refcat_factor * base_keep_limit, - "gaia_dr2_20200414": refcat_factor * base_keep_limit, - "atlas_refcat2_20220201": refcat_factor * base_keep_limit, - "the_monster_20240904": refcat_factor * base_keep_limit, - "the_monster_20250219": refcat_factor * base_keep_limit, - }, - ) + return DatasetCache(base_keep_limit) def _get_sasquatch_dispatcher(): @@ -734,7 +713,7 @@ def _find_refcats(self, dataset_type, region): bind={"search_region": region}, find_first=True, ), - all_callback=self._cache_datasets, + # Don't cache refcats )) if refcats: _log.debug("Found %d new refcat datasets from catalog '%s'.", len(refcats), dataset_type.name) @@ -775,7 +754,7 @@ def _find_templates(self, dataset_type, region, physical_filter): bind={"search_region": region}, find_first=True, ), - all_callback=self._cache_datasets, + # Don't cache templates )) if templates: _log.debug("Found %d new template datasets of type %s.", len(templates), dataset_type.name)