Skip to content

Commit 6f3c98e

Browse files
committed
Exclude Index columns for exposed Spark DataFrame and disallow Koalas DataFrame with no index
1 parent c87a849 commit 6f3c98e

File tree

9 files changed

+106
-70
lines changed

9 files changed

+106
-70
lines changed

databricks/koalas/frame.py

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1598,9 +1598,7 @@ def index(self):
15981598
Index
15991599
"""
16001600
from databricks.koalas.indexes import Index, MultiIndex
1601-
if len(self._internal.index_map) == 0:
1602-
return None
1603-
elif len(self._internal.index_map) == 1:
1601+
if len(self._internal.index_map) == 1:
16041602
return Index(self)
16051603
else:
16061604
return MultiIndex(self)
@@ -1860,9 +1858,6 @@ class max type
18601858
lion mammal 80.5 run
18611859
monkey mammal NaN jump
18621860
"""
1863-
if len(self._internal.index_map) == 0:
1864-
raise NotImplementedError('Can\'t reset index because there is no index.')
1865-
18661861
multi_index = len(self._internal.index_map) > 1
18671862

18681863
def rename(index):
@@ -1877,7 +1872,10 @@ def rename(index):
18771872
if level is None:
18781873
new_index_map = [(column, name if name is not None else rename(i))
18791874
for i, (column, name) in enumerate(self._internal.index_map)]
1880-
index_map = []
1875+
# TODO: this will end up with multiple columns that point the same index column.
1876+
# For instance, if we do df.reset_index(), the new column `index` points the
1877+
# same '__index_level_0__' internally. We should have new column.
1878+
index_map = [('__index_level_0__', None)]
18811879
else:
18821880
if isinstance(level, (int, str)):
18831881
level = [level]
@@ -2382,13 +2380,13 @@ def to_koalas(self):
23822380
23832381
>>> spark_df = df.to_spark()
23842382
>>> spark_df
2385-
DataFrame[__index_level_0__: bigint, col1: bigint, col2: bigint]
2383+
DataFrame[col1: bigint, col2: bigint]
23862384
23872385
>>> kdf = spark_df.to_koalas()
23882386
>>> kdf
2389-
__index_level_0__ col1 col2
2390-
0 0 1 3
2391-
1 1 2 4
2387+
col1 col2
2388+
0 1 3
2389+
1 2 4
23922390
23932391
Calling to_koalas on a Koalas DataFrame simply returns itself.
23942392
@@ -2493,8 +2491,8 @@ def to_table(self, name: str, format: Optional[str] = None, mode: str = 'error',
24932491
24942492
>>> df.to_table('%s.my_table' % db, partition_cols='date')
24952493
"""
2496-
self._sdf.write.saveAsTable(name=name, format=format, mode=mode,
2497-
partitionBy=partition_cols, options=options)
2494+
self.to_spark().write.saveAsTable(name=name, format=format, mode=mode,
2495+
partitionBy=partition_cols, options=options)
24982496

24992497
def to_delta(self, path: str, mode: str = 'error',
25002498
partition_cols: Union[str, List[str], None] = None, **options):
@@ -2604,8 +2602,8 @@ def to_parquet(self, path: str, mode: str = 'error',
26042602
... mode = 'overwrite',
26052603
... partition_cols=['date', 'country'])
26062604
"""
2607-
self._sdf.write.parquet(path=path, mode=mode, partitionBy=partition_cols,
2608-
compression=compression)
2605+
self.to_spark().write.parquet(
2606+
path=path, mode=mode, partitionBy=partition_cols, compression=compression)
26092607

26102608
def to_spark_io(self, path: Optional[str] = None, format: Optional[str] = None,
26112609
mode: str = 'error', partition_cols: Union[str, List[str], None] = None,
@@ -2657,13 +2655,16 @@ def to_spark_io(self, path: Optional[str] = None, format: Optional[str] = None,
26572655
26582656
>>> df.to_spark_io(path='%s/to_spark_io/foo.json' % path, format='json')
26592657
"""
2660-
self._sdf.write.save(path=path, format=format, mode=mode, partitionBy=partition_cols,
2661-
options=options)
2658+
self.to_spark().write.save(
2659+
path=path, format=format, mode=mode, partitionBy=partition_cols, options=options)
26622660

26632661
def to_spark(self):
26642662
"""
26652663
Return the current DataFrame as a Spark DataFrame.
26662664
2665+
.. note:: Index information is lost. So, if the index columns are not present in
2666+
actual columns, they are lost.
2667+
26672668
See Also
26682669
--------
26692670
DataFrame.to_koalas
@@ -3653,14 +3654,21 @@ def pivot_table(self, values=None, index=None, columns=None,
36533654
sdf = sdf.fillna(fill_value)
36543655

36553656
if index is not None:
3656-
return DataFrame(sdf).set_index(index)
3657+
data_columns = [column for column in sdf.columns if column not in index]
3658+
index_map = [(column, column) for column in index]
3659+
internal = _InternalFrame(sdf=sdf, data_columns=data_columns, index_map=index_map)
3660+
return DataFrame(internal)
36573661
else:
36583662
if isinstance(values, list):
36593663
index_values = values[-1]
36603664
else:
36613665
index_values = values
36623666

3663-
return DataFrame(sdf.withColumn(columns, F.lit(index_values))).set_index(columns)
3667+
sdf = sdf.withColumn(columns, F.lit(index_values))
3668+
data_columns = [column for column in sdf.columns if column not in columns]
3669+
index_map = [(column, column) for column in columns]
3670+
internal = _InternalFrame(sdf=sdf, data_columns=data_columns, index_map=index_map)
3671+
return DataFrame(internal)
36643672

36653673
def pivot(self, index=None, columns=None, values=None):
36663674
"""
@@ -4364,9 +4372,6 @@ def sort_index(self, axis: int = 0,
43644372
a 1 2 1
43654373
b 1 0 3
43664374
"""
4367-
if len(self._internal.index_map) == 0:
4368-
raise ValueError("Index should be set.")
4369-
43704375
if axis != 0:
43714376
raise ValueError("No other axes than 0 are supported at the moment")
43724377
if kind is not None:

databricks/koalas/indexing.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from pyspark.sql.types import BooleanType
2727
from pyspark.sql.utils import AnalysisException
2828

29+
from databricks.koalas.internal import _InternalFrame
2930
from databricks.koalas.exceptions import SparkPandasIndexingError, SparkPandasNotImplementedError
3031
from databricks.koalas.utils import column_index_level
3132

@@ -450,15 +451,17 @@ def raiseNotImplemented(description):
450451
raise KeyError("['{}'] not in index".format(key))
451452

452453
try:
453-
kdf = DataFrame(sdf.select(self._kdf._internal.index_scols + columns))
454+
sdf = sdf.select(self._kdf._internal.index_scols + columns)
455+
index_columns = self._kdf._internal.index_columns
456+
data_columns = [column for column in sdf.columns if column not in index_columns]
457+
internal = _InternalFrame(
458+
sdf=sdf, data_columns=data_columns,
459+
index_map=self._kdf._internal.index_map, column_index=column_index)
460+
kdf = DataFrame(internal)
454461
except AnalysisException:
455462
raise KeyError('[{}] don\'t exist in columns'
456463
.format([col._jc.toString() for col in columns]))
457464

458-
kdf._internal = kdf._internal.copy(
459-
data_columns=kdf._internal.data_columns[-len(columns):],
460-
index_map=self._kdf._internal.index_map,
461-
column_index=column_index)
462465
if cols_sel is not None and isinstance(cols_sel, spark.Column):
463466
from databricks.koalas.series import _col
464467
return _col(kdf)
@@ -686,7 +689,12 @@ def raiseNotImplemented(description):
686689
"listlike of integers, boolean array] types, got {}".format(cols_sel))
687690

688691
try:
689-
kdf = DataFrame(sdf.select(self._kdf._internal.index_scols + columns))
692+
sdf = sdf.select(self._kdf._internal.index_scols + columns)
693+
index_columns = self._kdf._internal.index_columns
694+
data_columns = [column for column in sdf.columns if column not in index_columns]
695+
internal = _InternalFrame(
696+
sdf=sdf, data_columns=data_columns, index_map=self._kdf._internal.index_map)
697+
kdf = DataFrame(internal)
690698
except AnalysisException:
691699
raise KeyError('[{}] don\'t exist in columns'
692700
.format([col._jc.toString() for col in columns]))
@@ -699,10 +707,7 @@ def raiseNotImplemented(description):
699707
column_index = \
700708
pd.MultiIndex.from_tuples(self._kdf._internal.column_index)[cols_sel].tolist()
701709

702-
kdf._internal = kdf._internal.copy(
703-
data_columns=kdf._internal.data_columns[-len(columns):],
704-
index_map=self._kdf._internal.index_map,
705-
column_index=column_index)
710+
kdf = DataFrame(kdf._internal.copy(column_index=column_index))
706711
if cols_sel is not None and isinstance(cols_sel, (Series, int)):
707712
from databricks.koalas.series import _col
708713
return _col(kdf)

databricks/koalas/internal.py

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class _InternalFrame(object):
7373
However, all columns including index column are also stored in Spark DataFrame internally
7474
as below.
7575
76-
>>> kdf.to_spark().show() # doctest: +NORMALIZE_WHITESPACE
76+
>>> kdf._internal.spark_internal_df.show() # doctest: +NORMALIZE_WHITESPACE
7777
+-----------------+---+---+---+---+---+
7878
|__index_level_0__| A| B| C| D| E|
7979
+-----------------+---+---+---+---+---+
@@ -122,7 +122,7 @@ class _InternalFrame(object):
122122
[None]
123123
>>> internal.index_map
124124
[('__index_level_0__', None)]
125-
>>> internal.spark_df.show() # doctest: +NORMALIZE_WHITESPACE
125+
>>> internal.spark_internal_df.show() # doctest: +NORMALIZE_WHITESPACE
126126
+-----------------+---+---+---+---+---+
127127
|__index_level_0__| A| B| C| D| E|
128128
+-----------------+---+---+---+---+---+
@@ -149,7 +149,7 @@ class _InternalFrame(object):
149149
3 7 11 15 19
150150
4 8 12 16 20
151151
152-
>>> kdf1.to_spark().show() # doctest: +NORMALIZE_WHITESPACE
152+
>>> kdf1._internal.spark_internal_df.show() # doctest: +NORMALIZE_WHITESPACE
153153
+---+---+---+---+---+
154154
| A| B| C| D| E|
155155
+---+---+---+---+---+
@@ -179,7 +179,7 @@ class _InternalFrame(object):
179179
['A']
180180
>>> internal.index_map
181181
[('A', 'A')]
182-
>>> internal.spark_df.show() # doctest: +NORMALIZE_WHITESPACE
182+
>>> internal.spark_internal_df.show() # doctest: +NORMALIZE_WHITESPACE
183183
+---+---+---+---+---+
184184
| A| B| C| D| E|
185185
+---+---+---+---+---+
@@ -207,7 +207,7 @@ class _InternalFrame(object):
207207
2 3 7 11 15 19
208208
3 4 8 12 16 20
209209
210-
>>> kdf2.to_spark().show() # doctest: +NORMALIZE_WHITESPACE
210+
>>> kdf2._internal.spark_internal_df.show() # doctest: +NORMALIZE_WHITESPACE
211211
+-----------------+---+---+---+---+---+
212212
|__index_level_0__| A| B| C| D| E|
213213
+-----------------+---+---+---+---+---+
@@ -237,7 +237,7 @@ class _InternalFrame(object):
237237
[None, 'A']
238238
>>> internal.index_map
239239
[('__index_level_0__', None), ('A', 'A')]
240-
>>> internal.spark_df.show() # doctest: +NORMALIZE_WHITESPACE
240+
>>> internal.spark_internal_df.show() # doctest: +NORMALIZE_WHITESPACE
241241
+-----------------+---+---+---+---+---+
242242
|__index_level_0__| A| B| C| D| E|
243243
+-----------------+---+---+---+---+---+
@@ -322,7 +322,7 @@ class _InternalFrame(object):
322322
['A']
323323
>>> internal.index_map
324324
[('A', 'A')]
325-
>>> internal.spark_df.show() # doctest: +NORMALIZE_WHITESPACE
325+
>>> internal.spark_internal_df.show() # doctest: +NORMALIZE_WHITESPACE
326326
+---+---+
327327
| A| B|
328328
+---+---+
@@ -367,21 +367,20 @@ def __init__(self, sdf: spark.DataFrame,
367367
# Here is when Koalas DataFrame is created directly from Spark DataFrame.
368368
assert column_index is None
369369
assert column_index_names is None
370+
assert "__index_level_0__" not in sdf.schema.names
371+
# Create default index.
372+
index_map = [('__index_level_0__', None)]
373+
sdf = _InternalFrame.attach_default_index(sdf)
370374

371-
if "__index_level_0__" not in sdf.schema.names:
372-
# Create default index.
373-
index_map = [('__index_level_0__', None)]
374-
sdf = _InternalFrame.attach_default_index(sdf)
375-
376-
assert index_map is None \
377-
or all(isinstance(index_field, str)
375+
assert index_map is not None
376+
assert all(isinstance(index_field, str)
378377
and (index_name is None or isinstance(index_name, str))
379378
for index_field, index_name in index_map)
380379
assert scol is None or isinstance(scol, spark.Column)
381380
assert data_columns is None or all(isinstance(col, str) for col in data_columns)
382381

383382
self._sdf = sdf # type: spark.DataFrame
384-
self._index_map = (index_map if index_map is not None else []) # type: List[IndexMap]
383+
self._index_map = index_map # type: List[IndexMap]
385384
self._scol = scol # type: Optional[spark.Column]
386385
if scol is not None:
387386
self._data_columns = sdf.select(scol).columns
@@ -541,6 +540,7 @@ def scols(self) -> List[spark.Column]:
541540
@property
542541
def index_map(self) -> List[IndexMap]:
543542
""" Return the managed index information. """
543+
assert len(self._index_map) > 0
544544
return self._index_map
545545

546546
@lazy_property
@@ -568,6 +568,23 @@ def column_index_names(self) -> Optional[List[str]]:
568568
""" Return names of the index levels. """
569569
return self._column_index_names
570570

571+
@lazy_property
572+
def spark_internal_df(self) -> spark.DataFrame:
573+
"""
574+
Return as Spark DataFrame. This contains index columns as well
575+
and should be only used for internal purposes.
576+
"""
577+
index_columns = set(self.index_columns)
578+
data_columns = []
579+
for column, idx in zip(self._data_columns, self.column_index):
580+
if column not in index_columns:
581+
scol = self.scol_for(column)
582+
name = str(idx) if len(idx) > 1 else idx[0]
583+
if column != name:
584+
scol = scol.alias(name)
585+
data_columns.append(scol)
586+
return self._sdf.select(self.index_scols + data_columns)
587+
571588
@lazy_property
572589
def spark_df(self) -> spark.DataFrame:
573590
""" Return as Spark DataFrame. """
@@ -580,12 +597,12 @@ def spark_df(self) -> spark.DataFrame:
580597
if column != name:
581598
scol = scol.alias(name)
582599
data_columns.append(scol)
583-
return self._sdf.select(self.index_scols + data_columns)
600+
return self._sdf.select(data_columns)
584601

585602
@lazy_property
586603
def pandas_df(self):
587604
""" Return as pandas DataFrame. """
588-
sdf = self.spark_df
605+
sdf = self.spark_internal_df
589606
pdf = sdf.toPandas()
590607
if len(pdf) == 0 and len(sdf.schema) > 0:
591608
pdf = pdf.astype({field.name: to_arrow_type(field.dataType).to_pandas_dtype()

databricks/koalas/namespace.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ def read_delta(path: str, version: Optional[str] = None, timestamp: Optional[str
273273
Examples
274274
--------
275275
>>> ks.range(1).to_delta('%s/read_delta/foo' % path)
276-
>>> ks.read_delta('%s/read_delta/foo' % path) # doctest: +SKIP
276+
>>> ks.read_delta('%s/read_delta/foo' % path)
277277
id
278278
0 0
279279
"""
@@ -307,7 +307,7 @@ def read_table(name: str) -> DataFrame:
307307
Examples
308308
--------
309309
>>> ks.range(1).to_table('%s.my_table' % db)
310-
>>> ks.read_table('%s.my_table' % db) # doctest: +SKIP
310+
>>> ks.read_table('%s.my_table' % db)
311311
id
312312
0 0
313313
"""

databricks/koalas/series.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1000,7 +1000,7 @@ def to_frame(self, name=None) -> spark.DataFrame:
10001000
2 c
10011001
"""
10021002
renamed = self.rename(name)
1003-
sdf = renamed._internal.spark_df
1003+
sdf = renamed._internal.spark_internal_df
10041004
internal = _InternalFrame(sdf=sdf,
10051005
data_columns=[sdf.schema[-1].name],
10061006
index_map=renamed._internal.index_map)

databricks/koalas/sql.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ def sql(query: str, globals=None, locals=None, **kwargs) -> DataFrame:
9292
9393
>>> mydf = ks.range(10)
9494
>>> x = range(4)
95-
>>> ks.sql("SELECT * from {mydf} WHERE id IN {x}") # doctest: +SKIP
95+
>>> ks.sql("SELECT * from {mydf} WHERE id IN {x}")
9696
id
9797
0 0
9898
1 1
@@ -105,9 +105,9 @@ def sql(query: str, globals=None, locals=None, **kwargs) -> DataFrame:
105105
... mydf2 = ks.DataFrame({"x": range(2)})
106106
... return ks.sql("SELECT * from {mydf2}")
107107
>>> statement()
108-
__index_level_0__ x
109-
0 0 0
110-
1 1 1
108+
x
109+
0 0
110+
1 1
111111
112112
Mixing Koalas and pandas DataFrames in a join operation. Note that the index is dropped.
113113

0 commit comments

Comments
 (0)