diff --git a/Sources/Action/Action.swift b/Sources/Action/Action.swift
index 707c8e39..82511e28 100644
--- a/Sources/Action/Action.swift
+++ b/Sources/Action/Action.swift
@@ -72,7 +72,7 @@ public final class Action {
return Observable.empty()
}
}
- .shareReplay(1)
+ .share()
elements = executionObservables
.flatMap { $0.catchError { _ in Observable.empty() } }
@@ -120,20 +120,26 @@ public final class Action {
@discardableResult
public func execute(_ value: Input) -> Observable {
- let subject = ReplaySubject.createUnbounded()
-
- executionObservables
+ defer {
+ inputs.onNext(value)
+ }
+
+ let execution = executionObservables
.take(1)
- .flatMap { $0.catchError { _ in Observable.never() } }
- .bindTo(subject)
- .addDisposableTo(disposeBag)
+ .flatMap { $0 }
+ .catchError { throw ActionError.underlyingError($0) }
- errors
- .map { throw $0 }
- .bindTo(subject)
- .addDisposableTo(disposeBag)
+ let notEnabledError = inputs
+ .takeUntil(executionObservables)
+ .withLatestFrom(enabled)
+ .flatMap { $0 ? Observable.empty() : Observable.error(ActionError.notEnabled) }
- inputs.onNext(value)
+ let subject = ReplaySubject.createUnbounded()
+ Observable
+ .of(execution, notEnabledError)
+ .merge()
+ .subscribe(subject)
+ .addDisposableTo(disposeBag)
return subject.asObservable()
}
diff --git a/Tests/ActionTests/ActionTests.swift b/Tests/ActionTests/ActionTests.swift
index a1e3a428..5b663bc5 100644
--- a/Tests/ActionTests/ActionTests.swift
+++ b/Tests/ActionTests/ActionTests.swift
@@ -374,7 +374,7 @@ class ActionTests: QuickSpec {
executionObservables = scheduler.createObserver(Observable.self)
}
- func bindAndExecute(action: Action) {
+ func bindAndExecuteTwice(action: Action) {
action.executionObservables
.bindTo(executionObservables)
.addDisposableTo(disposeBag)
@@ -385,73 +385,87 @@ class ActionTests: QuickSpec {
.addDisposableTo(disposeBag)
}
+ scheduler.scheduleAt(20) {
+ action.execute("b")
+ .bindTo(element)
+ .addDisposableTo(disposeBag)
+ }
+
scheduler.start()
}
context("single element action") {
beforeEach {
action = Action { Observable.just($0) }
- bindAndExecute(action: action)
+ bindAndExecuteTwice(action: action)
}
- it("element receives single value") {
+ it("element receives single value for each execution") {
XCTAssertEqual(element.events, [
next(10, "a"),
completed(10),
+ next(20, "b"),
+ completed(20),
])
}
- it("executes once") {
- expect(executionObservables.events.count) == 1
+ it("executes twice") {
+ expect(executionObservables.events.count) == 2
}
}
context("multiple element action") {
beforeEach {
action = Action { Observable.of($0, $0, $0) }
- bindAndExecute(action: action)
+ bindAndExecuteTwice(action: action)
}
- it("element receives mutiple values") {
+ it("element receives 3 values for each execution") {
XCTAssertEqual(element.events, [
next(10, "a"),
next(10, "a"),
next(10, "a"),
completed(10),
+ next(20, "b"),
+ next(20, "b"),
+ next(20, "b"),
+ completed(20),
])
}
- it("executes once") {
- expect(executionObservables.events.count) == 1
+ it("executes twice") {
+ expect(executionObservables.events.count) == 2
}
}
context("error action") {
beforeEach {
action = Action { _ in Observable.error(TestError) }
- bindAndExecute(action: action)
+ bindAndExecuteTwice(action: action)
}
it("element fails with underlyingError") {
XCTAssertEqual(element.events, [
- error(10, ActionError.underlyingError(TestError))
+ error(10, ActionError.underlyingError(TestError)),
+ error(20, ActionError.underlyingError(TestError)),
])
}
- it("executes once") {
- expect(executionObservables.events.count) == 1
+ it("executes twice") {
+ expect(executionObservables.events.count) == 2
}
}
context("disabled") {
beforeEach {
action = Action(enabledIf: Observable.just(false)) { Observable.just($0) }
- bindAndExecute(action: action)
+ bindAndExecuteTwice(action: action)
}
it("element fails with notEnabled") {
XCTAssertEqual(element.events, [
- error(10, ActionError.notEnabled)
+ error(10, ActionError.notEnabled),
+ error(20, ActionError.notEnabled),
])
}
@@ -459,6 +473,56 @@ class ActionTests: QuickSpec {
expect(executionObservables.events).to(beEmpty())
}
}
+
+ context("execute while executing") {
+ var secondElement: TestableObserver!
+ var trigger: PublishSubject!
+
+ beforeEach {
+ secondElement = scheduler.createObserver(String.self)
+ trigger = PublishSubject()
+ action = Action { Observable.just($0).sample(trigger) }
+
+ action.executionObservables
+ .bindTo(executionObservables)
+ .addDisposableTo(disposeBag)
+
+ scheduler.scheduleAt(10) {
+ action.execute("a")
+ .bindTo(element)
+ .addDisposableTo(disposeBag)
+ }
+
+ scheduler.scheduleAt(20) {
+ action.execute("b")
+ .bindTo(secondElement)
+ .addDisposableTo(disposeBag)
+ }
+
+ scheduler.scheduleAt(30) {
+ trigger.onNext()
+ }
+
+ scheduler.start()
+ }
+
+ it("first element receives single value") {
+ XCTAssertEqual(element.events, [
+ next(30, "a"),
+ completed(30),
+ ])
+ }
+
+ it("second element fails with notEnabled error") {
+ XCTAssertEqual(secondElement.events, [
+ error(20, ActionError.notEnabled)
+ ])
+ }
+
+ it("executes once") {
+ expect(executionObservables.events.count) == 1
+ }
+ }
}
}
}