Skip to content

Strictly define operator Fusion contract for Reactor subscribers (internal and user-made) #2636

@OlegDokuka

Description

@OlegDokuka

At the moment we have unconsolidated behaviors in the operators' fusion, which exposed how downstream (a.k.a Queue Drainer) notifies upstream (a.k.a Queue Source) about completion on doing any operators with a queue. This consolidation is even more important in the light when all the elements from the queue MUST be discarded.

This issue stands for providing a clear contract between upstream and downstream in case of fusion between them:

If Fusion established then:

  1. The Queue source must never call methods poll or clear on its queue until it is notified that downstream is not working with that queue.
  2. The Queue source upon reception of the clear call, must discard all the elements from the queue if the queue is known as finite
  3. The Queue drainer must notify the queue source via the queueSubscription.clear method that it is done with all operation on that queue and will never call the poll method anymore
  4. The Queue drainer must call the queueSubscription.clear only once and ensure it is never called concurrently.
  5. The Queue drainer must call the queueSubscription.clear after reception of any of the terminal signals which are (ON_ERROR, ON_COMPLETE, and CANCEL)

if Fusion not established then:

  1. The Queue source must manage the queue on its own and is responsible for terminating all interaction with that queue upon reception of any of the terminal signals which are (ON_ERROR, ON_COMPLETE, and CANCEL) and discarding all the elements from it if the queue is known as finite

By consolidating the behaviors, we will be able to resolve a list of issues related to cases when downstream try to blindly discard elements from the upstream's queue without full context:

  1. FluxIterable fused with any downstream led to infinity looping if the source iterable is infinite (Spring Boot RSocket high CPU usage after client disconnects rsocket/rsocket-java#992)
  2. FluxStream fused with any downstream let to infinity looping if the source is an infinite stream.
  3. FluxGenerate fused with any downstream let to infinity looping if the generate is infinite.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions