Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 219 additions & 2 deletions databricks/koalas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from pandas.core.dtypes.inference import is_sequence
from pyspark import sql as spark
from pyspark.sql import functions as F, Column
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import (BooleanType, ByteType, DecimalType, DoubleType, FloatType,
IntegerType, LongType, NumericType, ShortType)
from pyspark.sql.utils import AnalysisException
Expand All @@ -52,7 +52,7 @@
from databricks.koalas.missing.frame import _MissingPandasLikeDataFrame
from databricks.koalas.ml import corr
from databricks.koalas.utils import column_index_level, scol_for
from databricks.koalas.typedef import as_spark_type, as_python_type
from databricks.koalas.typedef import _infer_return_type, as_spark_type, as_python_type
from databricks.koalas.plot import KoalasFramePlotMethods
from databricks.koalas.config import get_option

Expand Down Expand Up @@ -6810,6 +6810,223 @@ def filter(self, items=None, like=None, regex=None, axis=None):
else:
raise TypeError("Must pass either `items`, `like`, or `regex`")

def rename(self,
mapper=None,
index=None,
columns=None,
axis='index',
inplace=False,
level=None,
errors='ignore'):

"""
Alter axes labels.
Function / dict values must be unique (1-to-1). Labels not contained in a dict / Series
will be left as-is. Extra labels listed don’t throw an error.

Parameters
----------
mapper : dict-like or function
Dict-like or functions transformations to apply to that axis’ values.
Use either `mapper` and `axis` to specify the axis to target with `mapper`, or `index`
and `columns`.
index : dict-like or function
Alternative to specifying axis ("mapper, axis=0" is equivalent to "index=mapper").
columns : dict-like or function
Alternative to specifying axis ("mapper, axis=1" is equivalent to "columns=mapper").
axis : int or str, default 'index'
Axis to target with mapper. Can be either the axis name ('index', 'columns') or
number (0, 1).
inplace : bool, default False
Whether to return a new DataFrame. If True then value of copy is ignored.
level : int or level name, default None
In case of a MultiIndex, only rename labels in the specified level.
errors : {'ignore', 'raise}, default 'ignore'
If 'raise', raise a `KeyError` when a dict-like `mapper`, `index`, or `columns`
contains labels that are not present in the Index being transformed. If 'ignore',
existing keys will be renamed and extra keys will be ignored.

Returns
-------
DataFrame with the renamed axis labels.

Raises:
-------
`KeyError`
If any of the labels is not found in the selected axis and "errors='raise'".

Examples
--------
>>> kdf1 = ks.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
>>> kdf1.rename(columns={"A": "a", "B": "c"}) # doctest: +NORMALIZE_WHITESPACE
a c
0 1 4
1 2 5
2 3 6

>>> kdf1.rename(index={1: 10, 2: 20}) # doctest: +NORMALIZE_WHITESPACE
A B
0 1 4
10 2 5
20 3 6

>>> def str_lower(s) -> str:
... return str.lower(s)
>>> kdf1.rename(str_lower, axis='columns') # doctest: +NORMALIZE_WHITESPACE
a b
0 1 4
1 2 5
2 3 6

>>> kdf1.rename(lambda x: x*10, axis='index') # doctest: +NORMALIZE_WHITESPACE
A B
0 1 4
10 2 5
20 3 6

>>> idx = pd.MultiIndex.from_tuples([('X', 'A'), ('X', 'B'), ('Y', 'C'), ('Y', 'D')])
>>> kdf2 = ks.DataFrame([[1, 2, 3, 4], [5, 6, 7, 8]], columns=idx)
>>> kdf2.rename(columns=str_lower, level=0) # doctest: +NORMALIZE_WHITESPACE
x y
A B C D
0 1 2 3 4
1 5 6 7 8

>>> kdf3 = ks.DataFrame([[1, 2], [3, 4], [5, 6], [7, 8]], index=idx, columns=list('ab'))
>>> kdf3.rename(index=str_lower) # doctest: +NORMALIZE_WHITESPACE
a b
x a 1 2
b 3 4
y c 5 6
d 7 8
"""

def gen_mapper_fn(mapper):
if isinstance(mapper, dict):
if len(mapper) == 0:
if errors == 'raise':
raise KeyError('Index include label which is not in the `mapper`.')
else:
return DataFrame(self._internal)

type_set = set(map(lambda x: type(x), mapper.values()))
if len(type_set) > 1:
raise ValueError("Mapper dict should have the same value type.")
spark_return_type = as_spark_type(list(type_set)[0])

def mapper_fn(x):
if x in mapper:
return mapper[x]
else:
if errors == 'raise':
raise KeyError('Index include value which is not in the `mapper`')
return x
elif callable(mapper):
spark_return_type = _infer_return_type(mapper).tpe

def mapper_fn(x):
return mapper(x)
else:
raise ValueError("`mapper` or `index` or `columns` should be "
"either dict-like or function type.")
return mapper_fn, spark_return_type

index_mapper_fn = None
index_mapper_ret_stype = None
columns_mapper_fn = None
columns_mapper_ret_stype = None

if mapper:
if axis == 'index' or axis == 0:
index_mapper_fn, index_mapper_ret_stype = gen_mapper_fn(mapper)
elif axis == 'columns' or axis == 1:
columns_mapper_fn, columns_mapper_ret_stype = gen_mapper_fn(mapper)
else:
raise ValueError("argument axis should be either the axis name "
"(‘index’, ‘columns’) or number (0, 1)")
else:
if index:
index_mapper_fn, index_mapper_ret_stype = gen_mapper_fn(index)
if columns:
columns_mapper_fn, _ = gen_mapper_fn(columns)

if not index and not columns:
raise ValueError("Either `index` or `columns` should be provided.")

if inplace:
raise RuntimeError("Koalas dataframe rename method do not support in-place operation.")

internal = self._internal
if index_mapper_fn:
# rename index labels, if `level` is None, rename all index columns, otherwise only
# rename the corresponding level index.
# implement this by transform the underlying spark dataframe,
# Example:
# suppose the kdf index column in underlying spark dataframe is "index_0", "index_1",
# if rename level 0 index labels, will do:
# ``kdf._sdf.withColumn("index_0", mapper_fn_udf(col("index_0"))``
# if rename all index labels (`level` is None), then will do:
# ```
# kdf._sdf.withColumn("index_0", mapper_fn_udf(col("index_0"))
# .withColumn("index_1", mapper_fn_udf(col("index_1"))
# ```

index_columns = internal.index_columns
num_indices = len(index_columns)
if level:
if level < 0 or level >= num_indices:
raise ValueError("level should be an integer between [0, num_indices)")

def gen_new_index_column(level):
index_col_name = index_columns[level]

index_mapper_udf = pandas_udf(lambda s: s.map(index_mapper_fn),
returnType=index_mapper_ret_stype)
return index_mapper_udf(scol_for(internal.sdf, index_col_name))

sdf = internal.sdf
if level is None:
for i in range(num_indices):
sdf = sdf.withColumn(index_columns[i], gen_new_index_column(i))
else:
sdf = sdf.withColumn(index_columns[level], gen_new_index_column(level))
internal = internal.copy(sdf=sdf)
if columns_mapper_fn:
# rename column name.
# Will modify the `_internal._column_index` and transform underlying spark dataframe
# to the same column name with `_internal._column_index`.
if level:
if level < 0 or level >= internal.column_index_level:
raise ValueError("level should be an integer between [0, column_index_level)")

def gen_new_column_index_entry(column_index_entry):
if isinstance(column_index_entry, tuple):
if level is None:
# rename all level columns
return tuple(map(columns_mapper_fn, column_index_entry))
else:
# only rename specified level column
entry_list = list(column_index_entry)
entry_list[level] = columns_mapper_fn(entry_list[level])
return tuple(entry_list)
else:
return columns_mapper_fn(column_index_entry)

new_column_index = list(map(gen_new_column_index_entry, internal.column_index))

if internal.column_index_level == 1:
new_data_columns = [col[0] for col in new_column_index]
else:
new_data_columns = [str(col) for col in new_column_index]
new_data_scols = [scol_for(internal.sdf, old_col_name).alias(new_col_name)
for old_col_name, new_col_name
in zip(internal.data_columns, new_data_columns)]
sdf = internal.sdf.select(*(internal.index_scols + new_data_scols))
# Q: Should we set `column_index_names` for new internal dataframe ?
internal = internal.copy(sdf=sdf, column_index=new_column_index,
data_columns=new_data_columns)
return DataFrame(internal)

def _get_from_multiindex_column(self, key):
""" Select columns from multi-index columns.

Expand Down
1 change: 0 additions & 1 deletion databricks/koalas/missing/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ class _MissingPandasLikeDataFrame(object):
quantile = unsupported_function('quantile')
query = unsupported_function('query')
reindex_like = unsupported_function('reindex_like')
rename = unsupported_function('rename')
rename_axis = unsupported_function('rename_axis')
reorder_levels = unsupported_function('reorder_levels')
resample = unsupported_function('resample')
Expand Down
65 changes: 65 additions & 0 deletions databricks/koalas/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,71 @@ def test_rename_columns(self):
self.assert_eq(kdf._internal.data_columns, ["('A', '0')", "('B', 1)"])
self.assert_eq(kdf._internal.spark_df.columns, ["('A', '0')", "('B', 1)"])

def test_rename_dataframe(self):
kdf1 = ks.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
result_kdf = kdf1.rename(columns={"A": "a", "B": "b"})
self.assert_eq(result_kdf.columns, pd.Index(['a', 'b']))

result_kdf = kdf1.rename(index={1: 10, 2: 20})
self.assert_eq(result_kdf.index, pd.Index([0, 10, 20]))

def str_lower(s) -> str:
return str.lower(s)

result_kdf = kdf1.rename(str_lower, axis='columns')
self.assert_eq(result_kdf.columns, pd.Index(['a', 'b']))

def mul10(x) -> int:
return x * 10

result_kdf = kdf1.rename(mul10, axis='index')
self.assert_eq(result_kdf.index, pd.Index([0, 10, 20]))

result_kdf = kdf1.rename(columns=str_lower, index={1: 10, 2: 20})
self.assert_eq(result_kdf.columns, pd.Index(['a', 'b']))
self.assert_eq(result_kdf.index, pd.Index([0, 10, 20]))

idx = pd.MultiIndex.from_tuples([('X', 'A'), ('X', 'B'), ('Y', 'C'), ('Y', 'D')])
kdf2 = ks.DataFrame([[1, 2, 3, 4], [5, 6, 7, 8]], columns=idx)

result_kdf = kdf2.rename(columns=str_lower)
self.assert_eq(result_kdf.columns,
pd.MultiIndex.from_tuples([('x', 'a'), ('x', 'b'), ('y', 'c'), ('y', 'd')]))

result_kdf = kdf2.rename(columns=str_lower, level=0)
self.assert_eq(result_kdf.columns,
pd.MultiIndex.from_tuples([('x', 'A'), ('x', 'B'), ('y', 'C'), ('y', 'D')]))

result_kdf = kdf2.rename(columns=str_lower, level=1)
self.assert_eq(result_kdf.columns,
pd.MultiIndex.from_tuples([('X', 'a'), ('X', 'b'), ('Y', 'c'), ('Y', 'd')]))

kdf3 = ks.DataFrame([[1, 2], [3, 4], [5, 6], [7, 8]], index=idx, columns=list('ab'))

result_kdf = kdf3.rename(index=str_lower)
self.assert_eq(result_kdf.index,
pd.Index([{'__index_level_0__': 'x', '__index_level_1__': 'a'},
{'__index_level_0__': 'x', '__index_level_1__': 'b'},
{'__index_level_0__': 'y', '__index_level_1__': 'c'},
{'__index_level_0__': 'y', '__index_level_1__': 'd'}],
dtype='object'))

result_kdf = kdf3.rename(index=str_lower, level=0)
self.assert_eq(result_kdf.index,
pd.Index([{'__index_level_0__': 'x', '__index_level_1__': 'A'},
{'__index_level_0__': 'x', '__index_level_1__': 'B'},
{'__index_level_0__': 'y', '__index_level_1__': 'C'},
{'__index_level_0__': 'y', '__index_level_1__': 'D'}],
dtype='object'))

result_kdf = kdf3.rename(index=str_lower, level=1)
self.assert_eq(result_kdf.index,
pd.Index([{'__index_level_0__': 'X', '__index_level_1__': 'a'},
{'__index_level_0__': 'X', '__index_level_1__': 'b'},
{'__index_level_0__': 'Y', '__index_level_1__': 'c'},
{'__index_level_0__': 'Y', '__index_level_1__': 'd'}],
dtype='object'))

def test_dot_in_column_name(self):
self.assert_eq(
ks.DataFrame(ks.range(1)._sdf.selectExpr("1 as `a.b`"))['a.b'],
Expand Down