Skip to content

Conversation

@pang-wu
Copy link
Collaborator

@pang-wu pang-wu commented Dec 23, 2025

Remove _owner usage in RayDP Spark→Ray conversion and centralize ownership in RayAppMaster (with configurable concurrency). There are two major changes:

spark_dataframe_to_ray_dataset

Python Driver
  |
  |  object_store_writer.save(useBatch, ownerName)
  v
Spark Executors (tasks over df.toArrowBatchRdd)
  |
  |  serialize partition -> Arrow IPC bytes
  |  call RegistryActor.get_or_create_blockstore_actor(actor_name, actor_node_ip)
  |    with node affinity (avoid all data stored in one node)
  |  call BlockStoreActor.put_arrow_ipc(batch_key, ipc_bytes) 
  v
Ray (Python actors own blocks)
  |
  |  Python groups (actor_name, batch_key) -> BlockStoreActor.get_block_refs(batch_keys)
  v
Ray Dataset built from ObjectRefs

Both the registry actor and blokstore actors could be customized, by default, the registry actor is the python RayDPSparkMaster.
Also, we introduce two configs to allow user set the resource of each blockstore actor:

"spark.ray.raydp_blockstore.actor.resource.CPU": 1,  # default is 0
"spark.ray.raydp_blockstore.actor.resource.memory": "100M",  # default is 0

from_spark_recoverable

Python Driver
  |
  | from_spark_recoverable(df, storage_level)
  v
Spark Driver (JVM / ObjectStoreWriter.prepareRecoverableRDD)
  |
  | 1) Build ArrowBatch RDD
  |    rdd = df.toArrowBatchRdd
  |
  | 2) Cache it in Spark
  |    rdd.persist(storage_level)
  |    rdd.count()   # materialize partitions into Spark BlockManager as Arrow IPC bytes
  |
  | 3) Compute partition -> executor mapping
  |    locations[i] = executorId that holds cached rdd_<id>_<i>
  |
  | 4) Return metadata to Python:
  |    - rddId, numPartitions
  |    - schemaJson
  |    - driverAgentUrl (for recache)
  |    - locations[]
  v
Python Driver
  |
  | 5) For each partition i, submit a Ray task (in parallel):
  |    _fetch_arrow_table_from_executor.remote(
  |        "raydp-executor-" + locations[i],
  |        rddId, i, schemaJson, driverAgentUrl)
  v
Ray Task Workers (Python)
  |
  | 6) Cross-language call into JVM actor raydp-executor-<id>:
  |    getRDDPartition(rddId, partitionId, schemaJson, driverAgentUrl)
  v
Spark Executor (JVM / RayDPExecutor.getRDDPartition)
  |
  | 7) Read cached bytes from Spark BlockManager:
  |    blockId = "rdd_<rddId>_<partitionId>"
  |    - if present: return Arrow IPC bytes
  |    - if missing: requestRecacheRDD(rddId, driverAgentUrl) then retry
  v
Ray Task Workers (Python)
  |
  | 8) Decode Arrow IPC bytes -> pyarrow.Table
  | 9) Return pyarrow.Table (task return becomes a Ray ObjectRef, with lineage)
  v
Python Driver
  |
  | 10) ray.data.from_arrow_refs(table_refs)
  |     (Ray Data does ray.get(get_table_block_metadata.remote(ref)) internally)
  v
Ray Dataset (blocks are table ObjectRefs)
  |
  | Node loss:
  | - if a block is lost, Ray reconstructs by rerunning the task (max_retries=-1),
  |   which refetches bytes again from Spark cache.

There are two new configs to control the resource of _fetch_arrow_table_from_executor :

  1. spark.ray.raydp_recoverable_fetch.task.resource.CPU control the CPU allocation, default is 0.
  2. spark.ray.raydp_recoverable_fetch.task.resource.memory specifies memory allocation, default is "0", valid value could be human readable string like 100m

NOTE This change require ray >= 2.37.0 because of this fix, so Python can call in JVM with load_code_from_local enabled without crashing the worker.

Motivation

  • RayDP previously relied on explicit owner manipulation to keep Spark‑converted datasets alive after Spark executors die, while preserving true Arrow batching.
  • This PR preserves those semantics while:
  • Eliminating _owner usage.
  • Keeping original memory / batching behavior.
  • Allowing controlled parallelism for ownership‑transfer operations.

@pang-wu pang-wu force-pushed the pang/mod branch 4 times, most recently from bc4a0d1 to 7ccd0f2 Compare December 24, 2025 04:13
@pang-wu pang-wu force-pushed the pang/mod branch 2 times, most recently from aace09a to e89d47a Compare December 25, 2025 23:55
@pang-wu pang-wu changed the title remove usage of ray.put(data, owner) remove usage of ray.put(data, _owner) and private ray object ownership manipulation API Dec 27, 2025
@pang-wu pang-wu requested a review from carsonwang December 27, 2025 05:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant