Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1780,15 +1780,19 @@ def pivot(
else:
return result_block.with_column_labels(columns_values)

def stack(self, how="left", levels: int = 1):
def stack(
self, how="left", levels: int = 1, *, override_labels: Optional[pd.Index] = None
):
"""Unpivot last column axis level into row axis"""
if levels == 0:
return self

# These are the values that will be turned into rows

col_labels, row_labels = utils.split_index(self.column_labels, levels=levels)
row_labels = row_labels.drop_duplicates()
row_labels = (
row_labels.drop_duplicates() if override_labels is None else override_labels
)

if col_labels is None:
result_index: pd.Index = pd.Index([None])
Expand Down
7 changes: 3 additions & 4 deletions bigframes/core/compile/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,18 @@

from typing import TYPE_CHECKING

from bigframes.core import rewrite
from bigframes.core.compile.ibis_compiler import ibis_compiler

if TYPE_CHECKING:
import bigframes.core.nodes


def test_only_ibis_inferred_schema(node: bigframes.core.nodes.BigFrameNode):
"""Use only for testing paths to ensure ibis inferred schema does not diverge from bigframes inferred schema."""
from bigframes.core.compile.ibis_compiler import ibis_compiler
import bigframes.core.rewrite
import bigframes.core.schema

node = ibis_compiler._replace_unsupported_ops(node)
node = rewrite.bake_order(node)
node = bigframes.core.rewrite.bake_order(node)
ir = ibis_compiler.compile_node(node)
items = tuple(
bigframes.core.schema.SchemaItem(name, ir.get_column_type(ibis_id))
Expand Down
14 changes: 14 additions & 0 deletions bigframes/core/groupby/dataframe_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,20 @@ def head(self, n: int = 5) -> df.DataFrame:
)
)

def describe(self, include: None | Literal["all"] = None):
from bigframes.pandas.core.methods import describe

return df.DataFrame(
describe._describe(
self._block,
self._selected_cols,
include,
as_index=self._as_index,
by_col_ids=self._by_col_ids,
dropna=self._dropna,
)
)

def size(self) -> typing.Union[df.DataFrame, series.Series]:
agg_block, _ = self._block.aggregate_size(
by_column_ids=self._by_col_ids,
Expand Down
14 changes: 14 additions & 0 deletions bigframes/core/groupby/series_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ def head(self, n: int = 5) -> series.Series:
)
)

def describe(self, include: None | Literal["all"] = None):
from bigframes.pandas.core.methods import describe

return df.DataFrame(
describe._describe(
self._block,
columns=[self._value_column],
include=include,
as_index=True,
by_col_ids=self._by_col_ids,
dropna=self._dropna,
)
).droplevel(level=0, axis=1)

def all(self) -> series.Series:
return self._aggregate(agg_ops.all_op)

Expand Down
4 changes: 0 additions & 4 deletions bigframes/core/rewrite/implicit_align.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@
from typing import cast, Optional, Sequence, Set, Tuple

import bigframes.core.expression
import bigframes.core.guid
import bigframes.core.identifiers
import bigframes.core.join_def
import bigframes.core.nodes
import bigframes.core.window_spec
import bigframes.operations.aggregations

# Combination of selects and additive nodes can be merged as an explicit keyless "row join"
ALIGNABLE_NODES = (
Expand Down
7 changes: 1 addition & 6 deletions bigframes/operations/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,7 @@ def name(self):
def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType:
if not dtypes.is_orderable(input_types[0]):
raise TypeError(f"Type {input_types[0]} is not orderable")
if pd.api.types.is_bool_dtype(input_types[0]) or pd.api.types.is_integer_dtype(
input_types[0]
):
return dtypes.FLOAT_DTYPE
else:
return input_types[0]
return input_types[0]


@dataclasses.dataclass(frozen=True)
Expand Down
185 changes: 90 additions & 95 deletions bigframes/pandas/core/methods/describe.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@

import typing

import pandas as pd

from bigframes import dataframe, dtypes, series
from bigframes.core.reshape import api as rs
from bigframes.core import agg_expressions, blocks
from bigframes.operations import aggregations

_DEFAULT_DTYPES = (
dtypes.NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE + dtypes.TEMPORAL_NUMERIC_BIGFRAMES_TYPES
)


def describe(
Expand All @@ -30,100 +37,88 @@ def describe(
elif not isinstance(input, dataframe.DataFrame):
raise TypeError(f"Unsupported type: {type(input)}")

block = input._block

describe_block = _describe(block, columns=block.value_columns, include=include)
# we override default stack behavior, because we want very specific ordering
stack_cols = pd.Index(
[
"count",
"nunique",
"top",
"freq",
"mean",
"std",
"min",
"25%",
"50%",
"75%",
"max",
]
).intersection(describe_block.column_labels.get_level_values(-1))
describe_block = describe_block.stack(override_labels=stack_cols)

return dataframe.DataFrame(describe_block).droplevel(level=0)


def _describe(
block: blocks.Block,
columns: typing.Sequence[str],
include: None | typing.Literal["all"] = None,
*,
as_index: bool = True,
by_col_ids: typing.Sequence[str] = [],
dropna: bool = False,
) -> blocks.Block:
stats: list[agg_expressions.Aggregation] = []
column_labels: list[typing.Hashable] = []

# include=None behaves like include='all' if no numeric columns present
if include is None:
numeric_df = _select_dtypes(
input,
dtypes.NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE
+ dtypes.TEMPORAL_NUMERIC_BIGFRAMES_TYPES,
)
if len(numeric_df.columns) == 0:
# Describe eligible non-numeric columns
return _describe_non_numeric(input)

# Otherwise, only describe numeric columns
return _describe_numeric(input)

elif include == "all":
numeric_result = _describe_numeric(input)
non_numeric_result = _describe_non_numeric(input)

if len(numeric_result.columns) == 0:
return non_numeric_result
elif len(non_numeric_result.columns) == 0:
return numeric_result
else:
# Use reindex after join to preserve the original column order.
return rs.concat(
[non_numeric_result, numeric_result], axis=1
)._reindex_columns(input.columns)

else:
raise ValueError(f"Unsupported include type: {include}")


def _describe_numeric(df: dataframe.DataFrame) -> dataframe.DataFrame:
number_df_result = typing.cast(
dataframe.DataFrame,
_select_dtypes(df, dtypes.NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE).agg(
[
"count",
"mean",
"std",
"min",
"25%",
"50%",
"75%",
"max",
]
),
)
temporal_df_result = typing.cast(
dataframe.DataFrame,
_select_dtypes(df, dtypes.TEMPORAL_NUMERIC_BIGFRAMES_TYPES).agg(["count"]),
if not any(
block.expr.get_column_type(col) in _DEFAULT_DTYPES for col in columns
):
include = "all"

for col_id in columns:
label = block.col_id_to_label[col_id]
dtype = block.expr.get_column_type(col_id)
if include != "all" and dtype not in _DEFAULT_DTYPES:
continue
agg_ops = _get_aggs_for_dtype(dtype)
stats.extend(op.as_expr(col_id) for op in agg_ops)
label_tuple = (label,) if block.column_labels.nlevels == 1 else label
column_labels.extend((*label_tuple, op.name) for op in agg_ops) # type: ignore

agg_block, _ = block.aggregate(
by_column_ids=by_col_ids,
aggregations=stats,
dropna=dropna,
column_labels=pd.Index(column_labels, name=(*block.column_labels.names, None)),
)

if len(number_df_result.columns) == 0:
return temporal_df_result
elif len(temporal_df_result.columns) == 0:
return number_df_result
return agg_block if as_index else agg_block.reset_index(drop=False)


def _get_aggs_for_dtype(dtype) -> list[aggregations.UnaryAggregateOp]:
if dtype in dtypes.NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE:
return [
aggregations.count_op,
aggregations.mean_op,
aggregations.std_op,
aggregations.min_op,
aggregations.ApproxQuartilesOp(1),
aggregations.ApproxQuartilesOp(2),
aggregations.ApproxQuartilesOp(3),
aggregations.max_op,
]
elif dtype in dtypes.TEMPORAL_NUMERIC_BIGFRAMES_TYPES:
return [aggregations.count_op]
elif dtype in [
dtypes.STRING_DTYPE,
dtypes.BOOL_DTYPE,
dtypes.BYTES_DTYPE,
dtypes.TIME_DTYPE,
]:
return [aggregations.count_op, aggregations.nunique_op]
else:
import bigframes.core.reshape.api as rs

original_columns = _select_dtypes(
df,
dtypes.NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE
+ dtypes.TEMPORAL_NUMERIC_BIGFRAMES_TYPES,
).columns

# Use reindex after join to preserve the original column order.
return rs.concat(
[number_df_result, temporal_df_result],
axis=1,
)._reindex_columns(original_columns)


def _describe_non_numeric(df: dataframe.DataFrame) -> dataframe.DataFrame:
return typing.cast(
dataframe.DataFrame,
_select_dtypes(
df,
[
dtypes.STRING_DTYPE,
dtypes.BOOL_DTYPE,
dtypes.BYTES_DTYPE,
dtypes.TIME_DTYPE,
],
).agg(["count", "nunique"]),
)


def _select_dtypes(
df: dataframe.DataFrame, dtypes: typing.Sequence[dtypes.Dtype]
) -> dataframe.DataFrame:
"""Selects columns without considering inheritance relationships."""
columns = [
col_id
for col_id, dtype in zip(df._block.value_columns, df._block.dtypes)
if dtype in dtypes
]
return dataframe.DataFrame(df._block.select_columns(columns))
return []
Loading