diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 973198fc..899df2f1 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -70,12 +70,8 @@ _log_trace3 = logging.getLogger("TRACE3.lsst." + __name__) _log_trace3.setLevel(logging.CRITICAL) # Turn off by default. -# The number of calib datasets to keep from previous runs, or one less than the -# number of calibs that may be present *during* a run. -base_keep_limit = int(os.environ.get("LOCAL_REPO_CACHE_SIZE", 3))-1 -# Multipliers to base_keep_limit for refcats and templates. -refcat_factor = int(os.environ.get("REFCATS_PER_IMAGE", 4)) -template_factor = int(os.environ.get("PATCHES_PER_IMAGE", 4)) +# The number of calib datasets to keep, including the current run. +base_keep_limit = int(os.environ.get("LOCAL_REPO_CACHE_SIZE", 3)) # Whether or not to export to the central repo. do_export = bool(int(os.environ.get("DEBUG_EXPORT_OUTPUTS", '1'))) # The number of arcseconds to pad the region in preloading spatial datasets. @@ -168,25 +164,7 @@ def make_local_cache(): cache : `activator.caching.DatasetCache` An empty cache with configured caching strategy and limits. """ - return DatasetCache( - base_keep_limit, - cache_sizes={ - # TODO: find an API that doesn't require explicit enumeration - "goodSeeingCoadd": template_factor * base_keep_limit, - "deepCoadd": template_factor * base_keep_limit, - "template_coadd": template_factor * base_keep_limit, - "uw_stars_20240524": refcat_factor * base_keep_limit, - "uw_stars_20240228": refcat_factor * base_keep_limit, - "uw_stars_20240130": refcat_factor * base_keep_limit, - "cal_ref_cat_2_2": refcat_factor * base_keep_limit, - "ps1_pv3_3pi_20170110": refcat_factor * base_keep_limit, - "gaia_dr3_20230707": refcat_factor * base_keep_limit, - "gaia_dr2_20200414": refcat_factor * base_keep_limit, - "atlas_refcat2_20220201": refcat_factor * base_keep_limit, - "the_monster_20240904": refcat_factor * base_keep_limit, - "the_monster_20250219": refcat_factor * base_keep_limit, - }, - ) + return DatasetCache(base_keep_limit) def _get_sasquatch_dispatcher(): @@ -406,15 +384,16 @@ def _define_dimensions(self): "instrument": self.instrument.getName(), }) - def _mark_dataset_usage(self, refs: collections.abc.Iterable[lsst.daf.butler.DatasetRef]): - """Mark requested datasets in the cache. + def _cache_datasets(self, refs: collections.abc.Iterable[lsst.daf.butler.DatasetRef]): + """Add or mark requested datasets in the cache. Parameters ---------- refs : iterable [`lsst.daf.butler.DatasetRef`] - The datasets to mark. Assumed to all fit inside the cache. + The datasets to cache. Assumed to all fit inside the cache. """ - self.cache.update(refs) + evicted = self.cache.update(refs) + self.butler.pruneDatasets(evicted, disassociate=True, unstore=True, purge=True) try: self.cache.access(refs) except LookupError as e: @@ -734,7 +713,7 @@ def _find_refcats(self, dataset_type, region): bind={"search_region": region}, find_first=True, ), - all_callback=self._mark_dataset_usage, + # Don't cache refcats )) if refcats: _log.debug("Found %d new refcat datasets from catalog '%s'.", len(refcats), dataset_type.name) @@ -775,7 +754,7 @@ def _find_templates(self, dataset_type, region, physical_filter): bind={"search_region": region}, find_first=True, ), - all_callback=self._mark_dataset_usage, + # Don't cache templates )) if templates: _log.debug("Found %d new template datasets of type %s.", len(templates), dataset_type.name) @@ -835,7 +814,7 @@ def query_calibs_by_date(butler, label): calibs = set(_filter_datasets( self.read_central_butler, self.butler, query_calibs_by_date, - all_callback=self._mark_dataset_usage, + all_callback=self._cache_datasets, )) if calibs: _log.debug("Found %d new calib datasets of type '%s'.", len(calibs), dataset_type.name) @@ -872,7 +851,7 @@ def _find_generic_datasets(self, dataset_type, detector_id, physical_filter): data_id=data_id, find_first=True, ), - all_callback=self._mark_dataset_usage, + all_callback=self._cache_datasets, )) if datasets: _log.debug("Found %d new datasets of type %s.", len(datasets), dataset_type.name) @@ -912,7 +891,7 @@ def _find_init_outputs(self): run = runs.get_output_run(self.instrument, self._deployment, pipeline_file, self._day_obs) types = self._get_init_output_types(pipeline_file) # Output runs are always cleared after execution, so _filter_datasets would always warn. - # This also means the init-outputs don't need to be cached with _mark_dataset_usage. + # This also means the init-outputs don't need to be cached with _cache_datasets. query = _generic_query(types, collections=run) datasets.update(query(self.read_central_butler, "source datasets")) if not datasets: