Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions bigframes/core/compile/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,18 @@

from typing import TYPE_CHECKING

from bigframes.core import rewrite
from bigframes.core.compile.ibis_compiler import ibis_compiler

if TYPE_CHECKING:
import bigframes.core.nodes


def test_only_ibis_inferred_schema(node: bigframes.core.nodes.BigFrameNode):
"""Use only for testing paths to ensure ibis inferred schema does not diverge from bigframes inferred schema."""
from bigframes.core.compile.ibis_compiler import ibis_compiler
import bigframes.core.rewrite
import bigframes.core.schema

node = ibis_compiler._replace_unsupported_ops(node)
node = rewrite.bake_order(node)
node = bigframes.core.rewrite.bake_order(node)
ir = ibis_compiler.compile_node(node)
items = tuple(
bigframes.core.schema.SchemaItem(name, ir.get_column_type(ibis_id))
Expand Down
14 changes: 14 additions & 0 deletions bigframes/core/groupby/dataframe_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,20 @@ def head(self, n: int = 5) -> df.DataFrame:
)
)

def describe(self, include: None | Literal["all"] = None):
from bigframes.pandas.core.methods import describe

return df.DataFrame(
describe._describe(
self._block,
self._selected_cols,
include,
as_index=self._as_index,
by_col_ids=self._by_col_ids,
dropna=self._dropna,
)
)

def size(self) -> typing.Union[df.DataFrame, series.Series]:
agg_block, _ = self._block.aggregate_size(
by_column_ids=self._by_col_ids,
Expand Down
14 changes: 14 additions & 0 deletions bigframes/core/groupby/series_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ def head(self, n: int = 5) -> series.Series:
)
)

def describe(self, include: None | Literal["all"] = None):
from bigframes.pandas.core.methods import describe

return df.DataFrame(
describe._describe(
self._block,
columns=[self._value_column],
include=include,
as_index=True,
by_col_ids=self._by_col_ids,
dropna=self._dropna,
)
).droplevel(level=0, axis=1)

def all(self) -> series.Series:
return self._aggregate(agg_ops.all_op)

Expand Down
4 changes: 0 additions & 4 deletions bigframes/core/rewrite/implicit_align.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@
from typing import cast, Optional, Sequence, Set, Tuple

import bigframes.core.expression
import bigframes.core.guid
import bigframes.core.identifiers
import bigframes.core.join_def
import bigframes.core.nodes
import bigframes.core.window_spec
import bigframes.operations.aggregations

# Combination of selects and additive nodes can be merged as an explicit keyless "row join"
ALIGNABLE_NODES = (
Expand Down
166 changes: 72 additions & 94 deletions bigframes/pandas/core/methods/describe.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@

import typing

import pandas as pd

from bigframes import dataframe, dtypes, series
from bigframes.core.reshape import api as rs
from bigframes.core import agg_expressions, blocks
from bigframes.operations import aggregations

_DEFAULT_DTYPES = (
dtypes.NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE + dtypes.TEMPORAL_NUMERIC_BIGFRAMES_TYPES
)


def describe(
Expand All @@ -30,100 +37,71 @@ def describe(
elif not isinstance(input, dataframe.DataFrame):
raise TypeError(f"Unsupported type: {type(input)}")

if include is None:
numeric_df = _select_dtypes(
input,
dtypes.NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE
+ dtypes.TEMPORAL_NUMERIC_BIGFRAMES_TYPES,
)
if len(numeric_df.columns) == 0:
# Describe eligible non-numeric columns
return _describe_non_numeric(input)

# Otherwise, only describe numeric columns
return _describe_numeric(input)

elif include == "all":
numeric_result = _describe_numeric(input)
non_numeric_result = _describe_non_numeric(input)

if len(numeric_result.columns) == 0:
return non_numeric_result
elif len(non_numeric_result.columns) == 0:
return numeric_result
else:
# Use reindex after join to preserve the original column order.
return rs.concat(
[non_numeric_result, numeric_result], axis=1
)._reindex_columns(input.columns)
block = input._block

else:
raise ValueError(f"Unsupported include type: {include}")


def _describe_numeric(df: dataframe.DataFrame) -> dataframe.DataFrame:
number_df_result = typing.cast(
dataframe.DataFrame,
_select_dtypes(df, dtypes.NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE).agg(
[
"count",
"mean",
"std",
"min",
"25%",
"50%",
"75%",
"max",
]
),
)
temporal_df_result = typing.cast(
dataframe.DataFrame,
_select_dtypes(df, dtypes.TEMPORAL_NUMERIC_BIGFRAMES_TYPES).agg(["count"]),
)
describe_block = _describe(block, columns=block.value_columns, include=include)

return dataframe.DataFrame(describe_block).stack().droplevel(level=0)

if len(number_df_result.columns) == 0:
return temporal_df_result
elif len(temporal_df_result.columns) == 0:
return number_df_result
else:
import bigframes.core.reshape.api as rs

original_columns = _select_dtypes(
df,
dtypes.NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE
+ dtypes.TEMPORAL_NUMERIC_BIGFRAMES_TYPES,
).columns

# Use reindex after join to preserve the original column order.
return rs.concat(
[number_df_result, temporal_df_result],
axis=1,
)._reindex_columns(original_columns)


def _describe_non_numeric(df: dataframe.DataFrame) -> dataframe.DataFrame:
return typing.cast(
dataframe.DataFrame,
_select_dtypes(
df,
[
dtypes.STRING_DTYPE,
dtypes.BOOL_DTYPE,
dtypes.BYTES_DTYPE,
dtypes.TIME_DTYPE,
],
).agg(["count", "nunique"]),
)

def _describe(
block: blocks.Block,
columns: typing.Sequence[str],
include: None | typing.Literal["all"] = None,
*,
as_index: bool = True,
by_col_ids: typing.Sequence[str] = [],
dropna: bool = False,
) -> blocks.Block:
stats: list[agg_expressions.Aggregation] = []
column_labels: list[typing.Hashable] = []

def _select_dtypes(
df: dataframe.DataFrame, dtypes: typing.Sequence[dtypes.Dtype]
) -> dataframe.DataFrame:
"""Selects columns without considering inheritance relationships."""
columns = [
col_id
for col_id, dtype in zip(df._block.value_columns, df._block.dtypes)
if dtype in dtypes
]
return dataframe.DataFrame(df._block.select_columns(columns))
# include=None behaves like include='all' if no numeric columns present
if include is None:
if not any(
block.expr.get_column_type(col) in _DEFAULT_DTYPES for col in columns
):
include = "all"

for col_id in columns:
label = block.col_id_to_label[col_id]
dtype = block.expr.get_column_type(col_id)
if include != "all" and dtype not in _DEFAULT_DTYPES:
continue
agg_ops = _get_aggs_for_dtype(dtype)
stats.extend(op.as_expr(col_id) for op in agg_ops)
label_tuple = (label,) if block.column_labels.nlevels == 1 else label
column_labels.extend((*label_tuple, op.name) for op in agg_ops) # type: ignore

agg_block, _ = block.aggregate(
by_column_ids=by_col_ids,
aggregations=stats,
dropna=dropna,
column_labels=pd.Index(column_labels, name=(*block.column_labels.names, None)),
)
return agg_block if as_index else agg_block.reset_index(drop=False)


def _get_aggs_for_dtype(dtype) -> list[aggregations.UnaryAggregateOp]:
if dtype in dtypes.NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE:
return [
aggregations.count_op,
aggregations.mean_op,
aggregations.std_op,
aggregations.min_op,
aggregations.ApproxQuartilesOp(1),
aggregations.ApproxQuartilesOp(2),
aggregations.ApproxQuartilesOp(3),
aggregations.max_op,
]
elif dtype in dtypes.TEMPORAL_NUMERIC_BIGFRAMES_TYPES:
return [aggregations.count_op]
elif dtype in [
dtypes.STRING_DTYPE,
dtypes.BOOL_DTYPE,
dtypes.BYTES_DTYPE,
dtypes.TIME_DTYPE,
]:
return [aggregations.count_op, aggregations.nunique_op]
else:
return []
122 changes: 122 additions & 0 deletions tests/system/small/pandas/test_describe.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,125 @@ def test_series_describe_temporal(scalars_dfs):
check_dtype=False,
check_index_type=False,
)


def test_df_groupby_describe(scalars_dfs):
# TODO: supply a reason why this isn't compatible with pandas 1.x
pytest.importorskip("pandas", minversion="2.0.0")
scalars_df, scalars_pandas_df = scalars_dfs

numeric_columns = [
"int64_col",
"float64_col",
]
non_numeric_columns = ["string_col"]
supported_columns = numeric_columns + non_numeric_columns

bf_full_result = (
scalars_df.groupby("bool_col")[supported_columns]
.describe(include="all")
.to_pandas()
)

pd_full_result = scalars_pandas_df.groupby("bool_col")[supported_columns].describe(
include="all"
)

for col in supported_columns:
pd_result = pd_full_result[col]
bf_result = bf_full_result[col]

if col in numeric_columns:
# Drop quartiles, as they are approximate
bf_min = bf_result["min"]
bf_p25 = bf_result["25%"]
bf_p50 = bf_result["50%"]
bf_p75 = bf_result["75%"]
bf_max = bf_result["max"]

# Reindex results with the specified keys and their order, because
# the relative order is not important.
bf_result = bf_result.reindex(
columns=["count", "mean", "std", "min", "max"]
)
pd_result = pd_result.reindex(
columns=["count", "mean", "std", "min", "max"]
)

# Double-check that quantiles are at least plausible.
assert (
(bf_min <= bf_p25)
& (bf_p25 <= bf_p50)
& (bf_p50 <= bf_p50)
& (bf_p75 <= bf_max)
).all()
else:
# Reindex results with the specified keys and their order, because
# the relative order is not important.
bf_result = bf_result.reindex(columns=["count", "nunique"])
pd_result = pd_result.reindex(columns=["count", "unique"])
pandas.testing.assert_frame_equal(
# BF counter part of "unique" is called "nunique"
pd_result.astype("Float64").rename(columns={"unique": "nunique"}),
bf_result,
check_dtype=False,
check_index_type=False,
)


def test_series_groupby_describe(scalars_dfs):
# TODO: supply a reason why this isn't compatible with pandas 1.x
pytest.importorskip("pandas", minversion="2.0.0")
scalars_df, scalars_pandas_df = scalars_dfs

numeric_columns = [
"int64_col",
"float64_col",
]
non_numeric_columns = ["string_col"]
supported_columns = numeric_columns + non_numeric_columns

bf_df = scalars_df.groupby("bool_col")

pd_df = scalars_pandas_df.groupby("bool_col")

for col in supported_columns:
pd_result = pd_df[col].describe(include="all")
bf_result = bf_df[col].describe(include="all").to_pandas()

if col in numeric_columns:
# Drop quartiles, as they are approximate
bf_min = bf_result["min"]
bf_p25 = bf_result["25%"]
bf_p50 = bf_result["50%"]
bf_p75 = bf_result["75%"]
bf_max = bf_result["max"]

# Reindex results with the specified keys and their order, because
# the relative order is not important.
bf_result = bf_result.reindex(
columns=["count", "mean", "std", "min", "max"]
)
pd_result = pd_result.reindex(
columns=["count", "mean", "std", "min", "max"]
)

# Double-check that quantiles are at least plausible.
assert (
(bf_min <= bf_p25)
& (bf_p25 <= bf_p50)
& (bf_p50 <= bf_p50)
& (bf_p75 <= bf_max)
).all()
else:
# Reindex results with the specified keys and their order, because
# the relative order is not important.
bf_result = bf_result.reindex(columns=["count", "nunique"])
pd_result = pd_result.reindex(columns=["count", "unique"])
pandas.testing.assert_frame_equal(
# BF counter part of "unique" is called "nunique"
pd_result.astype("Float64").rename(columns={"unique": "nunique"}),
bf_result,
check_dtype=False,
check_index_type=False,
)
Loading