diff --git a/docs/diagrams/new-blockwise-annotated.svg b/docs/diagrams/new-blockwise-annotated.svg new file mode 100644 index 000000000..15f3f2417 --- /dev/null +++ b/docs/diagrams/new-blockwise-annotated.svg @@ -0,0 +1,1185 @@ + +2022-11-14T16:09:56.206507image/svg+xmlMatplotlib v3.6.0, https://matplotlib.org/ diff --git a/docs/diagrams/new-blockwise.svg b/docs/diagrams/new-blockwise.svg new file mode 100644 index 000000000..3f310ad10 --- /dev/null +++ b/docs/diagrams/new-blockwise.svg @@ -0,0 +1,1563 @@ + + + + + + + + 2022-11-15T16:33:54.164631 + image/svg+xml + + + Matplotlib v3.6.0, https://matplotlib.org/ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/diagrams/new-cohorts-annotated.svg b/docs/diagrams/new-cohorts-annotated.svg new file mode 100644 index 000000000..e6396328f --- /dev/null +++ b/docs/diagrams/new-cohorts-annotated.svg @@ -0,0 +1,1845 @@ + + + + diff --git a/docs/diagrams/new-cohorts.svg b/docs/diagrams/new-cohorts.svg new file mode 100644 index 000000000..fdbcf2050 --- /dev/null +++ b/docs/diagrams/new-cohorts.svg @@ -0,0 +1,2261 @@ + + + + + + + + 2022-11-14T16:28:28.725615 + image/svg+xml + + + Matplotlib v3.6.0, https://matplotlib.org/ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/diagrams/new-map-reduce-reindex-False-annotated.svg b/docs/diagrams/new-map-reduce-reindex-False-annotated.svg new file mode 100644 index 000000000..73a45dc82 --- /dev/null +++ b/docs/diagrams/new-map-reduce-reindex-False-annotated.svg @@ -0,0 +1,1887 @@ + + + + diff --git a/docs/diagrams/new-map-reduce-reindex-False.svg b/docs/diagrams/new-map-reduce-reindex-False.svg new file mode 100644 index 000000000..5e03ccee8 --- /dev/null +++ b/docs/diagrams/new-map-reduce-reindex-False.svg @@ -0,0 +1,2382 @@ + + + + + + + + 2022-11-15T21:26:39.966187 + image/svg+xml + + + Matplotlib v3.6.0, https://matplotlib.org/ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/diagrams/new-map-reduce-reindex-True-annotated.svg b/docs/diagrams/new-map-reduce-reindex-True-annotated.svg new file mode 100644 index 000000000..412eaa301 --- /dev/null +++ b/docs/diagrams/new-map-reduce-reindex-True-annotated.svg @@ -0,0 +1,1794 @@ + + + + diff --git a/docs/diagrams/new-map-reduce-reindex-True.svg b/docs/diagrams/new-map-reduce-reindex-True.svg new file mode 100644 index 000000000..97b615190 --- /dev/null +++ b/docs/diagrams/new-map-reduce-reindex-True.svg @@ -0,0 +1,2373 @@ + + + + + + + + 2022-11-15T16:09:02.384660 + image/svg+xml + + + Matplotlib v3.6.0, https://matplotlib.org/ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/diagrams/new-split-apply-combine-annotated.svg b/docs/diagrams/new-split-apply-combine-annotated.svg new file mode 100644 index 000000000..fd38c9fef --- /dev/null +++ b/docs/diagrams/new-split-apply-combine-annotated.svg @@ -0,0 +1,2370 @@ + + + + diff --git a/docs/diagrams/split-apply-combine.svg b/docs/diagrams/split-apply-combine.svg new file mode 100644 index 000000000..a118d8fb3 --- /dev/null +++ b/docs/diagrams/split-apply-combine.svg @@ -0,0 +1,2342 @@ + + + + + + + + 2022-11-26T15:23:21.390398 + image/svg+xml + + + Matplotlib v3.6.2, https://matplotlib.org/ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/source/custom.md b/docs/source/aggregations.md similarity index 60% rename from docs/source/custom.md rename to docs/source/aggregations.md index dca975529..69494f45c 100644 --- a/docs/source/custom.md +++ b/docs/source/aggregations.md @@ -1,7 +1,27 @@ -# Custom reductions +# Aggregations -`flox` implements all common reductions provided by `numpy_groupies` in `aggregations.py`. -It also allows you to specify a custom Aggregation (again inspired by dask.dataframe), +`flox` implements all common reductions provided by `numpy_groupies` in `aggregations.py`. Control this by passing +the `func` kwarg: + +- `"sum"`, `"nansum"` +- `"prod"`, `"nanprod"` +- `"count"` - number of non-NaN elements by group +- `"mean"`, `"nanmean"` +- `"var"`, `"nanvar"` +- `"std"`, `"nanstd"` +- `"argmin"` +- `"argmax"` +- `"first"` +- `"last"` + + +```{tip} +We would like to add support for `cumsum`, `cumprod` ([issue](https://github.com/xarray-contrib/flox/issues/91)). Contributions are welcome! +``` + +## Custom Aggregations + +`flox` also allows you to specify a custom Aggregation (again inspired by dask.dataframe), though this might not be fully functional at the moment. See `aggregations.py` for examples. See the ["Custom Aggregations"](user-stories/custom-aggregations.ipynb) user story for a more user-friendly example. diff --git a/docs/source/api.rst b/docs/source/api.rst index 5c0d21e63..b0d5e0aa7 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -9,7 +9,7 @@ Functions .. autosummary:: :toctree: generated/ - ~core.groupby_reduce + groupby_reduce xarray.xarray_reduce Rechunking @@ -18,8 +18,8 @@ Rechunking .. autosummary:: :toctree: generated/ - ~core.rechunk_for_blockwise - ~core.rechunk_for_cohorts + rechunk_for_blockwise + rechunk_for_cohorts xarray.rechunk_for_blockwise xarray.rechunk_for_cohorts @@ -30,7 +30,7 @@ Visualization :toctree: generated/ visualize.draw_mesh - visualize.visualize_groups + visualize.visualize_groups_1d visualize.visualize_cohorts_2d Aggregation Objects diff --git a/docs/source/arrays.md b/docs/source/arrays.md new file mode 100644 index 000000000..c7cb63f79 --- /dev/null +++ b/docs/source/arrays.md @@ -0,0 +1,15 @@ +# Duck Array Support + +Aggregating over other array types will work if the array types supports the following methods, [ufunc.reduceat](https://numpy.org/doc/stable/reference/generated/numpy.ufunc.reduceat.html) or [ufunc.at](https://numpy.org/doc/stable/reference/generated/numpy.ufunc.at.html) + + +| Reduction | `method="numpy"` | `method="flox"` | +|--------------------------------|------------------|-------------------| +| sum, nansum | bincount | add.reduceat | +| mean, nanmean | bincount | add.reduceat | +| var, nanvar | bincount | add.reduceat | +| std, nanstd | bincount | add.reduceat | +| count | bincount | add.reduceat | +| prod | multiply.at | multiply.reduceat | +| max, nanmax, argmax, nanargmax | maximum.at | maximum.reduceat | +| min, nanmin, argmin, nanargmin | minimum.at | minimum.reduceat | diff --git a/docs/source/conf.py b/docs/source/conf.py index 8790c9502..dacea42c7 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -43,8 +43,8 @@ ] extlinks = { - "issue": ("https://github.com/xarray-contrib/flox/issues/%s", "GH#"), - "pr": ("https://github.com/xarray-contrib/flox/pull/%s", "GH#"), + "issue": ("https://github.com/xarray-contrib/flox/issues/%s", "GH#%s"), + "pr": ("https://github.com/xarray-contrib/flox/pull/%s", "PR#%s"), } templates_path = ["_templates"] @@ -174,7 +174,7 @@ "numpy": ("https://numpy.org/doc/stable", None), # "numba": ("https://numba.pydata.org/numba-doc/latest", None), "dask": ("https://docs.dask.org/en/latest", None), - "xarray": ("http://xarray.pydata.org/en/stable/", None), + "xarray": ("https://docs.xarray.dev/en/stable/", None), } autosummary_generate = True diff --git a/docs/source/engines.md b/docs/source/engines.md new file mode 100644 index 000000000..14e8c4d7f --- /dev/null +++ b/docs/source/engines.md @@ -0,0 +1,24 @@ +(engines)= +# Engines + +`flox` provides multiple options, using the `engine` kwarg, for computing the core GroupBy reduction on numpy or other array types other than dask. + +1. `engine="numpy"` wraps `numpy_groupies.aggregate_numpy`. This uses indexing tricks and functions like `np.bincount`, or the ufunc `.at` methods + (.e.g `np.maximum.at`) to provided reasonably performant aggregations. +1. `engine="numba"` wraps `numpy_groupies.aggregate_numba`. This uses `numba` kernels for the core aggregation. +1. `engine="flox"` uses the `ufunc.reduceat` method after first argsorting the array so that all group members occur sequentially. This was copied from + a [gist by Stephan Hoyer](https://gist.github.com/shoyer/f538ac78ae904c936844) + +See [](arrays) for more details. + +## Tradeoffs + + For the common case of reducing a nD array by a 1D array of group labels (e.g. `groupby("time.month")`), `engine="flox"` *can* be faster. +The reason is that `numpy_groupies` converts all groupby problems to a 1D problem, this can involve [some overhead](https://github.com/ml31415/numpy-groupies/pull/46). +It is possible to optimize this a bit in `flox` or `numpy_groupies`, but the work has not been done yet. +The advantage of `engine="numpy"` is that it tends to work for more array types, since it appears to be more common to implement `np.bincount`, and not `np.add.reduceat`. + +```{tip} +Other potential engines we could add are [`numbagg`](https://github.com/numbagg/numbagg) ([stalled PR here](https://github.com/xarray-contrib/flox/pull/72)) and [`datashader`](https://github.com/xarray-contrib/flox/issues/142). +Both use numba for high-performance aggregations. Contributions or discussion is very welcome! +``` diff --git a/docs/source/implementation.md b/docs/source/implementation.md index ae0db3539..705fca429 100644 --- a/docs/source/implementation.md +++ b/docs/source/implementation.md @@ -1,18 +1,29 @@ -# Algorithms +(algorithms)= +# Parallel Algorithms -`flox` outsources the core GroupBy operation to the vectorized implementations in -[numpy_groupies](https://github.com/ml31415/numpy-groupies). Constructing -an efficient groupby reduction with dask is hard, and depends on how the -groups are distributed amongst the blocks of an array. `flox` implements 4 strategies for -grouped reductions, each is appropriate for a particular distribution of groups -among the blocks of a dask array. +`flox` outsources the core GroupBy operation to the vectorized implementations controlled by the +[`engine` kwarg](engines.md). Applying these implementations on a parallel array type like dask +can be hard. Performance strongly depends on how the groups are distributed amongst the blocks of an array. -Switch between the various strategies by passing `method` to either {py:func}`flox.core.groupby_reduce` -or `xarray_reduce`. +`flox` implements 4 strategies for grouped reductions, each is appropriate for a particular distribution of groups +among the blocks of a dask array. Switch between the various strategies by passing `method` +and/or `reindex` to either {py:func}`flox.groupby_reduce` or {py:func}`flox.xarray.xarray_reduce`. +Your options are: +1. [`method="map-reduce"` with `reindex=False`](map-reindex-false) +1. [`method="map-reduce"` with `reindex=True`](map-reindex-True) +1. [`method="blockwise"`](method-blockwise) +1. [`method="cohorts"`](method-cohorts) -First we describe xarray's current strategy +The most appropriate strategy for your problem will depend on the chunking of your dataset, +and the distribution of group labels across those chunks. +```{tip} +Currently these strategies are implemented for dask. We would like to generalize to other parallel array types +as appropriate (e.g. Ramba, cubed, arkouda). Please open an issue to discuss if you are interested. +``` + +(xarray-split)= ## Background: Xarray's current GroupBy strategy Xarray's current strategy is to find all unique group labels, index out each group, @@ -21,7 +32,10 @@ labels (i.e. you cannot use this strategy to group by a dask array). Schematically, this looks like (colors indicate group labels; separated groups of colors indicate different blocks of an array): -![xarray-current-strategy](../diagrams/split-reduce.png) +```{image} ../diagrams/new-split-apply-combine-annotated.svg +:alt: xarray-current-strategy +:width: 100% +``` The first step is to extract all members of a group, which involves a *lot* of communication and is quite expensive (in dataframe terminology, this is a "shuffle"). @@ -30,89 +44,165 @@ big datasets. ## `method="map-reduce"` -The first idea is to use the "map-reduce" strategy (inspired by `dask.dataframe`). ![map-reduce-strategy-schematic](/../diagrams/map-reduce.png) +The "map-reduce" strategy is inspired by `dask.dataframe.groupby`). The GroupBy reduction is first applied blockwise. Those intermediate results are combined by concatenating to form a new array which is then reduced again. The combining of intermediate results uses dask\'s `_tree_reduce` till all group results are in one block. At that point the result is \"finalized\" and returned to the user. -*Tradeoffs*: -1. Allows grouping by a dask array so group labels need not be known at graph construction - time. -1. Works well when either the initial blockwise reduction is effective, or if the - reduction at the first combine step is effective. "effective" means we actually - reduce values and release some memory. +### General Tradeoffs +1. This approach works well when either the initial blockwise reduction is effective, or if the +reduction at the first combine step is effective. Here "effective" means we have multiple members of a single +group in a block so the blockwise application of groupby-reduce actually reduces values and releases some memory. +2. One downside is that the final result will only have one chunk along the new group axis. +3. We have two choices for how to construct the intermediate arrays. See below. + +(map-reindex-True)= +### `reindex=True` + +If we know all the group labels, we can do so right at the blockwise step (`reindex=True`). This matches `dask.array.histogram` and +`xhistogram`, where the bin edges, or group labels oof the output, are known. The downside is the potential of large memory use +if number of output groups is much larger than number of groups in a block. + +```{image} ../diagrams/new-map-reduce-reindex-True-annotated.svg +:alt: map-reduce-reindex-True-strategy-schematic +:width: 100% +``` + +(map-reindex-False)= +### `reindex=False` +We can `reindex` at the combine stage to groups present in the blocks being combined (`reindex=False`). This can limit memory use at the cost +of a performance reduction due to extra copies of the intermediate data during reindexing. + +```{image} ../diagrams/new-map-reduce-reindex-False-annotated.svg +:alt: map-reduce-reindex-True-strategy-schematic +:width: 100% +``` + +This approach allows grouping by a dask array so group labels can be discovered at compute time, similar to `dask.dataframe.groupby`. +### Example + +For example, consider `groupby("time.month")` with monthly frequency data and chunksize of 4 along `time`. +![cohorts-schematic](/../diagrams/cohorts-month-chunk4.png) +With `reindex=True`, each block will become 3x its original size at the blockwise step: input blocks have 4 timesteps while output block +has a value for all 12 months. One could use `reindex=False` to control memory usage but also see [`method="cohorts"`](method-cohorts) below. + + +(method-blockwise)= ## `method="blockwise"` -One case where `"map-reduce"` doesn't work well is the case of "resampling" reductions. An -example here is resampling from daily frequency to monthly frequency data: `da.resample(time="M").mean()` +One case where `method="map-reduce"` doesn't work well is the case of "resampling" reductions. An +example here is resampling from daily frequency to monthly frequency data: `da.resample(time="M").mean()` For resampling type reductions, 1. Group members occur sequentially (all days in January 2001 occur one after the other) -2. All groups are roughly equal length (31 days in January but 28 in most Februaries) +2. All groups not of exactly equal length (31 days in January but 28 in most Februaries) 3. All members in a group are next to each other (if the time series is sorted, which it usually is). +4. Because there can be a large number of groups, concatenating results for all groups in a single chunk could be catastrophic. -In this case, it makes sense to use `dask.dataframe` resample strategy which is to rechunk +In this case, it makes sense to use `dask.dataframe` resample strategy which is to rechunk using {py:func}`flox.rechunk_for_blockwise` so that all members of a group are in a single block. Then, the groupby operation can be applied blockwise. -![blockwise-strategy-schematic](/../diagrams/blockwise.png) +```{image} ../diagrams/new-blockwise-annotated.svg +:alt: blockwise-strategy-schematic +:width: 100% +``` *Tradeoffs* 1. Only works for certain groupings. 1. Group labels must be known at graph construction time, so this only works for numpy arrays 1. Currently the rechunking is only implemented for 1D arrays (being motivated by time resampling), but a nD generalization seems possible. +1. Only can use the `blockwise` strategy for grouping by `nD` arrays. 1. Works better when multiple groups are already in a single block; so that the intial rechunking only involves a small amount of communication. +(method-cohorts)= ## `method="cohorts"` -We can combine all of the above ideas for cases where members from different groups tend to occur close to each other. +The `map-reduce` strategy is quite effective but can involve some unnecessary communication. It can be possible to exploit +patterns in how group labels are distributed across chunks (similar to `method="blockwise"` above). Two cases are illustrative: +1. Groups labels can be *approximately-periodic*: e.g. `time.dayofyear` (period 365 or 366) or `time.month` (period 12). + Consider our earlier example, `groupby("time.month")` with monthly frequency data and chunksize of 4 along `time`. + ![cohorts-schematic](/../diagrams/cohorts-month-chunk4.png) + Because a chunksize of 4 evenly divides the number of groups (12) all we need to do is index out blocks + 0, 3, 7 and then apply the `"map-reduce"` strategy to form the final result for months Jan-Apr. Repeat for the + remaining groups of months (May-Aug; Sep-Dec) and then concatenate. + +2. Groups can be *spatially localized* like the blockwise case above, for example grouping by country administrative boundaries like + counties or districts. In this case, concatenating the result for the northwesternmost county or district and the southeasternmost + district can involve a lot of wasteful communication (again depending on chunking). + +For such cases, we can adapt xarray's shuffling or subsetting strategy by indexing out "cohorts" or group labels +that tend to occur next to each other. + +### A motivating example : time grouping + One example is the construction of "climatologies" which is a climate science term for something like `groupby("time.month")` ("monthly climatology") or `groupby("time.dayofyear")` ("daily climatology"). In these cases, 1. Groups occur sequentially (day 2 is always after day 1; and February is always after January) 2. Groups are approximately periodic (some years have 365 days and others have 366) -The idea here is to copy xarray's subsetting strategy but instead index out "cohorts" or group labels -that tend to occur next to each other. - -Consider this example of monthly average data; where 4 months are present in a single block (i.e. chunksize=4) +Consider our earlier example, `groupby("time.month")` with monthly frequency data and chunksize of 4 along `time`. ![cohorts-schematic](/../diagrams/cohorts-month-chunk4.png) -Because a chunksize of 4 evenly divides the number of groups (12) all we need to do is index out blocks +With `method="map-reduce", reindex=True`, each block will become 3x its original size at the blockwise step: input blocks have 4 timesteps while output block +has a value for all 12 months. Note that the blockwise groupby-reduction *does not reduce* the data since there is only one element in each +group. In addition, since `map-reduce` will make the final result have only one chunk of size 12 along the new `month` +dimension, the final result has chunk sizes 3x that of the input, which may not be ideal. + +However, because a chunksize of 4 evenly divides the number of groups (12) all we need to do is index out blocks 0, 3, 7 and then apply the `"map-reduce"` strategy to form the final result for months Jan-Apr. Repeat for the -remaining groups of months (May-Aug; Sep-Dec) and then concatenate. +remaining groups of months (May-Aug; Sep-Dec) and then concatenate. This is the essence of `method="cohorts"` + + +### Summary + +We can generalize this idea for more complicated problems (inspired by the ``split_out``kwarg in `dask.dataframe.groupby`) +We first apply the groupby-reduction blockwise, then split and reindex blocks to create a new array with which we complete the reduction +using `map-reduce`. Because the split or shuffle step occurs after the blockwise reduction, we *sometimes* communicate a significantly smaller +amount of data than if we split or shuffled the input array. + +```{image} /../diagrams/new-cohorts-annotated.svg +:alt: cohorts-strategy-schematic +:width: 100% +``` + +### Tradeoffs +1. Group labels must be known at graph construction time, so this only works for numpy arrays. +1. This does require more tasks and a more complicated graph, but the communication overhead can be significantly lower. +1. The detection of "cohorts" is currrently slow but could be improved. +1. The extra effort of detecting cohorts and mutiple copying of intermediate blocks may be worthwhile only if the chunk sizes are small + relative to the approximate period of group labels, or small relative to the size of spatially localized groups. + + +### Example : sensitivity to chunking + +One annoyance is that if the chunksize doesn't evenly divide the number of groups, we still end up splitting a number of chunks. +Consider our earlier example, `groupby("time.month")` with monthly frequency data and chunksize of 4 along `time`. +![cohorts-schematic](/../diagrams/cohorts-month-chunk4.png) `flox` can find these cohorts, below it identifies the cohorts with labels `1,2,3,4`; `5,6,7,8`, and `9,10,11,12`. ``` python ->>> flox.core.find_group_cohorts(labels, array.chunks[-1])) +>>> flox.find_group_cohorts(labels, array.chunks[-1])) [[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12]] # 3 cohorts ``` -For each cohort, it counts the number of blocks that need to be reduced. If `1` then it applies the reduction blockwise. -If > 1; then it uses `"map-reduce"`. -One annoyance is that if the chunksize doesn't evenly divide the number of groups, we still end up splitting a number of chunks. -For example, when `chunksize=5` +Now consider `chunksize=5`. ![cohorts-schematic](/../diagrams/cohorts-month-chunk5.png) ``` python >>> flox.core.find_group_cohorts(labels, array.chunks[-1])) [[1], [2, 3], [4, 5], [6], [7, 8], [9, 10], [11], [12]] # 8 cohorts ``` -We find 8 cohorts (note the original xarray strategy is equivalent to constructing 12 cohorts). -It's possible that some initial rechunking makes the situation better (just rechunk from 5-4), but it isn't an obvious improvement. +We find 8 cohorts (note the original xarray strategy is equivalent to constructing 12 cohorts). +In this case, it seems to better to rechunk to a size of `4` along `time`. If you have ideas for improving this case, please open an issue. -*Tradeoffs* -1. Generalizes well; when there's exactly one groups per chunk, this replicates Xarray's - strategy which is optimal. For resampling type reductions, as long as the array - is chunked appropriately ({py:func}`flox.core.rechunk_for_blockwise`, {py:func}`flox.xarray.rechunk_for_blockwise`), `method="cohorts"` is equivalent to `method="blockwise"`! -1. Group labels must be known at graph construction time, so this only works for numpy arrays -1. Currenltly implemented for grouping by 1D arrays. An nD generalization seems possible, - but hard? +### Example : spatial grouping diff --git a/docs/source/index.md b/docs/source/index.md index cf4c5c3ef..2e4f75a08 100644 --- a/docs/source/index.md +++ b/docs/source/index.md @@ -1,14 +1,19 @@ # flox: fast & furious GroupBy reductions for `dask.array` - -## Overview - [![GitHub Workflow CI Status](https://img.shields.io/github/workflow/status/xarray-contrib/flox/CI?logo=github&style=flat)](https://github.com/xarray-contrib/flox/actions) -[![GitHub Workflow Code Style Status](https://img.shields.io/github/workflow/status/xarray-contrib/flox/code-style?label=Code%20Style&style=flat)](https://github.com/xarray-contrib/flox/actions) +[![pre-commit.ci status](https://results.pre-commit.ci/badge/github/xarray-contrib/flox/main.svg)](https://results.pre-commit.ci/latest/github/xarray-contrib/flox/main) [![image](https://img.shields.io/codecov/c/github/xarray-contrib/flox.svg?style=flat)](https://codecov.io/gh/xarray-contrib/flox) +[![Documentation Status](https://readthedocs.org/projects/flox/badge/?version=latest)](https://flox.readthedocs.io/en/latest/?badge=latest) + [![PyPI](https://img.shields.io/pypi/v/flox.svg?style=flat)](https://pypi.org/project/flox/) [![Conda-forge](https://img.shields.io/conda/vn/conda-forge/flox.svg?style=flat)](https://anaconda.org/conda-forge/flox) -This project explores strategies for fast GroupBy reductions with dask.array. It used to be called `dask_groupby`. It was motivated by +[![NASA-80NSSC18M0156](https://img.shields.io/badge/NASA-80NSSC18M0156-blue)](https://earthdata.nasa.gov/esds/competitive-programs/access/pangeo-ml) +[![NASA-80NSSC22K0345](https://img.shields.io/badge/NASA-80NSSC22K0345-blue)](https://science.nasa.gov/open-science-overview) + +## Overview + +`flox` mainly provides strategies for fast GroupBy reductions with dask.array. `flox` uses the MapReduce paradigm (or a "tree reduction") +to run the GroupBy operation in a parallel-native way totally avoiding a sort or shuffle operation. It was motivated by 1. Dask Dataframe GroupBy [blogpost](https://blog.dask.org/2019/10/08/df-groupby) @@ -17,6 +22,17 @@ This project explores strategies for fast GroupBy reductions with dask.array. It See a presentation ([video](https://discourse.pangeo.io/t/november-17-2021-flox-fast-furious-groupby-reductions-with-dask-at-pangeo-scale/2016), [slides](https://docs.google.com/presentation/d/1YubKrwu9zPHC_CzVBhvORuQBW-z148BvX3Ne8XcvWsQ/edit?usp=sharing)) about this package, from the Pangeo Showcase. +## Why flox? + +1. {py:func}`flox.groupby_reduce` [wraps](engines.md) the `numpy-groupies` package for performant Groupby reductions on nD arrays. +1. {py:func}`flox.groupby_reduce` provides [parallel-friendly strategies](implementation.md) for GroupBy reductions by wrapping `numpy-groupies` for dask arrays. +1. `flox` [integrates with xarray](xarray.md) to provide more performant Groupby and Resampling operations. +1. {py:func}`flox.xarray.xarray_reduce` [extends](xarray.md) Xarray's GroupBy operations allowing lazy grouping by dask arrays, grouping by multiple arrays, + as well as combining categorical grouping and histogram-style binning operations using multiple variables. +1. `flox` also provides utility functions for rechunking both dask arrays and Xarray objects along a single dimension using the group labels as a guide: + 1. To rechunk for blockwise operations: {py:func}`flox.rechunk_for_blockwise`, {py:func}`flox.xarray.rechunk_for_blockwise`. + 1. To rechunk so that "cohorts", or groups of labels, tend to occur in the same chunks: {py:func}`flox.rechunk_for_cohorts`, {py:func}`flox.xarray.rechunk_for_cohorts`. + ## Installing ``` shell @@ -27,19 +43,15 @@ $ pip install flox $ conda install -c conda-forge flox ``` -## API - -There are two main functions -1. {py:func}`flox.core.groupby_reduce` - "pure" dask array interface -1. {py:func}`flox.xarray.xarray_reduce` - "pure" xarray interface; though [work is ongoing](https://github.com/pydata/xarray/pull/5734) to integrate this - package in xarray. - ## Acknowledgements -This work was funded in part by NASA-ACCESS 80NSSC18M0156 "Community tools for analysis of NASA Earth Observing System -Data in the Cloud" (PI J. Hamman), and [NCAR's Earth System Data Science Initiative](https://ncar.github.io/esds/). +This work was funded in part by +1. NASA-ACCESS 80NSSC18M0156 "Community tools for analysis of NASA Earth Observing System + Data in the Cloud" (PI J. Hamman), +2. NASA-OSTFL 80NSSC22K0345 "Enhancing analysis of NASA data with the open-source Python Xarray Library" (PIs Scott Henderson, University of Washington; + Deepak Cherian, NCAR; Jessica Scheick, University of New Hampshire), and +3. [NCAR's Earth System Data Science Initiative](https://ncar.github.io/esds/). + It was motivated by many discussions in the [Pangeo](https://pangeo.io) community. ## Contents @@ -47,8 +59,11 @@ It was motivated by many discussions in the [Pangeo](https://pangeo.io) communit .. toctree:: :maxdepth: 1 + aggregations.md + engines.md + arrays.md implementation.md - custom.md - api.rst + xarray.md user-stories.md + api.rst ``` diff --git a/docs/source/user-stories.md b/docs/source/user-stories.md index 2f190c63b..22b37939e 100644 --- a/docs/source/user-stories.md +++ b/docs/source/user-stories.md @@ -1,4 +1,4 @@ -# User Stories +# Tricks & Stories ```{eval-rst} .. toctree:: diff --git a/docs/source/xarray.md b/docs/source/xarray.md new file mode 100644 index 000000000..d566423b3 --- /dev/null +++ b/docs/source/xarray.md @@ -0,0 +1,28 @@ +(xarray)= +# Xarray + +Xarray will use flox by default (if installed) for DataArrays containing numpy and dask arrays. The default choice is `method="cohorts"` which generalizes +the best. Pass flox-specific kwargs to the specific reduction method: +```python +ds.groupby("time.month").mean(method="map-reduce", engine="flox") +ds.groupby_bins("lon", bins=[0, 10, 20]).mean(method="map-reduce") +ds.resample(time="M").mean(method="blockwise") +``` + +Xarray's GroupBy operations are currently limited: +1. One can only group by a single variable. +1. When grouping by a dask array, that array will be computed to discover the unique group labels, and their locations + +These limitations can be avoided by using {py:func}`flox.xarray.xarray_reduce` which allows grouping by multiple variables, lazy grouping by dask variables, +as well as an arbitrary combination of categorical grouping and binning. For example, +```python +flox.xarray.xarray_reduce( + ds, + ds.time.dt.month, + ds.lon, + func="mean", + expected_groups=[None, [0, 10, 20]], + isbin=[False, True], + method="map-reduce", +) +``` diff --git a/flox/core.py b/flox/core.py index 689f58f89..30db58012 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1571,7 +1571,7 @@ def groupby_reduce( ---------- array : ndarray or DaskArray Array to be reduced, possibly nD - by : ndarray or DaskArray + *by : ndarray or DaskArray Array of labels to group over. Must be aligned with ``array`` so that ``array.shape[-by.ndim :] == by.shape`` func : str or Aggregation @@ -1590,7 +1590,7 @@ def groupby_reduce( Negative integers are normalized using array.ndim fill_value : Any Value to assign when a label in ``expected_groups`` is not present. - dtype: data-type , optional + dtype : data-type , optional DType for the output. Can be anything that is accepted by ``np.dtype``. min_count : int, default: None The required number of valid values to perform the operation. If @@ -1636,11 +1636,11 @@ def groupby_reduce( * ``"numba"``: Use the implementations in ``numpy_groupies.aggregate_numba``. reindex : bool, optional - Whether to "reindex" the blockwise results to `expected_groups` (possibly automatically detected). + Whether to "reindex" the blockwise results to ``expected_groups`` (possibly automatically detected). If True, the intermediate result of the blockwise groupby-reduction has a value for all expected groups, and the final result is a simple reduction of those intermediates. In nearly all cases, this is a significant boost in computation speed. For cases like time grouping, this may result in large intermediates relative to the - original block size. Avoid that by using method="cohorts". By default, it is turned off for argreductions. + original block size. Avoid that by using ``method="cohorts"``. By default, it is turned off for argreductions. finalize_kwargs : dict, optional Kwargs passed to finalize the reduction such as ``ddof`` for var, std. diff --git a/flox/visualize.py b/flox/visualize.py index 0a8b84ea3..a97215298 100644 --- a/flox/visualize.py +++ b/flox/visualize.py @@ -21,12 +21,13 @@ def draw_mesh( colors=None, randomize=True, x0=0, + y0=0, append=False, ): dx = 2 xpts = x0 + np.arange(0, (ncol + nspaces) * dx, dx) - ypts = np.arange(0, nrow * dx, dx) + ypts = y0 + np.arange(0, nrow * dx, dx) if colors is None: colors = mpl.cm.Set2.colors[:4] @@ -39,6 +40,7 @@ def draw_mesh( ax.set_aspect(1) ax.set_axis_off() + # ncolors = len(colors) if not randomize: colors = iter(colors) @@ -55,7 +57,7 @@ def draw_mesh( counter[fcolor] += 1 ax.add_patch( mpl.patches.Rectangle( - (x, y - 0.5 * dx), + (x, y), dx, dx, edgecolor="w", @@ -66,14 +68,15 @@ def draw_mesh( if draw_line_at is not None and icolor > 0 and icolor % draw_line_at == 0: plt.plot([x, x], [y - 0.75 * dx, y + 0.75 * dx], color="k", lw=2) - ax.set_xlim((0, max(xpts) + dx)) - ax.set_ylim((-0.75 * dx, max(ypts) + 0.75 * dx)) + # assert n + 1 == ncolors, (n, ncolors) + ax.set_xlim((0, max(xpts) + 2 * dx)) + ax.set_ylim((-0.75 * dx + min(ypts), max(ypts) + 0.75 * dx)) if not append: plt.gcf().set_size_inches((ncol * pxin, (nrow + 2) * pxin)) -def visualize_groups_1d(array, labels, axis=-1, colors=None, cmap=None): +def visualize_groups_1d(array, labels, axis=-1, colors=None, cmap=None, append=True, x0=0): """ Visualize group distribution for a 1D array of group labels. """ @@ -93,7 +96,8 @@ def visualize_groups_1d(array, labels, axis=-1, colors=None, cmap=None): if len(unique_labels) > len(colors): raise ValueError("Not enough unique colors") - plt.figure() + if not append: + fig = plt.figure() i0 = 0 for i in chunks: lab = labels[i0 : i0 + i] @@ -103,13 +107,14 @@ def visualize_groups_1d(array, labels, axis=-1, colors=None, cmap=None): len(lab) + 1, colors=col, randomize=False, - append=True, - x0=i0 * 2.3, # + (i0 - 1) * 0.025, + append=append, + x0=x0 + i0 * 2.3, # + (i0 - 1) * 0.025, ) i0 += i - pxin = 0.8 - plt.gcf().set_size_inches((len(labels) * pxin, 1 * pxin)) + if not append: + pxin = 0.8 + fig.set_size_inches((len(labels) * pxin, 1 * pxin)) def get_colormap(N): @@ -173,3 +178,19 @@ def _visualize_cohorts(by, cohorts, ax=None): _, ax = plt.subplots(1, 1) ax.imshow(factorize_cohorts(by, cohorts), vmin=0, cmap=get_colormap(len(cohorts))) + + +def visualize_groups_2d(labels, y0=0, **kwargs): + colors = mpl.cm.tab10_r + for i, chunk in enumerate(labels): + chunk = np.atleast_2d(chunk) + draw_mesh( + *chunk.shape, + colors=tuple(colors(label) for label in np.flipud(chunk).ravel()), + randomize=False, + append=True, + y0=y0, + **kwargs, + ) + y0 = y0 + 2 * chunk.shape[0] + 2 + plt.ylim([-1, y0]) diff --git a/flox/xarray.py b/flox/xarray.py index 682dccf4d..35d99b6fa 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -103,7 +103,7 @@ def xarray_reduce( fill_value Value used for missing groups in the output i.e. when one of the labels in ``expected_groups`` is not actually present in ``by``. - dtype: data-type, optional + dtype : data-type, optional DType for the output. Can be anything accepted by ``np.dtype``. method : {"map-reduce", "blockwise", "cohorts", "split-reduce"}, optional Strategy for reduction of dask arrays only: @@ -161,7 +161,7 @@ def xarray_reduce( and the final result is a simple reduction of those intermediates. In nearly all cases, this is a significant boost in computation speed. For cases like time grouping, this may result in large intermediates relative to the original block size. Avoid that by using method="cohorts". By default, it is turned off for arg reductions. - **finalize_kwargs : + **finalize_kwargs kwargs passed to the finalize function, like ``ddof`` for var, std. Returns