diff --git a/requirements-dev.txt b/requirements-dev.txt index 7a42da0ec..7a9a43181 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -13,5 +13,7 @@ scikit-learn partd fsspec bed-reader +git+https://github.com/pangeo-data/rechunker.git +cbgen cyvcf2; platform_system != "Windows" yarl diff --git a/setup.cfg b/setup.cfg index d307de56a..02aa3bc96 100644 --- a/setup.cfg +++ b/setup.cfg @@ -57,6 +57,9 @@ vcf = cyvcf2 fsspec yarl +bgen = + rechunker + cbgen [coverage:report] fail_under = 100 @@ -115,10 +118,12 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-setuptools] ignore_missing_imports = True -[mypy-sgkit_plink.*] -ignore_missing_imports = True [mypy-sklearn.*] ignore_missing_imports = True +[mypy-cbgen.*] +ignore_missing_imports = True +[mypy-rechunker.*] +ignore_missing_imports = True [mypy-bed_reader.*] ignore_missing_imports = True [mypy-sphinx.*] diff --git a/sgkit/io/bgen/__init__.py b/sgkit/io/bgen/__init__.py new file mode 100644 index 000000000..16e6db44b --- /dev/null +++ b/sgkit/io/bgen/__init__.py @@ -0,0 +1,11 @@ +try: + from .bgen_reader import bgen_to_zarr, read_bgen, rechunk_bgen # noqa: F401 + + __all__ = ["read_bgen", "bgen_to_zarr", "rechunk_bgen"] +except ImportError as e: + msg = ( + "sgkit bgen requirements are not installed.\n\n" + "Please install them via pip :\n\n" + " pip install 'git+https://github.com/pystatgen/sgkit#egg=sgkit[bgen]'" + ) + raise ImportError(str(e) + "\n\n" + msg) from e diff --git a/sgkit/io/bgen/bgen_reader.py b/sgkit/io/bgen/bgen_reader.py new file mode 100644 index 000000000..d1961a975 --- /dev/null +++ b/sgkit/io/bgen/bgen_reader.py @@ -0,0 +1,623 @@ +"""BGEN reader implementation (using bgen_reader)""" +import logging +import tempfile +import time +from pathlib import Path +from typing import ( + Any, + Dict, + Hashable, + List, + Mapping, + MutableMapping, + Optional, + Tuple, + Union, +) + +import dask +import dask.array as da +import dask.dataframe as dd +import numpy as np +import pandas as pd +import xarray as xr +import zarr +from cbgen import bgen_file, bgen_metafile +from rechunker import api as rechunker_api +from xarray import Dataset + +from sgkit import create_genotype_dosage_dataset +from sgkit.io.utils import dataframe_to_dict, encode_contigs +from sgkit.typing import ArrayLike, DType, PathType + +logger = logging.getLogger(__name__) + +GT_DATA_VARS = [ + "call_genotype_probability", + "call_genotype_probability_mask", + "call_dosage", + "call_dosage_mask", +] + +METAFILE_DTYPE = dict( + [ + ("id", "S"), + ("rsid", "S"), + ("chrom", "S"), + ("pos", "int32"), + ("a1", "S"), + ("a2", "S"), + ("offset", "int64"), + ] +) + + +class BgenReader: + + name = "bgen_reader" + + def __init__( + self, + path: PathType, + metafile_path: Optional[PathType] = None, + dtype: DType = "float32", + ) -> None: + self.path = Path(path) + self.metafile_path = ( + Path(metafile_path) if metafile_path else self.path.with_suffix(".metafile") + ) + + with bgen_file(self.path) as bgen: + self.n_variants = bgen.nvariants + self.n_samples = bgen.nsamples + + if not self.metafile_path.exists(): + start = time.time() + logger.info( + f"Generating BGEN metafile for '{self.path}' (this may take a while)" + ) + bgen.create_metafile(self.metafile_path, verbose=False) + stop = time.time() + logger.info( + f"BGEN metafile generation complete ({stop - start:.0f} seconds)" + ) + + with bgen_metafile(self.metafile_path) as mf: + assert self.n_variants == mf.nvariants + self.npartitions = mf.npartitions + self.partition_size = mf.partition_size + + self.shape = (self.n_variants, self.n_samples, 3) + self.dtype = np.dtype(dtype) + self.precision = 64 if self.dtype.itemsize >= 8 else 32 + self.ndim = 3 + + def __getitem__(self, idx: Any) -> np.ndarray: + if not isinstance(idx, tuple): + raise IndexError(f"Indexer must be tuple (received {type(idx)})") + if len(idx) != self.ndim: + raise IndexError( + f"Indexer must have {self.ndim} items (received {len(idx)} slices)" + ) + if not all(isinstance(i, slice) or isinstance(i, int) for i in idx): + raise IndexError( + f"Indexer must contain only slices or ints (received types {[type(i) for i in idx]})" + ) + # Determine which dims should have unit size in result + squeeze_dims = tuple(i for i in range(len(idx)) if isinstance(idx[i], int)) + # Convert all indexers to slices + idx = tuple(slice(i, i + 1) if isinstance(i, int) else i for i in idx) + + if idx[0].start == idx[0].stop: + return np.empty((0,) * self.ndim, dtype=self.dtype) + + # Determine start and end partitions that correspond to the + # given variant dimension indexer + start_partition = idx[0].start // self.partition_size + start_partition_offset = idx[0].start % self.partition_size + end_partition = (idx[0].stop - 1) // self.partition_size + end_partition_offset = (idx[0].stop - 1) % self.partition_size + + # Create a list of all offsets into the underlying file at which + # data for each variant begins + all_vaddr = [] + with bgen_metafile(self.metafile_path) as mf: + for i in range(start_partition, end_partition + 1): + partition = mf.read_partition(i) + start_offset = start_partition_offset if i == start_partition else 0 + end_offset = ( + end_partition_offset + 1 + if i == end_partition + else self.partition_size + ) + vaddr = partition.variants.offset + all_vaddr.extend(vaddr[start_offset:end_offset].tolist()) + + # Read the probabilities for each variant, apply indexer for + # samples dimension to give probabilities for all genotypes, + # and then apply final genotype dimension indexer + with bgen_file(self.path) as bgen: + res = None + for i, vaddr in enumerate(all_vaddr): + probs = bgen.read_probability(vaddr, precision=self.precision)[idx[1]] + assert len(probs.shape) == 2 and probs.shape[1] == 3 + if res is None: + res = np.zeros((len(all_vaddr), len(probs), 3), dtype=self.dtype) + res[i] = probs + res = res[..., idx[2]] # type: ignore[index] + return np.squeeze(res, axis=squeeze_dims) + + +def _split_alleles(allele_ids: bytes) -> List[bytes]: + alleles = allele_ids.split(b",") + if len(alleles) != 2: + raise NotImplementedError( + f"Bgen reads only supported for biallelic variants (found non-biallelic variant '{str(allele_ids)}')" + ) + return alleles + + +def _read_metafile_partition(path: Path, partition: int) -> pd.DataFrame: + with bgen_metafile(path) as mf: + part = mf.read_partition(partition) + v = part.variants + allele_ids = np.array([_split_alleles(aid) for aid in v.allele_ids]) + data = { + "id": v.id, + "rsid": v.rsid, + "chrom": v.chromosome, + "pos": v.position, + "a1": allele_ids[:, 0], + "a2": allele_ids[:, 1], + "offset": v.offset, + } + return pd.DataFrame(data).astype(METAFILE_DTYPE) + + +def read_metafile(path: PathType) -> dd.DataFrame: + """ Read cbgen metafile containing partitioned variant info """ + with bgen_metafile(path) as mf: + nvariants = mf.nvariants + npartitions = mf.npartitions + partition_size = mf.partition_size + dfs = [] + index_base = 0 + divisions = [] + for i in range(npartitions): + divisions.append(index_base) + d = dask.delayed(_read_metafile_partition)(path, i) + dfs.append(d) + index_base += partition_size + divisions.append(nvariants - 1) + meta = dd.utils.make_meta(METAFILE_DTYPE) + return dd.from_delayed(dfs, meta=meta, divisions=divisions) + + +def read_samples(path: PathType) -> pd.DataFrame: + """ Read BGEN .sample file """ + df = pd.read_csv(path, sep=" ", skiprows=[1], usecols=[0]) + df.columns = ["sample_id"] + return df + + +def read_bgen( + path: PathType, + metafile_path: Optional[PathType] = None, + sample_path: Optional[PathType] = None, + chunks: Union[str, int, Tuple[int, int, int]] = "auto", + lock: bool = False, + persist: bool = True, + contig_dtype: DType = "str", + gp_dtype: DType = "float32", +) -> Dataset: + """Read BGEN dataset. + + Loads a single BGEN dataset as dask arrays within a Dataset + from a ``.bgen`` file. + + Parameters + ---------- + path + Path to BGEN file. + metafile_path + Path to companion index file used to determine BGEN byte offsets. + Defaults to ``path`` + ".metafile" if not provided. + This file is necessary for reading BGEN genotype probabilities and it will be + generated the first time the file is read if it does not already exist. + If it needs to be created, it can make the first call to this function + much slower than subsequent calls. + sample_path + Path to ``.sample`` file, by default None. This is used to fetch sample identifiers + and when provided it is preferred over sample identifiers embedded in the ``.bgen`` file. + chunks + Chunk size for genotype probability data (3 dimensions), + by default "auto". + lock + Whether or not to synchronize concurrent reads of + file blocks, by default False. This is passed through to + [dask.array.from_array](https://docs.dask.org/en/latest/array-api.html#dask.array.from_array). + persist + Whether or not to persist variant information in memory, by default True. + This is an important performance consideration as the metadata file for this data will + be read multiple times when False. + contig_dtype + Data type for contig names, by default "str". + This may also be an integer type (e.g. "int"), but will fail if any of the contig names + cannot be converted to integers. + gp_dtype + Data type for genotype probabilities, by default "float32". + + Warnings + -------- + Only bi-allelic, diploid BGEN files are currently supported. + + Returns + ------- + A dataset containing the following variables: + + - :data:`sgkit.variables.variant_id` (variants) + - :data:`sgkit.variables.variant_contig` (variants) + - :data:`sgkit.variables.variant_position` (variants) + - :data:`sgkit.variables.variant_allele` (variants) + - :data:`sgkit.variables.sample_id` (samples) + - :data:`sgkit.variables.call_dosage` (variants, samples) + - :data:`sgkit.variables.call_dosage_mask` (variants, samples) + - :data:`sgkit.variables.call_genotype_probability` (variants, samples, genotypes) + - :data:`sgkit.variables.call_genotype_probability_mask` (variants, samples, genotypes) + + """ + if isinstance(chunks, tuple) and len(chunks) != 3: + raise ValueError(f"`chunks` must be tuple with 3 items, not {chunks}") + if not np.issubdtype(gp_dtype, np.floating): + raise ValueError( + f"`gp_dtype` must be a floating point data type, not {gp_dtype}" + ) + if not np.issubdtype(contig_dtype, np.integer) and np.dtype( + contig_dtype + ).kind not in {"U", "S"}: + raise ValueError( + f"`contig_dtype` must be of string or int type, not {contig_dtype}" + ) + + path = Path(path) + sample_path = Path(sample_path) if sample_path else path.with_suffix(".sample") + + if sample_path.exists(): + sample_id = read_samples(sample_path).sample_id.values.astype("U") + else: + sample_id = _default_sample_ids(path) + + bgen_reader = BgenReader(path, metafile_path=metafile_path, dtype=gp_dtype) + + df = read_metafile(bgen_reader.metafile_path) + if persist: + df = df.persist() + arrs = dataframe_to_dict(df, METAFILE_DTYPE) + + variant_id = arrs["id"] + variant_contig = arrs["chrom"].astype(contig_dtype) + variant_contig, variant_contig_names = encode_contigs(variant_contig) + variant_contig_names = list(variant_contig_names) + variant_position = arrs["pos"] + variant_alleles = da.hstack((arrs["a1"][:, np.newaxis], arrs["a2"][:, np.newaxis])) + + call_genotype_probability = da.from_array( + bgen_reader, + chunks=chunks, + lock=lock, + fancy=False, + asarray=False, + name=f"{bgen_reader.name}:read_bgen:{path}", + ) + call_dosage = _to_dosage(call_genotype_probability) + + ds: Dataset = create_genotype_dosage_dataset( + variant_contig_names=variant_contig_names, + variant_contig=variant_contig, + variant_position=variant_position, + variant_alleles=variant_alleles, + sample_id=sample_id, + call_dosage=call_dosage, + call_genotype_probability=call_genotype_probability, + variant_id=variant_id, + ) + + return ds + + +def _default_sample_ids(path: PathType) -> ArrayLike: + """Fetch or generate sample ids""" + with bgen_file(path) as bgen: + if bgen.contain_samples: + return bgen.read_samples() + else: + return np.char.add(b"sample_", np.arange(bgen.nsamples).astype("S")) + + +def _to_dosage(probs: ArrayLike) -> ArrayLike: + """Calculate the dosage from genotype likelihoods (probabilities)""" + assert ( + probs.shape[-1] == 3 + ), f"Expecting genotype (trailing) dimension of size 3, got array of shape {probs.shape}" + return probs[..., 1] + 2 * probs[..., 2] + + +######################## +# Rechunking Functions # +######################## + + +def encode_variables( + ds: Dataset, + chunk_length: int, + chunk_width: int, + compressor: Optional[Any] = zarr.Blosc(cname="zstd", clevel=7, shuffle=2), + probability_dtype: Optional[Any] = "uint8", +) -> Dict[Hashable, Dict[str, Any]]: + encoding = {} + for v in ds: + e = {} + if compressor is not None: + e.update({"compressor": compressor}) + if v in GT_DATA_VARS: + e.update({"chunks": (chunk_length, chunk_width) + ds[v].shape[2:]}) + if probability_dtype is not None and v == "call_genotype_probability": + dtype = np.dtype(probability_dtype) + # Xarray will decode into float32 so any int greater than + # 16 bits will cause overflow/underflow + # See https://en.wikipedia.org/wiki/Floating-point_arithmetic#Internal_representation + # *bits precision column for single precision floats + if dtype not in [np.uint8, np.uint16]: + raise ValueError( + "Probability integer dtype invalid, must " + f"be uint8 or uint16 not {probability_dtype}" + ) + divisor = np.iinfo(dtype).max - 1 + e.update( + { + "dtype": probability_dtype, + "add_offset": -1.0 / divisor, + "scale_factor": 1.0 / divisor, + "_FillValue": 0, + } + ) + if e: + encoding[v] = e + return encoding + + +def pack_variables(ds: Dataset) -> Dataset: + # Remove dosage as it is unnecessary and should be redefined + # based on encoded probabilities later (w/ reduced precision) + ds = ds.drop_vars(["call_dosage", "call_dosage_mask"], errors="ignore") + + # Remove homozygous reference GP and redefine mask + gp = ds["call_genotype_probability"][..., 1:] + gp_mask = ds["call_genotype_probability_mask"].any(dim="genotypes") + ds = ds.drop_vars(["call_genotype_probability", "call_genotype_probability_mask"]) + ds = ds.assign(call_genotype_probability=gp, call_genotype_probability_mask=gp_mask) + return ds + + +def unpack_variables(ds: Dataset, dtype: DType = "float32") -> Dataset: + # Restore homozygous reference GP + gp = ds["call_genotype_probability"].astype(dtype) # type: ignore[no-untyped-call] + if gp.sizes["genotypes"] != 2: + raise ValueError( + "Expecting variable 'call_genotype_probability' to have genotypes " + f"dimension of size 2 (received sizes = {dict(gp.sizes)})" + ) + ds = ds.drop_vars("call_genotype_probability") + ds["call_genotype_probability"] = xr.concat( + [1 - gp.sum(dim="genotypes", skipna=False), gp], dim="genotypes" + ) + + # Restore dosage + ds["call_dosage"] = gp[..., 0] + 2 * gp[..., 1] + ds["call_dosage_mask"] = ds["call_genotype_probability_mask"] + ds["call_genotype_probability_mask"] = ds[ + "call_genotype_probability_mask" + ].broadcast_like(ds["call_genotype_probability"]) + return ds + + +def rechunk_bgen( + ds: Dataset, + output: Union[PathType, MutableMapping[str, bytes]], + *, + chunk_length: int = 10_000, + chunk_width: int = 1_000, + compressor: Optional[Any] = zarr.Blosc(cname="zstd", clevel=7, shuffle=2), + probability_dtype: Optional[DType] = "uint8", + max_mem: str = "4GB", + pack: bool = True, + tempdir: Optional[PathType] = None, +) -> Dataset: + """Rechunk BGEN dataset as Zarr. + + This function will use the algorithm https://rechunker.readthedocs.io/en/latest/ + to rechunk certain fields in a provided Dataset for better downstream performance. + Depending on the system memory available (and the `max_mem` setting) this + rechunking may occur without the need of any intermediate data store. Otherwise, + approximately as much disk space is required as was needed to store the original + BGEN data. Experiments show that this Zarr representation is ~20% larger even + with all available optimizations and fairly aggressive compression (i.e. the + default `clevel` 7). + + Note that this function is not evaluated lazily. The rechunking algorithm + will run inline so calls to it may be slow. The resulting Dataset is + generated based on the final, serialized Zarr data. + + Parameters + ---------- + ds : Dataset + Dataset to rechunk, typically the result from `read_bgen`. + output : Union[PathType, MutableMapping[str, bytes]] + Zarr store or path to directory in file system. + chunk_length : int + Length (number of variants) of chunks in which data are stored, by default 10_000. + chunk_width : int + Width (number of samples) to use when storing chunks in output, by default 1_000. + compressor : Optional[Any] + Zarr compressor, no compression is used when set as None. + probability_dtype : DType + Data type used to encode genotype probabilities, must be either uint8 or uint16. + Setting this parameter results in a loss of precision. If None, probabilities + will not be altered when stored. + max_mem : str + The amount of memory (in bytes) that workers are allowed to use. A string + (e.g. 100MB) can also be used. + pack : bool + Whether or not to optimize variable representations by removing unnecessary + dimensions and elements. This includes storing 2 genotypes instead of 3, omitting + dosage and collapsing the genotype probability mask to 2 dimensions. All of + the above are restored in the resulting Dataset at the expense of extra + computations on read. + tempdir : Optional[PathType] + Temporary directory where intermediate files are stored. The default None means + use the system default temporary directory. + + Warnings + -------- + This functional is only applicable to diploid, bi-allelic BGEN datasets. + + Returns + ------- + Dataset + The rechunked dataset. + """ + if isinstance(output, Path): + output = str(output) + + chunk_length = min(chunk_length, ds.dims["variants"]) + chunk_width = min(chunk_width, ds.dims["samples"]) + + if pack: + ds = pack_variables(ds) + + encoding = encode_variables( + ds, + chunk_length=chunk_length, + chunk_width=chunk_width, + compressor=compressor, + probability_dtype=probability_dtype, + ) + target_chunks = { + var: encoding[var]["chunks"] for var in encoding if "chunks" in encoding[var] + } + target_options = { + var: {k: v for k, v in encoding[var].items() if k != "chunks"} + for var in encoding + } + with tempfile.TemporaryDirectory( + prefix="bgen_to_zarr_", suffix=".zarr", dir=tempdir + ) as tmpdir: + rechunked = rechunker_api.rechunk( + ds, + max_mem=max_mem, + target_chunks=target_chunks, + target_store=output, + target_options=target_options, + temp_store=tmpdir, + executor="dask", + ) + rechunked.execute() + + ds: Dataset = xr.open_zarr(output, concat_characters=False) # type: ignore[no-untyped-call] + if pack: + ds = unpack_variables(ds) + + return ds + + +def bgen_to_zarr( + input: PathType, + output: Union[PathType, MutableMapping[str, bytes]], + region: Optional[Mapping[Hashable, Any]] = None, + chunk_length: int = 10_000, + chunk_width: int = 1_000, + temp_chunk_length: int = 100, + compressor: Optional[Any] = zarr.Blosc(cname="zstd", clevel=7, shuffle=2), + probability_dtype: Optional[DType] = "uint8", + max_mem: str = "4GB", + pack: bool = True, + tempdir: Optional[PathType] = None, +) -> Dataset: + """Rechunk BGEN dataset as Zarr. + + This function will use the algorithm https://rechunker.readthedocs.io/en/latest/ + to rechunk certain fields in a provided Dataset for better downstream performance. + Depending on the system memory available (and the `max_mem` setting) this + rechunking may occur without the need of any intermediate data store. Otherwise, + approximately as much disk space is required as was needed to store the original + BGEN data. Experiments show that this Zarr representation is ~20% larger even + with all available optimizations and fairly aggressive compression (i.e. the + default `clevel` 7). + + Note that this function is not evaluated lazily. The rechunking algorithm + will run inline so calls to it may be slow. The resulting Dataset is + generated based on the final, serialized Zarr data. + + Parameters + ---------- + input : PathType + Path to local BGEN dataset. + output : Union[PathType, MutableMapping[str, bytes]] + Zarr store or path to directory in file system. + region : Optional[Mapping[Hashable, Any]] + Indexers on dataset dimensions used to define a subset of data to convert. + Must be None or a dict with keys matching dimension names and values + equal to integers or slice objects. This is passed directly to `Dataset.isel` + so it has the same semantics. + chunk_length : int + Length (number of variants) of chunks in which data are stored, by default 10_000. + chunk_width : int + Width (number of samples) to use when storing chunks in output, by default 1_000. + temp_chunk_length : int + Length of chunks used in raw BGEN read, by default 100. This defines the vertical + chunking (i.e. in the variants dimension) used when reading the raw data and because + there is no horizontal chunking at this phase (i.e. in the samples dimension), this + value should be much smaller than the target `chunk_length`. + compressor : Optional[Any] + Zarr compressor, by default Blosc + zstd with compression level 7. No compression + is used when set as None. + probability_dtype : DType + Data type used to encode genotype probabilities, must be either uint8 or uint16. + Setting this parameter results in a loss of precision. If None, probabilities + will not be altered when stored. + max_mem : str + The amount of memory (in bytes) that workers are allowed to use. A string + (e.g. 100MB) can also be used. + pack : bool + Whether or not to optimize variable representations by removing unnecessary + dimensions and elements. This includes storing 2 genotypes instead of 3, omitting + dosage and collapsing the genotype probability mask to 2 dimensions. All of + the above are restored in the resulting Dataset at the expense of extra + computations on read. + tempdir : Optional[PathType] + Temporary directory where intermediate files are stored. The default None means + use the system default temporary directory. + + Warnings + -------- + This functional is only applicable to diploid, bi-allelic BGEN datasets. + + Returns + ------- + Dataset + The rechunked dataset. + """ + ds = read_bgen(input, chunks=(temp_chunk_length, -1, -1)) + if region is not None: + ds = ds.isel(indexers=region) + return rechunk_bgen( + ds, + output, + chunk_length=chunk_length, + chunk_width=chunk_width, + compressor=compressor, + probability_dtype=probability_dtype, + max_mem=max_mem, + pack=pack, + tempdir=tempdir, + ) diff --git a/sgkit/io/plink/__init__.py b/sgkit/io/plink/__init__.py index 480f56dbd..a8d54b4e5 100644 --- a/sgkit/io/plink/__init__.py +++ b/sgkit/io/plink/__init__.py @@ -4,7 +4,7 @@ __all__ = ["read_plink"] except ImportError as e: msg = ( - "sgkit-plink requirements are not installed.\n\n" + "sgkit plink requirements are not installed.\n\n" "Please install them via pip :\n\n" " pip install 'git+https://github.com/pystatgen/sgkit#egg=sgkit[plink]'" ) diff --git a/sgkit/io/plink/plink_reader.py b/sgkit/io/plink/plink_reader.py index d49cb5530..481e82ef4 100644 --- a/sgkit/io/plink/plink_reader.py +++ b/sgkit/io/plink/plink_reader.py @@ -1,16 +1,16 @@ """PLINK 1.9 reader implementation""" from pathlib import Path -from typing import Any, List, Mapping, Optional, Tuple, Union +from typing import Any, List, Optional, Tuple, Union import dask.array as da import dask.dataframe as dd import numpy as np from bed_reader import open_bed -from dask.array import Array from dask.dataframe import DataFrame from xarray import Dataset from sgkit import create_genotype_call_dataset +from sgkit.io.utils import dataframe_to_dict from sgkit.model import DIM_SAMPLE from sgkit.utils import encode_array @@ -32,8 +32,8 @@ ("variant_id", str, "U"), ("cm_pos", "float32", "float32"), ("pos", "int32", "int32"), - ("a1", str, "U"), - ("a2", str, "U"), + ("a1", str, "S"), + ("a2", str, "S"), ] BIM_DF_DTYPE = dict([(f[0], f[1]) for f in BIM_FIELDS]) BIM_ARRAY_DTYPE = dict([(f[0], f[2]) for f in BIM_FIELDS]) @@ -94,28 +94,6 @@ def close(self) -> None: self.bed._close_bed() # pragma: no cover -def _max_str_len(arr: Array) -> Array: - return arr.map_blocks(lambda s: np.char.str_len(s.astype(str)), dtype=np.int8).max() - - -def _to_dict( - df: DataFrame, dtype: Optional[Mapping[str, Any]] = None -) -> Mapping[str, Any]: - arrs = {} - for c in df: - a = df[c].to_dask_array(lengths=True) - dt = df[c].dtype - if dtype: - dt = dtype[c] - kind = np.dtype(dt).kind - if kind in ["U", "S"]: - # Compute fixed-length string dtype for array - max_len = _max_str_len(a).compute() - dt = f"{kind}{max_len}" - arrs[c] = a.astype(dt) - return arrs - - def read_fam(path: PathType, sep: str = " ") -> DataFrame: # See: https://www.cog-genomics.org/plink/1.9/formats#fam names = [f[0] for f in FAM_FIELDS] @@ -256,8 +234,8 @@ def read_plink( df_fam = df_fam.persist() df_bim = df_bim.persist() - arr_fam = _to_dict(df_fam, dtype=FAM_ARRAY_DTYPE) - arr_bim = _to_dict(df_bim, dtype=BIM_ARRAY_DTYPE) + arr_fam = dataframe_to_dict(df_fam, dtype=FAM_ARRAY_DTYPE) + arr_bim = dataframe_to_dict(df_bim, dtype=BIM_ARRAY_DTYPE) # Load genotyping data call_genotype = da.from_array( diff --git a/sgkit/io/utils.py b/sgkit/io/utils.py new file mode 100644 index 000000000..367b53cb0 --- /dev/null +++ b/sgkit/io/utils.py @@ -0,0 +1,39 @@ +from typing import Mapping, Optional, Tuple + +import dask.dataframe as dd +import numpy as np + +from ..typing import ArrayLike, DType +from ..utils import encode_array, max_str_len + + +def dataframe_to_dict( + df: dd.DataFrame, dtype: Optional[Mapping[str, DType]] = None +) -> Mapping[str, ArrayLike]: + """ Convert dask dataframe to dictionary of arrays """ + arrs = {} + for c in df: + a = df[c].to_dask_array(lengths=True) + dt = df[c].dtype + if dtype: + dt = dtype[c] + kind = np.dtype(dt).kind + if kind in ["U", "S"]: + # Compute fixed-length string dtype for array + max_len = int(max_str_len(a)) + dt = f"{kind}{max_len}" + arrs[c] = a.astype(dt) + return arrs + + +def encode_contigs(contig: ArrayLike) -> Tuple[np.ndarray, np.ndarray]: + # TODO: test preservation of int16 + # If contigs are already integers, use them as-is + if np.issubdtype(contig.dtype, np.integer): + ids = contig + names = np.unique(np.asarray(ids)).astype(str) + # Otherwise create index for contig names based + # on order of appearance in underlying file + else: + ids, names = encode_array(np.asarray(contig, dtype=str)) + return ids, names diff --git a/sgkit/tests/io/bgen/__init__.py b/sgkit/tests/io/bgen/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/sgkit/tests/io/bgen/data/.gitignore b/sgkit/tests/io/bgen/data/.gitignore new file mode 100644 index 000000000..4685de1c2 --- /dev/null +++ b/sgkit/tests/io/bgen/data/.gitignore @@ -0,0 +1,2 @@ +*.metadata2.mmm +*.metafile diff --git a/sgkit/tests/io/bgen/data/example-no-samples.bgen b/sgkit/tests/io/bgen/data/example-no-samples.bgen new file mode 100644 index 000000000..e8b88ed70 Binary files /dev/null and b/sgkit/tests/io/bgen/data/example-no-samples.bgen differ diff --git a/sgkit/tests/io/bgen/data/example-separate-samples.bgen b/sgkit/tests/io/bgen/data/example-separate-samples.bgen new file mode 100644 index 000000000..77a3ac705 Binary files /dev/null and b/sgkit/tests/io/bgen/data/example-separate-samples.bgen differ diff --git a/sgkit/tests/io/bgen/data/example-separate-samples.sample b/sgkit/tests/io/bgen/data/example-separate-samples.sample new file mode 100644 index 000000000..c6e7b1b26 --- /dev/null +++ b/sgkit/tests/io/bgen/data/example-separate-samples.sample @@ -0,0 +1,7 @@ +ID +0 +s1 +s2 +s3 +s4 +s5 diff --git a/sgkit/tests/io/bgen/data/example.bgen b/sgkit/tests/io/bgen/data/example.bgen new file mode 100644 index 000000000..a7d0e1c9a Binary files /dev/null and b/sgkit/tests/io/bgen/data/example.bgen differ diff --git a/sgkit/tests/io/bgen/data/samples b/sgkit/tests/io/bgen/data/samples new file mode 100644 index 000000000..44dbc30ea --- /dev/null +++ b/sgkit/tests/io/bgen/data/samples @@ -0,0 +1,5 @@ +sample_001 +sample_002 +sample_003 +sample_004 +sample_005 diff --git a/sgkit/tests/io/bgen/test_bgen_reader.py b/sgkit/tests/io/bgen/test_bgen_reader.py new file mode 100644 index 000000000..631109a27 --- /dev/null +++ b/sgkit/tests/io/bgen/test_bgen_reader.py @@ -0,0 +1,294 @@ +from pathlib import Path +from typing import Any, Tuple + +import numpy as np +import numpy.testing as npt +import pytest +import xarray as xr + +from sgkit.io.bgen.bgen_reader import ( + GT_DATA_VARS, + BgenReader, + _split_alleles, + bgen_to_zarr, + read_bgen, + rechunk_bgen, + unpack_variables, +) + +CHUNKS = [ + (100, 200, 3), + (100, 200, 1), + (100, 500, 3), + (199, 500, 3), + ((100, 99), 500, 2), + "auto", +] +INDEXES = [0, 10, 20, 100, -1] + +# Expectations below generated using bgen-reader directly, ex: +# > from bgen_reader import open_bgen +# > bgen = open_bgen('sgkit_bgen/tests/data/example.bgen', verbose=False) +# > bgen.read(-1)[0] # Probabilities for last variant, first sample +# array([[0.0133972 , 0.98135378, 0.00524902]] +# > bgen.allele_expectation(-1)[0, 0, -1] # Dosage for last variant, first sample +# 0.9918518217727197 +EXPECTED_PROBABILITIES = np.array( + [ # Generated using bgen-reader directly + [np.nan, np.nan, np.nan], + [0.007, 0.966, 0.0259], + [0.993, 0.002, 0.003], + [0.916, 0.007, 0.0765], + [0.013, 0.981, 0.0052], + ] +) +EXPECTED_DOSAGES = np.array( + [np.nan, 1.018, 0.010, 0.160, 0.991] # Generated using bgen-reader directly +) + +EXPECTED_DIMS = dict(variants=199, samples=500, genotypes=3, alleles=2) + + +def _shape(*dims: str) -> Tuple[int, ...]: + return tuple(EXPECTED_DIMS[d] for d in dims) + + +@pytest.mark.parametrize("chunks", CHUNKS) +def test_read_bgen(shared_datadir, chunks): + path = shared_datadir / "example.bgen" + ds = read_bgen(path, chunks=chunks) + + # check some of the data (in different chunks) + assert ds["call_dosage"].shape == _shape("variants", "samples") + npt.assert_almost_equal(ds["call_dosage"].values[1][0], 1.987, decimal=3) + npt.assert_almost_equal(ds["call_dosage"].values[100][0], 0.160, decimal=3) + npt.assert_array_equal(ds["call_dosage_mask"].values[0, 0], [True]) + npt.assert_array_equal(ds["call_dosage_mask"].values[0, 1], [False]) + assert ds["call_genotype_probability"].shape == _shape( + "variants", "samples", "genotypes" + ) + npt.assert_almost_equal( + ds["call_genotype_probability"].values[1][0], [0.005, 0.002, 0.992], decimal=3 + ) + npt.assert_almost_equal( + ds["call_genotype_probability"].values[100][0], [0.916, 0.007, 0.076], decimal=3 + ) + npt.assert_array_equal( + ds["call_genotype_probability_mask"].values[0, 0], [True] * 3 + ) + npt.assert_array_equal( + ds["call_genotype_probability_mask"].values[0, 1], [False] * 3 + ) + + +def test_read_bgen__with_sample_file(shared_datadir): + # The example file was generated using + # qctool -g sgkit_bgen/tests/data/example.bgen -og sgkit_bgen/tests/data/example-separate-samples.bgen -os sgkit_bgen/tests/data/example-separate-samples.sample -incl-samples sgkit_bgen/tests/data/samples + # Then editing example-separate-samples.sample to change the sample IDs + path = shared_datadir / "example-separate-samples.bgen" + ds = read_bgen(path) + # Check the sample IDs are the ones from the .sample file + assert ds["sample_id"].values.tolist() == ["s1", "s2", "s3", "s4", "s5"] + + +def test_read_bgen__with_no_samples(shared_datadir): + # The example file was generated using + # qctool -g sgkit_bgen/tests/data/example.bgen -og sgkit_bgen/tests/data/example-no-samples.bgen -os sgkit_bgen/tests/data/example-no-samples.sample -bgen-omit-sample-identifier-block -incl-samples sgkit_bgen/tests/data/samples + # Then deleting example-no-samples.sample + path = shared_datadir / "example-no-samples.bgen" + ds = read_bgen(path) + # Check the sample IDs are generated + assert ds["sample_id"].values.tolist() == [ + b"sample_0", + b"sample_1", + b"sample_2", + b"sample_3", + b"sample_4", + ] + + +@pytest.mark.parametrize("dtype", ["f2", "f4", "f8"]) +def test_read_bgen__gp_dtype(shared_datadir, dtype): + path = shared_datadir / "example.bgen" + ds = read_bgen(path, gp_dtype=dtype) + dtype = np.dtype(dtype) + assert ds["call_genotype_probability"].dtype == dtype + assert ds["call_dosage"].dtype == dtype + + +@pytest.mark.parametrize("dtype", ["c8", "i8", "str"]) +def test_read_bgen__invalid_gp_dtype(shared_datadir, dtype): + path = shared_datadir / "example.bgen" + with pytest.raises( + ValueError, match="`gp_dtype` must be a floating point data type" + ): + read_bgen(path, gp_dtype=dtype) + + +@pytest.mark.parametrize("dtype", ["U", "S", "u1", "u2", "i8", "int"]) +def test_read_bgen__contig_dtype(shared_datadir, dtype): + path = shared_datadir / "example.bgen" + ds = read_bgen(path, contig_dtype=dtype) + dtype = np.dtype(dtype) + if dtype.kind in {"U", "S"}: + assert ds["variant_contig"].dtype == np.int64 + else: + assert ds["variant_contig"].dtype == dtype + + +@pytest.mark.parametrize("dtype", ["c8", "M", "f4"]) +def test_read_bgen__invalid_contig_dtype(shared_datadir, dtype): + path = shared_datadir / "example.bgen" + with pytest.raises( + ValueError, match="`contig_dtype` must be of string or int type" + ): + read_bgen(path, contig_dtype=dtype) + + +@pytest.mark.parametrize("chunks", CHUNKS) +def test_read_bgen__fancy_index(shared_datadir, chunks): + path = shared_datadir / "example.bgen" + ds = read_bgen(path, chunks=chunks) + npt.assert_almost_equal( + ds["call_genotype_probability"][INDEXES, 0], EXPECTED_PROBABILITIES, decimal=3 + ) + npt.assert_almost_equal(ds["call_dosage"][INDEXES, 0], EXPECTED_DOSAGES, decimal=3) + + +@pytest.mark.parametrize("chunks", CHUNKS) +def test_read_bgen__scalar_index(shared_datadir, chunks): + path = shared_datadir / "example.bgen" + ds = read_bgen(path, chunks=chunks) + for i, ix in enumerate(INDEXES): + npt.assert_almost_equal( + ds["call_genotype_probability"][ix, 0], EXPECTED_PROBABILITIES[i], decimal=3 + ) + npt.assert_almost_equal( + ds["call_dosage"][ix, 0], EXPECTED_DOSAGES[i], decimal=3 + ) + for j in range(3): + npt.assert_almost_equal( + ds["call_genotype_probability"][ix, 0, j], + EXPECTED_PROBABILITIES[i, j], + decimal=3, + ) + + +def test_read_bgen__raise_on_invalid_indexers(shared_datadir): + path = shared_datadir / "example.bgen" + reader = BgenReader(path) + with pytest.raises(IndexError, match="Indexer must be tuple"): + reader[[0]] + with pytest.raises(IndexError, match="Indexer must have 3 items"): + reader[(slice(None),)] + with pytest.raises(IndexError, match="Indexer must contain only slices or ints"): + reader[([0], [0], [0])] + + +def test_split_alleles__raise_on_multiallelic(): + with pytest.raises( + NotImplementedError, match="Bgen reads only supported for biallelic variants" + ): + _split_alleles(b"A,B,C") + + +def test_read_bgen__invalid_chunks(shared_datadir): + path = shared_datadir / "example.bgen" + with pytest.raises(ValueError, match="`chunks` must be tuple with 3 items"): + read_bgen(path, chunks=(100, -1)) # type: ignore[arg-type] + + +def _rechunk_bgen( + shared_datadir: Path, tmp_path: Path, **kwargs: Any +) -> Tuple[xr.Dataset, xr.Dataset, str]: + path = shared_datadir / "example.bgen" + ds = read_bgen(path, chunks=(10, -1, -1)) + store = tmp_path / "example.zarr" + dsr = rechunk_bgen(ds, store, **kwargs) + return ds, dsr, str(store) + + +def _open_zarr(store: str, **kwargs: Any) -> xr.Dataset: + # Force concat_characters False to avoid to avoid https://github.com/pydata/xarray/issues/4405 + return xr.open_zarr(store, concat_characters=False, **kwargs) # type: ignore[no-any-return,no-untyped-call] + + +@pytest.mark.parametrize("target_chunks", [(10, 10), (50, 50), (100, 50), (50, 100)]) +def test_rechunk_bgen__target_chunks(shared_datadir, tmp_path, target_chunks): + _, dsr, store = _rechunk_bgen( + shared_datadir, + tmp_path, + chunk_length=target_chunks[0], + chunk_width=target_chunks[1], + pack=False, + ) + for v in GT_DATA_VARS: + assert dsr[v].data.chunksize[:2] == target_chunks + + +def test_rechunk_from_zarr__self_consistent(shared_datadir, tmp_path): + # With no probability dtype or packing, rechunk_{to,from}_zarr is a noop + ds, dsr, store = _rechunk_bgen( + shared_datadir, tmp_path, probability_dtype=None, pack=False + ) + xr.testing.assert_allclose(ds.compute(), dsr.compute()) # type: ignore[no-untyped-call] + + +@pytest.mark.parametrize("dtype", ["uint8", "uint16"]) +def test_rechunk_bgen__probability_encoding(shared_datadir, tmp_path, dtype): + ds, _, store = _rechunk_bgen( + shared_datadir, tmp_path, probability_dtype=dtype, pack=False + ) + dsr = _open_zarr(store, mask_and_scale=False) + v = "call_genotype_probability" + assert dsr[v].shape == ds[v].shape + assert dsr[v].dtype == dtype + dsr = _open_zarr(store, mask_and_scale=True) + # There are two missing calls which equates to + # 6 total nan values across 3 possible genotypes + assert np.isnan(dsr[v].values).sum() == 6 + tolerance = 1.0 / (np.iinfo(dtype).max - 1) + np.testing.assert_allclose(ds[v], dsr[v], atol=tolerance) + + +def test_rechunk_bgen__variable_packing(shared_datadir, tmp_path): + ds, dsr, store = _rechunk_bgen( + shared_datadir, tmp_path, probability_dtype=None, pack=True + ) + # A minor tolerance is necessary here when packing is enabled + # because one of the genotype probabilities is constructed from the others + xr.testing.assert_allclose(ds.compute(), dsr.compute(), atol=1e-6) # type: ignore[no-untyped-call] + + +@pytest.mark.parametrize("dtype", ["uint32", "int8", "float32"]) +def test_rechunk_bgen__invalid_probability_type(shared_datadir, tmp_path, dtype): + with pytest.raises(ValueError, match="Probability integer dtype invalid"): + _rechunk_bgen(shared_datadir, tmp_path, probability_dtype=dtype) + + +def test_unpack_variables__invalid_gp_dims(shared_datadir, tmp_path): + # Validate that an error is thrown when variables are + # unpacked without being packed in the first place + _, dsr, store = _rechunk_bgen(shared_datadir, tmp_path, pack=False) + with pytest.raises( + ValueError, + match="Expecting variable 'call_genotype_probability' to have genotypes dimension of size 2", + ): + unpack_variables(dsr) + + +@pytest.mark.parametrize( + "region", [None, dict(variants=slice(0, 100)), dict(samples=slice(0, 50))] +) +def test_bgen_to_zarr(shared_datadir, tmp_path, region): + input = shared_datadir / "example.bgen" + output = tmp_path / "example.zarr" + ds = bgen_to_zarr(input, output, region=region) + expected_dims = { + k: EXPECTED_DIMS[k] + if region is None or k not in region + else region[k].stop - region[k].start + for k in EXPECTED_DIMS + } + actual_dims = {k: v for k, v in ds.dims.items() if k in expected_dims} + assert actual_dims == expected_dims diff --git a/sgkit/tests/io/plink/test_plink_reader.py b/sgkit/tests/io/plink/test_plink_reader.py index a8e03c147..835fd5b29 100644 --- a/sgkit/tests/io/plink/test_plink_reader.py +++ b/sgkit/tests/io/plink/test_plink_reader.py @@ -30,6 +30,33 @@ def test_read_multi_path(shared_datadir, ds1): xr.testing.assert_equal(ds1, ds2) # type: ignore[no-untyped-call] +def test_read_ids(ds1): + assert ds1["sample_id"].values.tolist() == [ + "000", + "001", + "002", + "003", + "004", + "005", + "006", + "007", + "008", + "009", + ] + assert ds1["variant_id"][:10].values.tolist() == [ + "1:1:G:CGCGCG", + "1:2:ACT:G", + "1:3:ACT:G", + "1:4:G:CGCGCG", + "1:5:G:CGCGCG", + "1:6:ACT:G", + "1:7:G:CGCGCG", + "1:8:T:GTGG", + "1:9:T:GTGG", + "1:10:A:C", + ] + + def test_raise_on_both_path_types(): with pytest.raises( ValueError, diff --git a/sgkit/tests/test_utils.py b/sgkit/tests/test_utils.py index 02c54cf38..60ff09acc 100644 --- a/sgkit/tests/test_utils.py +++ b/sgkit/tests/test_utils.py @@ -6,11 +6,13 @@ import xarray as xr from hypothesis import given, settings from hypothesis import strategies as st +from hypothesis.extra.numpy import arrays from sgkit.utils import ( MergeWarning, check_array_like, encode_array, + max_str_len, merge_datasets, split_array_chunks, ) @@ -116,6 +118,38 @@ def test_split_array_chunks__size(a: int, b: int) -> None: assert len(res) == a +@pytest.mark.parametrize("dtype", ["U", "S", "O"]) +@pytest.mark.parametrize("chunks", [None, -1, 5]) +@pytest.mark.parametrize("backend", [xr.DataArray, np.array, da.array]) +@given(st.data()) +@settings(max_examples=25) +def test_max_str_len(dtype, chunks, backend, data): + shape = data.draw(st.lists(st.integers(0, 15), min_size=0, max_size=3)) + ndim = len(shape) + x = data.draw(arrays(dtype=dtype if dtype != "O" else "U", shape=shape)) + x = backend(x) + if dtype == "O": + x = x.astype(object) + if chunks is not None and backend is xr.DataArray: + x = x.chunk(chunks=(chunks,) * ndim) + if chunks is not None and backend is da.array: + x = x.rechunk((chunks,) * ndim) + if x.size == 0: + with pytest.raises( + ValueError, match="Max string length cannot be calculated for empty array" + ): + max_str_len(x) + else: + expected = max(map(len, np.asarray(x).ravel())) + actual = int(max_str_len(x)) + assert expected == actual + + +def test_max_str_len__invalid_dtype(): + with pytest.raises(ValueError, match="Array must have string dtype"): + max_str_len(np.array([1])) + + def test_split_array_chunks__raise_on_blocks_gt_n(): with pytest.raises( ValueError, diff --git a/sgkit/utils.py b/sgkit/utils.py index eef56be56..d3e9637db 100644 --- a/sgkit/utils.py +++ b/sgkit/utils.py @@ -194,3 +194,29 @@ def split_array_chunks(n: int, blocks: int) -> Tuple[int, ...]: n_div, n_mod = np.divmod(n, blocks) chunks = n_mod * (n_div + 1,) + (blocks - n_mod) * (n_div,) return chunks # type: ignore[no-any-return] + + +def max_str_len(a: ArrayLike) -> ArrayLike: + """Compute maximum string length for elements of an array + + Parameters + ---------- + a + Array of any shape, must have string or object dtype + + Returns + ------- + max_length + Scalar array with same type as provided array + """ + if a.size == 0: + raise ValueError("Max string length cannot be calculated for empty array") + if a.dtype.kind == "O": + a = a.astype(str) + if a.dtype.kind not in {"U", "S"}: + raise ValueError(f"Array must have string dtype (got dtype {a.dtype})") + + lens = np.frompyfunc(len, 1, 1)(a) + if isinstance(a, np.ndarray): + lens = np.asarray(lens) + return lens.max() diff --git a/sgkit/variables.py b/sgkit/variables.py index b671e853a..b73e4c47c 100644 --- a/sgkit/variables.py +++ b/sgkit/variables.py @@ -240,7 +240,7 @@ def _check_field( ) """PC Relate kinship coefficient matrix.""" sample_id, sample_id_spec = SgkitVariables.register_variable( - ArrayLikeSpec("sample_id", kind={"U", "O"}, ndim=1) + ArrayLikeSpec("sample_id", kind={"S", "U", "O"}, ndim=1) ) """The unique identifier of the sample.""" sample_pcs, sample_pcs_spec = SgkitVariables.register_variable( @@ -303,15 +303,19 @@ def _check_field( ) """The number of samples with heterozygous calls.""" variant_contig, variant_contig_spec = SgkitVariables.register_variable( - ArrayLikeSpec("variant_contig", kind="i", ndim=1) + ArrayLikeSpec("variant_contig", kind={"i", "u"}, ndim=1) ) -"""The (index of the) contig for each variant.""" +""" +Index corresponding to contig name for each variant. In some less common scenarios, +this may also be equivalent to the contig names if the data generating process used +contig names that were also integers. +""" variant_hwe_p_value, variant_hwe_p_value_spec = SgkitVariables.register_variable( ArrayLikeSpec("variant_hwe_p_value", kind="f") ) """P values from HWE test for each variant as float in [0, 1].""" variant_id, variant_id_spec = SgkitVariables.register_variable( - ArrayLikeSpec("variant_id", kind={"U", "O"}, ndim=1) + ArrayLikeSpec("variant_id", kind={"S", "U", "O"}, ndim=1) ) """The unique identifier of the variant.""" variant_n_called, variant_n_called_spec = SgkitVariables.register_variable(