Skip to content

Commit 3636b3b

Browse files
authored
Randomize index in tests and fix some window-like functions. (#1151)
1 parent 490d339 commit 3636b3b

File tree

9 files changed

+346
-277
lines changed

9 files changed

+346
-277
lines changed

databricks/koalas/base.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131

3232
from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
3333
from databricks.koalas import numpy_compat
34-
from databricks.koalas.internal import _InternalFrame, SPARK_INDEX_NAME_FORMAT
34+
from databricks.koalas.internal import (_InternalFrame, NATURAL_ORDER_COLUMN_NAME,
35+
SPARK_INDEX_NAME_FORMAT)
3536
from databricks.koalas.typedef import pandas_wraps, spark_type_to_pandas_dtype
3637
from databricks.koalas.utils import align_diff_series, scol_for, validate_axis
3738
from databricks.koalas.frame import DataFrame
@@ -790,7 +791,7 @@ def _shift(self, periods, fill_value, part_cols=()):
790791
raise ValueError('periods should be an int; however, got [%s]' % type(periods))
791792

792793
col = self._scol
793-
window = Window.partitionBy(*part_cols).orderBy(self._internal.index_scols)\
794+
window = Window.partitionBy(*part_cols).orderBy(NATURAL_ORDER_COLUMN_NAME) \
794795
.rowsBetween(-periods, -periods)
795796
lag_col = F.lag(col, periods).over(window)
796797
col = F.when(lag_col.isNull() | F.isnan(lag_col), fill_value).otherwise(lag_col)

databricks/koalas/frame.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3185,13 +3185,14 @@ def duplicated(self, subset=None, keep='first'):
31853185
ord_func = spark.functions.asc
31863186
else:
31873187
ord_func = spark.functions.desc
3188-
window = Window.partitionBy(group_cols).orderBy(ord_func(index_column)).rowsBetween(
3189-
Window.unboundedPreceding, Window.currentRow)
3188+
window = Window.partitionBy(group_cols) \
3189+
.orderBy(ord_func(NATURAL_ORDER_COLUMN_NAME)) \
3190+
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
31903191
sdf = sdf.withColumn(column, F.row_number().over(window) > 1)
31913192
elif not keep:
3192-
window = Window.partitionBy(group_cols).orderBy(scol_for(sdf, index_column).desc())\
3193+
window = Window.partitionBy(group_cols) \
31933194
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
3194-
sdf = sdf.withColumn(column, F.count(scol_for(sdf, index_column)).over(window) > 1)
3195+
sdf = sdf.withColumn(column, F.count('*').over(window) > 1)
31953196
else:
31963197
raise ValueError("'keep' only support 'first', 'last' and False")
31973198
sdf = sdf.select(scol_for(sdf, index_column), scol_for(sdf, column))
@@ -7853,8 +7854,8 @@ def pct_change(self, periods=1):
78537854
1980-02-01 NaN NaN NaN
78547855
1980-03-01 0.067912 0.073814 0.06883
78557856
"""
7856-
sdf = self._sdf.drop(NATURAL_ORDER_COLUMN_NAME)
7857-
window = Window.orderBy(self._internal.index_columns).rowsBetween(-periods, -periods)
7857+
sdf = self._sdf
7858+
window = Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(-periods, -periods)
78587859

78597860
for column_name in self._internal.data_columns:
78607861
prev_row = F.lag(F.col(column_name), periods).over(window)

databricks/koalas/groupby.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2055,9 +2055,10 @@ def nsmallest(self, n=5):
20552055
groupkeys = self._groupkeys
20562056
sdf = self._kdf._sdf
20572057
name = self._agg_columns[0]._internal.data_columns[0]
2058-
window = Window.partitionBy([s._scol for s in groupkeys]).orderBy(F.col(name))
2058+
window = Window.partitionBy([s._scol for s in groupkeys]) \
2059+
.orderBy(scol_for(sdf, name), NATURAL_ORDER_COLUMN_NAME)
20592060
sdf = sdf.withColumn('rank', F.row_number().over(window)).filter(F.col('rank') <= n)
2060-
internal = _InternalFrame(sdf=sdf,
2061+
internal = _InternalFrame(sdf=sdf.drop(NATURAL_ORDER_COLUMN_NAME),
20612062
index_map=([(s._internal.data_columns[0],
20622063
s._internal.column_index[0])
20632064
for s in self._groupkeys]
@@ -2101,9 +2102,10 @@ def nlargest(self, n=5):
21012102
groupkeys = self._groupkeys
21022103
sdf = self._kdf._sdf
21032104
name = self._agg_columns[0]._internal.data_columns[0]
2104-
window = Window.partitionBy([s._scol for s in groupkeys]).orderBy(F.col(name).desc())
2105+
window = Window.partitionBy([s._scol for s in groupkeys]) \
2106+
.orderBy(F.col(name).desc(), NATURAL_ORDER_COLUMN_NAME)
21052107
sdf = sdf.withColumn('rank', F.row_number().over(window)).filter(F.col('rank') <= n)
2106-
internal = _InternalFrame(sdf=sdf,
2108+
internal = _InternalFrame(sdf=sdf.drop(NATURAL_ORDER_COLUMN_NAME),
21072109
index_map=([(s._internal.data_columns[0],
21082110
s._internal.column_index[0])
21092111
for s in self._groupkeys]

databricks/koalas/series.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1460,7 +1460,7 @@ def _fillna(self, value=None, method=None, axis=None, inplace=False, limit=None,
14601460
else:
14611461
end = Window.unboundedFollowing
14621462

1463-
window = Window.partitionBy(*part_cols).orderBy(self._internal.index_scols)\
1463+
window = Window.partitionBy(*part_cols).orderBy(NATURAL_ORDER_COLUMN_NAME) \
14641464
.rowsBetween(begin, end)
14651465
scol = F.when(scol.isNull(), func(scol, True).over(window)).otherwise(scol)
14661466
kseries = self._with_new_scol(scol).rename(column_name)
@@ -2850,20 +2850,17 @@ def _rank(self, method='average', ascending=True, part_cols=()):
28502850
raise ValueError('rank do not support index now')
28512851

28522852
if ascending:
2853-
asc_func = spark.functions.asc
2853+
asc_func = lambda scol: scol.asc()
28542854
else:
2855-
asc_func = spark.functions.desc
2856-
2857-
index_column = self._internal.index_columns[0]
2858-
column_name = self._internal.data_columns[0]
2855+
asc_func = lambda scol: scol.desc()
28592856

28602857
if method == 'first':
28612858
window = Window.orderBy(
2862-
asc_func(column_name), asc_func(index_column)
2859+
asc_func(self._internal.scol), asc_func(F.col(NATURAL_ORDER_COLUMN_NAME))
28632860
).partitionBy(*part_cols).rowsBetween(Window.unboundedPreceding, Window.currentRow)
28642861
scol = F.row_number().over(window)
28652862
elif method == 'dense':
2866-
window = Window.orderBy(asc_func(column_name)).partitionBy(*part_cols) \
2863+
window = Window.orderBy(asc_func(self._internal.scol)).partitionBy(*part_cols) \
28672864
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
28682865
scol = F.dense_rank().over(window)
28692866
else:
@@ -2874,10 +2871,10 @@ def _rank(self, method='average', ascending=True, part_cols=()):
28742871
elif method == 'max':
28752872
stat_func = F.max
28762873
window1 = Window.orderBy(
2877-
asc_func(column_name)
2874+
asc_func(self._internal.scol)
28782875
).partitionBy(*part_cols).rowsBetween(Window.unboundedPreceding, Window.currentRow)
28792876
window2 = Window.partitionBy(
2880-
*[column_name] + list(part_cols)
2877+
[self._internal.scol] + list(part_cols)
28812878
).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
28822879
scol = stat_func(F.row_number().over(window1)).over(window2)
28832880
kser = self._with_new_scol(scol).rename(self.name)
@@ -2959,7 +2956,7 @@ def diff(self, periods=1):
29592956
def _diff(self, periods, part_cols=()):
29602957
if not isinstance(periods, int):
29612958
raise ValueError('periods should be an int; however, got [%s]' % type(periods))
2962-
window = Window.partitionBy(*part_cols).orderBy(self._internal.index_scols)\
2959+
window = Window.partitionBy(*part_cols).orderBy(NATURAL_ORDER_COLUMN_NAME) \
29632960
.rowsBetween(-periods, -periods)
29642961
scol = self._scol - F.lag(self._scol, periods).over(window)
29652962
return self._with_new_scol(scol).rename(self.name)

0 commit comments

Comments
 (0)