Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions bigframes/core/groupby/dataframe_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,76 @@ def var(
self._raise_on_non_numeric("var")
return self._aggregate_all(agg_ops.var_op, numeric_only=True)

def corr(
self,
*,
numeric_only: bool = False,
) -> df.DataFrame:
if not numeric_only:
self._raise_on_non_numeric("corr")
if len(self._selected_cols) > 30:
raise ValueError(
f"Cannot calculate corr on >30 columns, dataframe has {len(self._selected_cols)} selected columns."
)

labels = self._block._get_labels_for_columns(self._selected_cols)
block = self._block
aggregations = [
agg_expressions.BinaryAggregation(
agg_ops.CorrOp(), ex.deref(left_col), ex.deref(right_col)
)
for left_col in self._selected_cols
for right_col in self._selected_cols
]
# unique columns stops
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I don't fully understand this comment, but I can infer from the context below that we're doing something to guarantee uniqueness here?

uniq_orig_columns = utils.combine_indices(labels, pd.Index(range(len(labels))))
result_labels = utils.cross_indices(uniq_orig_columns, uniq_orig_columns)

block, _ = block.aggregate(
by_column_ids=self._by_col_ids,
aggregations=aggregations,
column_labels=result_labels,
)

block = block.stack(levels=labels.nlevels + 1)
# Drop the last level of each index, which was created to guarantee uniqueness
return df.DataFrame(block).droplevel(-1, axis=0).droplevel(-1, axis=1)

def cov(
self,
*,
numeric_only: bool = False,
) -> df.DataFrame:
if not numeric_only:
self._raise_on_non_numeric("cov")
if len(self._selected_cols) > 30:
raise ValueError(
f"Cannot calculate cov on >30 columns, dataframe has {len(self._selected_cols)} selected columns."
)

labels = self._block._get_labels_for_columns(self._selected_cols)
block = self._block
aggregations = [
agg_expressions.BinaryAggregation(
agg_ops.CovOp(), ex.deref(left_col), ex.deref(right_col)
)
for left_col in self._selected_cols
for right_col in self._selected_cols
]
# unique columns stops
uniq_orig_columns = utils.combine_indices(labels, pd.Index(range(len(labels))))
result_labels = utils.cross_indices(uniq_orig_columns, uniq_orig_columns)

block, _ = block.aggregate(
by_column_ids=self._by_col_ids,
aggregations=aggregations,
column_labels=result_labels,
)

block = block.stack(levels=labels.nlevels + 1)
# Drop the last level of each index, which was created to guarantee uniqueness
return df.DataFrame(block).droplevel(-1, axis=0).droplevel(-1, axis=1)

def skew(
self,
*,
Expand Down
20 changes: 20 additions & 0 deletions tests/system/small/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,26 @@ def test_dataframe_groupby_aggregate(
pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False)


def test_dataframe_groupby_corr(scalars_df_index, scalars_pandas_df_index):
col_names = ["int64_too", "float64_col", "int64_col", "bool_col"]
bf_result = scalars_df_index[col_names].groupby("bool_col").corr().to_pandas()
pd_result = scalars_pandas_df_index[col_names].groupby("bool_col").corr()

pd.testing.assert_frame_equal(
pd_result, bf_result, check_dtype=False, check_index_type=False
)


def test_dataframe_groupby_cov(scalars_df_index, scalars_pandas_df_index):
col_names = ["int64_too", "float64_col", "int64_col", "bool_col"]
bf_result = scalars_df_index[col_names].groupby("bool_col").cov().to_pandas()
pd_result = scalars_pandas_df_index[col_names].groupby("bool_col").cov()

pd.testing.assert_frame_equal(
pd_result, bf_result, check_dtype=False, check_index_type=False
)


@pytest.mark.parametrize(
("ordered"),
[
Expand Down
62 changes: 62 additions & 0 deletions third_party/bigframes_vendored/pandas/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1516,6 +1516,68 @@ def aggregate(self, func, **kwargs):
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def corr(
self,
*,
numeric_only: bool = False,
):
"""
Compute pairwise correlation of columns, excluding NA/null values.

**Examples:**


>>> df = bpd.DataFrame({'A': [1, 2, 3],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool to see that this works with the Polars session. :-)

... 'B': [400, 500, 600],
... 'C': [0.8, 0.4, 0.9]})
>>> df.corr(numeric_only=True)
A B C
A 1.0 1.0 0.188982
B 1.0 1.0 0.188982
C 0.188982 0.188982 1.0
<BLANKLINE>
[3 rows x 3 columns]

Args:
numeric_only(bool, default False):
Include only float, int, boolean, decimal data.

Returns:
bigframes.pandas.DataFrame: Correlation matrix.
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def cov(
self,
*,
numeric_only: bool = False,
):
"""
Compute pairwise covariance of columns, excluding NA/null values.

**Examples:**


>>> df = bpd.DataFrame({'A': [1, 2, 3],
... 'B': [400, 500, 600],
... 'C': [0.8, 0.4, 0.9]})
>>> df.cov(numeric_only=True)
A B C
A 1.0 100.0 0.05
B 100.0 10000.0 5.0
C 0.05 5.0 0.07
<BLANKLINE>
[3 rows x 3 columns]

Args:
numeric_only(bool, default False):
Include only float, int, boolean, decimal data.

Returns:
bigframes.pandas.DataFrame: The covariance matrix of the series of the DataFrame.
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def nunique(self):
"""
Return DataFrame with counts of unique elements in each position.
Expand Down