Skip to content

Wrong behavior of async* function after pause-resume-pause #49465

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
sgrekhov opened this issue Jul 18, 2022 · 3 comments
Closed

Wrong behavior of async* function after pause-resume-pause #49465

sgrekhov opened this issue Jul 18, 2022 · 3 comments
Labels
area-vm Use area-vm for VM related issues, including code coverage, and the AOT and JIT backends. closed-as-intended Closed as the reported issue is expected behavior type-bug Incorrect behavior (everything from a crash to more subtle misbehavior)

Comments

@sgrekhov
Copy link
Contributor

Generator function marked as async* performs excessive generation cicle after resume()

Example:

import "dart:async";

List<int> readyToSent = [];
List<int> sent = [];

Stream<int> generator() async* {
  for (int i = 1; i <= 5; i++) {
    readyToSent.add(i);
    yield i;
    sent.add(i);
  }
}

main() async {
  List received = [];
  Stream<int> s = generator();
  late StreamSubscription<int> ss;
  ss = s.listen((int i) async {
    received.add(i);
    if (i == 2) {
      print(sent); // [1]
      print(readyToSent); // [1, 2]
      ss.pause();
      await Future.delayed(Duration(milliseconds: 100));
      print(sent); // [1]
      print(readyToSent); // [1,2]
      ss.resume();
      print(sent); // [1]
      print(readyToSent); // [1, 2]
      ss.pause();
      await Future.delayed(Duration(milliseconds: 100));
      print(sent); // [1, 2] Why?
      print(readyToSent); // [1, 2, 3]
      await ss.cancel();
    }
  });
  await Future.delayed(Duration(seconds: 1));
  print("F1: $received"); // [1, 2]
  print("F2: $sent");     // [1, 2]
  print("F3: $readyToSent");  // [1, 2, 3]
}

Tested on the edge version of SDK on Linux

Cc @lrhn

@sgrekhov sgrekhov changed the title Wrong behavior of asyn* function after pause-resume-pause Wrong behavior of async* function after pause-resume-pause Jul 18, 2022
@mkustermann mkustermann added the area-vm Use area-vm for VM related issues, including code coverage, and the AOT and JIT backends. label Jul 21, 2022
@mkustermann
Copy link
Member

/cc @alexmarkov

@mkustermann mkustermann added the type-bug Incorrect behavior (everything from a crash to more subtle misbehavior) label Jul 21, 2022
@alexmarkov
Copy link
Contributor

Calling ss.resume() schedules a micro-task to resume execution of the async* generator, and the subsequent ss.pause() doesn't have any effect until the next yield.The implementation follows this statement from the spec:

If paused, the function is blocked at the yield statement until the subscription is resumed or canceled.

(https://github.com/dart-lang/language/blob/master/accepted/future-releases/async-star-behavior/feature-specification.md)

The spec doesn't clarify what happens if subscription is resumed but paused again before async* generator has a chance to run.

Maybe we should update the spec to explicitly mention that yield/yield* should check if the subscription is paused and canceled both when entering yield/yield* and right before resuming the execution of async* function.
Currently VM checks for cancellation twice, but it checks for paused subscription only once (when deciding if we need to schedule a micro-task or not), and resuming subscription schedules a micro-task which may run when the subscription is paused again.

@lrhn Could you confirm that we need the 2nd check for the paused subscription immediately before resuming the async* body?

@lrhn
Copy link
Member

lrhn commented Jul 21, 2022

The spec doesn't clarify what happens if subscription is resumed but paused again before async* generator has a chance to run.

That was deliberate.
The moment the subscription is resumed, it's fine to resume the source stream whatever that means. It could mean synchronously starting some slow machinery in the background and moving on, which means that the time to check for pause at the yield is past.

So the current behavior is valid and within specification.

The spec should say what is required:

Execution of yield e; proceeds as follows:

  • e is evaluated to a value v. (If this evaluation throws, so does yield e, as usual.)
  • The value v is delivered as an event to the stream subscription corresponding to the current invocation.
    • If that subscription is paused, the event cannot be delivered until the subscription is resumed.
    • If that subscription is already cancelled, the event is considered delivered immediately.
  • After the event has been delivered (this can happen at any later time, but checking immediately after delivering the event synchronously is allowed - and encouraged for speed):
    • If the subscription is cancelled, execution completes by returning without a value.
    • If the subscription is paused, execution suspends.
      • While suspended, if the subscription is cancelled, execution completes by returning without a value.
      • While suspended, if the subscription is resumed, execution is un-suspended and completes normally at a later time (can be any later time, but must not be immediately/synchronously during the resume call.)
    • Otherwise execution completes normally.

Technically, the VM only needs to check for cancellation once: After checking for/resuming from a pause.

The minimal implementation will be:

  • Use a synchronous stream controller
  • Deliver event (controller.add(value);). If paused, event is buffered. If cancelled, event is discarded.
  • If paused (controller.isPaused): Wait for resume or cancel (resume won't be signaled until buffered events have been delivered).
  • (Get here if not paused or paused and resumed/cancelled)
  • If cancelled: return;.

I usually write it as:

StreamController<T> $c = ...
Completer<void>? _suspendedAtYield;
$c.onCancel = $c.onResume = () {
  _suspendedAtYield?.complete(null);
  _suspendedAtYield = null;
};

....
  // `yield e` becomes:
  $c.add(e);
  if ($c.isPaused) {
    await (_suspendedAtYield = Completer()).future;
  }
  if (!$c.hasListener) return;

With the current synchronous StreamController implementation, I think that should be a valid desugaring of yield.

@alexmarkov alexmarkov closed this as not planned Won't fix, can't repro, duplicate, stale Jul 25, 2022
@alexmarkov alexmarkov added the closed-as-intended Closed as the reported issue is expected behavior label Jul 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-vm Use area-vm for VM related issues, including code coverage, and the AOT and JIT backends. closed-as-intended Closed as the reported issue is expected behavior type-bug Incorrect behavior (everything from a crash to more subtle misbehavior)
Projects
None yet
Development

No branches or pull requests

4 participants