So far, we've seen most of the methods that allow us to create a sequence and transform it into what we want. However, most applications will have more than one source of input. We need a way a of combining sequences. We've already seen a few sequences that use more than one observable. In this chapter, we will see the most important operators that use multiple sequences to produce one.
The most straight-forward combination of sequences is to have one run after the other.
The concat
operator concatenates sequences one after the other. There are many overloads to concat
, which allow you to provide source observables in different numbers and formats.
public static final <T> Observable<T> concat(
Observable<? extends Observable<? extends T>> observables)
public static final <T> Observable<T> concat(
Observable<? extends T> t1,
Observable<? extends T> t2)
public static final <T> Observable<T> concat(Observable<? extends T> t1,
Observable<? extends T> t2,
Observable<? extends T> t3)
public static final <T> Observable<T> concat(Observable<? extends T> t1,
Observable<? extends T> t2,
Observable<? extends T> t3,
Observable<? extends T> t4)
// All the way to 10 observables
Concatenating two (or more) given observables is straightforward.
Observable<Integer> seq1 = Observable.range(0, 3);
Observable<Integer> seq2 = Observable.range(10, 3);
Observable.concat(seq1, seq2)
.subscribe(System.out::println);
0
1
2
10
11
12
If the number of sequences to be combined is dynamic, you can provide an observable that emits the sequences to be concatenated. In this example, we will use our familiar groupBy
to create a sequence that emits words starting with the same letter together.
Observable<String> words = Observable.just(
"First",
"Second",
"Third",
"Fourth",
"Fifth",
"Sixth"
);
Observable.concat(words.groupBy(v -> v.charAt(0)))
.subscribe(System.out::println);
First
Fourth
Fifth
Second
Sixth
Third
concat
behaves like the flattening phase of concatMap
. In fact, concatMap
is an alias for applying map
and then concat
.
The concatWith
operator is an alternative style of doing concat
, which allows you to combine sequences one by one in a chain:
public void exampleConcatWith() {
Observable<Integer> seq1 = Observable.range(0, 3);
Observable<Integer> seq2 = Observable.range(10, 3);
Observable<Integer> seq3 = Observable.just(20);
seq1.concatWith(seq2)
.concatWith(seq3)
.subscribe(System.out::println);
}
0
1
2
10
11
12
20
repeat
allows you to concatenate a sequence after itself, either an infinite or a finite number of times. repeat
doesn't cache the values to repeat them. When the time comes, it will start a new subscription and dispose of the old one.
public final Observable<T> repeat()
public final Observable<T> repeat(long count)
Its application is very simple
Observable<Integer> words = Observable.range(0,2);
words.repeat(2)
.subscribe(System.out::println);
0
1
0
1
If you need more control than repeat
gives, you can control when the repetition starts with the repeatWhen
operator. The when is defined by an observable that you provide. When the original sequence completes, it waits for the handling observable to emit something (the value is irrelevant) and only then does it repeat. If the handling observable terminates, that means that the repetitions should stop.
It may be useful for the signal to know when a repetition has been completed. repeatWhen
provides a special observable that emits void
when a repetition terminates. You can use that observable to construct your signal.
public final Observable<T> repeatWhen(
Func1<? super Observable<? extends java.lang.Void>,? extends Observable<?>> notificationHandler)
The argument of repeatWhen
is a function that takes an observable and returns an observable. The types emitted by both objects do not matter. The input is the observable that signals the end of a repetition and the returned observable will be used to signal a restart.
In the next example, we create our version of repeat(n)
using repeatWhen
.
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
values
.take(2)
.repeatWhen(ob -> {
return ob.take(2);
})
.subscribe(new PrintSubscriber("repeatWhen"));
repeatWhen: 0
repeatWhen: 1
repeatWhen: 0
repeatWhen: 1
repeatWhen: Completed
Here the repetition happens immediately: ob
emits when a repetition has ended, so the returned observable also emits right after a completed repetition. That signal the new repetition to begin.
In the next example, we create sequence that repeats every two seconds, forever.
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
values
.take(5)
.repeatWhen((ob)-> {
ob.subscribe();
return Observable.interval(2, TimeUnit.SECONDS);
})
.subscribe(new PrintSubscriber("repeatWhen"));
Note that the sequence repeats every 2 seconds regardless of when it completed. That's because we created an independent interval
observable that sends a signal every 2 seconds. In the next chapter, [Time-shifted sequences](/Part 3 - Taming the sequence/5. Time-shifted sequences.md), we will see ways of dealing with sequences in time with more control.
Another thing to note is the ob.subscribe()
statement, which appears to be useless. That is necessary because it forces ob
to be created. In the current implementation of repeatWhen
, if ob
is not subscribed to, then repetitions never begin.
startWith
takes a sequence and concatenates it before the observable it is applied to.
public final Observable<T> startWith(java.lang.Iterable<T> values)
public final Observable<T> startWith(Observable<T> values)
public final Observable<T> startWith(T t1)
public final Observable<T> startWith(T t1, T t2)
public final Observable<T> startWith(T t1, T t2, T t3)
// up to 10 values
Here an example
Observable<Integer> values = Observable.range(0, 3);
values.startWith(-1,-2)
.subscribe(System.out::println);
-1
-2
0
1
2
startWith
is a shorthand for using concat
with a just
and our source sequence.
Observable.concat(
Observable.just(-1,-2,-3),
values)
// Same as
values.startWith(-1,-2,-3)
Observables aren't always emitting values at predictable moments in time. We will now see some operators intended for combining sequences that emit values concurrently.
amb
takes a number of observables and returns the one that emits a value first. The rest are discarded.
public static final <T> Observable<T> amb(
java.lang.Iterable<? extends Observable<? extends T>> sources)
public static final <T> Observable<T> amb(
Observable<? extends T> o1,
Observable<? extends T> o2)
public static final <T> Observable<T> amb(
Observable<? extends T> o1,
Observable<? extends T> o2,
Observable<? extends T> o3)
// Up to 10 observables
In the following example, amb
will mirror the second observable, because it waits less to start.
Observable.amb(
Observable.timer(100, TimeUnit.MILLISECONDS).map(i -> "First"),
Observable.timer(50, TimeUnit.MILLISECONDS).map(i -> "Second"))
.subscribe(System.out::println);
Second
It's usefulness may be not be obvious
The amb feature can be useful if you have multiple cheap resources that can provide values, but latency is widely variable. For an example, you may have servers replicated around the world. Issuing a query is cheap for both the client to send and for the server to respond, however due to network conditions the latency is not predictable and varies considerably. Using the Amb operator, you can send the same request out to many servers and consume the result of the first that responds. -Lee Cambell www.introtorx.com
An alternative style of doing amb
is the ambWith
operator. ambWith
allows you to combine the observables one by one in a chain. This is more convenient when using amb
in the middle of a chain or operators.
Observable.timer(100, TimeUnit.MILLISECONDS).map(i -> "First")
.ambWith(Observable.timer(50, TimeUnit.MILLISECONDS).map(i -> "Second"))
.ambWith(Observable.timer(70, TimeUnit.MILLISECONDS).map(i -> "Third"))
.subscribe(System.out::println);
Second
merge
combines a set of observables into one. The resulting observable emits the values that the source observables emit, as they emit them. This means that values from different sequences can be mixed.
public static final <T> Observable<T> merge(
java.lang.Iterable<? extends Observable<? extends T>> sequences)
public static final <T> Observable<T> merge(
java.lang.Iterable<? extends Observable<? extends T>> sequences,
int maxConcurrent)
public static final <T> Observable<T> merge(
Observable<? extends Observable<? extends T>> source)
public static final <T> Observable<T> merge(
Observable<? extends Observable<? extends T>> source,
int maxConcurrent)
public static final <T> Observable<T> merge(
Observable<? extends T> t1,
Observable<? extends T> t2)
public static final <T> Observable<T> merge(
Observable<? extends T> t1,
Observable<? extends T> t2,
Observable<? extends T> t3)
...
The many overloads are different ways of supplying a set of observables to merge. Here an example of what merge
does
Observable.merge(
Observable.interval(250, TimeUnit.MILLISECONDS).map(i -> "First"),
Observable.interval(150, TimeUnit.MILLISECONDS).map(i -> "Second"))
.take(10)
.subscribe(System.out::println);
Second
First
Second
Second
First
Second
Second
First
Second
First
The difference between concat
and merge
is that merge
does not wait for the current observable to terminate before moving to the next. merge
subscribes to every observable available to it and emits items as they come. In that way, merge
is similar to the flattening part of flatMap
.
Like other combinators that are static methods, merge
has an alternative that allows you to merge sequences one by one in a chain. The operator is called mergeWith
and the behaviour is the same. The following example has the same result as the one above.
Observable.interval(250, TimeUnit.MILLISECONDS).map(i -> "First")
.mergeWith(Observable.interval(150, TimeUnit.MILLISECONDS).map(i -> "Second"))
.take(10)
.subscribe(System.out::println);
With merge
, as soon as any of the source sequences fails, the merged sequence fails as well. An alternative to that behaviour is mergeDelayError
, which will postpone the emission of an error and continue to merge values from sequences that haven't failed.
public static final <T> Observable<T> mergeDelayError(
Observable<? extends Observable<? extends T>> source)
public static final <T> Observable<T> mergeDelayError(
Observable<? extends T> t1,
Observable<? extends T> t2)
public static final <T> Observable<T> mergeDelayError(
Observable<? extends T> t1,
Observable<? extends T> t2,
Observable<? extends T> t3)
...
In the next example, we merge two observables which emit every 100ms. One fails early while the other observable continues to complete.
Observable<Long> failAt200 =
Observable.concat(
Observable.interval(100, TimeUnit.MILLISECONDS).take(2),
Observable.error(new Exception("Failed")));
Observable<Long> completeAt400 =
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(4);
Observable.mergeDelayError(failAt200, completeAt400)
.subscribe(
System.out::println,
System.out::println);
0
0
1
1
2
3
java.lang.Exception: Failed
In the beginning, both observables emit the same value. After value 1, the first sequence fails, and the merged sequence continues with values only from the second sequence.
When merging more than two sequences, the merged sequence will go on until all of the sources have terminated, successfully or with an error. If more than one sequences fail, the error in the merged sequence will be of type CompositeException
Observable<Long> failAt200 =
Observable.concat(
Observable.interval(100, TimeUnit.MILLISECONDS).take(2),
Observable.error(new Exception("Failed")));
Observable<Long> failAt300 =
Observable.concat(
Observable.interval(100, TimeUnit.MILLISECONDS).take(3),
Observable.error(new Exception("Failed")));
Observable<Long> completeAt400 =
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(4);
Observable.mergeDelayError(failAt200, failAt300, completeAt400)
.subscribe(
System.out::println,
System.out::println);
0
0
0
1
1
1
2
2
3
rx.exceptions.CompositeException: 2 exceptions occurred.
The switchOnNext
operator takes an observable that emits observables. The returned observable emits items from the most recent observable. As soon as a new observable comes, the old one is discarded and values from the newer one are emitted.
Observable.switchOnNext(
Observable.interval(100, TimeUnit.MILLISECONDS)
.map(i ->
Observable.interval(30, TimeUnit.MILLISECONDS)
.map(i2 -> i)
)
)
.take(9)
.subscribe(System.out::println);
0
0
0
1
1
1
2
2
2
This example may be a bit confusing. What we've done is creating an observable that creates a new observable every 100ms. Every created observable emits its number in the sequence every 30ms. After 100ms, each of those observables has had enough time to emit its number 3 times. Then a new observable is created, which causes them to be replaced by the new one.
Where flatMap
internally uses merge
to combine the generated sequences and concatMap
uses concat
, there is switchMap
to use switchOnNext
for the flattening phase.
public final <R> Observable<R> switchMap(Func1<? super T,? extends Observable<? extends R>> func)
Every value from the source sequence is mapped through func
to an observable. The values from the generated observable are emitted by the returned observable. Every time a new value arrives, func
generates a new observable and switches to it, dropping the old one. The example we showed for switchOnNext
can also be implemented with switchMap
:
Observable.interval(100, TimeUnit.MILLISECONDS)
.switchMap(i ->
Observable.interval(30, TimeUnit.MILLISECONDS)
.map(l -> i))
.take(9)
.subscribe(System.out::println);
0
0
0
1
1
1
2
2
2
So far, we've seen operators which, in one way or the other, flattened multiple sequences into one of the same type. The next operators put the source sequences side-by-side and use the values to create a composite value.
zip
is a very basic function out of functional programming. It takes two or more sequences and matches their values one-to-one by index. A function is required to combine the values. Unlike what you might expect from other environments, in RxJava zip
doesn't default to combining all the values in a tuple.
In the next example, we have two sources that emit items at different rates.
Observable.zip(
Observable.interval(100, TimeUnit.MILLISECONDS)
.doOnNext(i -> System.out.println("Left emits " + i)),
Observable.interval(150, TimeUnit.MILLISECONDS)
.doOnNext(i -> System.out.println("Right emits " + i)),
(i1,i2) -> i1 + " - " + i2)
.take(6)
.subscribe(System.out::println);
Left emits 0
Right emits 0
0 - 0
Left emits 1
Right emits 1
Left emits 2
1 - 1
Left emits 3
Right emits 2
2 - 2
Left emits 4
Left emits 5
Right emits 3
3 - 3
Left emits 6
Right emits 4
4 - 4
Left emits 7
Right emits 5
Left emits 8
5 - 5
As we can see, zip
matched values based on index.
zip
has multiple overloads for zipping more than two sequences together.
public static final <R> Observable<R> zip(
java.lang.Iterable<? extends Observable<?>> ws,
FuncN<? extends R> zipFunction)
public static final <R> Observable<R> zip(
Observable<? extends Observable<?>> ws,
FuncN<? extends R> zipFunction)
public static final <T1,T2,R> Observable<R> zip(
Observable<? extends T1> o1,
Observable<? extends T2> o2,
Func2<? super T1,? super T2,? extends R> zipFunction)
public static final <T1,T2,T3,R> Observable<R> zip(
Observable<? extends T1> o1,
Observable<? extends T2> o2,
Observable<? extends T3> o3,
Func3<? super T1,? super T2,? super T3,? extends R> zipFunction)
/// etc
When zipping more than two sequences, the operator will wait until all of the sources have emitted the next value before it emits the next zipped value. In the next example, we add another source with its own frequency again.
Observable.zip(
Observable.interval(100, TimeUnit.MILLISECONDS),
Observable.interval(150, TimeUnit.MILLISECONDS),
Observable.interval(050, TimeUnit.MILLISECONDS),
(i1,i2,i3) -> i1 + " - " + i2 + " - " + i3)
.take(6)
.subscribe(System.out::println);
0 - 0 - 0
1 - 1 - 1
2 - 2 - 2
3 - 3 - 3
4 - 4 - 4
5 - 5 - 5
The zipped sequence terminates when any of the source sequences terminates successfully. Further values from the other sequences will be ignored. We can see that in the next example, where we zip sequences of different sizes and count the elements in the zipped sequence.
Observable.zip(
Observable.range(0, 5),
Observable.range(0, 3),
Observable.range(0, 8),
(i1,i2,i3) -> i1 + " - " + i2 + " - " + i3)
.count()
.subscribe(System.out::println);
3
The zipped sequence contains as many elements as the shortest source sequence.
There is also the zipWith
operator, which is an alternative style of zipping 2 sequences. zipWith
allows you to zip in a chain, but it can be inconvenient for zipping more that two sequences.
Observable.interval(100, TimeUnit.MILLISECONDS)
.zipWith(
Observable.interval(150, TimeUnit.MILLISECONDS),
(i1,i2) -> i1 + " - " + i2)
.take(6)
.subscribe(System.out::println);
0 - 0
1 - 1
2 - 2
3 - 3
4 - 4
5 - 5
The zipWith
also has an overload that allows you to zip your observable sequence with an iterable.
Observable.range(0, 5)
.zipWith(
Arrays.asList(0,2,4,6,8),
(i1,i2) -> i1 + " - " + i2)
.subscribe(System.out::println);
0 - 0
1 - 2
2 - 4
3 - 6
4 - 8
Where zip
uses indices, combineLatest
will use time. Every time one of the observables being combined emits a value, that value is combined with the latest value by the other observable. Once again, a function is required to combine the values.
Observable.combineLatest(
Observable.interval(100, TimeUnit.MILLISECONDS)
.doOnNext(i -> System.out.println("Left emits")),
Observable.interval(150, TimeUnit.MILLISECONDS)
.doOnNext(i -> System.out.println("Right emits")),
(i1,i2) -> i1 + " - " + i2
)
.take(6)
.subscribe(System.out::println);
Left emits
Right emits
0 - 0
Left emits
1 - 0
Left emits
2 - 0
Right emits
2 - 1
Left emits
3 - 1
Right emits
3 - 2
As we can see, combineLatest
first it waits for every sequence to have a value. After that, every value emitted by either observable results in a combined value being emitted.
Just like every combinator we've seen in this chapter, there are overloads that allow to combine more than two sequences.
I like to think of combineLatest
as one event occuring in the context of another. combineLatest
is very useful when consuming input from GUIs, where multiple stateful GUI controls affect the same output. Imagine a text input field, a paragraph that echoes the text and a checkbox that signals to capitalise it or not. Everytime the text field or the checkbox changes, combineLatest
will combine the text with the decision to capitalise it or not. The end result is ready to be written to the output.
Previous | Next |
---|---|
[Advanced error handling](/Part 3 - Taming the sequence/3. Advanced error handling.md) | [Time-shifted sequences](/Part 3 - Taming the sequence/5. Time-shifted sequences.md) |