From 3d84ddb05ad5f25aaea4d6400b8d6502fb98843e Mon Sep 17 00:00:00 2001 From: Brock Date: Thu, 2 Sep 2021 21:48:09 -0700 Subject: [PATCH 1/6] REF: reindex_indexer use np.void to avoid JoinUnit --- pandas/core/internals/concat.py | 44 +++++++++++++++++++++++++++---- pandas/core/internals/managers.py | 31 +++++++++++++++++++--- 2 files changed, 67 insertions(+), 8 deletions(-) diff --git a/pandas/core/internals/concat.py b/pandas/core/internals/concat.py index 6b41d7a26080d..224641a6ea40c 100644 --- a/pandas/core/internals/concat.py +++ b/pandas/core/internals/concat.py @@ -198,6 +198,27 @@ def concatenate_managers( if isinstance(mgrs_indexers[0][0], ArrayManager): return _concatenate_array_managers(mgrs_indexers, axes, concat_axis, copy) + new_mgrs_indexers = [] + for mgr, indexers in mgrs_indexers: + # We only reindex for axis=0 (i.e. columns), as this can be done cheaply + if 0 in indexers: + new_mgr = mgr.reindex_indexer( + axes[0], + indexers[0], + axis=0, + copy=False, + only_slice=True, + allow_dups=True, + use_na_proxy=True, + ) + new_indexers = indexers.copy() + del new_indexers[0] + new_mgrs_indexers.append((new_mgr, new_indexers)) + else: + new_mgrs_indexers.append((mgr, indexers)) + + mgrs_indexers = new_mgrs_indexers + concat_plans = [ _get_mgr_concatenation_plan(mgr, indexers) for mgr, indexers in mgrs_indexers ] @@ -375,6 +396,8 @@ def _is_valid_na_for(self, dtype: DtypeObj) -> bool: return False if self.block is None: return True + if self.block.dtype.kind == "V": + return True if self.dtype == object: values = self.block.values @@ -401,6 +424,8 @@ def is_na(self) -> bool: blk = self.block if blk is None: return True + if blk.dtype.kind == "V": + return True if not blk._can_hold_na: return False @@ -426,7 +451,7 @@ def is_na(self) -> bool: return all(isna_all(row) for row in values) def get_reindexed_values(self, empty_dtype: DtypeObj, upcasted_na) -> ArrayLike: - if upcasted_na is None: + if upcasted_na is None and self.block.dtype.kind != "V": # No upcasting is necessary fill_value = self.block.fill_value values = self.block.get_values() @@ -435,6 +460,7 @@ def get_reindexed_values(self, empty_dtype: DtypeObj, upcasted_na) -> ArrayLike: if self._is_valid_na_for(empty_dtype): # note: always holds when self.block is None + # or self.block.dtype.kind == "V" blk_dtype = getattr(self.block, "dtype", None) if blk_dtype == np.dtype("object"): @@ -512,7 +538,9 @@ def _concatenate_join_units( empty_dtype = _get_empty_dtype(join_units) - has_none_blocks = any(unit.block is None for unit in join_units) + has_none_blocks = any( + unit.block is None or unit.block.dtype.kind == "V" for unit in join_units + ) upcasted_na = _dtype_to_na_value(empty_dtype, has_none_blocks) to_concat = [ @@ -597,13 +625,19 @@ def _get_empty_dtype(join_units: Sequence[JoinUnit]) -> DtypeObj: empty_dtype = join_units[0].block.dtype return empty_dtype - has_none_blocks = any(unit.block is None for unit in join_units) + has_none_blocks = any( + unit.block is None or unit.block.dtype.kind == "V" for unit in join_units + ) dtypes = [ unit.dtype for unit in join_units if unit.block is not None and not unit.is_na ] if not len(dtypes): - dtypes = [unit.dtype for unit in join_units if unit.block is not None] + dtypes = [ + unit.dtype + for unit in join_units + if unit.block is not None and unit.block.dtype.kind != "V" + ] dtype = find_common_type(dtypes) if has_none_blocks: @@ -619,7 +653,7 @@ def _is_uniform_join_units(join_units: list[JoinUnit]) -> bool: """ first = join_units[0].block - if first is None: + if first is None or first.dtype.kind == "V": return False return ( # exclude cases where a) ju.block is None or b) we have e.g. Int64+int64 diff --git a/pandas/core/internals/managers.py b/pandas/core/internals/managers.py index 874065b50037f..a9894ab5acf23 100644 --- a/pandas/core/internals/managers.py +++ b/pandas/core/internals/managers.py @@ -66,6 +66,7 @@ from pandas.core.internals.blocks import ( Block, DatetimeTZBlock, + NumpyBlock, ensure_block_shape, extend_blocks, get_block_type, @@ -613,6 +614,8 @@ def reindex_indexer( copy: bool = True, consolidate: bool = True, only_slice: bool = False, + *, + use_na_proxy: bool = False, ) -> T: """ Parameters @@ -627,6 +630,8 @@ def reindex_indexer( Whether to consolidate inplace before reindexing. only_slice : bool, default False Whether to take views, not copies, along columns. + use_na_proxy : bool, default False + Whether to use a np.void ndarray for newly introduced columns. pandas-indexer with -1's only. """ @@ -651,7 +656,10 @@ def reindex_indexer( if axis == 0: new_blocks = self._slice_take_blocks_ax0( - indexer, fill_value=fill_value, only_slice=only_slice + indexer, + fill_value=fill_value, + only_slice=only_slice, + use_na_proxy=use_na_proxy, ) else: new_blocks = [ @@ -675,6 +683,8 @@ def _slice_take_blocks_ax0( slice_or_indexer: slice | np.ndarray, fill_value=lib.no_default, only_slice: bool = False, + *, + use_na_proxy: bool = False, ) -> list[Block]: """ Slice/take blocks along axis=0. @@ -688,6 +698,8 @@ def _slice_take_blocks_ax0( only_slice : bool, default False If True, we always return views on existing arrays, never copies. This is used when called from ops.blockwise.operate_blockwise. + use_na_proxy : bool, default False + Whether to use a np.void ndarray for newly introduced columns. Returns ------- @@ -756,7 +768,11 @@ def _slice_take_blocks_ax0( # If we've got here, fill_value was not lib.no_default blocks.append( - self._make_na_block(placement=mgr_locs, fill_value=fill_value) + self._make_na_block( + placement=mgr_locs, + fill_value=fill_value, + use_na_proxy=use_na_proxy, + ) ) else: blk = self.blocks[blkno] @@ -798,7 +814,16 @@ def _slice_take_blocks_ax0( return blocks - def _make_na_block(self, placement: BlockPlacement, fill_value=None) -> Block: + def _make_na_block( + self, placement: BlockPlacement, fill_value=None, use_na_proxy: bool = False + ) -> Block: + + if use_na_proxy: + assert fill_value is None + shape = (len(placement), self.shape[1]) + vals = np.empty(shape, dtype=np.void) + nb = NumpyBlock(vals, placement, ndim=2) + return nb if fill_value is None: fill_value = np.nan From e0c3f9c7d71a7de5e3adaf45120b2aa97bf92965 Mon Sep 17 00:00:00 2001 From: Brock Date: Fri, 3 Sep 2021 10:11:17 -0700 Subject: [PATCH 2/6] REF: remove no-longer reachable cases from internals.concat --- pandas/core/internals/concat.py | 136 ++++++++++++-------------------- 1 file changed, 51 insertions(+), 85 deletions(-) diff --git a/pandas/core/internals/concat.py b/pandas/core/internals/concat.py index 224641a6ea40c..e291fc3113e44 100644 --- a/pandas/core/internals/concat.py +++ b/pandas/core/internals/concat.py @@ -1,6 +1,5 @@ from __future__ import annotations -import copy import itertools from typing import ( TYPE_CHECKING, @@ -65,6 +64,7 @@ if TYPE_CHECKING: from pandas import Index + from pandas.core.internals.blocks import Block def _concatenate_array_managers( @@ -287,26 +287,20 @@ def _get_mgr_concatenation_plan(mgr: BlockManager, indexers: dict[int, np.ndarra mgr_shape_list[ax] = len(indexer) mgr_shape = tuple(mgr_shape_list) - has_column_indexer = False + assert 0 not in indexers - if 0 in indexers: - has_column_indexer = True - ax0_indexer = indexers.pop(0) - blknos = algos.take_nd(mgr.blknos, ax0_indexer, fill_value=-1) - blklocs = algos.take_nd(mgr.blklocs, ax0_indexer, fill_value=-1) - else: - - if mgr.is_single_block: - blk = mgr.blocks[0] - return [(blk.mgr_locs, JoinUnit(blk, mgr_shape, indexers))] + if mgr.is_single_block: + blk = mgr.blocks[0] + return [(blk.mgr_locs, JoinUnit(blk, mgr_shape, indexers))] - blknos = mgr.blknos - blklocs = mgr.blklocs + blknos = mgr.blknos + blklocs = mgr.blklocs plan = [] for blkno, placements in libinternals.get_blkno_placements(blknos, group=False): assert placements.is_slice_like + assert blkno != -1 join_unit_indexers = indexers.copy() @@ -314,41 +308,33 @@ def _get_mgr_concatenation_plan(mgr: BlockManager, indexers: dict[int, np.ndarra shape_list[0] = len(placements) shape = tuple(shape_list) - if blkno == -1: - # only reachable in the `0 in indexers` case - unit = JoinUnit(None, shape) - else: - blk = mgr.blocks[blkno] - ax0_blk_indexer = blklocs[placements.indexer] - - unit_no_ax0_reindexing = ( - len(placements) == len(blk.mgr_locs) - and - # Fastpath detection of join unit not - # needing to reindex its block: no ax0 - # reindexing took place and block - # placement was sequential before. - ( - ( - not has_column_indexer - and blk.mgr_locs.is_slice_like - and blk.mgr_locs.as_slice.step == 1 - ) - or - # Slow-ish detection: all indexer locs - # are sequential (and length match is - # checked above). - (np.diff(ax0_blk_indexer) == 1).all() - ) + blk = mgr.blocks[blkno] + ax0_blk_indexer = blklocs[placements.indexer] + + unit_no_ax0_reindexing = ( + len(placements) == len(blk.mgr_locs) + and + # Fastpath detection of join unit not + # needing to reindex its block: no ax0 + # reindexing took place and block + # placement was sequential before. + ( + (blk.mgr_locs.is_slice_like and blk.mgr_locs.as_slice.step == 1) + or + # Slow-ish detection: all indexer locs + # are sequential (and length match is + # checked above). + (np.diff(ax0_blk_indexer) == 1).all() ) + ) - # Omit indexer if no item reindexing is required. - if unit_no_ax0_reindexing: - join_unit_indexers.pop(0, None) - else: - join_unit_indexers[0] = ax0_blk_indexer + # Omit indexer if no item reindexing is required. + if unit_no_ax0_reindexing: + join_unit_indexers.pop(0, None) + else: + join_unit_indexers[0] = ax0_blk_indexer - unit = JoinUnit(blk, shape, join_unit_indexers) + unit = JoinUnit(blk, shape, join_unit_indexers) plan.append((placements, unit)) @@ -356,7 +342,7 @@ def _get_mgr_concatenation_plan(mgr: BlockManager, indexers: dict[int, np.ndarra class JoinUnit: - def __init__(self, block, shape: Shape, indexers=None): + def __init__(self, block: Block, shape: Shape, indexers=None): # Passing shape explicitly is required for cases when block is None. # Note: block is None implies indexers is None, but not vice-versa if indexers is None: @@ -380,7 +366,7 @@ def needs_filling(self) -> bool: @cache_readonly def dtype(self): blk = self.block - if blk is None: + if blk.values.dtype.kind == "V": raise AssertionError("Block is None, no dtype") if not self.needs_filling: @@ -394,8 +380,6 @@ def _is_valid_na_for(self, dtype: DtypeObj) -> bool: """ if not self.is_na: return False - if self.block is None: - return True if self.block.dtype.kind == "V": return True @@ -422,8 +406,6 @@ def _is_valid_na_for(self, dtype: DtypeObj) -> bool: @cache_readonly def is_na(self) -> bool: blk = self.block - if blk is None: - return True if blk.dtype.kind == "V": return True @@ -451,6 +433,8 @@ def is_na(self) -> bool: return all(isna_all(row) for row in values) def get_reindexed_values(self, empty_dtype: DtypeObj, upcasted_na) -> ArrayLike: + values: ArrayLike + if upcasted_na is None and self.block.dtype.kind != "V": # No upcasting is necessary fill_value = self.block.fill_value @@ -459,9 +443,8 @@ def get_reindexed_values(self, empty_dtype: DtypeObj, upcasted_na) -> ArrayLike: fill_value = upcasted_na if self._is_valid_na_for(empty_dtype): - # note: always holds when self.block is None - # or self.block.dtype.kind == "V" - blk_dtype = getattr(self.block, "dtype", None) + # note: always holds when self.block.dtype.kind == "V" + blk_dtype = self.block.dtype if blk_dtype == np.dtype("object"): # we want to avoid filling with np.nan if we are @@ -538,9 +521,7 @@ def _concatenate_join_units( empty_dtype = _get_empty_dtype(join_units) - has_none_blocks = any( - unit.block is None or unit.block.dtype.kind == "V" for unit in join_units - ) + has_none_blocks = any(unit.block.dtype.kind == "V" for unit in join_units) upcasted_na = _dtype_to_na_value(empty_dtype, has_none_blocks) to_concat = [ @@ -616,8 +597,6 @@ def _get_empty_dtype(join_units: Sequence[JoinUnit]) -> DtypeObj: """ if len(join_units) == 1: blk = join_units[0].block - if blk is None: - return np.dtype(np.float64) return blk.dtype if _is_uniform_reindex(join_units): @@ -625,19 +604,11 @@ def _get_empty_dtype(join_units: Sequence[JoinUnit]) -> DtypeObj: empty_dtype = join_units[0].block.dtype return empty_dtype - has_none_blocks = any( - unit.block is None or unit.block.dtype.kind == "V" for unit in join_units - ) + has_none_blocks = any(unit.block.dtype.kind == "V" for unit in join_units) - dtypes = [ - unit.dtype for unit in join_units if unit.block is not None and not unit.is_na - ] + dtypes = [unit.dtype for unit in join_units if not unit.is_na] if not len(dtypes): - dtypes = [ - unit.dtype - for unit in join_units - if unit.block is not None and unit.block.dtype.kind != "V" - ] + dtypes = [unit.dtype for unit in join_units if unit.block.dtype.kind != "V"] dtype = find_common_type(dtypes) if has_none_blocks: @@ -653,7 +624,7 @@ def _is_uniform_join_units(join_units: list[JoinUnit]) -> bool: """ first = join_units[0].block - if first is None or first.dtype.kind == "V": + if first.dtype.kind == "V": return False return ( # exclude cases where a) ju.block is None or b) we have e.g. Int64+int64 @@ -683,7 +654,7 @@ def _is_uniform_join_units(join_units: list[JoinUnit]) -> bool: def _is_uniform_reindex(join_units) -> bool: return ( # TODO: should this be ju.block._can_hold_na? - all(ju.block and ju.block.is_extension for ju in join_units) + all(ju.block.is_extension for ju in join_units) and len({ju.block.dtype.name for ju in join_units}) == 1 ) @@ -694,20 +665,15 @@ def _trim_join_unit(join_unit: JoinUnit, length: int) -> JoinUnit: Extra items that didn't fit are returned as a separate block. """ - if 0 not in join_unit.indexers: - extra_indexers = join_unit.indexers + assert 0 not in join_unit.indexers - if join_unit.block is None: - extra_block = None - else: - extra_block = join_unit.block.getitem_block(slice(length, None)) - join_unit.block = join_unit.block.getitem_block(slice(length)) - else: - extra_block = join_unit.block + extra_indexers = join_unit.indexers - extra_indexers = copy.copy(join_unit.indexers) - extra_indexers[0] = extra_indexers[0][length:] - join_unit.indexers[0] = join_unit.indexers[0][:length] + if join_unit.block is None: + extra_block = None + else: + extra_block = join_unit.block.getitem_block(slice(length, None)) + join_unit.block = join_unit.block.getitem_block(slice(length)) extra_shape = (join_unit.shape[0] - length,) + join_unit.shape[1:] join_unit.shape = (length,) + join_unit.shape[1:] From 1b5472fa0b773b79c2d5d8b44f1b8645bbc54f26 Mon Sep 17 00:00:00 2001 From: Brock Date: Fri, 3 Sep 2021 16:25:05 -0700 Subject: [PATCH 3/6] fix incorrect assertion --- pandas/core/internals/concat.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/pandas/core/internals/concat.py b/pandas/core/internals/concat.py index e291fc3113e44..bb89f378fb287 100644 --- a/pandas/core/internals/concat.py +++ b/pandas/core/internals/concat.py @@ -1,5 +1,6 @@ from __future__ import annotations +import copy import itertools from typing import ( TYPE_CHECKING, @@ -665,9 +666,20 @@ def _trim_join_unit(join_unit: JoinUnit, length: int) -> JoinUnit: Extra items that didn't fit are returned as a separate block. """ - assert 0 not in join_unit.indexers + if 0 not in join_unit.indexers: + extra_indexers = join_unit.indexers - extra_indexers = join_unit.indexers + if join_unit.block is None: + extra_block = None + else: + extra_block = join_unit.block.getitem_block(slice(length, None)) + join_unit.block = join_unit.block.getitem_block(slice(length)) + else: + extra_block = join_unit.block + + extra_indexers = copy.copy(join_unit.indexers) + extra_indexers[0] = extra_indexers[0][length:] + join_unit.indexers[0] = join_unit.indexers[0][:length] if join_unit.block is None: extra_block = None From 72a8e82d6a6ed7d7048d05a1c9aa6d73697a390a Mon Sep 17 00:00:00 2001 From: Brock Date: Sat, 4 Sep 2021 13:08:56 -0700 Subject: [PATCH 4/6] typo fixup --- pandas/core/internals/concat.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pandas/core/internals/concat.py b/pandas/core/internals/concat.py index bb89f378fb287..da9c8f4a5a5ab 100644 --- a/pandas/core/internals/concat.py +++ b/pandas/core/internals/concat.py @@ -681,12 +681,6 @@ def _trim_join_unit(join_unit: JoinUnit, length: int) -> JoinUnit: extra_indexers[0] = extra_indexers[0][length:] join_unit.indexers[0] = join_unit.indexers[0][:length] - if join_unit.block is None: - extra_block = None - else: - extra_block = join_unit.block.getitem_block(slice(length, None)) - join_unit.block = join_unit.block.getitem_block(slice(length)) - extra_shape = (join_unit.shape[0] - length,) + join_unit.shape[1:] join_unit.shape = (length,) + join_unit.shape[1:] From 7b7002f15f3b0aa1f6a7bb66e8bd4627190b04b2 Mon Sep 17 00:00:00 2001 From: Brock Date: Sat, 4 Sep 2021 17:59:55 -0700 Subject: [PATCH 5/6] refactor out _reindex_columns_void --- pandas/core/internals/concat.py | 53 ++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/pandas/core/internals/concat.py b/pandas/core/internals/concat.py index 224641a6ea40c..ee6722ae6ee85 100644 --- a/pandas/core/internals/concat.py +++ b/pandas/core/internals/concat.py @@ -198,26 +198,7 @@ def concatenate_managers( if isinstance(mgrs_indexers[0][0], ArrayManager): return _concatenate_array_managers(mgrs_indexers, axes, concat_axis, copy) - new_mgrs_indexers = [] - for mgr, indexers in mgrs_indexers: - # We only reindex for axis=0 (i.e. columns), as this can be done cheaply - if 0 in indexers: - new_mgr = mgr.reindex_indexer( - axes[0], - indexers[0], - axis=0, - copy=False, - only_slice=True, - allow_dups=True, - use_na_proxy=True, - ) - new_indexers = indexers.copy() - del new_indexers[0] - new_mgrs_indexers.append((new_mgr, new_indexers)) - else: - new_mgrs_indexers.append((mgr, indexers)) - - mgrs_indexers = new_mgrs_indexers + mgrs_indexers = _reindex_columns_void(axes, mgrs_indexers) concat_plans = [ _get_mgr_concatenation_plan(mgr, indexers) for mgr, indexers in mgrs_indexers @@ -266,6 +247,38 @@ def concatenate_managers( return BlockManager(tuple(blocks), axes) +def _reindex_columns_void( + axes: list[Index], mgrs_indexers: list[tuple[BlockManager, dict[int, np.ndarray]]] +) -> list[tuple[BlockManager, dict[int, np.ndarray]]]: + """ + Reindex along columns so that all of the BlockManagers being concatenated + have matching columns. + + Columns added in this reindexing have dtype=np.void, indicating they + should be ignored when choosing a column's final dtype. + """ + new_mgrs_indexers = [] + for mgr, indexers in mgrs_indexers: + # We only reindex for axis=0 (i.e. columns), as this can be done cheaply + if 0 in indexers: + new_mgr = mgr.reindex_indexer( + axes[0], + indexers[0], + axis=0, + copy=False, + only_slice=True, + allow_dups=True, + use_na_proxy=True, + ) + new_indexers = indexers.copy() + del new_indexers[0] + new_mgrs_indexers.append((new_mgr, new_indexers)) + else: + new_mgrs_indexers.append((mgr, indexers)) + + return new_mgrs_indexers + + def _get_mgr_concatenation_plan(mgr: BlockManager, indexers: dict[int, np.ndarray]): """ Construct concatenation plan for given block manager and indexers. From 32d9d86f919fbbabe6c8d907e29aa05682e8cec0 Mon Sep 17 00:00:00 2001 From: Brock Date: Mon, 6 Sep 2021 12:21:30 -0700 Subject: [PATCH 6/6] _reindex_columns_void -> _maybe_reindex_columns_na_proxy --- pandas/core/internals/concat.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pandas/core/internals/concat.py b/pandas/core/internals/concat.py index ee6722ae6ee85..25bdc71fe7d12 100644 --- a/pandas/core/internals/concat.py +++ b/pandas/core/internals/concat.py @@ -198,7 +198,7 @@ def concatenate_managers( if isinstance(mgrs_indexers[0][0], ArrayManager): return _concatenate_array_managers(mgrs_indexers, axes, concat_axis, copy) - mgrs_indexers = _reindex_columns_void(axes, mgrs_indexers) + mgrs_indexers = _maybe_reindex_columns_na_proxy(axes, mgrs_indexers) concat_plans = [ _get_mgr_concatenation_plan(mgr, indexers) for mgr, indexers in mgrs_indexers @@ -247,7 +247,7 @@ def concatenate_managers( return BlockManager(tuple(blocks), axes) -def _reindex_columns_void( +def _maybe_reindex_columns_na_proxy( axes: list[Index], mgrs_indexers: list[tuple[BlockManager, dict[int, np.ndarray]]] ) -> list[tuple[BlockManager, dict[int, np.ndarray]]]: """