-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Trying to extend the Scheduler interface according to the comments at #229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
RxJava-pull-requests #85 SUCCESS |
Awesome. Looks like it doesn't break a I thought about two examples final Scheduler scheduler = Schedulers.currentThread();
Observable<Integer> obs = Observable.create(new Func1<Observer<Integer>, Subscription>() {
@Override
public Subscription call(final Observer<Integer> observer) {
return scheduler.schedule(0, new Func2<Scheduler, Integer, Subscription>() {
@Override
public Subscription call(Scheduler scheduler, Integer i) {
if (i > 42) {
observer.onCompleted();
return Subscriptions.empty();
}
observer.onNext(i);
return scheduler.schedule(i + 1, this);
}
});
}
});
obs.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Completed");
}
@Override
public void onError(Exception e) {
System.out.println("Error");
}
@Override
public void onNext(Integer args) {
System.out.println(args);
}
}); and final Scheduler scheduler = Schedulers.threadPoolForComputation();
Observable<Integer> obs = Observable.create(new Func1<Observer<Integer>, Subscription>() {
@Override
public Subscription call(final Observer<Integer> observer) {
return scheduler.schedule(new BooleanSubscription(), new Func2<Scheduler, BooleanSubscription, Subscription>() {
@Override
public Subscription call(Scheduler scheduler, BooleanSubscription cancel) {
if (cancel.isUnsubscribed()) {
observer.onCompleted();
return Subscriptions.empty();
}
observer.onNext(42);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
scheduler.schedule(cancel, this);
return cancel;
}
});
}
});
Subscription subscribe = obs.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Completed");
}
@Override
public void onError(Exception e) {
System.out.println("Error");
}
@Override
public void onNext(Integer args) {
System.out.println(args);
}
});
Thread.sleep(10000);
subscribe.unsubscribe();
System.out.println("unsubscribe");
Thread.sleep(10000); And they worked well for me. |
I'm wondering if the Rx has 3 primary methods (the rest are extension methods that look like they do transformations). http://msdn.microsoft.com/en-us/library/hh211963(v=vs.103).aspx Thus the 2 key ones are: <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action);
<T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit); We don't have one with an explicit time to run, only relative at this time. |
It appears the overloads all make sense ... but again .Net is able to use extension methods to make the design much more elegant where only the main 3 are part of the interface and the rest come along for the ride. It means we end up with a Scheduler/AbstractScheduler Interface/Abstract pairing to make this work. Should we just make Scheduler an Abstract? I'm very tempted to do so because of the following problems:
Or should be remove all but the main 3 methods from Scheduler and put all the overloads as utility functions on the Schedulers class instead? The precedent for using abstract (or concrete) classes instead of interfaces (which .Net then augments with extension methods) is already done - Observable is a concrete class instead of interface for this very reason. All plugins are done as abstracts instead of interfaces for this reason as well. Thoughts? |
While reviewing and playing with this I considered some changes I'd like to propose. I have submitted another pull request (#235) that builds on top of your work @jmhofer and includes the unit tests from @mairbek I would appreciate your thoughts on it and whether they are beneficial changes or just pedantic. |
Merged manually via #235 |
Work done in ReactiveX#229 added the following methods: - Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit)} - Subscription schedule(T state, Func2<Scheduler, T, Subscription> action)} These are in fact the primary methods from RxNet (http://msdn.microsoft.com/en-us/library/hh211963(v=vs.103).aspx) and the others are just helper overloads. It seems it is better to set the precedent to use these 2 methods for actual implementation logic while all other methods are just decorating and forwarding from AbstractScheduler to these methods. I have updated the various implementations to achieve this. Unit tests are passing … but we don't have enough unit test coverage so I won't be surprised if bugs are found.
These tests came from @mairbek at ReactiveX#229 (comment)
I've tried to extend the
Scheduler
interface according to the discussion at issue #19. If I understand this correctly, most of it can be done via theAbstractScheduler
.I hope this helps. Please have a look.