[RxJava] SimpleRxJava2
针对Rxjava2的简单封装switchThread
map
concatMap
zip
等操作
**
* Created by ngliaxl on 2018/4/9.
*
* SimpleRxJava2.create(new SimpleRxJava2.EmitterCallback<String>() {
* @Override
* public void getEmitDatas(List<String> datas) throws Exception {
* Log.d("OWEN", "getEmitDatas: 1111 " + Thread.currentThread().getName());
* datas.add("1");
* }
* }).switchThread().concatMap(new SimpleRxJava2.FunctionCallback<String, Integer>() {
* @Override
* public SimpleRxJava2<Integer> apply(String s) throws Exception {
* Log.d("OWEN", "apply:1111 " + Thread.currentThread().getName());
* if(s.equals("")){
* return null;
* }else{
* return SimpleRxJava2.create(new SimpleRxJava2.EmitterCallback<Integer>() {
* @Override
* public void getEmitDatas(List<Integer> datas) throws Exception {
* Log.d("OWEN", "getEmitDatas: 2222 " + Thread.currentThread().getName());
* datas.add(2);
* }
* }).switchThread();
* }
* }
* }).concatMap(new SimpleRxJava2.FunctionCallback<Integer, Boolean>() {
* @Override
* public SimpleRxJava2<Boolean> apply(Integer integer) throws Exception {
* Log.d("OWEN", "apply:2222 " + Thread.currentThread().getName());
*
* return SimpleRxJava2.create(new SimpleRxJava2.EmitterCallback<Boolean>() {
* @Override
* public void getEmitDatas(List<Boolean> datas) throws Exception {
* Log.d("OWEN", "getEmitDatas: 3333 " + Thread.currentThread().getName());
* datas.add(Boolean.TRUE);
* }
* }).switchThread();
* }
* })
* .deliveryResult(new SimpleRxJava2.ConsumerCallback<Boolean>(){
* @Override
* public void onReceive(Boolean data) {
* Log.d("OWEN", "============= : " + data + Thread.currentThread().getName());
* }
* });
*/
public class SimpleRxJava2<T> {
private Observable<T> mObservable;
public SimpleRxJava2() {
}
public SimpleRxJava2(Observable<T> observable) {
this.mObservable = observable;
}
public static <T> SimpleRxJava2<T> create(final EmitterCallback<T> callback) {
Observable<T> observable = Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(ObservableEmitter<T> emitter) {
List<T> datas = new ArrayList<>();
Throwable trowable = null;
try {
callback.getEmitDatas(datas);
} catch (Exception e) {
e.printStackTrace();
trowable = e;
} finally {
for (T data : datas) {
emitter.onNext(data);
}
if (!emitter.isDisposed() && trowable != null) emitter.onError(trowable);
else emitter.onComplete();
}
}
});
return new SimpleRxJava2<>(observable);
}
public static <T> SimpleRxJava2<T> create2(final EmitterCallback<T> callback) {
return create(callback).switchThread();
}
public static <T> SimpleRxJava2<T> empty() {
return SimpleRxJava2.create(new SimpleRxJava2.EmitterCallback<T>() {
@Override
public void getEmitDatas(List<T> datas) {
}
}).switchThread();
}
public static <T> SimpleRxJava2<T> createFrom(final FutureCallback<T> callback) {
ExecutorService service = Executors.newSingleThreadExecutor();
final FutureTask<T> future = new FutureTask<>(new Callable<T>() {
@Override
public T call() throws InterruptedException {
return callback.call();
}
});
service.submit(future);
Observable<T> observable = Observable.fromFuture(future);
return new SimpleRxJava2<>(observable);
}
public static <T> T getFromFuture(final FutureCallback<T> callback) {
final List<T> t = new ArrayList<>();
SimpleRxJava2.createFrom(callback).deliveryResult(new ConsumerCallback<T>() {
@Override
public void onReceive(T data) {
t.add(data);
}
});
return t.isEmpty() ? null : t.get(0);
}
public <Q> SimpleRxJava2<Q> transform(final Transform<T, Q> callback) {
Observable<Q> observable = getObservable().map(new Function<T, Q>() {
@Override
public Q apply(T t) throws Exception {
return callback.apply(t);
}
});
return new SimpleRxJava2<>(observable);
}
public <Q> SimpleRxJava2<Q> concatMap(final FunctionCallback<T, Q> callback) {
Observable<Q> observable = getObservable().concatMap(new Function<T, ObservableSource<Q>>() {
@Override
public ObservableSource<Q> apply(T t) throws Exception {
SimpleRxJava2<Q> apply = callback.apply(t);
if (apply != null) return apply.getObservable();
return null;
}
});
return new SimpleRxJava2<>(observable);
}
public <H, Q> SimpleRxJava2<T> compose(SimpleRxJava2<H> s1, SimpleRxJava2<Q> s2, final
ApplyCallback<H, Q, T> callback) {
Observable<T> observable = Observable.zip(s1.getObservable(), s2.getObservable(), new
BiFunction<H, Q, T>() {
@Override
public T apply(H h, Q q) throws Exception {
return callback.apply(h, q);
}
});
return new SimpleRxJava2<>(observable);
}
public SimpleRxJava2<T> deliveryResult(final ConsumerCallback<T> callback) {
getObservable().subscribe(new Observer<T>() {
@Override
public void onSubscribe(Disposable d) {
if (callback != null) {
callback.onStart(d);
}
}
@Override
public void onNext(T t) {
if (callback != null) {
callback.onReceive(t);
}
}
@Override
public void onError(Throwable e) {
if (!e.getMessage().contains("Future returned null")
&& !e.getMessage().contains("onNext called with null")) {
Timber.e(e);
}
if (callback != null) {
callback.onError(e);
}
}
@Override
public void onComplete() {
if (callback != null)
callback.onComplete();
}
});
return this;
}
public interface EmitterCallback<T> {
void getEmitDatas(List<T> datas) throws Exception;
}
public interface FutureCallback<T> {
T call();
}
public interface FunctionCallback<T, Q> {
SimpleRxJava2<Q> apply(@NonNull T t) throws Exception;
}
public interface Transform<T, Q> {
Q apply(T t);
}
public interface ApplyCallback<H, Q, T> {
T apply(H h, Q q);
}
public static class ConsumerCallback<T> {
public void onStart(Disposable d) {
}
public void onReceive(T data) {
}
public void onError(Throwable t) {
}
public void onComplete() {
}
}
public static Scheduler getIoScheduler(){
return Schedulers.io();
}
public SimpleRxJava2<T> subscribeOnIO() {
return new SimpleRxJava2<>(getObservable().subscribeOn(Schedulers.io()));
}
public SimpleRxJava2<T> subscribeOnMain() {
return subscribeOn(AndroidSchedulers.mainThread());
}
public SimpleRxJava2<T> subscribeOnLine() {
return subscribeOn(Schedulers.trampoline());
}
public SimpleRxJava2<T> subscribeOn(Scheduler scheduler) {
return new SimpleRxJava2<>(getObservable().subscribeOn(scheduler));
}
public SimpleRxJava2<T> observeOnIO() {
return observeOn(Schedulers.io());
}
public SimpleRxJava2<T> observeOnMain() {
return observeOn(AndroidSchedulers.mainThread());
}
public SimpleRxJava2<T> observeOn(Scheduler scheduler) {
return new SimpleRxJava2<>(getObservable().observeOn(scheduler));
}
public SimpleRxJava2<T> switchThread() {
return new SimpleRxJava2<>(getObservable().subscribeOn(Schedulers.io()).observeOn
(AndroidSchedulers.mainThread()));
}
public SimpleRxJava2<T> singleThread() {
Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());
return new SimpleRxJava2<>(getObservable().subscribeOn(scheduler).observeOn
(scheduler));
}
public Observable<T> getObservable() {
return mObservable;
}
}