|
| 1 | +"""PLINK 1.9 reader implementation""" |
| 2 | +from pathlib import Path |
| 3 | +from typing import Any, List, Mapping, Optional, Tuple, Union |
| 4 | + |
| 5 | +import dask.array as da |
| 6 | +import dask.dataframe as dd |
| 7 | +import numpy as np |
| 8 | +from bed_reader import open_bed |
| 9 | +from dask.array import Array |
| 10 | +from dask.dataframe import DataFrame |
| 11 | +from xarray import Dataset |
| 12 | + |
| 13 | +from sgkit import create_genotype_call_dataset |
| 14 | +from sgkit.model import DIM_SAMPLE |
| 15 | +from sgkit.utils import encode_array |
| 16 | + |
| 17 | +PathType = Union[str, Path] |
| 18 | + |
| 19 | +FAM_FIELDS = [ |
| 20 | + ("family_id", str, "U"), |
| 21 | + ("member_id", str, "U"), |
| 22 | + ("paternal_id", str, "U"), |
| 23 | + ("maternal_id", str, "U"), |
| 24 | + ("sex", str, "int8"), |
| 25 | + ("phenotype", str, "int8"), |
| 26 | +] |
| 27 | +FAM_DF_DTYPE = dict([(f[0], f[1]) for f in FAM_FIELDS]) |
| 28 | +FAM_ARRAY_DTYPE = dict([(f[0], f[2]) for f in FAM_FIELDS]) |
| 29 | + |
| 30 | +BIM_FIELDS = [ |
| 31 | + ("contig", str, "U"), |
| 32 | + ("variant_id", str, "U"), |
| 33 | + ("cm_pos", "float32", "float32"), |
| 34 | + ("pos", "int32", "int32"), |
| 35 | + ("a1", str, "U"), |
| 36 | + ("a2", str, "U"), |
| 37 | +] |
| 38 | +BIM_DF_DTYPE = dict([(f[0], f[1]) for f in BIM_FIELDS]) |
| 39 | +BIM_ARRAY_DTYPE = dict([(f[0], f[2]) for f in BIM_FIELDS]) |
| 40 | + |
| 41 | + |
| 42 | +class BedReader(object): |
| 43 | + def __init__( |
| 44 | + self, |
| 45 | + path: PathType, |
| 46 | + shape: Tuple[int, int], |
| 47 | + dtype: Any = np.int8, |
| 48 | + count_A1: bool = True, |
| 49 | + ) -> None: |
| 50 | + # n variants (sid = SNP id), n samples (iid = Individual id) |
| 51 | + n_sid, n_iid = shape |
| 52 | + # Initialize Bed with empty arrays for axis data, otherwise it will |
| 53 | + # load the bim/map/fam files entirely into memory (it does not do out-of-core for those) |
| 54 | + self.bed = open_bed( |
| 55 | + path, |
| 56 | + count_A1=count_A1, |
| 57 | + iid_count=n_iid, |
| 58 | + sid_count=n_sid, |
| 59 | + num_threads=None, # NOTE: Default: Use 'em all! |
| 60 | + ) |
| 61 | + self.shape = (n_sid, n_iid, 2) |
| 62 | + self.dtype = dtype |
| 63 | + self.ndim = 3 |
| 64 | + |
| 65 | + def __getitem__(self, idx: Tuple[Any, ...]) -> np.ndarray: |
| 66 | + if not isinstance(idx, tuple): |
| 67 | + raise IndexError( # pragma: no cover |
| 68 | + f"Indexer must be tuple (received {type(idx)})" |
| 69 | + ) |
| 70 | + if len(idx) != self.ndim: |
| 71 | + raise IndexError( # pragma: no cover |
| 72 | + f"Indexer must be two-item tuple (received {len(idx)} slices)" |
| 73 | + ) |
| 74 | + # Slice using reversal of first two slices since |
| 75 | + # bed-reader uses sample x variant orientation. |
| 76 | + # Missing values are represented as -127 with int8 dtype, |
| 77 | + # see: https://fastlmm.github.io/bed-reader |
| 78 | + arr = self.bed.read(index=(idx[1], idx[0]), dtype=np.int8, order="F").T |
| 79 | + # NOTE: bed-reader can return float32 and float64, too, so this copy could be avoided |
| 80 | + # (missing would then be NaN) |
| 81 | + arr = arr.astype(self.dtype) |
| 82 | + # Add a ploidy dimension, so allele counts of 0, 1, 2 correspond to 00, 10, 11 |
| 83 | + call0 = np.where(arr < 0, -1, np.where(arr == 0, 0, 1)) |
| 84 | + call1 = np.where(arr < 0, -1, np.where(arr == 2, 1, 0)) |
| 85 | + arr = np.stack([call0, call1], axis=-1) |
| 86 | + # Apply final slice to 3D result |
| 87 | + return arr[:, :, idx[-1]] |
| 88 | + |
| 89 | + def close(self) -> None: |
| 90 | + # This is not actually crucial since a Bed instance with no |
| 91 | + # in-memory bim/map/fam data is essentially just a file pointer |
| 92 | + # but this will still be problematic if the an array is created |
| 93 | + # from the same PLINK dataset many times |
| 94 | + self.bed._close_bed() # pragma: no cover |
| 95 | + |
| 96 | + |
| 97 | +def _max_str_len(arr: Array) -> Array: |
| 98 | + return arr.map_blocks(lambda s: np.char.str_len(s.astype(str)), dtype=np.int8).max() |
| 99 | + |
| 100 | + |
| 101 | +def _to_dict( |
| 102 | + df: DataFrame, dtype: Optional[Mapping[str, Any]] = None |
| 103 | +) -> Mapping[str, Any]: |
| 104 | + arrs = {} |
| 105 | + for c in df: |
| 106 | + a = df[c].to_dask_array(lengths=True) |
| 107 | + dt = df[c].dtype |
| 108 | + if dtype: |
| 109 | + dt = dtype[c] |
| 110 | + kind = np.dtype(dt).kind |
| 111 | + if kind in ["U", "S"]: |
| 112 | + # Compute fixed-length string dtype for array |
| 113 | + max_len = _max_str_len(a).compute() |
| 114 | + dt = f"{kind}{max_len}" |
| 115 | + arrs[c] = a.astype(dt) |
| 116 | + return arrs |
| 117 | + |
| 118 | + |
| 119 | +def read_fam(path: PathType, sep: str = " ") -> DataFrame: |
| 120 | + # See: https://www.cog-genomics.org/plink/1.9/formats#fam |
| 121 | + names = [f[0] for f in FAM_FIELDS] |
| 122 | + df = dd.read_csv(str(path), sep=sep, names=names, dtype=FAM_DF_DTYPE) |
| 123 | + |
| 124 | + def coerce_code(v: dd.Series, codes: List[int]) -> dd.Series: |
| 125 | + # Set non-ints and unexpected codes to missing (-1) |
| 126 | + v = dd.to_numeric(v, errors="coerce") |
| 127 | + v = v.where(v.isin(codes), np.nan) |
| 128 | + return v.fillna(-1).astype("int8") |
| 129 | + |
| 130 | + df["paternal_id"] = df["paternal_id"].where(df["paternal_id"] != "0", None) |
| 131 | + df["maternal_id"] = df["maternal_id"].where(df["maternal_id"] != "0", None) |
| 132 | + df["sex"] = coerce_code(df["sex"], [1, 2]) |
| 133 | + df["phenotype"] = coerce_code(df["phenotype"], [1, 2]) |
| 134 | + |
| 135 | + return df |
| 136 | + |
| 137 | + |
| 138 | +def read_bim(path: PathType, sep: str = "\t") -> DataFrame: |
| 139 | + # See: https://www.cog-genomics.org/plink/1.9/formats#bim |
| 140 | + names = [f[0] for f in BIM_FIELDS] |
| 141 | + df = dd.read_csv(str(path), sep=sep, names=names, dtype=BIM_DF_DTYPE) |
| 142 | + df["contig"] = df["contig"].where(df["contig"] != "0", None) |
| 143 | + return df |
| 144 | + |
| 145 | + |
| 146 | +def read_plink( |
| 147 | + *, |
| 148 | + path: Optional[PathType] = None, |
| 149 | + bed_path: Optional[PathType] = None, |
| 150 | + bim_path: Optional[PathType] = None, |
| 151 | + fam_path: Optional[PathType] = None, |
| 152 | + chunks: Union[str, int, tuple] = "auto", # type: ignore[type-arg] |
| 153 | + fam_sep: str = " ", |
| 154 | + bim_sep: str = "\t", |
| 155 | + bim_int_contig: bool = False, |
| 156 | + count_a1: bool = True, |
| 157 | + lock: bool = False, |
| 158 | + persist: bool = True, |
| 159 | +) -> Dataset: |
| 160 | + """Read PLINK dataset. |
| 161 | +
|
| 162 | + Loads a single PLINK dataset as dask arrays within a Dataset |
| 163 | + from bed, bim, and fam files. |
| 164 | +
|
| 165 | + Parameters |
| 166 | + ---------- |
| 167 | + path : Optional[PathType] |
| 168 | + Path to PLINK file set. |
| 169 | + This should not include a suffix, i.e. if the files are |
| 170 | + at `data.{bed,fam,bim}` then only 'data' should be |
| 171 | + provided (suffixes are added internally). |
| 172 | + Either this path must be provided or all 3 of |
| 173 | + `bed_path`, `bim_path` and `fam_path`. |
| 174 | + bed_path: Optional[PathType] |
| 175 | + Path to PLINK bed file. |
| 176 | + This should be a full path including the `.bed` extension |
| 177 | + and cannot be specified in conjunction with `path`. |
| 178 | + bim_path: Optional[PathType] |
| 179 | + Path to PLINK bim file. |
| 180 | + This should be a full path including the `.bim` extension |
| 181 | + and cannot be specified in conjunction with `path`. |
| 182 | + fam_path: Optional[PathType] |
| 183 | + Path to PLINK fam file. |
| 184 | + This should be a full path including the `.fam` extension |
| 185 | + and cannot be specified in conjunction with `path`. |
| 186 | + chunks : Union[str, int, tuple], optional |
| 187 | + Chunk size for genotype (i.e. `.bed`) data, by default "auto" |
| 188 | + fam_sep : str, optional |
| 189 | + Delimiter for `.fam` file, by default " " |
| 190 | + bim_sep : str, optional |
| 191 | + Delimiter for `.bim` file, by default "\t" |
| 192 | + bim_int_contig : bool, optional |
| 193 | + Whether or not the contig/chromosome name in the `.bim` |
| 194 | + file should be interpreted as an integer, by default False. |
| 195 | + If False, then the `variant/contig` field in the resulting |
| 196 | + dataset will contain the indexes of corresponding strings |
| 197 | + encountered in the first `.bim` field. |
| 198 | + count_a1 : bool, optional |
| 199 | + Whether or not allele counts should be for A1 or A2, |
| 200 | + by default True. Typically A1 is the minor allele |
| 201 | + and should be counted instead of A2. This is not enforced |
| 202 | + by PLINK though and it is up to the data generating process |
| 203 | + to ensure that A1 is in fact an alternate/minor/effect |
| 204 | + allele. See https://www.cog-genomics.org/plink/1.9/formats |
| 205 | + for more details. |
| 206 | + lock : bool, optional |
| 207 | + Whether or not to synchronize concurrent reads of `.bed` |
| 208 | + file blocks, by default False. This is passed through to |
| 209 | + [dask.array.from_array](https://docs.dask.org/en/latest/array-api.html#dask.array.from_array). |
| 210 | + persist : bool, optional |
| 211 | + Whether or not to persist `.fam` and `.bim` information in |
| 212 | + memory, by default True. This is an important performance |
| 213 | + consideration as the plain text files for this data will |
| 214 | + be read multiple times when False. This can lead to load |
| 215 | + times that are upwards of 10x slower. |
| 216 | +
|
| 217 | + Returns |
| 218 | + ------- |
| 219 | + Dataset |
| 220 | + A dataset containing genotypes as 3 dimensional calls along with |
| 221 | + all accompanying pedigree and variant information. The content |
| 222 | + of this dataset matches that of `sgkit.create_genotype_call_dataset` |
| 223 | + with all pedigree-specific fields defined as: |
| 224 | + - sample_family_id: Family identifier commonly referred to as FID |
| 225 | + - sample_id: Within-family identifier for sample |
| 226 | + - sample_paternal_id: Within-family identifier for father of sample |
| 227 | + - sample_maternal_id: Within-family identifier for mother of sample |
| 228 | + - sample_sex: Sex code equal to 1 for male, 2 for female, and -1 |
| 229 | + for missing |
| 230 | + - sample_phenotype: Phenotype code equal to 1 for control, 2 for case, |
| 231 | + and -1 for missing |
| 232 | +
|
| 233 | + See https://www.cog-genomics.org/plink/1.9/formats#fam for more details. |
| 234 | +
|
| 235 | + Raises |
| 236 | + ------ |
| 237 | + ValueError |
| 238 | + If `path` and one of `bed_path`, `bim_path` or `fam_path` are provided. |
| 239 | + """ |
| 240 | + if path and (bed_path or bim_path or fam_path): |
| 241 | + raise ValueError( |
| 242 | + "Either `path` or all 3 of `{bed,bim,fam}_path` must be specified but not both" |
| 243 | + ) |
| 244 | + if path: |
| 245 | + bed_path, bim_path, fam_path = [ |
| 246 | + f"{path}.{ext}" for ext in ["bed", "bim", "fam"] |
| 247 | + ] |
| 248 | + |
| 249 | + # Load axis data first to determine dimension sizes |
| 250 | + df_fam = read_fam(fam_path, sep=fam_sep) # type: ignore[arg-type] |
| 251 | + df_bim = read_bim(bim_path, sep=bim_sep) # type: ignore[arg-type] |
| 252 | + |
| 253 | + if persist: |
| 254 | + df_fam = df_fam.persist() |
| 255 | + df_bim = df_bim.persist() |
| 256 | + |
| 257 | + arr_fam = _to_dict(df_fam, dtype=FAM_ARRAY_DTYPE) |
| 258 | + arr_bim = _to_dict(df_bim, dtype=BIM_ARRAY_DTYPE) |
| 259 | + |
| 260 | + # Load genotyping data |
| 261 | + call_genotype = da.from_array( |
| 262 | + # Make sure to use asarray=False in order for masked arrays to propagate |
| 263 | + BedReader(bed_path, (len(df_bim), len(df_fam)), count_A1=count_a1), # type: ignore[arg-type] |
| 264 | + chunks=chunks, |
| 265 | + # Lock must be true with multiprocessing dask scheduler |
| 266 | + # to not get bed-reader errors (it works w/ threading backend though) |
| 267 | + lock=lock, |
| 268 | + asarray=False, |
| 269 | + name=f"bed_reader:read_plink:{bed_path}", |
| 270 | + ) |
| 271 | + |
| 272 | + # If contigs are already integers, use them as-is |
| 273 | + if bim_int_contig: |
| 274 | + variant_contig = arr_bim["contig"].astype("int16") |
| 275 | + variant_contig_names = da.unique(variant_contig).astype(str) |
| 276 | + variant_contig_names = list(variant_contig_names.compute()) |
| 277 | + # Otherwise create index for contig names based |
| 278 | + # on order of appearance in underlying .bim file |
| 279 | + else: |
| 280 | + variant_contig, variant_contig_names = encode_array(arr_bim["contig"].compute()) |
| 281 | + variant_contig = variant_contig.astype("int16") |
| 282 | + variant_contig_names = list(variant_contig_names) |
| 283 | + |
| 284 | + variant_position = arr_bim["pos"] |
| 285 | + a1 = arr_bim["a1"].astype("str") |
| 286 | + a2 = arr_bim["a2"].astype("str") |
| 287 | + |
| 288 | + # Note: column_stack not implemented in Dask, must use [v|h]stack |
| 289 | + variant_alleles = da.hstack((a1[:, np.newaxis], a2[:, np.newaxis])) |
| 290 | + variant_alleles = variant_alleles.astype("S") |
| 291 | + variant_id = arr_bim["variant_id"] |
| 292 | + |
| 293 | + sample_id = arr_fam["member_id"] |
| 294 | + |
| 295 | + ds = create_genotype_call_dataset( |
| 296 | + variant_contig_names=variant_contig_names, |
| 297 | + variant_contig=variant_contig, |
| 298 | + variant_position=variant_position, |
| 299 | + variant_alleles=variant_alleles, |
| 300 | + sample_id=sample_id, |
| 301 | + call_genotype=call_genotype, |
| 302 | + variant_id=variant_id, |
| 303 | + ) |
| 304 | + |
| 305 | + # Assign PLINK-specific pedigree fields |
| 306 | + return ds.assign( |
| 307 | + {f"sample_{f}": (DIM_SAMPLE, arr_fam[f]) for f in arr_fam if f != "member_id"} |
| 308 | + ) |
0 commit comments