Skip to content

Commit 30de99c

Browse files
committed
remove usage of ray.put(data, owner)
1 parent f79c935 commit 30de99c

File tree

5 files changed

+46
-13
lines changed

5 files changed

+46
-13
lines changed

core/raydp-main/src/main/java/org/apache/spark/raydp/SparkOnRayConfigs.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,16 @@ public class SparkOnRayConfigs {
1010
public static final String SPARK_MASTER_ACTOR_RESOURCE_PREFIX =
1111
"spark.ray.raydp_spark_master.actor.resource";
1212

13+
/**
14+
* Concurrency (max parallelism) for data owner transfer operations, i.e.
15+
* how many concurrent putDatasetBlock calls RayAppMaster can handle.
16+
*
17+
* Example usage:
18+
* spark.ray.raydp_spark_master.actor.owner_transfer_concurrency=4
19+
*/
20+
public static final String SPARK_MASTER_OWNER_TRANSFER_CONCURRENCY =
21+
"spark.ray.raydp_spark_master.actor.owner_transfer_concurrency";
22+
1323
/**
1424
* Extra JVM options for the RayDP AppMaster actor and gateway process.
1525
* This is useful for passing JDK 17+ --add-opens flags.

core/raydp-main/src/main/scala/org/apache/spark/sql/raydp/ObjectStoreWriter.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
package org.apache.spark.sql.raydp
1919

2020
import com.intel.raydp.shims.SparkShimLoader
21-
import io.ray.api.{ActorHandle, ObjectRef, PyActorHandle, Ray}
21+
import io.ray.api.{ActorHandle, ObjectRef, Ray}
2222
import io.ray.runtime.AbstractRayRuntime
2323
import java.io.ByteArrayOutputStream
24-
import java.util.{List, UUID}
24+
import java.util.{List, Optional, UUID}
2525
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue}
2626
import java.util.function.{Function => JFunction}
2727
import org.apache.arrow.vector.VectorSchemaRoot
@@ -64,13 +64,13 @@ class ObjectStoreWriter(@transient val df: DataFrame) extends Serializable {
6464
queue: ObjectRefHolder.Queue,
6565
ownerName: String): RecordBatch = {
6666

67-
var objectRef: ObjectRef[Array[Byte]] = null
68-
if (ownerName == "") {
69-
objectRef = Ray.put(data)
70-
} else {
71-
var dataOwner: PyActorHandle = Ray.getActor(ownerName).get()
72-
objectRef = Ray.put(data, dataOwner)
73-
}
67+
// NOTE: We intentionally do NOT pass an owner argument to Ray.put anymore.
68+
// The default JVM path puts the serialized Arrow batch into Ray's object store
69+
// from the Spark executor JVM process.
70+
//
71+
// Ownership transfer to a long-lived Python actor is implemented on the Python side
72+
// by "adopting" (re-putting) these ObjectRefs inside the target actor.
73+
val objectRef: ObjectRef[Array[Byte]] = Ray.put(data)
7474

7575
// add the objectRef to the objectRefHolder to avoid reference GC
7676
queue.add(objectRef)

python/raydp/spark/dataset.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# limitations under the License.
1616
import logging
1717
import uuid
18-
from typing import Callable, Dict, List, NoReturn, Optional, Iterable, Union
18+
from typing import Callable, List, Optional, Union
1919
from dataclasses import dataclass
2020

2121
import pandas as pd
@@ -103,7 +103,9 @@ def get_raydp_master_owner(spark: Optional[SparkSession] = None) -> PartitionObj
103103
def raydp_master_set_reference_as_state(
104104
raydp_master_actor: ray.actor.ActorHandle,
105105
objects: List[ObjectRef]) -> ObjectRef:
106-
return raydp_master_actor.add_objects.remote(uuid.uuid4(), objects)
106+
# Adopt objects in the Python master actor so it becomes the owner of the
107+
# dataset blocks without using Ray.put `_owner`.
108+
return raydp_master_actor.adopt_objects.remote(uuid.uuid4(), objects)
107109

108110
return PartitionObjectsOwner(
109111
obj_holder_name,
@@ -141,7 +143,10 @@ def _save_spark_df_to_object_store(df: sql.DataFrame, use_batch: bool = True,
141143

142144
if owner is not None:
143145
actor_owner = ray.get_actor(actor_owner_name)
144-
ray.get(owner.set_reference_as_state(actor_owner, blocks))
146+
adopted = ray.get(owner.set_reference_as_state(actor_owner, blocks))
147+
# If the owner callback returns a new list of refs (adoption), use it.
148+
if adopted is not None:
149+
blocks = adopted
145150

146151
return blocks, block_sizes
147152

python/raydp/spark/ray_cluster_master.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,18 @@ def get_spark_home(self) -> str:
224224
def add_objects(self, timestamp, objects):
225225
self._objects[timestamp] = objects
226226

227+
def adopt_objects(self, timestamp, objects):
228+
"""Adopt objects by re-putting them inside this actor.
229+
230+
This makes this actor the owner of the newly created objects without
231+
using the Ray.put `_owner` argument.
232+
233+
Returns the new ObjectRefs.
234+
"""
235+
new_objects = [ray.put(ray.get(obj)) for obj in objects]
236+
self._objects[timestamp] = new_objects
237+
return new_objects
238+
227239
def get_object(self, timestamp, idx):
228240
return self._objects[timestamp][idx]
229241

python/raydp/tests/test_data_owner_transfer.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,11 @@ def wake(self):
154154
def set_objects(self, objects):
155155
self.objects = objects
156156

157+
def adopt_objects(self, objects):
158+
# Re-put inside this actor so this actor becomes the owner of the new objects.
159+
self.objects = [ray.put(ray.get(o)) for o in objects]
160+
return self.objects
161+
157162
if ray_client.ray.is_connected():
158163
pytest.skip("Skip this test if using ray client")
159164

@@ -185,7 +190,7 @@ def set_objects(self, objects):
185190
# and transfer data ownership to dedicated Object Holder (Singleton)
186191
ds = spark_dataframe_to_ray_dataset(df_train, parallelism=4, owner=PartitionObjectsOwner(
187192
owner_actor_name,
188-
lambda actor, objects: actor.set_objects.remote(objects)))
193+
lambda actor, objects: actor.adopt_objects.remote(objects)))
189194

190195
# display data
191196
ds.show(5)
@@ -226,6 +231,7 @@ def test_api_compatibility(ray_cluster, jdk17_extra_spark_configs):
226231

227232
# check compatibility of ray 1.9.0 API: no data onwership transfer
228233
ds = ray.data.from_spark(df_train)
234+
ds.show(1)
229235
ray_gc() # ensure GC kicked in
230236
time.sleep(3)
231237

0 commit comments

Comments
 (0)