Skip to content

Update visualizations & docs #189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Nov 27, 2022
1,185 changes: 1,185 additions & 0 deletions docs/diagrams/new-blockwise-annotated.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1,563 changes: 1,563 additions & 0 deletions docs/diagrams/new-blockwise.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1,845 changes: 1,845 additions & 0 deletions docs/diagrams/new-cohorts-annotated.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2,261 changes: 2,261 additions & 0 deletions docs/diagrams/new-cohorts.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1,887 changes: 1,887 additions & 0 deletions docs/diagrams/new-map-reduce-reindex-False-annotated.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2,382 changes: 2,382 additions & 0 deletions docs/diagrams/new-map-reduce-reindex-False.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1,794 changes: 1,794 additions & 0 deletions docs/diagrams/new-map-reduce-reindex-True-annotated.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2,373 changes: 2,373 additions & 0 deletions docs/diagrams/new-map-reduce-reindex-True.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2,370 changes: 2,370 additions & 0 deletions docs/diagrams/new-split-apply-combine-annotated.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2,342 changes: 2,342 additions & 0 deletions docs/diagrams/split-apply-combine.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
26 changes: 23 additions & 3 deletions docs/source/custom.md → docs/source/aggregations.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,27 @@
# Custom reductions
# Aggregations

`flox` implements all common reductions provided by `numpy_groupies` in `aggregations.py`.
It also allows you to specify a custom Aggregation (again inspired by dask.dataframe),
`flox` implements all common reductions provided by `numpy_groupies` in `aggregations.py`. Control this by passing
the `func` kwarg:

- `"sum"`, `"nansum"`
- `"prod"`, `"nanprod"`
- `"count"` - number of non-NaN elements by group
- `"mean"`, `"nanmean"`
- `"var"`, `"nanvar"`
- `"std"`, `"nanstd"`
- `"argmin"`
- `"argmax"`
- `"first"`
- `"last"`


```{tip}
We would like to add support for `cumsum`, `cumprod` ([issue](https://github.com/xarray-contrib/flox/issues/91)). Contributions are welcome!
```

## Custom Aggregations

`flox` also allows you to specify a custom Aggregation (again inspired by dask.dataframe),
though this might not be fully functional at the moment. See `aggregations.py` for examples.

See the ["Custom Aggregations"](user-stories/custom-aggregations.ipynb) user story for a more user-friendly example.
Expand Down
8 changes: 4 additions & 4 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Functions
.. autosummary::
:toctree: generated/

~core.groupby_reduce
groupby_reduce
xarray.xarray_reduce

Rechunking
Expand All @@ -18,8 +18,8 @@ Rechunking
.. autosummary::
:toctree: generated/

~core.rechunk_for_blockwise
~core.rechunk_for_cohorts
rechunk_for_blockwise
rechunk_for_cohorts
xarray.rechunk_for_blockwise
xarray.rechunk_for_cohorts

Expand All @@ -30,7 +30,7 @@ Visualization
:toctree: generated/

visualize.draw_mesh
visualize.visualize_groups
visualize.visualize_groups_1d
visualize.visualize_cohorts_2d

Aggregation Objects
Expand Down
15 changes: 15 additions & 0 deletions docs/source/arrays.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Duck Array Support

Aggregating over other array types will work if the array types supports the following methods, [ufunc.reduceat](https://numpy.org/doc/stable/reference/generated/numpy.ufunc.reduceat.html) or [ufunc.at](https://numpy.org/doc/stable/reference/generated/numpy.ufunc.at.html)


| Reduction | `method="numpy"` | `method="flox"` |
|--------------------------------|------------------|-------------------|
| sum, nansum | bincount | add.reduceat |
| mean, nanmean | bincount | add.reduceat |
| var, nanvar | bincount | add.reduceat |
| std, nanstd | bincount | add.reduceat |
| count | bincount | add.reduceat |
| prod | multiply.at | multiply.reduceat |
| max, nanmax, argmax, nanargmax | maximum.at | maximum.reduceat |
| min, nanmin, argmin, nanargmin | minimum.at | minimum.reduceat |
6 changes: 3 additions & 3 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
]

extlinks = {
"issue": ("https://github.com/xarray-contrib/flox/issues/%s", "GH#"),
"pr": ("https://github.com/xarray-contrib/flox/pull/%s", "GH#"),
"issue": ("https://github.com/xarray-contrib/flox/issues/%s", "GH#%s"),
"pr": ("https://github.com/xarray-contrib/flox/pull/%s", "PR#%s"),
}

templates_path = ["_templates"]
Expand Down Expand Up @@ -174,7 +174,7 @@
"numpy": ("https://numpy.org/doc/stable", None),
# "numba": ("https://numba.pydata.org/numba-doc/latest", None),
"dask": ("https://docs.dask.org/en/latest", None),
"xarray": ("http://xarray.pydata.org/en/stable/", None),
"xarray": ("https://docs.xarray.dev/en/stable/", None),
}

autosummary_generate = True
Expand Down
24 changes: 24 additions & 0 deletions docs/source/engines.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
(engines)=
# Engines

`flox` provides multiple options, using the `engine` kwarg, for computing the core GroupBy reduction on numpy or other array types other than dask.

1. `engine="numpy"` wraps `numpy_groupies.aggregate_numpy`. This uses indexing tricks and functions like `np.bincount`, or the ufunc `.at` methods
(.e.g `np.maximum.at`) to provided reasonably performant aggregations.
1. `engine="numba"` wraps `numpy_groupies.aggregate_numba`. This uses `numba` kernels for the core aggregation.
1. `engine="flox"` uses the `ufunc.reduceat` method after first argsorting the array so that all group members occur sequentially. This was copied from
a [gist by Stephan Hoyer](https://gist.github.com/shoyer/f538ac78ae904c936844)

See [](arrays) for more details.

## Tradeoffs

For the common case of reducing a nD array by a 1D array of group labels (e.g. `groupby("time.month")`), `engine="flox"` *can* be faster.
The reason is that `numpy_groupies` converts all groupby problems to a 1D problem, this can involve [some overhead](https://github.com/ml31415/numpy-groupies/pull/46).
It is possible to optimize this a bit in `flox` or `numpy_groupies`, but the work has not been done yet.
The advantage of `engine="numpy"` is that it tends to work for more array types, since it appears to be more common to implement `np.bincount`, and not `np.add.reduceat`.

```{tip}
Other potential engines we could add are [`numbagg`](https://github.com/numbagg/numbagg) ([stalled PR here](https://github.com/xarray-contrib/flox/pull/72)) and [`datashader`](https://github.com/xarray-contrib/flox/issues/142).
Both use numba for high-performance aggregations. Contributions or discussion is very welcome!
```
178 changes: 134 additions & 44 deletions docs/source/implementation.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
# Algorithms
(algorithms)=
# Parallel Algorithms

`flox` outsources the core GroupBy operation to the vectorized implementations in
[numpy_groupies](https://github.com/ml31415/numpy-groupies). Constructing
an efficient groupby reduction with dask is hard, and depends on how the
groups are distributed amongst the blocks of an array. `flox` implements 4 strategies for
grouped reductions, each is appropriate for a particular distribution of groups
among the blocks of a dask array.
`flox` outsources the core GroupBy operation to the vectorized implementations controlled by the
[`engine` kwarg](engines.md). Applying these implementations on a parallel array type like dask
can be hard. Performance strongly depends on how the groups are distributed amongst the blocks of an array.

Switch between the various strategies by passing `method` to either {py:func}`flox.core.groupby_reduce`
or `xarray_reduce`.
`flox` implements 4 strategies for grouped reductions, each is appropriate for a particular distribution of groups
among the blocks of a dask array. Switch between the various strategies by passing `method`
and/or `reindex` to either {py:func}`flox.groupby_reduce` or {py:func}`flox.xarray.xarray_reduce`.

Your options are:
1. [`method="map-reduce"` with `reindex=False`](map-reindex-false)
1. [`method="map-reduce"` with `reindex=True`](map-reindex-True)
1. [`method="blockwise"`](method-blockwise)
1. [`method="cohorts"`](method-cohorts)

First we describe xarray's current strategy
The most appropriate strategy for your problem will depend on the chunking of your dataset,
and the distribution of group labels across those chunks.

```{tip}
Currently these strategies are implemented for dask. We would like to generalize to other parallel array types
as appropriate (e.g. Ramba, cubed, arkouda). Please open an issue to discuss if you are interested.
```

(xarray-split)=
## Background: Xarray's current GroupBy strategy

Xarray's current strategy is to find all unique group labels, index out each group,
Expand All @@ -21,7 +32,10 @@ labels (i.e. you cannot use this strategy to group by a dask array).

Schematically, this looks like (colors indicate group labels; separated groups of colors
indicate different blocks of an array):
![xarray-current-strategy](../diagrams/split-reduce.png)
```{image} ../diagrams/new-split-apply-combine-annotated.svg
:alt: xarray-current-strategy
:width: 100%
```

The first step is to extract all members of a group, which involves a *lot* of
communication and is quite expensive (in dataframe terminology, this is a "shuffle").
Expand All @@ -30,89 +44,165 @@ big datasets.

## `method="map-reduce"`

The first idea is to use the "map-reduce" strategy (inspired by `dask.dataframe`).

![map-reduce-strategy-schematic](/../diagrams/map-reduce.png)

The "map-reduce" strategy is inspired by `dask.dataframe.groupby`).
The GroupBy reduction is first applied blockwise. Those intermediate results are
combined by concatenating to form a new array which is then reduced
again. The combining of intermediate results uses dask\'s `_tree_reduce`
till all group results are in one block. At that point the result is
\"finalized\" and returned to the user.

*Tradeoffs*:
1. Allows grouping by a dask array so group labels need not be known at graph construction
time.
1. Works well when either the initial blockwise reduction is effective, or if the
reduction at the first combine step is effective. "effective" means we actually
reduce values and release some memory.
### General Tradeoffs
1. This approach works well when either the initial blockwise reduction is effective, or if the
reduction at the first combine step is effective. Here "effective" means we have multiple members of a single
group in a block so the blockwise application of groupby-reduce actually reduces values and releases some memory.
2. One downside is that the final result will only have one chunk along the new group axis.
3. We have two choices for how to construct the intermediate arrays. See below.

(map-reindex-True)=
### `reindex=True`

If we know all the group labels, we can do so right at the blockwise step (`reindex=True`). This matches `dask.array.histogram` and
`xhistogram`, where the bin edges, or group labels oof the output, are known. The downside is the potential of large memory use
if number of output groups is much larger than number of groups in a block.

```{image} ../diagrams/new-map-reduce-reindex-True-annotated.svg
:alt: map-reduce-reindex-True-strategy-schematic
:width: 100%
```

(map-reindex-False)=
### `reindex=False`
We can `reindex` at the combine stage to groups present in the blocks being combined (`reindex=False`). This can limit memory use at the cost
of a performance reduction due to extra copies of the intermediate data during reindexing.

```{image} ../diagrams/new-map-reduce-reindex-False-annotated.svg
:alt: map-reduce-reindex-True-strategy-schematic
:width: 100%
```

This approach allows grouping by a dask array so group labels can be discovered at compute time, similar to `dask.dataframe.groupby`.

### Example

For example, consider `groupby("time.month")` with monthly frequency data and chunksize of 4 along `time`.
![cohorts-schematic](/../diagrams/cohorts-month-chunk4.png)
With `reindex=True`, each block will become 3x its original size at the blockwise step: input blocks have 4 timesteps while output block
has a value for all 12 months. One could use `reindex=False` to control memory usage but also see [`method="cohorts"`](method-cohorts) below.


(method-blockwise)=
## `method="blockwise"`

One case where `"map-reduce"` doesn't work well is the case of "resampling" reductions. An
example here is resampling from daily frequency to monthly frequency data: `da.resample(time="M").mean()`
One case where `method="map-reduce"` doesn't work well is the case of "resampling" reductions. An
example here is resampling from daily frequency to monthly frequency data: `da.resample(time="M").mean()`
For resampling type reductions,
1. Group members occur sequentially (all days in January 2001 occur one after the other)
2. All groups are roughly equal length (31 days in January but 28 in most Februaries)
2. All groups not of exactly equal length (31 days in January but 28 in most Februaries)
3. All members in a group are next to each other (if the time series is sorted, which it
usually is).
4. Because there can be a large number of groups, concatenating results for all groups in a single chunk could be catastrophic.

In this case, it makes sense to use `dask.dataframe` resample strategy which is to rechunk
In this case, it makes sense to use `dask.dataframe` resample strategy which is to rechunk using {py:func}`flox.rechunk_for_blockwise`
so that all members of a group are in a single block. Then, the groupby operation can be applied blockwise.

![blockwise-strategy-schematic](/../diagrams/blockwise.png)
```{image} ../diagrams/new-blockwise-annotated.svg
:alt: blockwise-strategy-schematic
:width: 100%
```

*Tradeoffs*
1. Only works for certain groupings.
1. Group labels must be known at graph construction time, so this only works for numpy arrays
1. Currently the rechunking is only implemented for 1D arrays (being motivated by time resampling),
but a nD generalization seems possible.
1. Only can use the `blockwise` strategy for grouping by `nD` arrays.
1. Works better when multiple groups are already in a single block; so that the intial
rechunking only involves a small amount of communication.

(method-cohorts)=
## `method="cohorts"`

We can combine all of the above ideas for cases where members from different groups tend to occur close to each other.
The `map-reduce` strategy is quite effective but can involve some unnecessary communication. It can be possible to exploit
patterns in how group labels are distributed across chunks (similar to `method="blockwise"` above). Two cases are illustrative:
1. Groups labels can be *approximately-periodic*: e.g. `time.dayofyear` (period 365 or 366) or `time.month` (period 12).
Consider our earlier example, `groupby("time.month")` with monthly frequency data and chunksize of 4 along `time`.
![cohorts-schematic](/../diagrams/cohorts-month-chunk4.png)
Because a chunksize of 4 evenly divides the number of groups (12) all we need to do is index out blocks
0, 3, 7 and then apply the `"map-reduce"` strategy to form the final result for months Jan-Apr. Repeat for the
remaining groups of months (May-Aug; Sep-Dec) and then concatenate.

2. Groups can be *spatially localized* like the blockwise case above, for example grouping by country administrative boundaries like
counties or districts. In this case, concatenating the result for the northwesternmost county or district and the southeasternmost
district can involve a lot of wasteful communication (again depending on chunking).

For such cases, we can adapt xarray's shuffling or subsetting strategy by indexing out "cohorts" or group labels
that tend to occur next to each other.

### A motivating example : time grouping

One example is the construction of "climatologies" which is a climate science term for something like `groupby("time.month")`
("monthly climatology") or `groupby("time.dayofyear")` ("daily climatology"). In these cases,
1. Groups occur sequentially (day 2 is always after day 1; and February is always after January)
2. Groups are approximately periodic (some years have 365 days and others have 366)

The idea here is to copy xarray's subsetting strategy but instead index out "cohorts" or group labels
that tend to occur next to each other.

Consider this example of monthly average data; where 4 months are present in a single block (i.e. chunksize=4)
Consider our earlier example, `groupby("time.month")` with monthly frequency data and chunksize of 4 along `time`.
![cohorts-schematic](/../diagrams/cohorts-month-chunk4.png)

Because a chunksize of 4 evenly divides the number of groups (12) all we need to do is index out blocks
With `method="map-reduce", reindex=True`, each block will become 3x its original size at the blockwise step: input blocks have 4 timesteps while output block
has a value for all 12 months. Note that the blockwise groupby-reduction *does not reduce* the data since there is only one element in each
group. In addition, since `map-reduce` will make the final result have only one chunk of size 12 along the new `month`
dimension, the final result has chunk sizes 3x that of the input, which may not be ideal.

However, because a chunksize of 4 evenly divides the number of groups (12) all we need to do is index out blocks
0, 3, 7 and then apply the `"map-reduce"` strategy to form the final result for months Jan-Apr. Repeat for the
remaining groups of months (May-Aug; Sep-Dec) and then concatenate.
remaining groups of months (May-Aug; Sep-Dec) and then concatenate. This is the essence of `method="cohorts"`


### Summary

We can generalize this idea for more complicated problems (inspired by the ``split_out``kwarg in `dask.dataframe.groupby`)
We first apply the groupby-reduction blockwise, then split and reindex blocks to create a new array with which we complete the reduction
using `map-reduce`. Because the split or shuffle step occurs after the blockwise reduction, we *sometimes* communicate a significantly smaller
amount of data than if we split or shuffled the input array.

```{image} /../diagrams/new-cohorts-annotated.svg
:alt: cohorts-strategy-schematic
:width: 100%
```

### Tradeoffs
1. Group labels must be known at graph construction time, so this only works for numpy arrays.
1. This does require more tasks and a more complicated graph, but the communication overhead can be significantly lower.
1. The detection of "cohorts" is currrently slow but could be improved.
1. The extra effort of detecting cohorts and mutiple copying of intermediate blocks may be worthwhile only if the chunk sizes are small
relative to the approximate period of group labels, or small relative to the size of spatially localized groups.


### Example : sensitivity to chunking

One annoyance is that if the chunksize doesn't evenly divide the number of groups, we still end up splitting a number of chunks.
Consider our earlier example, `groupby("time.month")` with monthly frequency data and chunksize of 4 along `time`.
![cohorts-schematic](/../diagrams/cohorts-month-chunk4.png)

`flox` can find these cohorts, below it identifies the cohorts with labels `1,2,3,4`; `5,6,7,8`, and `9,10,11,12`.
``` python
>>> flox.core.find_group_cohorts(labels, array.chunks[-1]))
>>> flox.find_group_cohorts(labels, array.chunks[-1]))
[[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12]] # 3 cohorts
```
For each cohort, it counts the number of blocks that need to be reduced. If `1` then it applies the reduction blockwise.
If > 1; then it uses `"map-reduce"`.

One annoyance is that if the chunksize doesn't evenly divide the number of groups, we still end up splitting a number of chunks.
For example, when `chunksize=5`
Now consider `chunksize=5`.
![cohorts-schematic](/../diagrams/cohorts-month-chunk5.png)

``` python
>>> flox.core.find_group_cohorts(labels, array.chunks[-1]))
[[1], [2, 3], [4, 5], [6], [7, 8], [9, 10], [11], [12]] # 8 cohorts
```
We find 8 cohorts (note the original xarray strategy is equivalent to constructing 12 cohorts).

It's possible that some initial rechunking makes the situation better (just rechunk from 5-4), but it isn't an obvious improvement.
We find 8 cohorts (note the original xarray strategy is equivalent to constructing 12 cohorts).
In this case, it seems to better to rechunk to a size of `4` along `time`.
If you have ideas for improving this case, please open an issue.

*Tradeoffs*
1. Generalizes well; when there's exactly one groups per chunk, this replicates Xarray's
strategy which is optimal. For resampling type reductions, as long as the array
is chunked appropriately ({py:func}`flox.core.rechunk_for_blockwise`, {py:func}`flox.xarray.rechunk_for_blockwise`), `method="cohorts"` is equivalent to `method="blockwise"`!
1. Group labels must be known at graph construction time, so this only works for numpy arrays
1. Currenltly implemented for grouping by 1D arrays. An nD generalization seems possible,
but hard?
### Example : spatial grouping
Loading