Skip to content

Add support for synchronous subscription to Streams #33024

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
alanrussian opened this issue May 2, 2018 · 5 comments
Closed

Add support for synchronous subscription to Streams #33024

alanrussian opened this issue May 2, 2018 · 5 comments
Labels
area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries. area-language Dart language related items (some items might be better tracked at github.com/dart-lang/language). closed-duplicate Closed in favor of an existing report library-async

Comments

@alanrussian
Copy link

alanrussian commented May 2, 2018

It would be very beneficial for Flutter users if there were a way to synchronously subscribe to streams. The reason for this is discussed in flutter/flutter#16465. I'll briefly summarize it here:

  • Flutter has a StreamBuilder widget which takes in a Stream<T> and a (BuildContext,AsyncSnapshot<T>) -> Widget. The widget allows users to render a widget with the latest value from the stream.
  • The StreamBuilder widget renders the first frame of the widget in the same run loop that subscribes to the stream. This means that if the Stream has a value upon subscription, an unnecessary intermediary frame will be drawn resulting in an unacceptable jumpy UI. The only possible fix here appears to be to add support for synchronous subscription to streams.
  • Discussion within the Flutter issue talks about possible workarounds and why I don't find them to be a good solution.

Is it possible to add support for synchronous stream subscription at some point in Dart? From my internal discussion with @lrhn, it would not be possible to change the existing behavior without breaking clients, but could it be made a parameter to listen or a separate method?

cc @floitschG

@anders-sandholm anders-sandholm added area-language Dart language related items (some items might be better tracked at github.com/dart-lang/language). area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries. library-async labels May 7, 2018
@lrhn
Copy link
Member

lrhn commented May 7, 2018

It's definitely possible to do something in the direction, but it feels like sub-optimal API-design to have a way to "subscribe synchronously" (meaning that the subscription becomes active before the onListen call instead of after) because it gives two subtly different ways to do the same thing, and no guidance to users about which one to choose. And it is defined in terms of onListen, which is not a Stream term, but a controller term, so it breaks the Stream abstraction - not all streams come from controllers, in fact, most don't (they just wrap something that might).

I also don't want to just generally allow sending events during onListen. It's too dangerous because events might be lost if listeners haven't been set up yet. Errors will become uncaught if error handlers haven't been set up. Any code in onData handlers that try to access the subscription object will find null on the first event. The experience we got when initially adding streams to the language was that it was simply too hard for users to reason about if callbacks were called before you were done setting up.

That means, really, that what you are asking for is a different kind of stream. One that has a behavior that you opt in to by picking that other kind of stream, and which is not interchangeable with a normal stream.

Maybe just make a stream transformation like:

import "dart:async";
import "package:async/async.dart";

class InitialStateStream<T> extends Stream<T> {
  StreamSubscription<T> _source;
  bool _isDone;
  Result _initialState;
  InitialStateStream(Stream<T> source, {bool cancelOnEarlyError = false}) {
    _source = source.listen((v) {
      _initialState = Result.value(v);
    }, onError: (e, s) {
      _initialState = Result.error(e, s);
      if (cancelOnEarlyError) {
        _source.cancel();
        _isDone = true;
      }
    }, onDone: () {
      _initialState = null;
      _isDone = true;
    });
  }
  StreamSubscription<T> listen(void onData(T data),
      {Function onError, void onDone(), bool cancelOnError = false}) {
    _source.onData(onData);
    _source.onError(onError); // Actually need to do something about cancelOnError too.
    _source.onDone(onDone);
    if (_initialState != null) {
      if (_initialState.isValue) {
        if (onData != null) onData(_initialState.asValue.value);
      } else if (onError != null) {
        ErrorResult error = _initialState;
        if (onError is void Function(Null, Null)) {
          onError(error.error, error.stackTrace);
        } else {
          onError(error.error);
        }
      }
    }
    if (_isDone) scheduleMicrotask(onDone);
    return _source;
  }
}

(NOT TESTED)

@alanrussian
Copy link
Author

Thanks for the explanation, Lasse. If it makes more sense to create a separate Stream to serve this purpose, that's fine. Are there any chances that such a Stream type could be added to the standard libraries?

cc @lestathc

@alanrussian
Copy link
Author

I suppose this is a duplicate of #30532?

@alanrussian
Copy link
Author

I think there's a very similar issue with Futures. Let's say I have an API to get some data, and I want it to fetch the data on the first call but return cached data immediately on subsequent calls. With Dart, it's unclear to me how to do this because a future always completes in the next runloop (see Future.value). I suppose I could return FutureOr but that seems like a weird return type.

@lrhn
Copy link
Member

lrhn commented Jun 26, 2018

It's similar with futures, and we deliberately did not give users a way to access the value synchronously, or even detect whether it was available. Again, it would make for multiple ways to do the same thing, and we'd see code that tried to do both in the name of "performance" (ignoring that they double their code and likely their bugs along with it).

What would work is an "async cache" like:

class AsyncCache<T> {
  bool get hasValue;
  T get value;  // throws if doesn't have value.
  Future<T> get eventualValue;  // When it's ready.
}

You could definitely populate something like that using a Future.

And yes, it's a duplicate of #30532. I'll close this one for now, and probably close #30532 later.

@lrhn lrhn closed this as completed Jun 26, 2018
@lrhn lrhn added the closed-duplicate Closed in favor of an existing report label Jun 26, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries. area-language Dart language related items (some items might be better tracked at github.com/dart-lang/language). closed-duplicate Closed in favor of an existing report library-async
Projects
None yet
Development

No branches or pull requests

3 participants