@@ -1559,15 +1559,7 @@ def __init__(
15591559 self ._agg : StatefulShuffleAggregation = agg_factory (
15601560 aggregator_id , target_partition_ids
15611561 )
1562- # One buffer per partition to enable concurrent finalization
1563- self ._output_buffers : Dict [int , BlockOutputBuffer ] = {
1564- partition_id : BlockOutputBuffer (
1565- output_block_size_option = OutputBlockSizeOption (
1566- target_max_block_size = data_context .target_max_block_size
1567- )
1568- )
1569- for partition_id in target_partition_ids
1570- }
1562+ self ._data_context = data_context
15711563
15721564 def submit (self , input_seq_id : int , partition_id : int , partition_shard : Block ):
15731565 with self ._lock :
@@ -1585,13 +1577,29 @@ def finalize(
15851577 # Clear any remaining state (to release resources)
15861578 self ._agg .clear (partition_id )
15871579
1588- # No lock needed - each partition has its own buffer
1589- output_buffer = self ._output_buffers [partition_id ]
1580+ target_max_block_size = self ._data_context .target_max_block_size
1581+ # None means the user wants to preserve the block distribution,
1582+ # so we do not break the block down further.
1583+ if target_max_block_size is not None :
1584+ # Creating the block output buffer because retry finalize tasks
1585+ # would mean I would have to keep internal bookkeepping
1586+ # (like which tasks have been finalized) idempotent.
1587+ # Also, calling output_buffer.finalize() down below would fail
1588+ # because it asserts it hasn't already been finalized.
1589+ output_buffer = BlockOutputBuffer (
1590+ output_block_size_option = OutputBlockSizeOption (
1591+ target_max_block_size = target_max_block_size
1592+ or DEFAULT_TARGET_MAX_BLOCK_SIZE
1593+ )
1594+ )
15901595
1591- output_buffer .add_block (block )
1592- output_buffer .finalize ()
1593- while output_buffer .has_next ():
1594- block = output_buffer .next ()
1596+ output_buffer .add_block (block )
1597+ output_buffer .finalize ()
1598+ while output_buffer .has_next ():
1599+ block = output_buffer .next ()
1600+ yield block
1601+ yield BlockMetadataWithSchema .from_block (block , stats = exec_stats )
1602+ else :
15951603 yield block
15961604 yield BlockMetadataWithSchema .from_block (block , stats = exec_stats )
15971605
0 commit comments