public final class Flowables extends Object
| Modifier and Type | Method and Description |
|---|---|
static <T> CachedFlowable<T> |
cache(io.reactivex.Flowable<T> source)
Returns a cached
Flowable like Flowable.cache()
except that the cache can be reset by calling
CachedFlowable.reset(). |
static <T> io.reactivex.Flowable<T> |
cache(io.reactivex.Flowable<T> source,
long duration,
TimeUnit unit,
io.reactivex.Scheduler.Worker worker)
Returns a cached
Flowable like Flowable.cache()
except that the cache can be reset by calling
CachedFlowable.reset() and the cache will be automatically
reset an interval after first subscription (or first subscription after
reset). |
static <T> CloseableFlowableWithReset<T> |
cache(io.reactivex.Flowable<T> source,
long duration,
TimeUnit unit,
io.reactivex.Scheduler scheduler)
Returns a cached
Flowable like Flowable.cache()
except that the cache may be reset by the user calling
CloseableFlowableWithReset.reset(). |
static <T> io.reactivex.Flowable<T> |
fetchPagesByRequest(io.reactivex.functions.BiFunction<? super Long,? super Long,? extends io.reactivex.Flowable<T>> fetch) |
static <T> io.reactivex.Flowable<T> |
fetchPagesByRequest(io.reactivex.functions.BiFunction<? super Long,? super Long,? extends io.reactivex.Flowable<T>> fetch,
long start) |
static <T> io.reactivex.Flowable<T> |
fetchPagesByRequest(io.reactivex.functions.BiFunction<? super Long,? super Long,? extends io.reactivex.Flowable<T>> fetch,
long start,
int maxConcurrent)
Creates a Flowable that is aimed at supporting calls to a service that provides data in pages where the page sizes are determined by requests from downstream (requests are a part of the backpressure machinery of RxJava).
|
static <A,B,K,C> io.reactivex.Flowable<C> |
match(io.reactivex.Flowable<A> a,
io.reactivex.Flowable<B> b,
io.reactivex.functions.Function<? super A,K> aKey,
io.reactivex.functions.Function<? super B,K> bKey,
io.reactivex.functions.BiFunction<? super A,? super B,C> combiner) |
static <A,B,K,C> io.reactivex.Flowable<C> |
match(io.reactivex.Flowable<A> a,
io.reactivex.Flowable<B> b,
io.reactivex.functions.Function<? super A,K> aKey,
io.reactivex.functions.Function<? super B,K> bKey,
io.reactivex.functions.BiFunction<? super A,? super B,C> combiner,
int requestSize) |
static <T> io.reactivex.Flowable<T> |
repeat(T t) |
static <T> io.reactivex.Flowable<T> |
repeat(T t,
long count) |
public static <A,B,K,C> io.reactivex.Flowable<C> match(io.reactivex.Flowable<A> a,
io.reactivex.Flowable<B> b,
io.reactivex.functions.Function<? super A,K> aKey,
io.reactivex.functions.Function<? super B,K> bKey,
io.reactivex.functions.BiFunction<? super A,? super B,C> combiner,
int requestSize)
public static <A,B,K,C> io.reactivex.Flowable<C> match(io.reactivex.Flowable<A> a,
io.reactivex.Flowable<B> b,
io.reactivex.functions.Function<? super A,K> aKey,
io.reactivex.functions.Function<? super B,K> bKey,
io.reactivex.functions.BiFunction<? super A,? super B,C> combiner)
public static <T> io.reactivex.Flowable<T> repeat(T t)
public static <T> io.reactivex.Flowable<T> repeat(T t,
long count)
public static <T> io.reactivex.Flowable<T> fetchPagesByRequest(io.reactivex.functions.BiFunction<? super Long,? super Long,? extends io.reactivex.Flowable<T>> fetch, long start, int maxConcurrent)
Creates a Flowable that is aimed at supporting calls to a service that provides data in pages where the page sizes are determined by requests from downstream (requests are a part of the backpressure machinery of RxJava).
Here's an example.
Suppose you have a stateless web service, say a rest service that returns JSON/XML and supplies you with
The service supports paging in that you can pass it a start number and a page size and it will return just that slice from the list.
Now I want to give a library with a Flowable definition of this service to my colleagues that they can call in their applications whatever they may be. For example,
Let's see how we can efficiently support those use cases. I'm going to assume that the movie data returned by the service are mapped conveniently to objects by whatever framework I'm using (JAXB, Jersey, etc.). The fetch method looks like this:
// note that start is 0-based
List<Movie> mostPopularMovies(int start, int size);
Now I'm going to wrap this synchronous call as a Flowable to give to my colleagues:
Flowable<Movie> mostPopularMovies(int start) {
return Flowables.fetchPagesByRequest(
(position, n) -> Flowable.fromIterable(mostPopular(position, n)),
start)
// rebatch requests so that they are always between
// 5 and 100 except for the first request
.compose(Transformers.rebatchRequests(5, 100, false));
}
Flowable<Movie> mostPopularMovies() {
return mostPopularMovies(0);
}
Note particularly that the method above uses a variant of rebatchRequests to limit both minimum and maximum requests. We particularly don't want to allow a single call requesting the top 100,000 popular movies because of the memory and network pressures that arise from that call.
Righto, Fred now uses the new API like this:
Movie top = mostPopularMovies()
.compose(Transformers.maxRequest(1))
.first()
.blockingFirst();
The use of maxRequest above may seem unnecessary but strangely enough the first operator requests Long.MAX_VALUE of upstream and cancels as soon as one arrives. The take, elemnentAt and firstXXX operators all have this counter-intuitive characteristic.
Greta uses the new API like this:
mostPopularMovies()
.rebatchRequests(20)
.doOnNext(movie -> addToUI(movie))
.subscribe(subscriber);
A bit more detail about fetchPagesByRequest:
If the fetch function returns a Flowable that delivers fewer than the requested number of items then the overall stream completes.
T - item typefetch - a function that takes a position index and a length and returns a Flowablestart - the start indexmaxConcurrent - how many pages to request concurrentlypublic static <T> io.reactivex.Flowable<T> fetchPagesByRequest(io.reactivex.functions.BiFunction<? super Long,? super Long,? extends io.reactivex.Flowable<T>> fetch, long start)
public static <T> io.reactivex.Flowable<T> fetchPagesByRequest(io.reactivex.functions.BiFunction<? super Long,? super Long,? extends io.reactivex.Flowable<T>> fetch)
public static <T> CachedFlowable<T> cache(io.reactivex.Flowable<T> source)
Flowable like Flowable.cache()
except that the cache can be reset by calling
CachedFlowable.reset().T - the generic type of the sourcesource - the observable to be cached.public static <T> io.reactivex.Flowable<T> cache(io.reactivex.Flowable<T> source,
long duration,
TimeUnit unit,
io.reactivex.Scheduler.Worker worker)
Flowable like Flowable.cache()
except that the cache can be reset by calling
CachedFlowable.reset() and the cache will be automatically
reset an interval after first subscription (or first subscription after
reset). The interval is defined by duration and unit .T - the generic type of the sourcesource - the source observableduration - duration till next resetunit - units corresponding to the durationworker - worker to use for scheduling reset. Don't forget to
unsubscribe the worker when no longer required.public static <T> CloseableFlowableWithReset<T> cache(io.reactivex.Flowable<T> source, long duration, TimeUnit unit, io.reactivex.Scheduler scheduler)
Flowable like Flowable.cache()
except that the cache may be reset by the user calling
CloseableFlowableWithReset.reset().T - generic type of source observablesource - the source observableduration - duration till next resetunit - units corresponding to the durationscheduler - scheduler to use for scheduling reset.CloseableFlowableWithReset that should be closed once
finished to prevent worker memory leak.Copyright © 2013–2018. All rights reserved.