Skip to content

Commit cf533d2

Browse files
committed
Exclude Index columns for exposed Spark DataFrame and disallow Koalas DataFrame with no index
1 parent f0f1859 commit cf533d2

File tree

7 files changed

+75
-61
lines changed

7 files changed

+75
-61
lines changed

databricks/koalas/frame.py

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1598,9 +1598,7 @@ def index(self):
15981598
Index
15991599
"""
16001600
from databricks.koalas.indexes import Index, MultiIndex
1601-
if len(self._internal.index_map) == 0:
1602-
return None
1603-
elif len(self._internal.index_map) == 1:
1601+
if len(self._internal.index_map) == 1:
16041602
return Index(self)
16051603
else:
16061604
return MultiIndex(self)
@@ -1860,9 +1858,6 @@ class max type
18601858
lion mammal 80.5 run
18611859
monkey mammal NaN jump
18621860
"""
1863-
if len(self._internal.index_map) == 0:
1864-
raise NotImplementedError('Can\'t reset index because there is no index.')
1865-
18661861
multi_index = len(self._internal.index_map) > 1
18671862

18681863
def rename(index):
@@ -2396,13 +2391,13 @@ def to_koalas(self):
23962391
23972392
>>> spark_df = df.to_spark()
23982393
>>> spark_df
2399-
DataFrame[__index_level_0__: bigint, col1: bigint, col2: bigint]
2394+
DataFrame[col1: bigint, col2: bigint]
24002395
24012396
>>> kdf = spark_df.to_koalas()
24022397
>>> kdf
2403-
__index_level_0__ col1 col2
2404-
0 0 1 3
2405-
1 1 2 4
2398+
col1 col2
2399+
0 1 3
2400+
1 2 4
24062401
24072402
Calling to_koalas on a Koalas DataFrame simply returns itself.
24082403
@@ -2507,8 +2502,8 @@ def to_table(self, name: str, format: Optional[str] = None, mode: str = 'error',
25072502
25082503
>>> df.to_table('%s.my_table' % db, partition_cols='date')
25092504
"""
2510-
self._sdf.write.saveAsTable(name=name, format=format, mode=mode,
2511-
partitionBy=partition_cols, options=options)
2505+
self.to_spark().write.saveAsTable(name=name, format=format, mode=mode,
2506+
partitionBy=partition_cols, options=options)
25122507

25132508
def to_delta(self, path: str, mode: str = 'error',
25142509
partition_cols: Union[str, List[str], None] = None, **options):
@@ -2618,8 +2613,8 @@ def to_parquet(self, path: str, mode: str = 'error',
26182613
... mode = 'overwrite',
26192614
... partition_cols=['date', 'country'])
26202615
"""
2621-
self._sdf.write.parquet(path=path, mode=mode, partitionBy=partition_cols,
2622-
compression=compression)
2616+
self.to_spark().write.parquet(
2617+
path=path, mode=mode, partitionBy=partition_cols, compression=compression)
26232618

26242619
def to_spark_io(self, path: Optional[str] = None, format: Optional[str] = None,
26252620
mode: str = 'error', partition_cols: Union[str, List[str], None] = None,
@@ -2671,13 +2666,16 @@ def to_spark_io(self, path: Optional[str] = None, format: Optional[str] = None,
26712666
26722667
>>> df.to_spark_io(path='%s/to_spark_io/foo.json' % path, format='json')
26732668
"""
2674-
self._sdf.write.save(path=path, format=format, mode=mode, partitionBy=partition_cols,
2675-
options=options)
2669+
self.to_spark().write.save(
2670+
path=path, format=format, mode=mode, partitionBy=partition_cols, options=options)
26762671

26772672
def to_spark(self):
26782673
"""
26792674
Return the current DataFrame as a Spark DataFrame.
26802675
2676+
.. note:: Index information is lost. So, if the index columns are not present in
2677+
actual columns, they are lost.
2678+
26812679
See Also
26822680
--------
26832681
DataFrame.to_koalas
@@ -3667,14 +3665,21 @@ def pivot_table(self, values=None, index=None, columns=None,
36673665
sdf = sdf.fillna(fill_value)
36683666

36693667
if index is not None:
3670-
return DataFrame(sdf).set_index(index)
3668+
data_columns = [column for column in sdf.columns if column not in index]
3669+
index_map = [(column, column) for column in index]
3670+
internal = _InternalFrame(sdf=sdf, data_columns=data_columns, index_map=index_map)
3671+
return DataFrame(internal)
36713672
else:
36723673
if isinstance(values, list):
36733674
index_values = values[-1]
36743675
else:
36753676
index_values = values
36763677

3677-
return DataFrame(sdf.withColumn(columns, F.lit(index_values))).set_index(columns)
3678+
sdf = sdf.withColumn(columns, F.lit(index_values))
3679+
data_columns = [column for column in sdf.columns if column not in columns]
3680+
index_map = [(column, column) for column in columns]
3681+
internal = _InternalFrame(sdf=sdf, data_columns=data_columns, index_map=index_map)
3682+
return DataFrame(internal)
36783683

36793684
def pivot(self, index=None, columns=None, values=None):
36803685
"""
@@ -4378,9 +4383,6 @@ def sort_index(self, axis: int = 0,
43784383
a 1 2 1
43794384
b 1 0 3
43804385
"""
4381-
if len(self._internal.index_map) == 0:
4382-
raise ValueError("Index should be set.")
4383-
43844386
if axis != 0:
43854387
raise ValueError("No other axes than 0 are supported at the moment")
43864388
if kind is not None:

databricks/koalas/indexing.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import pandas as pd
2323
from pandas.api.types import is_list_like
24+
25+
from databricks.koalas.internal import _InternalFrame
2426
from pyspark import sql as spark
2527
from pyspark.sql import functions as F
2628
from pyspark.sql.types import BooleanType
@@ -444,15 +446,17 @@ def raiseNotImplemented(description):
444446
columns = [_make_col(c) for c in cols_sel]
445447

446448
try:
447-
kdf = DataFrame(sdf.select(self._kdf._internal.index_scols + columns))
449+
sdf = sdf.select(self._kdf._internal.index_scols + columns)
450+
index_columns = self._kdf._internal.index_columns
451+
data_columns = [column for column in sdf.columns if column not in index_columns]
452+
internal = _InternalFrame(
453+
sdf=sdf, data_columns=data_columns,
454+
index_map=self._kdf._internal.index_map, column_index=column_index)
455+
kdf = DataFrame(internal)
448456
except AnalysisException:
449457
raise KeyError('[{}] don\'t exist in columns'
450458
.format([col._jc.toString() for col in columns]))
451459

452-
kdf._internal = kdf._internal.copy(
453-
data_columns=kdf._internal.data_columns[-len(columns):],
454-
index_map=self._kdf._internal.index_map,
455-
column_index=column_index)
456460
if cols_sel is not None and isinstance(cols_sel, spark.Column):
457461
from databricks.koalas.series import _col
458462
return _col(kdf)
@@ -680,7 +684,12 @@ def raiseNotImplemented(description):
680684
"listlike of integers, boolean array] types, got {}".format(cols_sel))
681685

682686
try:
683-
kdf = DataFrame(sdf.select(self._kdf._internal.index_scols + columns))
687+
sdf = sdf.select(self._kdf._internal.index_scols + columns)
688+
index_columns = self._kdf._internal.index_columns
689+
data_columns = [column for column in sdf.columns if column not in index_columns]
690+
internal = _InternalFrame(
691+
sdf=sdf, data_columns=data_columns, index_map=self._kdf._internal.index_map)
692+
kdf = DataFrame(internal)
684693
except AnalysisException:
685694
raise KeyError('[{}] don\'t exist in columns'
686695
.format([col._jc.toString() for col in columns]))
@@ -692,10 +701,7 @@ def raiseNotImplemented(description):
692701
else:
693702
column_index = pd.MultiIndex.from_tuples(column_index)[cols_sel].tolist()
694703

695-
kdf._internal = kdf._internal.copy(
696-
data_columns=kdf._internal.data_columns[-len(columns):],
697-
index_map=self._kdf._internal.index_map,
698-
column_index=column_index)
704+
kdf = DataFrame(kdf._internal.copy(column_index=column_index))
699705
if cols_sel is not None and isinstance(cols_sel, (Series, int)):
700706
from databricks.koalas.series import _col
701707
return _col(kdf)

databricks/koalas/internal.py

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class _InternalFrame(object):
7373
However, all columns including index column are also stored in Spark DataFrame internally
7474
as below.
7575
76-
>>> kdf.to_spark().show() # doctest: +NORMALIZE_WHITESPACE
76+
>>> kdf._internal.spark_internal_df.show() # doctest: +NORMALIZE_WHITESPACE
7777
+-----------------+---+---+---+---+---+
7878
|__index_level_0__| A| B| C| D| E|
7979
+-----------------+---+---+---+---+---+
@@ -122,7 +122,7 @@ class _InternalFrame(object):
122122
[None]
123123
>>> internal.index_map
124124
[('__index_level_0__', None)]
125-
>>> internal.spark_df.show() # doctest: +NORMALIZE_WHITESPACE
125+
>>> internal.spark_internal_df.show() # doctest: +NORMALIZE_WHITESPACE
126126
+-----------------+---+---+---+---+---+
127127
|__index_level_0__| A| B| C| D| E|
128128
+-----------------+---+---+---+---+---+
@@ -149,7 +149,7 @@ class _InternalFrame(object):
149149
3 7 11 15 19
150150
4 8 12 16 20
151151
152-
>>> kdf1.to_spark().show() # doctest: +NORMALIZE_WHITESPACE
152+
>>> kdf1._internal.spark_internal_df.show() # doctest: +NORMALIZE_WHITESPACE
153153
+---+---+---+---+---+
154154
| A| B| C| D| E|
155155
+---+---+---+---+---+
@@ -179,7 +179,7 @@ class _InternalFrame(object):
179179
['A']
180180
>>> internal.index_map
181181
[('A', 'A')]
182-
>>> internal.spark_df.show() # doctest: +NORMALIZE_WHITESPACE
182+
>>> internal.spark_internal_df.show() # doctest: +NORMALIZE_WHITESPACE
183183
+---+---+---+---+---+
184184
| A| B| C| D| E|
185185
+---+---+---+---+---+
@@ -207,7 +207,7 @@ class _InternalFrame(object):
207207
2 3 7 11 15 19
208208
3 4 8 12 16 20
209209
210-
>>> kdf2.to_spark().show() # doctest: +NORMALIZE_WHITESPACE
210+
>>> kdf2._internal.spark_internal_df.show() # doctest: +NORMALIZE_WHITESPACE
211211
+-----------------+---+---+---+---+---+
212212
|__index_level_0__| A| B| C| D| E|
213213
+-----------------+---+---+---+---+---+
@@ -237,7 +237,7 @@ class _InternalFrame(object):
237237
[None, 'A']
238238
>>> internal.index_map
239239
[('__index_level_0__', None), ('A', 'A')]
240-
>>> internal.spark_df.show() # doctest: +NORMALIZE_WHITESPACE
240+
>>> internal.spark_internal_df.show() # doctest: +NORMALIZE_WHITESPACE
241241
+-----------------+---+---+---+---+---+
242242
|__index_level_0__| A| B| C| D| E|
243243
+-----------------+---+---+---+---+---+
@@ -322,7 +322,7 @@ class _InternalFrame(object):
322322
['A']
323323
>>> internal.index_map
324324
[('A', 'A')]
325-
>>> internal.spark_df.show() # doctest: +NORMALIZE_WHITESPACE
325+
>>> internal.spark_internal_df.show() # doctest: +NORMALIZE_WHITESPACE
326326
+---+---+
327327
| A| B|
328328
+---+---+
@@ -367,21 +367,20 @@ def __init__(self, sdf: spark.DataFrame,
367367
# Here is when Koalas DataFrame is created directly from Spark DataFrame.
368368
assert column_index is None
369369
assert column_index_names is None
370+
assert "__index_level_0__" not in sdf.schema.names
371+
# Create default index.
372+
index_map = [('__index_level_0__', None)]
373+
sdf = _InternalFrame.attach_default_index(sdf)
370374

371-
if "__index_level_0__" not in sdf.schema.names:
372-
# Create default index.
373-
index_map = [('__index_level_0__', None)]
374-
sdf = _InternalFrame.attach_default_index(sdf)
375-
376-
assert index_map is None \
377-
or all(isinstance(index_field, str)
375+
assert index_map is not None
376+
assert all(isinstance(index_field, str)
378377
and (index_name is None or isinstance(index_name, str))
379378
for index_field, index_name in index_map)
380379
assert scol is None or isinstance(scol, spark.Column)
381380
assert data_columns is None or all(isinstance(col, str) for col in data_columns)
382381

383382
self._sdf = sdf # type: spark.DataFrame
384-
self._index_map = (index_map if index_map is not None else []) # type: List[IndexMap]
383+
self._index_map = index_map # type: List[IndexMap]
385384
self._scol = scol # type: Optional[spark.Column]
386385
if scol is not None:
387386
self._data_columns = sdf.select(scol).columns
@@ -559,15 +558,23 @@ def column_index_names(self) -> Optional[List[str]]:
559558
""" Return names of the index levels. """
560559
return self._column_index_names
561560

561+
@lazy_property
562+
def spark_internal_df(self) -> spark.DataFrame:
563+
"""
564+
Return as Spark DataFrame. This contains index columns as well
565+
and should be only used for internal purposes.
566+
"""
567+
return self._sdf.select(self.scols)
568+
562569
@lazy_property
563570
def spark_df(self) -> spark.DataFrame:
564571
""" Return as Spark DataFrame. """
565-
return self._sdf.select(self.scols)
572+
return self._sdf.select(self.data_scols)
566573

567574
@lazy_property
568575
def pandas_df(self):
569576
""" Return as pandas DataFrame. """
570-
sdf = self.spark_df
577+
sdf = self.spark_internal_df
571578
pdf = sdf.toPandas()
572579
if len(pdf) == 0 and len(sdf.schema) > 0:
573580
pdf = pdf.astype({field.name: to_arrow_type(field.dataType).to_pandas_dtype()

databricks/koalas/namespace.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ def read_delta(path: str, version: Optional[str] = None, timestamp: Optional[str
273273
Examples
274274
--------
275275
>>> ks.range(1).to_delta('%s/read_delta/foo' % path)
276-
>>> ks.read_delta('%s/read_delta/foo' % path) # doctest: +SKIP
276+
>>> ks.read_delta('%s/read_delta/foo' % path)
277277
id
278278
0 0
279279
"""
@@ -307,7 +307,7 @@ def read_table(name: str) -> DataFrame:
307307
Examples
308308
--------
309309
>>> ks.range(1).to_table('%s.my_table' % db)
310-
>>> ks.read_table('%s.my_table' % db) # doctest: +SKIP
310+
>>> ks.read_table('%s.my_table' % db)
311311
id
312312
0 0
313313
"""

databricks/koalas/series.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1000,7 +1000,7 @@ def to_frame(self, name=None) -> spark.DataFrame:
10001000
2 c
10011001
"""
10021002
renamed = self.rename(name)
1003-
sdf = renamed._internal.spark_df
1003+
sdf = renamed._internal.spark_internal_df
10041004
internal = _InternalFrame(sdf=sdf,
10051005
data_columns=[sdf.schema[-1].name],
10061006
index_map=renamed._internal.index_map)

databricks/koalas/sql.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ def sql(query: str, globals=None, locals=None, **kwargs) -> DataFrame:
9292
9393
>>> mydf = ks.range(10)
9494
>>> x = range(4)
95-
>>> ks.sql("SELECT * from {mydf} WHERE id IN {x}") # doctest: +SKIP
95+
>>> ks.sql("SELECT * from {mydf} WHERE id IN {x}")
9696
id
9797
0 0
9898
1 1
@@ -105,9 +105,9 @@ def sql(query: str, globals=None, locals=None, **kwargs) -> DataFrame:
105105
... mydf2 = ks.DataFrame({"x": range(2)})
106106
... return ks.sql("SELECT * from {mydf2}")
107107
>>> statement()
108-
__index_level_0__ x
109-
0 0 0
110-
1 1 1
108+
x
109+
0 0
110+
1 1
111111
112112
Mixing Koalas and pandas DataFrames in a join operation. Note that the index is dropped.
113113

databricks/koalas/tests/test_ops_on_diff_frames.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
# limitations under the License.
1515
#
1616
import os
17-
import unittest
1817

1918
import pandas as pd
2019

@@ -104,10 +103,10 @@ def kdf5(self):
104103
def kdf6(self):
105104
return ks.from_pandas(self.pdf6)
106105

107-
@unittest.skip('FIXME: Now that we can handle this case?')
108106
def test_no_index(self):
109-
with self.assertRaisesRegex(AssertionError, "cannot join with no overlapping index name"):
110-
ks.range(10) + ks.range(10)
107+
self.assertEqual(
108+
ks.range(10) + ks.range(10),
109+
ks.DataFrame({'id': list(range(10))}) + ks.DataFrame({'id': list(range(10))}))
111110

112111
def test_no_matched_index(self):
113112
with self.assertRaisesRegex(ValueError, "Index names must be exactly matched"):
@@ -257,8 +256,8 @@ def test_multi_index_assignment_series(self):
257256

258257
kdf = ks.from_pandas(self.pdf5)
259258
pdf = self.pdf5.copy()
260-
kdf['x'] = self.kdf6.e
261-
pdf['x'] = self.pdf6.e
259+
kdf['e'] = self.kdf6.e
260+
pdf['e'] = self.pdf6.e
262261

263262
self.assert_eq(kdf.sort_index(), pdf.sort_index())
264263

0 commit comments

Comments
 (0)