Skip to content

Commit 26404f8

Browse files
committed
single output buffer
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
1 parent ab87601 commit 26404f8

File tree

1 file changed

+20
-18
lines changed

1 file changed

+20
-18
lines changed

python/ray/data/_internal/execution/operators/hash_shuffle.py

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1560,14 +1560,13 @@ def __init__(
15601560
aggregator_id, target_partition_ids
15611561
)
15621562
# One buffer per partition to enable concurrent finalization
1563-
self._output_buffers: Dict[int, BlockOutputBuffer] = {
1564-
partition_id: BlockOutputBuffer(
1565-
output_block_size_option=OutputBlockSizeOption(
1566-
target_max_block_size=data_context.target_max_block_size
1567-
)
1563+
self._output_buffer_lock = threading.Lock()
1564+
self._output_buffer = BlockOutputBuffer(
1565+
output_block_size_option=OutputBlockSizeOption(
1566+
target_max_block_size=data_context.target_max_block_size
15681567
)
1569-
for partition_id in target_partition_ids
1570-
}
1568+
)
1569+
self._partition_ids_to_finalize: Set[int] = set(target_partition_ids)
15711570

15721571
def submit(self, input_seq_id: int, partition_id: int, partition_shard: Block):
15731572
with self._lock:
@@ -1581,19 +1580,22 @@ def finalize(
15811580
exec_stats_builder = BlockExecStats.builder()
15821581
# Finalize given partition id
15831582
block = self._agg.finalize(partition_id)
1584-
exec_stats = exec_stats_builder.build()
1583+
15851584
# Clear any remaining state (to release resources)
15861585
self._agg.clear(partition_id)
1587-
1588-
# No lock needed - each partition has its own buffer
1589-
output_buffer = self._output_buffers[partition_id]
1590-
1591-
output_buffer.add_block(block)
1592-
output_buffer.finalize()
1593-
while output_buffer.has_next():
1594-
block = output_buffer.next()
1595-
yield block
1596-
yield BlockMetadataWithSchema.from_block(block, stats=exec_stats)
1586+
self._partition_ids_to_finalize.remove(partition_id)
1587+
1588+
with self._output_buffer_lock:
1589+
self._output_buffer.add_block(block)
1590+
if len(self._partition_ids_to_finalize) == 0:
1591+
# this is the last finalized partition_id
1592+
self._output_buffer.finalize()
1593+
while self._output_buffer.has_next():
1594+
block = self._output_buffer.next()
1595+
yield block
1596+
yield BlockMetadataWithSchema.from_block(
1597+
block, stats=exec_stats_builder.build()
1598+
)
15971599

15981600

15991601
def _get_total_cluster_resources() -> ExecutionResources:

0 commit comments

Comments
 (0)