Skip to content

SchedulePeriodically Signature #856

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ trait Scheduler {
override def call(inner: rx.Scheduler.Inner): Unit = action(javaInnerToScalaInner(inner))
},
initialDelay.toNanos,
duration.NANOSECONDS,
period.toNanos,
duration.NANOSECONDS
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,16 @@ public void testInvalidDelayValues() {
final Action1<Inner> action = mock(Action1.class);

exception.expect(IllegalArgumentException.class);
scheduler.schedulePeriodically(action, -1L, 100L, TimeUnit.SECONDS);
scheduler.schedulePeriodically(action, -1L, TimeUnit.SECONDS, 100L, TimeUnit.SECONDS);

exception.expect(IllegalArgumentException.class);
scheduler.schedulePeriodically(action, 100L, -1L, TimeUnit.SECONDS);
scheduler.schedulePeriodically(action, 100L, TimeUnit.SECONDS, -1L, TimeUnit.SECONDS);

exception.expect(IllegalArgumentException.class);
scheduler.schedulePeriodically(action, 1L + Integer.MAX_VALUE, 100L, TimeUnit.MILLISECONDS);
scheduler.schedulePeriodically(action, 1L + Integer.MAX_VALUE, TimeUnit.MILLISECONDS, 100L, TimeUnit.MILLISECONDS);

exception.expect(IllegalArgumentException.class);
scheduler.schedulePeriodically(action, 100L, 1L + Integer.MAX_VALUE / 1000, TimeUnit.SECONDS);
scheduler.schedulePeriodically(action, 100L, TimeUnit.SECONDS, 1L + Integer.MAX_VALUE / 1000, TimeUnit.SECONDS);
}

@Test
Expand All @@ -80,7 +80,7 @@ public void call(Inner inner) {
}
};

Subscription sub = scheduler.schedulePeriodically(action, 50, 200, TimeUnit.MILLISECONDS);
Subscription sub = scheduler.schedulePeriodically(action, 50, TimeUnit.MILLISECONDS, 200, TimeUnit.MILLISECONDS);

if (!latch.await(5000, TimeUnit.MILLISECONDS)) {
fail("timed out waiting for tasks to execute");
Expand Down
10 changes: 6 additions & 4 deletions rxjava-core/src/main/java/rx/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,16 @@ public abstract class Scheduler {
* The action to execute periodically.
* @param initialDelay
* Time to wait before executing the action for the first time.
* @param initialDelayUnit
* The time unit the interval above is given in.
* @param period
* The time interval to wait each time in between executing the action.
* @param unit
* @param periodUnit
* The time unit the interval above is given in.
* @return A subscription to be able to unsubscribe from action.
*/
public Subscription schedulePeriodically(final Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit) {
final long periodInNanos = unit.toNanos(period);
public Subscription schedulePeriodically(final Action1<Scheduler.Inner> action, long initialDelay, TimeUnit initialDelayUnit, long period, TimeUnit periodUnit) {
final long periodInNanos = periodUnit.toNanos(period);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather chose a common unit here instead of converting to nanos.

TimeUnit common = initialDelayUnit.compareTo(periodUnit) < 0 ? initialDelayUnit : periodUnit;

final long initial = common.convert(initialDelay, initialDelayUnit);
final long period = common.convert(period, periodUnit);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The common unit is nanos. It already was before this change.


final Action1<Scheduler.Inner> recursiveAction = new Action1<Scheduler.Inner>() {
@Override
Expand All @@ -91,7 +93,7 @@ public void call(Inner inner) {
}
}
};
return schedule(recursiveAction, initialDelay, unit);
return schedule(recursiveAction, initialDelay, initialDelayUnit);
}

public abstract static class Inner implements Subscription {
Expand Down
4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/operators/ChunkedOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ public TimeBasedChunkCreator(final NonOverlappingChunks<T, C> chunks, long time,
public void call(Inner inner) {
chunks.emitAndReplaceChunk();
}
}, 0, time, unit));
}, 0, TimeUnit.MILLISECONDS, time, unit));
}

public TimeBasedChunkCreator(final OverlappingChunks<T, C> chunks, long time, TimeUnit unit, Scheduler scheduler) {
Expand All @@ -582,7 +582,7 @@ public TimeBasedChunkCreator(final OverlappingChunks<T, C> chunks, long time, Ti
public void call(Inner inner) {
chunks.createChunk();
}
}, 0, time, unit));
}, 0, TimeUnit.MILLISECONDS, time, unit));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void call(Inner inner) {
observer.onNext(currentValue);
currentValue++;
}
}, period, period, unit);
}, period, unit, period, unit);

return Subscriptions.create(new Action0() {
@Override
Expand Down
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/operators/OperationTimer.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public Subscription onSubscribe(final Observer<? super Long> t1) {
public void call(Inner inner) {
t1.onNext(count++);
}
}, initialDelay, period, unit);
}, initialDelay, unit, period, unit);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public Subscription schedule(Action1<Inner> action, long delayTime, TimeUnit uni
}

@Override
public Subscription schedulePeriodically(final Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit) {
public Subscription schedulePeriodically(final Action1<Scheduler.Inner> action, long initialDelay, TimeUnit delayUnit, long period, TimeUnit periodUnit) {
if (executor instanceof ScheduledExecutorService) {
final InnerExecutorScheduler inner = new InnerExecutorScheduler();
ScheduledFuture<?> f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(new Runnable() {
Expand All @@ -72,12 +72,12 @@ public void run() {
}
action.call(inner);
}
}, initialDelay, period, unit);
}, delayUnit.toMillis(initialDelay), periodUnit.toMillis(period), TimeUnit.MILLISECONDS);

inner.innerSubscription.set(Subscriptions.from(f));
return inner;
} else {
return super.schedulePeriodically(action, initialDelay, period, unit);
return super.schedulePeriodically(action, initialDelay, delayUnit, period, periodUnit);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void call(Inner inner) {
System.out.println(scheduler.now());
calledOp.call(scheduler.now());
}
}, 1, 2, TimeUnit.SECONDS);
}, 1, TimeUnit.SECONDS, 2, TimeUnit.SECONDS);

verify(calledOp, never()).call(anyLong());

Expand Down