Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion conftest.py

This file was deleted.

2 changes: 1 addition & 1 deletion src/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ ignored-parents=
max-args=15

# Maximum number of attributes for a class (see R0902).
max-attributes=15
max-attributes=25
Comment thread
delucchi-cmu marked this conversation as resolved.

# Maximum number of boolean expressions in an if statement (see R0916).
max-bool-expr=5
Expand Down
3 changes: 3 additions & 0 deletions src/hipscat/catalog/association_catalog/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .association_catalog import AssociationCatalog
from .association_catalog_info import AssociationCatalogInfo
from .partition_join_info import PartitionJoinInfo
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pandas as pd
from hipscat.io import FilePointer, file_io
from typing_extensions import Self

from hipscat.io import FilePointer, file_io


class PartitionJoinInfo:
"""Association catalog metadata with which partitions matches occur in the join"""
Expand Down
76 changes: 76 additions & 0 deletions src/hipscat/catalog/dataset/catalog_info_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import dataclasses
from typing import Optional

from hipscat.catalog.association_catalog.association_catalog_info import (
AssociationCatalogInfo,
)
from hipscat.catalog.catalog_info import CatalogInfo
from hipscat.catalog.catalog_type import CatalogType
from hipscat.catalog.dataset.base_catalog_info import BaseCatalogInfo
from hipscat.catalog.index.index_catalog_info import IndexCatalogInfo
from hipscat.catalog.margin_cache.margin_cache_catalog_info import (
MarginCacheCatalogInfo,
)
from hipscat.catalog.source_catalog.source_catalog_info import SourceCatalogInfo
from hipscat.io import FilePointer, file_io, paths

CATALOG_TYPE_TO_INFO_CLASS = {
CatalogType.OBJECT: CatalogInfo,
CatalogType.SOURCE: SourceCatalogInfo,
CatalogType.ASSOCIATION: AssociationCatalogInfo,
CatalogType.INDEX: IndexCatalogInfo,
CatalogType.MARGIN: MarginCacheCatalogInfo,
}
"""Map of catalog types to their expected subclass of BaseCatalogInfo."""


def create_catalog_info(
keywords: dict, catalog_type: Optional[CatalogType] = None
) -> BaseCatalogInfo:
"""Generate a typed catalog info object from the type specified explicitly or
using ``catalog_type`` keyword.

Args:
keywords: dictionary of catalog info keywords (e.g. from reading a
``catalog_info.json`` file).
catalog_type: explicit request for a specific catalog type. if not
provided, we will look for a key ``catalog_type`` in the keywords.
Returns:
populated BaseCatalogInfo of appropriate type.
"""

if not catalog_type:
if "catalog_type" not in keywords.keys():
raise ValueError("catalog type is required to create catalog info object")
catalog_type = keywords["catalog_type"]

if catalog_type not in CatalogType.all_types():
raise ValueError(f"Unknown catalog type: {catalog_type}")

if catalog_type not in CATALOG_TYPE_TO_INFO_CLASS: # pragma: no cover
raise NotImplementedError(f"Unhandled catalog type: {catalog_type}")
ci_class = CATALOG_TYPE_TO_INFO_CLASS[catalog_type]
catalog_info_keywords = {}
for field in dataclasses.fields(ci_class):
if field.name in keywords:
catalog_info_keywords[field.name] = keywords[field.name]
return ci_class(**catalog_info_keywords)


def from_catalog_dir(catalog_base_dir: FilePointer):
"""Generate a typed catalog info object from the type specified in the
Comment thread
delucchi-cmu marked this conversation as resolved.
catalog info file.

Args:
catalog_base_dir: a path pointing to the base directory of a catalog,
or may point to a ``catalog_info.json`` file directly.
Returns:
populated BaseCatalogInfo of appropriate type.
"""
if file_io.is_regular_file(catalog_base_dir):
## This might be the catalog_info.json file - try anyway
metadata_keywords = file_io.load_json_file(catalog_base_dir)
else:
catalog_info_file = paths.get_catalog_info_pointer(catalog_base_dir)
metadata_keywords = file_io.load_json_file(catalog_info_file)
return create_catalog_info(metadata_keywords)
3 changes: 3 additions & 0 deletions src/hipscat/catalog/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ class Dataset:
"""A base HiPSCat dataset

A base dataset contains a catalog_info metadata file and the data contained in parquet files

TODO - create factory methods to get appropriately-typed datasets for
some catalog info or catalog directory
"""

CatalogInfoClass: Type[BaseCatalogInfo] = BaseCatalogInfo
Expand Down
1 change: 1 addition & 0 deletions src/hipscat/catalog/index/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .index_catalog_info import IndexCatalogInfo
1 change: 1 addition & 0 deletions src/hipscat/catalog/margin_cache/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .margin_cache_catalog_info import MarginCacheCatalogInfo
1 change: 1 addition & 0 deletions src/hipscat/catalog/source_catalog/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .source_catalog_info import SourceCatalogInfo
261 changes: 261 additions & 0 deletions src/hipscat/inspection/almanac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
import glob
import os
import warnings
from typing import List

import pandas as pd

from hipscat.catalog.catalog import CatalogType
from hipscat.catalog.dataset.dataset import Dataset
from hipscat.inspection.almanac_info import AlmanacInfo


class Almanac:
Comment thread
delucchi-cmu marked this conversation as resolved.
"""Single instance of an almanac, and available catalogs within namespaces

Params:
include_default_dir:
include directory indicated in HIPSCAT_ALMANAC_DIR
environment variable. see AlmanacInfo.get_default_dir
dirs:
additional directories to look for almanac files in. we support a
few types of input, with different behaviors:

- ``str`` - a single directory
- ``list[str]`` - multiple directories
- ``dict[str:str]`` / ``dict[str:list[str]]`` - namespace
dictionary. for each key in the dictionary, we put all almanac
entries under a namespace. this is useful if you have name
collisions e.g. between multiple surveys or user-provided
catalogs.
"""

def __init__(self, include_default_dir=True, dirs=None):
"""Create new almanac."""
self.files = {}
self.entries = {}
self.dir_to_catalog_name = {}
self._init_files(include_default_dir=include_default_dir, dirs=dirs)
self._init_catalog_objects()
self._init_catalog_links()

def _init_files(self, include_default_dir=True, dirs=None):
"""Create a list of all the almanac files we want to add to this instance.

Each almanac file corresponds to a single catalog.

Args:
include_default_dir: include directory indicated in HIPSCAT_ALMANAC_DIR
environment variable. see AlmanacInfo.get_default_dir
dirs: additional directories to look for almanac files in
"""
if include_default_dir:
default_dir = AlmanacInfo.get_default_dir()
if default_dir:
self._add_files_to_namespace(default_dir)
if pd.api.types.is_dict_like(dirs):
for key, value in dirs.items():
self._add_files_to_namespace(value, key)
elif pd.api.types.is_list_like(dirs):
self._add_files_to_namespace(dirs)
elif dirs is not None:
self._add_files_to_namespace(dirs)

def _add_files_to_namespace(self, directory, namespace=""):
"""Get almanac files within a directory or list of directories.

Args:
directory: directory to scan
namespace: if provided, files in this directory will be in their
own namespace in the almanac
"""
if not pd.api.types.is_list_like(directory):
directory = [directory]

files = []
for input_path in directory:
if os.path.isfile(input_path):
files.append(input_path)
continue

input_paths = glob.glob(f"{input_path}/**.yml")
input_paths.sort()
files.extend(input_paths)

if namespace in self.files:
self.files[namespace].extend(files)
else:
self.files[namespace] = files

def _init_catalog_objects(self):
"""Create (unlinked) almanac info objects for all the files found
in the previous steps."""
for namespace, files in self.files.items():
for file in files:
catalog_info = AlmanacInfo.from_file(file)
catalog_info.namespace = namespace
if namespace:
full_name = f"{namespace}:{catalog_info.catalog_name}"
else:
full_name = catalog_info.catalog_name
if full_name in self.entries:
warnings.warn(
f"Duplicate catalog name ({full_name}). Try using namespaces."
)
else:
self.entries[full_name] = catalog_info
self.dir_to_catalog_name[catalog_info.catalog_path] = full_name

def _init_catalog_links(self):
"""Initialize the links between almanac catalogs.

For each type of link (e.g. primary or join), look for the catalog in
the almanac, using whatever text we have. If found, add the object
to the almanac info as a pointer. Additionally, add the reference to
the linked object, so catalogs know about each other from either side.
"""
for catalog_entry in self.entries.values():
if catalog_entry.catalog_type == CatalogType.OBJECT:
## Object currently has no links that start in the catalog.
pass
elif catalog_entry.catalog_type == CatalogType.SOURCE:
## Source catalogs MAY indicate their primary object catalog.
if catalog_entry.primary:
object_catalog = self._get_linked_catalog(
catalog_entry.primary, catalog_entry.namespace
)
if not object_catalog:
warnings.warn(
f"source catalog {catalog_entry.catalog_name} missing "
f"object catalog {catalog_entry.primary}"
)
else:
catalog_entry.primary_link = object_catalog
catalog_entry.objects.append(object_catalog)
object_catalog.sources.append(catalog_entry)
elif catalog_entry.catalog_type == CatalogType.ASSOCIATION:
## Association table MUST have a primary and join catalog
primary_catalog = self._get_linked_catalog(
catalog_entry.primary, catalog_entry.namespace
)
if not primary_catalog:
warnings.warn(
f"association table {catalog_entry.catalog_name} missing "
f"primary catalog {catalog_entry.primary}"
)
else:
catalog_entry.primary_link = primary_catalog
primary_catalog.associations.append(catalog_entry)

join_catalog = self._get_linked_catalog(
catalog_entry.join,
catalog_entry.namespace,
)
if not join_catalog:
warnings.warn(
f"association table {catalog_entry.catalog_name} missing "
f"join catalog {catalog_entry.join}"
)
else:
catalog_entry.join_link = join_catalog
join_catalog.associations_right.append(catalog_entry)
elif catalog_entry.catalog_type == CatalogType.MARGIN:
## Margin catalogs MUST have a primary catalog
primary_catalog = self._get_linked_catalog(
catalog_entry.primary, catalog_entry.namespace
)
if not primary_catalog:
warnings.warn(
f"margin table {catalog_entry.catalog_name} missing "
f"primary catalog {catalog_entry.primary}"
)
else:
catalog_entry.primary_link = primary_catalog
primary_catalog.margins.append(catalog_entry)
elif catalog_entry.catalog_type == CatalogType.INDEX:
## Index tables MUST have a primary catalog
primary_catalog = self._get_linked_catalog(
catalog_entry.primary, catalog_entry.namespace
)
if not primary_catalog:
warnings.warn(
f"index table {catalog_entry.catalog_name} missing "
f"primary catalog {catalog_entry.primary}"
)
else:
catalog_entry.primary_link = primary_catalog
primary_catalog.indexes.append(catalog_entry)
else: # pragma: no cover
warnings.warn(f"Unknown catalog type {catalog_entry.catalog_type}")

def _get_linked_catalog(self, linked_text, namespace) -> AlmanacInfo:
"""Find a catalog to be used for linking catalogs within the almanac.

e.g. for an association table, we will have a primary and join catalog.
the association catalog is "receiving" the link of primary catalog info,
and a link of join catalog info.

Comment thread
delucchi-cmu marked this conversation as resolved.
Args:
linked_text: text provided for the linked catalog. this could take
a few different forms:

- empty or None (returns None)
- short name of a catalog
- namespaced name of a catalog
- full path to a catalog base directory
- path to a catalog base directory, with environment variables
namespace: the namespace in the catalog **receiving** the link.
this is used to resolve the linked_text argument, so if you're
relying on namespaces, the receiving and linking catalog should
be in the same namespace
Returns:
almanac info for the linked catalog, if found
"""
resolved_path = os.path.expandvars(linked_text)
if linked_text in self.dir_to_catalog_name: # pragma: no cover
linked_text = self.dir_to_catalog_name[linked_text]
elif resolved_path in self.dir_to_catalog_name:
linked_text = self.dir_to_catalog_name[resolved_path]

resolved_name = linked_text
if not resolved_name in self.entries:
resolved_name = f"{namespace}:{linked_text}"
Comment thread
delucchi-cmu marked this conversation as resolved.
if not resolved_name in self.entries:
return None
return self.entries[resolved_name]

def catalogs(self, include_deprecated=False, types: List[str] = None):
"""Get names of catalogs in the almanac, matching the provided conditions.

Catalogs must meet all criteria provided in order to be returned (e.g.
the criteria are ANDED together).

Args:
include_deprecated: include catalogs which contain some text in their
``deprecated`` field.
types: include ONLY catalogs within the list of provided types.
"""
selected = []
for full_name, catalog_info in self.entries.items():
include = True
if not include_deprecated and catalog_info.deprecated:
include = False
if types and catalog_info.catalog_type not in types:
include = False

if include:
selected.append(full_name)
return selected

def get_almanac_info(self, catalog_name: str) -> AlmanacInfo:
"""Fetch the almanac info for a single catalog."""
return self.entries[catalog_name]

def get_catalog(self, catalog_name: str) -> Dataset:
"""Fetch the fully-populated hipscat metadata for the catalog name.

This will load the ``catalog_info.join`` and other relevant metadata files
from disk."""
return Dataset.read_from_hipscat(
self.get_almanac_info(catalog_name=catalog_name).catalog_path
)
Comment thread
delucchi-cmu marked this conversation as resolved.
Loading