Skip to content

Commit 7ccd0f2

Browse files
committed
fix calls
1 parent 88ffe20 commit 7ccd0f2

File tree

8 files changed

+51
-25
lines changed

8 files changed

+51
-25
lines changed

core/raydp-main/src/main/java/org/apache/spark/deploy/raydp/RayAppMasterUtils.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@ public static ActorHandle<RayAppMaster> createAppMaster(
3838
jvmOptions.add("-cp");
3939
jvmOptions.add(cp);
4040
creator.setJvmOptions(jvmOptions);
41-
for(Map.Entry<String, Double> resource : appMasterResource.entrySet()) {
42-
String resourceName = resource.getKey()
43-
.substring(SparkOnRayConfigs.SPARK_MASTER_ACTOR_RESOURCE_PREFIX.length() + 1);
41+
42+
for (Map.Entry<String, Double> resource : appMasterResource.entrySet()) {
43+
String key = resource.getKey();
44+
String resourceName = key.substring(
45+
SparkOnRayConfigs.SPARK_MASTER_ACTOR_RESOURCE_PREFIX.length() + 1);
4446
creator.setResource(resourceName, resource.getValue());
4547
}
4648

core/raydp-main/src/main/java/org/apache/spark/raydp/RayDPUtils.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package org.apache.spark.raydp;
1919

20+
import io.ray.api.ActorHandle;
2021
import io.ray.api.ObjectRef;
2122
import io.ray.api.Ray;
2223
import io.ray.api.id.ObjectId;
2324
import io.ray.runtime.AbstractRayRuntime;
2425
import io.ray.runtime.object.ObjectRefImpl;
26+
import org.apache.spark.deploy.raydp.RayAppMaster;
2527

2628
public class RayDPUtils {
2729

@@ -51,4 +53,13 @@ public static <T> ObjectRef<T> readBinary(byte[] obj, Class<T> clazz, byte[] own
5153
);
5254
return ref;
5355
}
56+
57+
/**
58+
* Helper to invoke putDatasetBlock on RayAppMaster using a Java method
59+
* reference, so Ray can serialize the function as a proper Java lambda.
60+
*/
61+
public static ObjectRef<ObjectRef<byte[]>> putDatasetBlockAsync(
62+
ActorHandle<RayAppMaster> handle, byte[] data) {
63+
return handle.task(RayAppMaster::putDatasetBlock, data).remote();
64+
}
5465
}

core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/AppMasterJavaBridge.scala

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,10 @@ class AppMasterJavaBridge {
6262
case (k, v) => k.startsWith(SparkOnRayConfigs.SPARK_MASTER_ACTOR_RESOURCE_PREFIX)
6363
}.map{ case (k, v) => k->double2Double(v.toString.toDouble) }.asJava
6464

65-
// Owner transfer concurrency is configured via a dedicated key and should
66-
// not be bundled with the generic AppMaster resources.
67-
val ownerTransferConcurrency = Option(
68-
sparkProps.get(SparkOnRayConfigs.SPARK_MASTER_OWNER_TRANSFER_CONCURRENCY))
69-
.map(_.toString.toInt).getOrElse(0)
70-
7165
handle = RayAppMasterUtils.createAppMaster(
7266
extra_cp, name,
7367
(sparkJvmOptions ++ Seq(SparkOnRayConfigs.RAYDP_LOGFILE_PREFIX_CFG)).asJava,
74-
appMasterResources,
75-
ownerTransferConcurrency)
68+
appMasterResources)
7669
}
7770
}
7871

core/raydp-main/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,8 @@ class RayCoarseGrainedSchedulerBackend(
9191
case (k, v) => k.startsWith(SparkOnRayConfigs.SPARK_MASTER_ACTOR_RESOURCE_PREFIX)
9292
}.map{ case (k, v) => k->double2Double(v.toDouble) }
9393

94-
// Owner transfer concurrency is configured via a dedicated key and should
95-
// not be bundled with the generic AppMaster resources.
96-
val ownerTransferConcurrency =
97-
conf.get(SparkOnRayConfigs.SPARK_MASTER_OWNER_TRANSFER_CONCURRENCY, "0").toInt
98-
9994
masterHandle = RayAppMasterUtils.createAppMaster(cp, null, options.toBuffer.asJava,
100-
appMasterResources.toMap.asJava, ownerTransferConcurrency)
95+
appMasterResources.toMap.asJava)
10196
uri = new URI(RayAppMasterUtils.getMasterUrl(masterHandle))
10297
} else {
10398
uri = new URI(sparkUrl)

core/raydp-main/src/main/scala/org/apache/spark/sql/raydp/ObjectStoreWriter.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,12 @@ class ObjectStoreWriter(@transient val df: DataFrame) extends Serializable {
6565
ownerName: String): RecordBatch = {
6666

6767
// NOTE: We intentionally do NOT pass an owner argument to Ray.put anymore.
68-
// Instead, we route all puts through the long-lived RayAppMaster actor,
69-
// so that object ownership is decoupled from individual Spark executors.
70-
val objectRef: ObjectRef[Array[Byte]] =
71-
ObjectStoreWriter.putViaOwner(data)
68+
// The default JVM path puts the serialized Arrow batch into Ray's object store
69+
// from the Spark executor JVM process.
70+
//
71+
// Ownership transfer to a long-lived Python actor is implemented on the Python side
72+
// by "adopting" (re-putting) these ObjectRefs inside the target actor.
73+
val objectRef: ObjectRef[Array[Byte]] = Ray.put(data)
7274

7375
// add the objectRef to the objectRefHolder to avoid reference GC
7476
queue.add(objectRef)

python/raydp/spark/dataset.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# limitations under the License.
1616
import logging
1717
import uuid
18-
from typing import Callable, Dict, List, NoReturn, Optional, Iterable, Union
18+
from typing import Callable, List, Optional, Union
1919
from dataclasses import dataclass
2020

2121
import pandas as pd
@@ -103,7 +103,9 @@ def get_raydp_master_owner(spark: Optional[SparkSession] = None) -> PartitionObj
103103
def raydp_master_set_reference_as_state(
104104
raydp_master_actor: ray.actor.ActorHandle,
105105
objects: List[ObjectRef]) -> ObjectRef:
106-
return raydp_master_actor.add_objects.remote(uuid.uuid4(), objects)
106+
# Adopt objects in the Python master actor so it becomes the owner of the
107+
# dataset blocks without using Ray.put `_owner`.
108+
return raydp_master_actor.adopt_objects.remote(uuid.uuid4(), objects)
107109

108110
return PartitionObjectsOwner(
109111
obj_holder_name,
@@ -141,7 +143,10 @@ def _save_spark_df_to_object_store(df: sql.DataFrame, use_batch: bool = True,
141143

142144
if owner is not None:
143145
actor_owner = ray.get_actor(actor_owner_name)
144-
ray.get(owner.set_reference_as_state(actor_owner, blocks))
146+
adopted = ray.get(owner.set_reference_as_state(actor_owner, blocks))
147+
# If the owner callback returns a new list of refs (adoption), use it.
148+
if adopted is not None:
149+
blocks = adopted
145150

146151
return blocks, block_sizes
147152

python/raydp/spark/ray_cluster_master.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,18 @@ def get_spark_home(self) -> str:
224224
def add_objects(self, timestamp, objects):
225225
self._objects[timestamp] = objects
226226

227+
def adopt_objects(self, timestamp, objects):
228+
"""Adopt objects by re-putting them inside this actor.
229+
230+
This makes this actor the owner of the newly created objects without
231+
using the Ray.put `_owner` argument.
232+
233+
Returns the new ObjectRefs.
234+
"""
235+
new_objects = [ray.put(ray.get(obj)) for obj in objects]
236+
self._objects[timestamp] = new_objects
237+
return new_objects
238+
227239
def get_object(self, timestamp, idx):
228240
return self._objects[timestamp][idx]
229241

python/raydp/tests/test_data_owner_transfer.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,11 @@ def wake(self):
154154
def set_objects(self, objects):
155155
self.objects = objects
156156

157+
def adopt_objects(self, objects):
158+
# Re-put inside this actor so this actor becomes the owner of the new objects.
159+
self.objects = [ray.put(ray.get(o)) for o in objects]
160+
return self.objects
161+
157162
if ray_client.ray.is_connected():
158163
pytest.skip("Skip this test if using ray client")
159164

@@ -185,7 +190,7 @@ def set_objects(self, objects):
185190
# and transfer data ownership to dedicated Object Holder (Singleton)
186191
ds = spark_dataframe_to_ray_dataset(df_train, parallelism=4, owner=PartitionObjectsOwner(
187192
owner_actor_name,
188-
lambda actor, objects: actor.set_objects.remote(objects)))
193+
lambda actor, objects: actor.adopt_objects.remote(objects)))
189194

190195
# display data
191196
ds.show(5)
@@ -226,6 +231,7 @@ def test_api_compatibility(ray_cluster, jdk17_extra_spark_configs):
226231

227232
# check compatibility of ray 1.9.0 API: no data onwership transfer
228233
ds = ray.data.from_spark(df_train)
234+
ds.show(1)
229235
ray_gc() # ensure GC kicked in
230236
time.sleep(3)
231237

0 commit comments

Comments
 (0)