Skip to content

[Data] Fixing ActorPoolMapOperator to guarantee dispatch of all given inputs#60763

Merged
alexeykudinkin merged 8 commits intomasterfrom
ak/apmo-lvns-fix
Feb 5, 2026
Merged

[Data] Fixing ActorPoolMapOperator to guarantee dispatch of all given inputs#60763
alexeykudinkin merged 8 commits intomasterfrom
ak/apmo-lvns-fix

Conversation

@alexeykudinkin
Copy link
Contributor

@alexeykudinkin alexeykudinkin commented Feb 5, 2026

Description

This change revisits ActorPoolMapOperator input handling & scheduling sequence to align it with input handling protocol established in the StreamingExecutor -- inputs are only to be submitted when operator is believed to be ready to handle it, ie

  • It has resource budget
  • When task could be launched*

*While operator might not immediately launch the task due to "bundling" multiple inputs together, it's still expected that one of the op.add_input(...) calls will eventually trigger task scheduling that will handle all of the previously provided inputs.

This however is not the case for APMO:

  1. APMO can refuse scheduling: for ex, when actors are fully utilized, when actors are restarting, etc
  2. When APMO refuses scheduling, it enqueues provided input bundle into its own internal queue. However, draining of that queue could not be guaranteed with the current execution model.

Changes

To work around these issues and guarantee liveness for ActorPoolMapOperator following changes are implemented:

APMO is aligned with task submission protocol

New inputs are submitted to the operator only when APMO is able to schedule new task immediately (verified t/h op.can_accept_input()).

If it's not able to schedule the task immediately input is rejected and is kept in the external input queue.

Revisited _ActorTaskSelector to keep it in sync with the Operator

Currently _ActorTaskScheduler might depend on an external state. This is problematic for the following reasons:

  • This state is used to determine actors that we can safely route to
  • This is used to determine whether APMO can schedule a task to run (see above)
  • However, if state changes between the check and when op.add_input(...) is invoked then handling protocol will be violated.

To work this problem around we're snapshotting all external state inside _ActorTaskScheduler.refresh_state(...) method:

  • State is snapshotted (refreshed periodically)
  • This way state is synchronized with the other Operator's state
  • This makes it impossible for can_schedule_task and select_actors to get out of sync

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@alexeykudinkin alexeykudinkin requested a review from a team as a code owner February 5, 2026 02:47
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request provides a significant and well-executed refactoring of ActorPoolMapOperator to guarantee liveness by aligning its input handling with the StreamingExecutor's protocol. The core change, ensuring input is only accepted when a task can be scheduled immediately via can_add_input(), is sound and addresses a key correctness issue. The refactoring of scheduling logic into _ActorTaskSelector and _ActorPool improves modularity. The test suite has been commendably updated to reflect these changes, including a new comprehensive test for the fixed liveness issue. My review identified a minor bug in a warning condition and an opportunity to clarify an assertion message for better debuggability. Overall, this is a high-quality contribution that enhances the robustness of Ray Data.

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@alexeykudinkin alexeykudinkin added the go add ONLY when ready to merge, run all tests label Feb 5, 2026
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.


- This method should only return `True` when operator is guaranteed
to be able to launch a task, meaning that subsequent `op.add_input(...)`
should be able to launch a task.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be handled later. The contract is kind of fragile, because there is no constraints on WHEN the next add_input will be called.
We should

  1. either make can_dadd_input and add_input atomic
  2. or introduce some boundaries at which the op's states can change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the contract is that:

  • Before add_input, can_add_input must be called (which is done when we're selecting operator to dispatch to)
  • This is enforced through assertions inside add_input calling can_add_input

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@alexeykudinkin alexeykudinkin enabled auto-merge (squash) February 5, 2026 06:14
@alexeykudinkin alexeykudinkin merged commit 50c715e into master Feb 5, 2026
7 checks passed
@alexeykudinkin alexeykudinkin deleted the ak/apmo-lvns-fix branch February 5, 2026 07:16
tiennguyentony pushed a commit to tiennguyentony/ray that referenced this pull request Feb 7, 2026
…en inputs (ray-project#60763)

## Description

This change revisits `ActorPoolMapOperator` input handling & scheduling
sequence to align it with input handling protocol established in the
`StreamingExecutor` -- inputs are only to be submitted when operator is
**believed to be ready to handle it**, ie

 - It has resource budget
 - When task _could be_ launched*

*While operator might not immediately launch the task due to "bundling"
multiple inputs together, it's still expected that one of the
`op.add_input(...)` calls will eventually trigger task scheduling that
will handle **all of the previously provided inputs**.

This however is not the case for APMO:

1. APMO can refuse scheduling: for ex, when actors are fully utilized,
when actors are restarting, etc
2. When APMO refuses scheduling, it enqueues provided input bundle into
its _own internal queue_. However, draining of that queue *could not be
guaranteed* with the current execution model.

Changes
---

To work around these issues and guarantee liveness for
`ActorPoolMapOperator` following changes are implemented:

### APMO is aligned with task submission protocol

New inputs are submitted to the operator **only** when APMO is able to
schedule new task **immediately** (verified t/h
`op.can_accept_input()`).

If it's not able to schedule the task immediately input is rejected and
is kept in the external input queue.

### Revisited _ActorTaskSelector to keep it in sync with the Operator

Currently `_ActorTaskScheduler` might depend on an external state. This
is problematic for the following reasons:

 - This state is used to determine actors that we can safely route to
- This is used to determine whether APMO can schedule a task to run (see
above)
- However, if state changes between the check and when
`op.add_input(...)` is invoked then handling protocol will be violated.

To work this problem around we're snapshotting all external state inside
`_ActorTaskScheduler.refresh_state(...)` method:

 - State is snapshotted (refreshed periodically)
 - This way state is synchronized with the other Operator's state
- This makes it impossible for `can_schedule_task` and `select_actors`
to get out of sync

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: tiennguyentony <46289799+tiennguyentony@users.noreply.github.com>
tiennguyentony pushed a commit to tiennguyentony/ray that referenced this pull request Feb 7, 2026
…en inputs (ray-project#60763)


## Description

This change revisits `ActorPoolMapOperator` input handling & scheduling
sequence to align it with input handling protocol established in the
`StreamingExecutor` -- inputs are only to be submitted when operator is
**believed to be ready to handle it**, ie

 - It has resource budget
 - When task _could be_ launched*

*While operator might not immediately launch the task due to "bundling"
multiple inputs together, it's still expected that one of the
`op.add_input(...)` calls will eventually trigger task scheduling that
will handle **all of the previously provided inputs**.

This however is not the case for APMO:

1. APMO can refuse scheduling: for ex, when actors are fully utilized,
when actors are restarting, etc
2. When APMO refuses scheduling, it enqueues provided input bundle into
its _own internal queue_. However, draining of that queue *could not be
guaranteed* with the current execution model.

Changes
---

To work around these issues and guarantee liveness for
`ActorPoolMapOperator` following changes are implemented:

### APMO is aligned with task submission protocol

New inputs are submitted to the operator **only** when APMO is able to
schedule new task **immediately** (verified t/h
`op.can_accept_input()`).

If it's not able to schedule the task immediately input is rejected and
is kept in the external input queue.

### Revisited _ActorTaskSelector to keep it in sync with the Operator

Currently `_ActorTaskScheduler` might depend on an external state. This
is problematic for the following reasons:

 - This state is used to determine actors that we can safely route to
- This is used to determine whether APMO can schedule a task to run (see
above)
- However, if state changes between the check and when
`op.add_input(...)` is invoked then handling protocol will be violated.

To work this problem around we're snapshotting all external state inside
`_ActorTaskScheduler.refresh_state(...)` method:

 - State is snapshotted (refreshed periodically)
 - This way state is synchronized with the other Operator's state
- This makes it impossible for `can_schedule_task` and `select_actors`
to get out of sync

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: tiennguyentony <46289799+tiennguyentony@users.noreply.github.com>
tiennguyentony pushed a commit to tiennguyentony/ray that referenced this pull request Feb 7, 2026
…en inputs (ray-project#60763)


## Description

This change revisits `ActorPoolMapOperator` input handling & scheduling
sequence to align it with input handling protocol established in the
`StreamingExecutor` -- inputs are only to be submitted when operator is
**believed to be ready to handle it**, ie

 - It has resource budget
 - When task _could be_ launched*

*While operator might not immediately launch the task due to "bundling"
multiple inputs together, it's still expected that one of the
`op.add_input(...)` calls will eventually trigger task scheduling that
will handle **all of the previously provided inputs**.

This however is not the case for APMO:

1. APMO can refuse scheduling: for ex, when actors are fully utilized,
when actors are restarting, etc
2. When APMO refuses scheduling, it enqueues provided input bundle into
its _own internal queue_. However, draining of that queue *could not be
guaranteed* with the current execution model.

Changes
---

To work around these issues and guarantee liveness for
`ActorPoolMapOperator` following changes are implemented:

### APMO is aligned with task submission protocol

New inputs are submitted to the operator **only** when APMO is able to
schedule new task **immediately** (verified t/h
`op.can_accept_input()`).

If it's not able to schedule the task immediately input is rejected and
is kept in the external input queue.

### Revisited _ActorTaskSelector to keep it in sync with the Operator

Currently `_ActorTaskScheduler` might depend on an external state. This
is problematic for the following reasons:

 - This state is used to determine actors that we can safely route to
- This is used to determine whether APMO can schedule a task to run (see
above)
- However, if state changes between the check and when
`op.add_input(...)` is invoked then handling protocol will be violated.

To work this problem around we're snapshotting all external state inside
`_ActorTaskScheduler.refresh_state(...)` method:

 - State is snapshotted (refreshed periodically)
 - This way state is synchronized with the other Operator's state
- This makes it impossible for `can_schedule_task` and `select_actors`
to get out of sync


## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
elliot-barn pushed a commit that referenced this pull request Feb 9, 2026
…en inputs (#60763)

## Description

This change revisits `ActorPoolMapOperator` input handling & scheduling
sequence to align it with input handling protocol established in the
`StreamingExecutor` -- inputs are only to be submitted when operator is
**believed to be ready to handle it**, ie

 - It has resource budget
 - When task _could be_ launched*

*While operator might not immediately launch the task due to "bundling"
multiple inputs together, it's still expected that one of the
`op.add_input(...)` calls will eventually trigger task scheduling that
will handle **all of the previously provided inputs**.

This however is not the case for APMO:

1. APMO can refuse scheduling: for ex, when actors are fully utilized,
when actors are restarting, etc
2. When APMO refuses scheduling, it enqueues provided input bundle into
its _own internal queue_. However, draining of that queue *could not be
guaranteed* with the current execution model.

Changes
---

To work around these issues and guarantee liveness for
`ActorPoolMapOperator` following changes are implemented:

### APMO is aligned with task submission protocol

New inputs are submitted to the operator **only** when APMO is able to
schedule new task **immediately** (verified t/h
`op.can_accept_input()`).

If it's not able to schedule the task immediately input is rejected and
is kept in the external input queue.

### Revisited _ActorTaskSelector to keep it in sync with the Operator

Currently `_ActorTaskScheduler` might depend on an external state. This
is problematic for the following reasons:

 - This state is used to determine actors that we can safely route to
- This is used to determine whether APMO can schedule a task to run (see
above)
- However, if state changes between the check and when
`op.add_input(...)` is invoked then handling protocol will be violated.

To work this problem around we're snapshotting all external state inside
`_ActorTaskScheduler.refresh_state(...)` method:

 - State is snapshotted (refreshed periodically)
 - This way state is synchronized with the other Operator's state
- This makes it impossible for `can_schedule_task` and `select_actors`
to get out of sync


## Related issues
> Link related issues: "Fixes #1234", "Closes #1234", or "Related to
#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
elliot-barn pushed a commit that referenced this pull request Feb 9, 2026
…en inputs (#60763)

## Description

This change revisits `ActorPoolMapOperator` input handling & scheduling
sequence to align it with input handling protocol established in the
`StreamingExecutor` -- inputs are only to be submitted when operator is
**believed to be ready to handle it**, ie

 - It has resource budget
 - When task _could be_ launched*

*While operator might not immediately launch the task due to "bundling"
multiple inputs together, it's still expected that one of the
`op.add_input(...)` calls will eventually trigger task scheduling that
will handle **all of the previously provided inputs**.

This however is not the case for APMO:

1. APMO can refuse scheduling: for ex, when actors are fully utilized,
when actors are restarting, etc
2. When APMO refuses scheduling, it enqueues provided input bundle into
its _own internal queue_. However, draining of that queue *could not be
guaranteed* with the current execution model.

Changes
---

To work around these issues and guarantee liveness for
`ActorPoolMapOperator` following changes are implemented:

### APMO is aligned with task submission protocol

New inputs are submitted to the operator **only** when APMO is able to
schedule new task **immediately** (verified t/h
`op.can_accept_input()`).

If it's not able to schedule the task immediately input is rejected and
is kept in the external input queue.

### Revisited _ActorTaskSelector to keep it in sync with the Operator

Currently `_ActorTaskScheduler` might depend on an external state. This
is problematic for the following reasons:

 - This state is used to determine actors that we can safely route to
- This is used to determine whether APMO can schedule a task to run (see
above)
- However, if state changes between the check and when
`op.add_input(...)` is invoked then handling protocol will be violated.

To work this problem around we're snapshotting all external state inside
`_ActorTaskScheduler.refresh_state(...)` method:

 - State is snapshotted (refreshed periodically)
 - This way state is synchronized with the other Operator's state
- This makes it impossible for `can_schedule_task` and `select_actors`
to get out of sync


## Related issues
> Link related issues: "Fixes #1234", "Closes #1234", or "Related to
#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Kunchd pushed a commit to Kunchd/ray that referenced this pull request Feb 17, 2026
…en inputs (ray-project#60763)

## Description

This change revisits `ActorPoolMapOperator` input handling & scheduling
sequence to align it with input handling protocol established in the
`StreamingExecutor` -- inputs are only to be submitted when operator is
**believed to be ready to handle it**, ie

 - It has resource budget
 - When task _could be_ launched*

*While operator might not immediately launch the task due to "bundling"
multiple inputs together, it's still expected that one of the
`op.add_input(...)` calls will eventually trigger task scheduling that
will handle **all of the previously provided inputs**.

This however is not the case for APMO:

1. APMO can refuse scheduling: for ex, when actors are fully utilized,
when actors are restarting, etc
2. When APMO refuses scheduling, it enqueues provided input bundle into
its _own internal queue_. However, draining of that queue *could not be
guaranteed* with the current execution model.

Changes
---

To work around these issues and guarantee liveness for
`ActorPoolMapOperator` following changes are implemented:

### APMO is aligned with task submission protocol

New inputs are submitted to the operator **only** when APMO is able to
schedule new task **immediately** (verified t/h
`op.can_accept_input()`).

If it's not able to schedule the task immediately input is rejected and
is kept in the external input queue.

### Revisited _ActorTaskSelector to keep it in sync with the Operator

Currently `_ActorTaskScheduler` might depend on an external state. This
is problematic for the following reasons:

 - This state is used to determine actors that we can safely route to
- This is used to determine whether APMO can schedule a task to run (see
above)
- However, if state changes between the check and when
`op.add_input(...)` is invoked then handling protocol will be violated.

To work this problem around we're snapshotting all external state inside
`_ActorTaskScheduler.refresh_state(...)` method:

 - State is snapshotted (refreshed periodically)
 - This way state is synchronized with the other Operator's state
- This makes it impossible for `can_schedule_task` and `select_actors`
to get out of sync


## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
ans9868 pushed a commit to ans9868/ray that referenced this pull request Feb 18, 2026
…en inputs (ray-project#60763)

## Description

This change revisits `ActorPoolMapOperator` input handling & scheduling
sequence to align it with input handling protocol established in the
`StreamingExecutor` -- inputs are only to be submitted when operator is
**believed to be ready to handle it**, ie

 - It has resource budget
 - When task _could be_ launched*

*While operator might not immediately launch the task due to "bundling"
multiple inputs together, it's still expected that one of the
`op.add_input(...)` calls will eventually trigger task scheduling that
will handle **all of the previously provided inputs**.

This however is not the case for APMO:

1. APMO can refuse scheduling: for ex, when actors are fully utilized,
when actors are restarting, etc
2. When APMO refuses scheduling, it enqueues provided input bundle into
its _own internal queue_. However, draining of that queue *could not be
guaranteed* with the current execution model.

Changes
---

To work around these issues and guarantee liveness for
`ActorPoolMapOperator` following changes are implemented:

### APMO is aligned with task submission protocol

New inputs are submitted to the operator **only** when APMO is able to
schedule new task **immediately** (verified t/h
`op.can_accept_input()`).

If it's not able to schedule the task immediately input is rejected and
is kept in the external input queue.

### Revisited _ActorTaskSelector to keep it in sync with the Operator

Currently `_ActorTaskScheduler` might depend on an external state. This
is problematic for the following reasons:

 - This state is used to determine actors that we can safely route to
- This is used to determine whether APMO can schedule a task to run (see
above)
- However, if state changes between the check and when
`op.add_input(...)` is invoked then handling protocol will be violated.

To work this problem around we're snapshotting all external state inside
`_ActorTaskScheduler.refresh_state(...)` method:

 - State is snapshotted (refreshed periodically)
 - This way state is synchronized with the other Operator's state
- This makes it impossible for `can_schedule_task` and `select_actors`
to get out of sync

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Adel Nour <ans9868@nyu.edu>
Aydin-ab pushed a commit to kunling-anyscale/ray that referenced this pull request Feb 20, 2026
…en inputs (ray-project#60763)

## Description

This change revisits `ActorPoolMapOperator` input handling & scheduling
sequence to align it with input handling protocol established in the
`StreamingExecutor` -- inputs are only to be submitted when operator is
**believed to be ready to handle it**, ie

 - It has resource budget
 - When task _could be_ launched*

*While operator might not immediately launch the task due to "bundling"
multiple inputs together, it's still expected that one of the
`op.add_input(...)` calls will eventually trigger task scheduling that
will handle **all of the previously provided inputs**.

This however is not the case for APMO:

1. APMO can refuse scheduling: for ex, when actors are fully utilized,
when actors are restarting, etc
2. When APMO refuses scheduling, it enqueues provided input bundle into
its _own internal queue_. However, draining of that queue *could not be
guaranteed* with the current execution model.

Changes
---

To work around these issues and guarantee liveness for
`ActorPoolMapOperator` following changes are implemented:

### APMO is aligned with task submission protocol

New inputs are submitted to the operator **only** when APMO is able to
schedule new task **immediately** (verified t/h
`op.can_accept_input()`).

If it's not able to schedule the task immediately input is rejected and
is kept in the external input queue.

### Revisited _ActorTaskSelector to keep it in sync with the Operator

Currently `_ActorTaskScheduler` might depend on an external state. This
is problematic for the following reasons:

 - This state is used to determine actors that we can safely route to
- This is used to determine whether APMO can schedule a task to run (see
above)
- However, if state changes between the check and when
`op.add_input(...)` is invoked then handling protocol will be violated.

To work this problem around we're snapshotting all external state inside
`_ActorTaskScheduler.refresh_state(...)` method:

 - State is snapshotted (refreshed periodically)
 - This way state is synchronized with the other Operator's state
- This makes it impossible for `can_schedule_task` and `select_actors`
to get out of sync


## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…en inputs (ray-project#60763)

## Description

This change revisits `ActorPoolMapOperator` input handling & scheduling
sequence to align it with input handling protocol established in the
`StreamingExecutor` -- inputs are only to be submitted when operator is
**believed to be ready to handle it**, ie

 - It has resource budget
 - When task _could be_ launched*

*While operator might not immediately launch the task due to "bundling"
multiple inputs together, it's still expected that one of the
`op.add_input(...)` calls will eventually trigger task scheduling that
will handle **all of the previously provided inputs**.

This however is not the case for APMO:

1. APMO can refuse scheduling: for ex, when actors are fully utilized,
when actors are restarting, etc
2. When APMO refuses scheduling, it enqueues provided input bundle into
its _own internal queue_. However, draining of that queue *could not be
guaranteed* with the current execution model.

Changes
---

To work around these issues and guarantee liveness for
`ActorPoolMapOperator` following changes are implemented:

### APMO is aligned with task submission protocol

New inputs are submitted to the operator **only** when APMO is able to
schedule new task **immediately** (verified t/h
`op.can_accept_input()`).

If it's not able to schedule the task immediately input is rejected and
is kept in the external input queue.

### Revisited _ActorTaskSelector to keep it in sync with the Operator

Currently `_ActorTaskScheduler` might depend on an external state. This
is problematic for the following reasons:

 - This state is used to determine actors that we can safely route to
- This is used to determine whether APMO can schedule a task to run (see
above)
- However, if state changes between the check and when
`op.add_input(...)` is invoked then handling protocol will be violated.

To work this problem around we're snapshotting all external state inside
`_ActorTaskScheduler.refresh_state(...)` method:

 - State is snapshotted (refreshed periodically)
 - This way state is synchronized with the other Operator's state
- This makes it impossible for `can_schedule_task` and `select_actors`
to get out of sync

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…en inputs (ray-project#60763)

## Description

This change revisits `ActorPoolMapOperator` input handling & scheduling
sequence to align it with input handling protocol established in the
`StreamingExecutor` -- inputs are only to be submitted when operator is
**believed to be ready to handle it**, ie

 - It has resource budget
 - When task _could be_ launched*

*While operator might not immediately launch the task due to "bundling"
multiple inputs together, it's still expected that one of the
`op.add_input(...)` calls will eventually trigger task scheduling that
will handle **all of the previously provided inputs**.

This however is not the case for APMO:

1. APMO can refuse scheduling: for ex, when actors are fully utilized,
when actors are restarting, etc
2. When APMO refuses scheduling, it enqueues provided input bundle into
its _own internal queue_. However, draining of that queue *could not be
guaranteed* with the current execution model.

Changes
---

To work around these issues and guarantee liveness for
`ActorPoolMapOperator` following changes are implemented:

### APMO is aligned with task submission protocol

New inputs are submitted to the operator **only** when APMO is able to
schedule new task **immediately** (verified t/h
`op.can_accept_input()`).

If it's not able to schedule the task immediately input is rejected and
is kept in the external input queue.

### Revisited _ActorTaskSelector to keep it in sync with the Operator

Currently `_ActorTaskScheduler` might depend on an external state. This
is problematic for the following reasons:

 - This state is used to determine actors that we can safely route to
- This is used to determine whether APMO can schedule a task to run (see
above)
- However, if state changes between the check and when
`op.add_input(...)` is invoked then handling protocol will be violated.

To work this problem around we're snapshotting all external state inside
`_ActorTaskScheduler.refresh_state(...)` method:

 - State is snapshotted (refreshed periodically)
 - This way state is synchronized with the other Operator's state
- This makes it impossible for `can_schedule_task` and `select_actors`
to get out of sync

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ray fails to serialize self-reference objects

2 participants