Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions databricks/koalas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -4085,28 +4085,34 @@ def fillna(self, value=None, method=None, axis=None, inplace=False, limit=None):
raise NotImplementedError("fillna currently only works for axis=0 or axis='index'")
if not isinstance(value, (float, int, str, bool, dict, pd.Series)):
raise TypeError("Unsupported type %s" % type(value))
if limit is not None:
raise ValueError('limit parameter for value is not support now')
if isinstance(value, pd.Series):
value = value.to_dict()
if isinstance(value, dict):
for v in value.values():
if not isinstance(v, (float, int, str, bool)):
raise TypeError("Unsupported type %s" % type(v))
value = {k if isinstance(k, tuple) else (k,): v for k, v in value.items()}
value = {self._internal.column_name_for(k): v for k, v in value.items()
if k in self._internal.column_index}
if limit is not None:
raise ValueError('limit parameter for value is not support now')
sdf = self._sdf.fillna(value, subset=self._internal.data_columns)
kdf = DataFrame(self._internal.copy(
sdf=sdf,
column_scols=[scol_for(sdf, col) for col in self._internal.data_columns]))

def op(kser):
idx = kser._internal.column_index[0]
for k, v in value.items():
if k == idx[:len(k)]:
return kser.fillna(value=value[k], method=method, axis=axis,
inplace=False, limit=limit)
else:
return kser
else:
op = lambda kser: kser.fillna(value=value, method=method, axis=axis,
inplace=False, limit=limit)
elif method is not None:
op = lambda kser: kser.fillna(value=value, method=method, axis=axis,
inplace=False, limit=limit)
else:
if method is None:
raise ValueError("Must specify a fillna 'value' or 'method' parameter.")
raise ValueError("Must specify a fillna 'value' or 'method' parameter.")

kdf = self._apply_series_op(
lambda kser: kser.fillna(value=value, method=method, axis=axis,
inplace=False, limit=limit))
kdf = self._apply_series_op(op)
if inplace:
self._internal = kdf._internal
else:
Expand Down
12 changes: 11 additions & 1 deletion databricks/koalas/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
from datetime import datetime
from distutils.version import LooseVersion
import inspect
import sys

import numpy as np
import pandas as pd

from databricks import koalas as ks
from databricks.koalas.config import set_option, reset_option, option_context
from databricks.koalas.config import option_context
from databricks.koalas.testing.utils import ReusedSQLTestCase, SQLTestUtils
from databricks.koalas.exceptions import PandasNotImplementedError
from databricks.koalas.missing.frame import _MissingPandasLikeDataFrame
Expand Down Expand Up @@ -655,6 +656,15 @@ def test_fillna(self):
self.assert_eq(pdf.fillna(method='bfill'), kdf.fillna(method='bfill'))
self.assert_eq(pdf.fillna(method='bfill', limit=2), kdf.fillna(method='bfill', limit=2))

self.assert_eq(kdf.fillna({'x': -1}), pdf.fillna({'x': -1}))

if sys.version_info >= (3, 6):
# flaky in Python 3.5.
self.assert_eq(kdf.fillna({'x': -1, ('x', 'b'): -2}),
pdf.fillna({'x': -1, ('x', 'b'): -2}))
self.assert_eq(kdf.fillna({('x', 'b'): -2, 'x': -1}),
pdf.fillna({('x', 'b'): -2, 'x': -1}))

# check multi index
pdf = pdf.set_index([('x', 'a'), ('x', 'b')])
kdf = ks.from_pandas(pdf)
Expand Down