结合Retrofit中RxJava2的使用,分析每一步相关源码
返回创建同步观测值的实例,该实例默认情况下不在任何调度器上运行。
->public static RxJava2CallAdapterFactory create() {
return new RxJava2CallAdapterFactory(null, false);
}
->private RxJava2CallAdapterFactory(@Nullable Scheduler scheduler, boolean isAsync) {
this.scheduler = scheduler;
this.isAsync = isAsync;
}
//最终会调用RxJava2CallAdapterFactory#get方法
->return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable, isSingle, isMaybe, false);
参数对应的值:
responseType:接口方法返回类型 Obeservable中的T
scheduler:null
isAsync:false
isResult:false
isBody:true
isFlowable:false
isSingle:false
isMaybe:false
isCompletable:false
*/
->RxJava2CallAdapter#adapt返回observable
根据上创建对象时传入的参数,最终确定
responseObservable = new CallExecuteObservable<>(call);
observable= new BodyObservable<>(responseObservable);
@Override public Object adapt(Call call) {
Observable> responseObservable = isAsync
? new CallEnqueueObservable<>(call)
: new CallExecuteObservable<>(call);
Observable> observable;
if (isResult) {
observable = new ResultObservable<>(responseObservable);
} else if (isBody) {
observable = new BodyObservable<>(responseObservable);
} else {
observable = responseObservable;
}
if (scheduler != null) {
observable = observable.subscribeOn(scheduler);
}
if (isFlowable) {
return observable.toFlowable(BackpressureStrategy.LATEST);
}
if (isSingle) {
return observable.singleOrError();
}
if (isMaybe) {
return observable.singleElement();
}
if (isCompletable) {
return observable.ignoreElements();
}
return observable;
}
api.login(parameter)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread(),true)
.subscribeWith(new DisposableObserver >() {
@Override
public void onNext(@NonNull Base result) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Observable
public interface ObservableSource{ /** * 将给定的Observer订阅到此ObservableSource实例。 */ void subscribe(@NonNull Observer super T> observer); }
ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T>
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
/* 可消费的Observable */
protected final ObservableSource<T> source;
/**
* 持有上游ObservableSource。
*/
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
subscribeOn和observeOn都是Observable的方法,所以可以重复链式调用
在指定的Scheduler上异步地将观察者订阅到此ObservableSource。
@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final ObservablesubscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn (this, scheduler)); } ->RxJavaPlugins#onAssembly可以添加自定义变换的Function这里f=null返回source @SuppressWarnings({ "rawtypes", "unchecked" }) @NonNull public static Observable onAssembly(@NonNull Observable source) { Function super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; } /*this指向BodyObservable scheduler指向IoScheduler*/ new ObservableSubscribeOn (this, scheduler) ->public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) { super(source); this.scheduler = scheduler; }
将BodyObservable订阅到ObservableSubscribeOn并返回
修改ObservableSource以在指定调度程序上通知观察者
@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final ObservableobserveOn(Scheduler scheduler, boolean delayError) { return observeOn(scheduler, delayError, bufferSize()); } ->@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); //可以添加自定义变换的Function return RxJavaPlugins.onAssembly(new ObservableObserveOn (this, scheduler, delayError, bufferSize)); } 参数说明: delayError:如果调度程序是真正异步的,onError通知将在发射线程上的onNext通知之前调用。 bufferSize(): ->public static int bufferSize() { return Flowable.bufferSize(); } /*this指向ObservableSubscribeOn scheduler指向HandlerScheduler(new Handler(Looper.getMainLooper()))*/ new ObservableObserveOn (this, scheduler, delayError, bufferSize) ->ObservableObserveOn ->public final class ObservableObserveOn extends AbstractObservableWithUpstream { final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn(ObservableSource source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source);//source指向ObservableSubscribeOn this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; } ... }
订阅主线程ObservableObserveOn接收上游ObservableSubscribeOn的发射任务和通知。
将给定的观察者(子类)订阅到此Observable并按原样返回给定的观察者。
->@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final> E subscribeWith(E observer) { subscribe(observer); return observer; } //observer指向DisposableObserver subscribe(observer); ->ObservableObserveOn ->@SchedulerSupport(SchedulerSupport.NONE) @Override protected void subscribeActual(Observer super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { //HandlerScheduler#createWorker->new HandlerWorker(new Handler(Looper.getMainLooper())); Scheduler.Worker w = scheduler.createWorker(); //source指向ObservableSubscribeOn //observer指向DisposableObserver source.subscribe(new ObserveOnObserver (observer, w, delayError, bufferSize)); } }
将主线程DisposableObserver的观察者ObserveOnObserver订阅到工作线程的ObservableSubscribeOn
source.subscribe(new ObserveOnObserver
->ObservableSubscribeOn
->@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
...
}
}
->@Override
public void subscribeActual(final Observer super T> s) {
/*s指向ObserveOnObserver
SubscribeOnObserver#actual指向ObserveOnObserver
创建新的SubscribeOnObserver*/
final SubscribeOnObserver parent = new SubscribeOnObserver(s);
//将SubscribeOnObserver订阅到ObserveOnObserver
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
s.onSubscribe(parent);
->ObserveOnObserver
->@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
...
queue = new SpscLinkedArrayQueue(bufferSize);
/*actual指向ObserveOnObserver
ObserveOnObserver#actual指向DisposableObserver*/
actual.onSubscribe(this);
}
}
actual.onSubscribe(this);
->DisposableObserver
->public final void onSubscribe(@NonNull Disposable s) {
if (EndConsumerHelper.setOnce(this.s, s, getClass())) {
onStart();
}
}
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
->scheduler指向IoScheduler
new SubscribeTask(parent)
->final class SubscribeTask implements Runnable {
private final SubscribeOnObserver parent;
SubscribeTask(SubscribeOnObserver parent) {
this.parent = parent;
}
@Override
public void run() {
//source指向BodyObservable
source.subscribe(parent);
}
}
source.subscribe(parent);
->final class BodyObservable extends Observable {
private final Observable> upstream;
BodyObservable(Observable> upstream) {
//upstream指向responseObservable
this.upstream = upstream;
}
@Override protected void subscribeActual(Observer super T> observer) {
//observer指向SubscribeOnObserver
upstream.subscribe(new BodyObserver(observer));
}
...
}
upstream.subscribe(new BodyObserver(observer));
->final class CallExecuteObservable extends Observable> {
private final Call originalCall;
CallExecuteObservable(Call originalCall) {
this.originalCall = originalCall;
}
@Override protected void subscribeActual(Observer super Response> observer) {
//call指向OkhttpCall
Call call = originalCall.clone();
//BodyObserver订阅OkhttpCall
observer.onSubscribe(new CallDisposable(call));
boolean terminated = false;
try {
Response response = call.execute();
if (!call.isCanceled()) {
observer.onNext(response);
}
if (!call.isCanceled()) {
terminated = true;
observer.onComplete();
}
} catch (Throwable t) {
...
}
}
...
}