Skip to content

Commit 89ff375

Browse files
Merge pull request #885 from abersnaze/observable-string-from
Fixed an issue with the from(Reader) added a bunch of unit tests.
2 parents ea9b73a + 7ffb0ef commit 89ff375

File tree

2 files changed

+128
-4
lines changed

2 files changed

+128
-4
lines changed

rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java

Lines changed: 98 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.nio.charset.CoderResult;
2828
import java.nio.charset.CodingErrorAction;
2929
import java.util.Arrays;
30+
import java.util.Objects;
3031
import java.util.regex.Pattern;
3132

3233
import rx.Observable;
@@ -37,10 +38,28 @@
3738
import rx.functions.Func2;
3839

3940
public class StringObservable {
41+
/**
42+
* Reads from the bytes from a source {@link InputStream} and outputs {@link Observable} of
43+
* {@link byte[]}s
44+
*
45+
* @param i
46+
* Source {@link InputStream}
47+
* @return
48+
*/
4049
public static Observable<byte[]> from(final InputStream i) {
4150
return from(i, 8 * 1024);
4251
}
4352

53+
/**
54+
* Reads from the bytes from a source {@link InputStream} and outputs {@link Observable} of
55+
* {@link byte[]}s
56+
*
57+
* @param i
58+
* Source {@link InputStream}
59+
* @param size
60+
* internal buffer size
61+
* @return
62+
*/
4463
public static Observable<byte[]> from(final InputStream i, final int size) {
4564
return Observable.create(new OnSubscribe<byte[]>() {
4665
@Override
@@ -65,10 +84,28 @@ public void call(Subscriber<? super byte[]> o) {
6584
});
6685
}
6786

87+
/**
88+
* Reads from the characters from a source {@link Reader} and outputs {@link Observable} of
89+
* {@link String}s
90+
*
91+
* @param i
92+
* Source {@link Reader}
93+
* @return
94+
*/
6895
public static Observable<String> from(final Reader i) {
6996
return from(i, 8 * 1024);
7097
}
7198

99+
/**
100+
* Reads from the characters from a source {@link Reader} and outputs {@link Observable} of
101+
* {@link String}s
102+
*
103+
* @param i
104+
* Source {@link Reader}
105+
* @param size
106+
* internal buffer size
107+
* @return
108+
*/
72109
public static Observable<String> from(final Reader i, final int size) {
73110
return Observable.create(new OnSubscribe<String>() {
74111
@Override
@@ -80,7 +117,7 @@ public void call(Subscriber<? super String> o) {
80117
int n = 0;
81118
n = i.read(buffer);
82119
while (n != -1 && !o.isUnsubscribed()) {
83-
o.onNext(new String(buffer));
120+
o.onNext(new String(buffer, 0, n));
84121
n = i.read(buffer);
85122
}
86123
} catch (IOException e) {
@@ -119,7 +156,7 @@ public static Observable<String> decode(Observable<byte[]> src, Charset charset)
119156

120157
/**
121158
* Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams
122-
* and where handles when a multibyte character spans two chunks.
159+
* and where it handles when a multibyte character spans two chunks.
123160
* This method allows for more control over how malformed and unmappable characters are handled.
124161
*
125162
* @param src
@@ -151,6 +188,9 @@ public void onNext(byte[] bytes) {
151188
}
152189

153190
public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) {
191+
if (o.isUnsubscribed())
192+
return false;
193+
154194
ByteBuffer bb;
155195
if (last != null) {
156196
if (next != null) {
@@ -270,8 +310,10 @@ public String call(String a, String b) {
270310
/**
271311
* Rechunks the strings based on a regex pattern and works on infinite stream.
272312
*
273-
* resplit(["boo:an", "d:foo"], ":") --> ["boo", "and", "foo"]
274-
* resplit(["boo:an", "d:foo"], "o") --> ["b", "", ":and:f", "", ""]
313+
* <pre>
314+
* split(["boo:an", "d:foo"], ":") --> ["boo", "and", "foo"]
315+
* split(["boo:an", "d:foo"], "o") --> ["b", "", ":and:f", "", ""]
316+
* </pre>
275317
*
276318
* See {@link Pattern}
277319
*
@@ -399,4 +441,56 @@ public void onNext(Object t) {
399441
}
400442
});
401443
}
444+
445+
public final static class Line {
446+
private final int number;
447+
private final String text;
448+
449+
public Line(int number, String text) {
450+
this.number = number;
451+
this.text = text;
452+
}
453+
454+
public int getNumber() {
455+
return number;
456+
}
457+
458+
public String getText() {
459+
return text;
460+
}
461+
462+
@Override
463+
public int hashCode() {
464+
return Objects.hash(number, text);
465+
}
466+
467+
@Override
468+
public boolean equals(Object obj) {
469+
if (!(obj instanceof Line))
470+
return false;
471+
return Objects.equals(number, ((Line) obj).number) && Objects.equals(text, ((Line) obj).text);
472+
}
473+
474+
@Override
475+
public String toString() {
476+
return number + ":" + text;
477+
}
478+
}
479+
480+
/**
481+
* Splits the {@link Observable} of Strings by lines and numbers them (zero based index)
482+
*
483+
* @param source
484+
* @return
485+
*/
486+
public static Observable<Line> byLine(Observable<String> source) {
487+
return split(source, System.getProperty("line.separator")).map(new Func1<String, Line>() {
488+
int lineNumber = 0;
489+
490+
@Override
491+
public Line call(String text) {
492+
return new Line(lineNumber++, text);
493+
}
494+
});
495+
}
402496
}

rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,23 @@
1919
import static org.mockito.Matchers.*;
2020
import static org.mockito.Mockito.*;
2121

22+
import java.io.ByteArrayInputStream;
2223
import java.io.IOException;
24+
import java.io.NotSerializableException;
25+
import java.io.StringReader;
2326
import java.nio.charset.Charset;
2427
import java.nio.charset.CharsetDecoder;
2528
import java.nio.charset.MalformedInputException;
2629
import java.util.Arrays;
30+
import java.util.List;
31+
32+
import junit.framework.Assert;
2733

2834
import org.junit.Test;
2935

3036
import rx.Observable;
3137
import rx.Observer;
38+
import rx.observables.StringObservable.Line;
3239
import rx.observers.TestObserver;
3340
import rx.util.AssertObservable;
3441

@@ -221,4 +228,27 @@ public void testJoinThrows() {
221228
verify(observer, never()).onCompleted();
222229
verify(observer, times(1)).onError(any(Throwable.class));
223230
}
231+
232+
@Test
233+
public void testFromInputStream() {
234+
final byte[] inBytes = "test".getBytes();
235+
final byte[] outBytes = StringObservable.from(new ByteArrayInputStream(inBytes)).toBlockingObservable().single();
236+
assertNotSame(inBytes, outBytes);
237+
assertArrayEquals(inBytes, outBytes);
238+
}
239+
240+
@Test
241+
public void testFromReader() {
242+
final String inStr = "test";
243+
final String outStr = StringObservable.from(new StringReader(inStr)).toBlockingObservable().single();
244+
assertNotSame(inStr, outStr);
245+
assertEquals(inStr, outStr);
246+
}
247+
248+
@Test
249+
public void testByLine() {
250+
List<Line> lines = StringObservable.byLine(Observable.from(Arrays.asList("qwer", "\nasdf\n", "zx", "cv"))).toList().toBlockingObservable().single();
251+
252+
assertEquals(Arrays.asList(new Line(0, "qwer"), new Line(1, "asdf"), new Line(2, "zxcv")), lines);
253+
}
224254
}

0 commit comments

Comments
 (0)