Skip to content

Commit 9ded95d

Browse files
Merge pull request #883 from zsxwing/swing-thread-safe
Make Subscriptions of SwingObservable thread-safe
2 parents ef2da8a + 7babfaf commit 9ded95d

File tree

7 files changed

+375
-196
lines changed

7 files changed

+375
-196
lines changed

rxjava-contrib/rxjava-swing/src/main/java/rx/observables/SwingObservable.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Set;
2626

2727
import javax.swing.AbstractButton;
28+
import javax.swing.SwingUtilities;
2829

2930
import rx.Observable;
3031
import rx.functions.Func1;
@@ -140,4 +141,15 @@ public static Observable<ComponentEvent> fromComponentEvents(Component component
140141
public static Observable<Dimension> fromResizing(Component component) {
141142
return ComponentEventSource.fromResizing(component);
142143
}
144+
145+
/**
146+
* Check if the current thead is the event dispatch thread.
147+
*
148+
* @throws IllegalStateException if the current thread is not the event dispatch thread.
149+
*/
150+
public static void assertEventDispatchThread() {
151+
if (!SwingUtilities.isEventDispatchThread()) {
152+
throw new IllegalStateException("Need to run in the event dispatch thread, but was " + Thread.currentThread());
153+
}
154+
}
143155
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.subscriptions;
17+
18+
import javax.swing.SwingUtilities;
19+
20+
import rx.Scheduler.Inner;
21+
import rx.Subscription;
22+
import rx.schedulers.SwingScheduler;
23+
import rx.functions.Action0;
24+
import rx.functions.Action1;
25+
26+
public final class SwingSubscriptions {
27+
28+
private SwingSubscriptions() {
29+
// no instance
30+
}
31+
32+
/**
33+
* Create an Subscription that always runs <code>unsubscribe</code> in the event dispatch thread.
34+
*
35+
* @param unsubscribe
36+
* @return an Subscription that always runs <code>unsubscribe</code> in the event dispatch thread.
37+
*/
38+
public static Subscription unsubscribeInEventDispatchThread(final Action0 unsubscribe) {
39+
return Subscriptions.create(new Action0() {
40+
@Override
41+
public void call() {
42+
if (SwingUtilities.isEventDispatchThread()) {
43+
unsubscribe.call();
44+
} else {
45+
SwingScheduler.getInstance().schedule(new Action1<Inner>() {
46+
@Override
47+
public void call(Inner inner) {
48+
unsubscribe.call();
49+
}
50+
});
51+
}
52+
}
53+
});
54+
}
55+
}

rxjava-contrib/rxjava-swing/src/main/java/rx/swing/sources/AbstractButtonSource.java

Lines changed: 54 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
*/
1616
package rx.swing.sources;
1717

18-
import static org.mockito.Mockito.*;
18+
import static org.mockito.Mockito.mock;
19+
import static org.mockito.Mockito.never;
20+
import static org.mockito.Mockito.times;
21+
import static org.mockito.Mockito.verify;
1922

2023
import java.awt.event.ActionEvent;
2124
import java.awt.event.ActionListener;
@@ -26,76 +29,84 @@
2629
import org.mockito.Matchers;
2730

2831
import rx.Observable;
29-
import rx.Observable.OnSubscribeFunc;
30-
import rx.Observer;
32+
import rx.Observable.OnSubscribe;
33+
import rx.Subscriber;
3134
import rx.Subscription;
3235
import rx.functions.Action0;
3336
import rx.functions.Action1;
34-
import rx.subscriptions.Subscriptions;
37+
import rx.observables.SwingObservable;
38+
import rx.subscriptions.SwingSubscriptions;
3539

3640
public enum AbstractButtonSource { ; // no instances
3741

3842
/**
3943
* @see rx.observables.SwingObservable#fromButtonAction
4044
*/
4145
public static Observable<ActionEvent> fromActionOf(final AbstractButton button) {
42-
return Observable.create(new OnSubscribeFunc<ActionEvent>() {
46+
return Observable.create(new OnSubscribe<ActionEvent>() {
4347
@Override
44-
public Subscription onSubscribe(final Observer<? super ActionEvent> observer) {
48+
public void call(final Subscriber<? super ActionEvent> subscriber) {
49+
SwingObservable.assertEventDispatchThread();
4550
final ActionListener listener = new ActionListener() {
4651
@Override
4752
public void actionPerformed(ActionEvent e) {
48-
observer.onNext(e);
53+
subscriber.onNext(e);
4954
}
5055
};
5156
button.addActionListener(listener);
52-
53-
return Subscriptions.create(new Action0() {
57+
subscriber.add(SwingSubscriptions.unsubscribeInEventDispatchThread(new Action0() {
5458
@Override
5559
public void call() {
5660
button.removeActionListener(listener);
5761
}
58-
});
62+
}));
5963
}
6064
});
6165
}
6266

6367
public static class UnitTest {
6468
@Test
65-
public void testObservingActionEvents() {
66-
@SuppressWarnings("unchecked")
67-
Action1<ActionEvent> action = mock(Action1.class);
68-
@SuppressWarnings("unchecked")
69-
Action1<Throwable> error = mock(Action1.class);
70-
Action0 complete = mock(Action0.class);
71-
72-
final ActionEvent event = new ActionEvent(this, 1, "command");
73-
74-
@SuppressWarnings("serial")
75-
class TestButton extends AbstractButton {
76-
void testAction() {
77-
fireActionPerformed(event);
69+
public void testObservingActionEvents() throws Throwable {
70+
SwingTestHelper.create().runInEventDispatchThread(new Action0() {
71+
72+
@Override
73+
public void call() {
74+
@SuppressWarnings("unchecked")
75+
Action1<ActionEvent> action = mock(Action1.class);
76+
@SuppressWarnings("unchecked")
77+
Action1<Throwable> error = mock(Action1.class);
78+
Action0 complete = mock(Action0.class);
79+
80+
final ActionEvent event = new ActionEvent(this, 1, "command");
81+
82+
@SuppressWarnings("serial")
83+
class TestButton extends AbstractButton {
84+
void testAction() {
85+
fireActionPerformed(event);
86+
}
87+
}
88+
89+
TestButton button = new TestButton();
90+
Subscription sub = fromActionOf(button).subscribe(action, error, complete);
91+
92+
verify(action, never()).call(Matchers.<ActionEvent> any());
93+
verify(error, never()).call(Matchers.<Throwable> any());
94+
verify(complete, never()).call();
95+
96+
button.testAction();
97+
verify(action, times(1)).call(Matchers.<ActionEvent> any());
98+
99+
button.testAction();
100+
verify(action, times(2)).call(Matchers.<ActionEvent> any());
101+
102+
sub.unsubscribe();
103+
button.testAction();
104+
verify(action, times(2)).call(Matchers.<ActionEvent> any());
105+
verify(error, never()).call(Matchers.<Throwable> any());
106+
verify(complete, never()).call();
78107
}
79-
}
80-
81-
TestButton button = new TestButton();
82-
Subscription sub = fromActionOf(button).subscribe(action, error, complete);
83-
84-
verify(action, never()).call(Matchers.<ActionEvent>any());
85-
verify(error, never()).call(Matchers.<Throwable>any());
86-
verify(complete, never()).call();
87-
88-
button.testAction();
89-
verify(action, times(1)).call(Matchers.<ActionEvent>any());
90-
91-
button.testAction();
92-
verify(action, times(2)).call(Matchers.<ActionEvent>any());
93-
94-
sub.unsubscribe();
95-
button.testAction();
96-
verify(action, times(2)).call(Matchers.<ActionEvent>any());
97-
verify(error, never()).call(Matchers.<Throwable>any());
98-
verify(complete, never()).call();
108+
109+
}).awaitTerminal();
99110
}
100111
}
101112
}

rxjava-contrib/rxjava-swing/src/main/java/rx/swing/sources/ComponentEventSource.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,60 +15,59 @@
1515
*/
1616
package rx.swing.sources;
1717

18-
import static rx.swing.sources.ComponentEventSource.Predicate.*;
18+
import static rx.swing.sources.ComponentEventSource.Predicate.RESIZED;
1919

2020
import java.awt.Component;
2121
import java.awt.Dimension;
2222
import java.awt.event.ComponentEvent;
2323
import java.awt.event.ComponentListener;
2424

2525
import rx.Observable;
26-
import rx.Observable.OnSubscribeFunc;
27-
import rx.Observer;
28-
import rx.Subscription;
26+
import rx.Observable.OnSubscribe;
27+
import rx.Subscriber;
2928
import rx.functions.Action0;
3029
import rx.functions.Func1;
3130
import rx.observables.SwingObservable;
32-
import rx.subscriptions.Subscriptions;
31+
import rx.subscriptions.SwingSubscriptions;
3332

3433
public enum ComponentEventSource { ; // no instances
3534

3635
/**
3736
* @see rx.observables.SwingObservable#fromComponentEvents
3837
*/
3938
public static Observable<ComponentEvent> fromComponentEventsOf(final Component component) {
40-
return Observable.create(new OnSubscribeFunc<ComponentEvent>() {
39+
return Observable.create(new OnSubscribe<ComponentEvent>() {
4140
@Override
42-
public Subscription onSubscribe(final Observer<? super ComponentEvent> observer) {
41+
public void call(final Subscriber<? super ComponentEvent> subscriber) {
42+
SwingObservable.assertEventDispatchThread();
4343
final ComponentListener listener = new ComponentListener() {
4444
@Override
4545
public void componentHidden(ComponentEvent event) {
46-
observer.onNext(event);
46+
subscriber.onNext(event);
4747
}
4848

4949
@Override
5050
public void componentMoved(ComponentEvent event) {
51-
observer.onNext(event);
51+
subscriber.onNext(event);
5252
}
5353

5454
@Override
5555
public void componentResized(ComponentEvent event) {
56-
observer.onNext(event);
56+
subscriber.onNext(event);
5757
}
5858

5959
@Override
6060
public void componentShown(ComponentEvent event) {
61-
observer.onNext(event);
61+
subscriber.onNext(event);
6262
}
6363
};
6464
component.addComponentListener(listener);
65-
66-
return Subscriptions.create(new Action0() {
65+
subscriber.add(SwingSubscriptions.unsubscribeInEventDispatchThread(new Action0() {
6766
@Override
6867
public void call() {
6968
component.removeComponentListener(listener);
7069
}
71-
});
70+
}));
7271
}
7372
});
7473
}

0 commit comments

Comments
 (0)