Skip to content

REF: reindex_indexer up-front to simplify JoinUnit #43384

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Sep 8, 2021
116 changes: 44 additions & 72 deletions pandas/core/internals/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@

if TYPE_CHECKING:
from pandas import Index
from pandas.core.internals.blocks import Block


def _concatenate_array_managers(
Expand Down Expand Up @@ -300,76 +301,62 @@ def _get_mgr_concatenation_plan(mgr: BlockManager, indexers: dict[int, np.ndarra
mgr_shape_list[ax] = len(indexer)
mgr_shape = tuple(mgr_shape_list)

has_column_indexer = False
assert 0 not in indexers

if 0 in indexers:
has_column_indexer = True
ax0_indexer = indexers.pop(0)
blknos = algos.take_nd(mgr.blknos, ax0_indexer, fill_value=-1)
blklocs = algos.take_nd(mgr.blklocs, ax0_indexer, fill_value=-1)
else:

if mgr.is_single_block:
blk = mgr.blocks[0]
return [(blk.mgr_locs, JoinUnit(blk, mgr_shape, indexers))]
if mgr.is_single_block:
blk = mgr.blocks[0]
return [(blk.mgr_locs, JoinUnit(blk, mgr_shape, indexers))]

blknos = mgr.blknos
blklocs = mgr.blklocs
blknos = mgr.blknos
blklocs = mgr.blklocs

plan = []
for blkno, placements in libinternals.get_blkno_placements(blknos, group=False):

assert placements.is_slice_like
assert blkno != -1

join_unit_indexers = indexers.copy()

shape_list = list(mgr_shape)
shape_list[0] = len(placements)
shape = tuple(shape_list)

if blkno == -1:
# only reachable in the `0 in indexers` case
unit = JoinUnit(None, shape)
else:
blk = mgr.blocks[blkno]
ax0_blk_indexer = blklocs[placements.indexer]

unit_no_ax0_reindexing = (
len(placements) == len(blk.mgr_locs)
and
# Fastpath detection of join unit not
# needing to reindex its block: no ax0
# reindexing took place and block
# placement was sequential before.
(
(
not has_column_indexer
and blk.mgr_locs.is_slice_like
and blk.mgr_locs.as_slice.step == 1
)
or
# Slow-ish detection: all indexer locs
# are sequential (and length match is
# checked above).
(np.diff(ax0_blk_indexer) == 1).all()
)
blk = mgr.blocks[blkno]
ax0_blk_indexer = blklocs[placements.indexer]

unit_no_ax0_reindexing = (
len(placements) == len(blk.mgr_locs)
and
# Fastpath detection of join unit not
# needing to reindex its block: no ax0
# reindexing took place and block
# placement was sequential before.
(
(blk.mgr_locs.is_slice_like and blk.mgr_locs.as_slice.step == 1)
or
# Slow-ish detection: all indexer locs
# are sequential (and length match is
# checked above).
(np.diff(ax0_blk_indexer) == 1).all()
)
)

# Omit indexer if no item reindexing is required.
if unit_no_ax0_reindexing:
join_unit_indexers.pop(0, None)
else:
join_unit_indexers[0] = ax0_blk_indexer
# Omit indexer if no item reindexing is required.
if unit_no_ax0_reindexing:
join_unit_indexers.pop(0, None)
else:
join_unit_indexers[0] = ax0_blk_indexer

unit = JoinUnit(blk, shape, join_unit_indexers)
unit = JoinUnit(blk, shape, join_unit_indexers)

plan.append((placements, unit))

return plan


class JoinUnit:
def __init__(self, block, shape: Shape, indexers=None):
def __init__(self, block: Block, shape: Shape, indexers=None):
# Passing shape explicitly is required for cases when block is None.
# Note: block is None implies indexers is None, but not vice-versa
if indexers is None:
Expand All @@ -393,7 +380,7 @@ def needs_filling(self) -> bool:
@cache_readonly
def dtype(self):
blk = self.block
if blk is None:
if blk.values.dtype.kind == "V":
raise AssertionError("Block is None, no dtype")

if not self.needs_filling:
Expand All @@ -407,8 +394,6 @@ def _is_valid_na_for(self, dtype: DtypeObj) -> bool:
"""
if not self.is_na:
return False
if self.block is None:
return True
if self.block.dtype.kind == "V":
return True

Expand All @@ -435,8 +420,6 @@ def _is_valid_na_for(self, dtype: DtypeObj) -> bool:
@cache_readonly
def is_na(self) -> bool:
blk = self.block
if blk is None:
return True
if blk.dtype.kind == "V":
return True

Expand Down Expand Up @@ -464,6 +447,8 @@ def is_na(self) -> bool:
return all(isna_all(row) for row in values)

def get_reindexed_values(self, empty_dtype: DtypeObj, upcasted_na) -> ArrayLike:
values: ArrayLike

if upcasted_na is None and self.block.dtype.kind != "V":
# No upcasting is necessary
fill_value = self.block.fill_value
Expand All @@ -472,9 +457,8 @@ def get_reindexed_values(self, empty_dtype: DtypeObj, upcasted_na) -> ArrayLike:
fill_value = upcasted_na

if self._is_valid_na_for(empty_dtype):
# note: always holds when self.block is None
# or self.block.dtype.kind == "V"
blk_dtype = getattr(self.block, "dtype", None)
# note: always holds when self.block.dtype.kind == "V"
blk_dtype = self.block.dtype

if blk_dtype == np.dtype("object"):
# we want to avoid filling with np.nan if we are
Expand Down Expand Up @@ -551,9 +535,7 @@ def _concatenate_join_units(

empty_dtype = _get_empty_dtype(join_units)

has_none_blocks = any(
unit.block is None or unit.block.dtype.kind == "V" for unit in join_units
)
has_none_blocks = any(unit.block.dtype.kind == "V" for unit in join_units)
upcasted_na = _dtype_to_na_value(empty_dtype, has_none_blocks)

to_concat = [
Expand Down Expand Up @@ -629,28 +611,18 @@ def _get_empty_dtype(join_units: Sequence[JoinUnit]) -> DtypeObj:
"""
if len(join_units) == 1:
blk = join_units[0].block
if blk is None:
return np.dtype(np.float64)
return blk.dtype

if _is_uniform_reindex(join_units):
# FIXME: integrate property
empty_dtype = join_units[0].block.dtype
return empty_dtype

has_none_blocks = any(
unit.block is None or unit.block.dtype.kind == "V" for unit in join_units
)
has_none_blocks = any(unit.block.dtype.kind == "V" for unit in join_units)

dtypes = [
unit.dtype for unit in join_units if unit.block is not None and not unit.is_na
]
dtypes = [unit.dtype for unit in join_units if not unit.is_na]
if not len(dtypes):
dtypes = [
unit.dtype
for unit in join_units
if unit.block is not None and unit.block.dtype.kind != "V"
]
dtypes = [unit.dtype for unit in join_units if unit.block.dtype.kind != "V"]

dtype = find_common_type(dtypes)
if has_none_blocks:
Expand All @@ -666,7 +638,7 @@ def _is_uniform_join_units(join_units: list[JoinUnit]) -> bool:

"""
first = join_units[0].block
if first is None or first.dtype.kind == "V":
if first.dtype.kind == "V":
return False
return (
# exclude cases where a) ju.block is None or b) we have e.g. Int64+int64
Expand Down Expand Up @@ -696,7 +668,7 @@ def _is_uniform_join_units(join_units: list[JoinUnit]) -> bool:
def _is_uniform_reindex(join_units) -> bool:
return (
# TODO: should this be ju.block._can_hold_na?
all(ju.block and ju.block.is_extension for ju in join_units)
all(ju.block.is_extension for ju in join_units)
and len({ju.block.dtype.name for ju in join_units}) == 1
)

Expand Down