Skip to content

Optimize dictionary groupby aggregation for pandas backend #2491

Closed
@dchigarev

Description

@dchigarev

Currently, all functions from dictionary aggregation in groupby applied via groupby_agg method, that combines all partitions into a column partitions and applies aggregation functions to them. The disadvantage of this approach is that we're not getting the full power of parallelism. For example, if we have DataFrame with shape (1.000.000, 10), then groupby_agg will create only a single column partition and the function will be applied only for one partition (so only one core will be used).

In the opposite of groupby_agg there is GroupbyReduce approach, that firstly applies aggregation function by mapping over all partitions and then doing reduce phase. That approach in theory gives us a high utilization and a good scale, but can't be applied for functions that require information about the whole column. However many of groupby-built-in functions implemented with this approach and shows good performance (sum, any, mean?, min, max, prod, ...)

So groupby_agg gives us a bad scale in case of tight frames in comparison with GroupbyReduce, but it's the best we can do for now, because we have to provide to the aggregation function the whole column.

But it's a common case when dictionary aggregation looks like this (most of h2o groupby queries):

grp.aggregate(func={"col1": ["sum", "prod"], "col2": ["sum"], "col3": ["prod"]})

As we can see, all of those functions can be performed via GroupbyReduce approach, however, because it's dict-aggregation they will be applied via groupby_agg. So the proposal is to use GroupbyReduce for dictionary aggregation in cases where it's possible (when all aggregation function supports GroupbyReduce approach).

>>> agg_dict = {"col1": "sum", "col2": "prod"}
>>> prepare_dict_for_map_reduce(agg_dict)
({"col1": sum_map_fn, "col2": prod_map_fn}, {"col1": sum_reduce_fn, "col2": prod_reduce_fn})

Draft implementation and time measurements

Here is the draft implementation of the query compiler method that do dictionary groupby-reduce aggregation:

Under spoiler
def groupby_dict_reduce(self, by, agg_dict):
        # Dummy func
        def get_map_fn(*args, **kwargs):
            return lambda df: df.sum()

        # Dummy func
        def get_reduce_fn(*args, **kwargs):
            return lambda df: df.sum()

        def prepare_dict(agg_dict):
            return (
                {key: get_map_fn(fn) for col, fn in keys},
                {key: get_reduce_fn(fn) for col, fn in keys},
            )

        map_dict, reduce_dict = prepare_dict(agg_dict)
        return GroupbyReduceFunction.register(
            lambda df: df.agg(map_dict), lambda df: df.agg(reduce_dict)
        )(self, by, axis, groupby_args={}, map_args={})

And a time measurements of the first two groupby queries of H2O benchmark with 5GB data (pd.DEFAULT_NPARTITIONS = 112):

Query N Modin Pandas
1 3.67s 5.08s
2 3.97s 11.42s
Script to measure
import modin.pandas as pd
import numpy as np
from timeit import default_timer as timer

def measure_aggregation(df, by, agg_dict, implementation=None):
    t1 = timer()
    repr(df.groupby(by).agg(agg_dict))
    return timer() - t1

path = "/localdisk/benchmark_datasets/h2o/G1_1e8_1e1_0_0.csv"

by_cols = [
    ["id1"],
    ["id1", "id2"],
]

df = pd.read_csv(path)
print("file readed")
for by in by_cols:
    times = [measure_aggregation(df, by, {"v1": "sum"}) for _ in range(5)]
    print(f"len(by) == {len(by)}: {np.min(times)}s")

Metadata

Metadata

Assignees

Labels

Performance 🚀Performance related issues and pull requests.new feature/request 💬Requests and pull requests for new features

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions