学习笔记

RxJava2 源码分析
Publish: 2018/7/23   

结合RetrofitRxJava2的使用,分析每一步相关源码

关联Rxjava

RxJava2CallAdapterFactory.create()

返回创建同步观测值的实例,该实例默认情况下不在任何调度器上运行。

->public static RxJava2CallAdapterFactory create() {
    return new RxJava2CallAdapterFactory(null, false);
  }

->private RxJava2CallAdapterFactory(@Nullable Scheduler scheduler, boolean isAsync) {
    this.scheduler = scheduler;
    this.isAsync = isAsync;
  }

//最终会调用RxJava2CallAdapterFactory#get方法
->return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable, isSingle, isMaybe, false);
参数对应的值:
     responseType:接口方法返回类型 Obeservable中的T
     scheduler:null
     isAsync:false
     isResult:false
     isBody:true
     isFlowable:false
     isSingle:false
     isMaybe:false
     isCompletable:false
   */

->RxJava2CallAdapter#adapt返回observable

RxJava2CallAdapter

根据上创建对象时传入的参数,最终确定
responseObservable = new CallExecuteObservable<>(call);
observable= new BodyObservable<>(responseObservable);

    @Override public Object adapt(Call call) {
    Observable> responseObservable = isAsync
        ? new CallEnqueueObservable<>(call)
        : new CallExecuteObservable<>(call);

    Observable observable;
    if (isResult) {
      observable = new ResultObservable<>(responseObservable);
    } else if (isBody) {
      observable = new BodyObservable<>(responseObservable);
    } else {
      observable = responseObservable;
    }

    if (scheduler != null) {
      observable = observable.subscribeOn(scheduler);
    }

    if (isFlowable) {
      return observable.toFlowable(BackpressureStrategy.LATEST);
    }
    if (isSingle) {
      return observable.singleOrError();
    }
    if (isMaybe) {
      return observable.singleElement();
    }
    if (isCompletable) {
      return observable.ignoreElements();
    }
    return observable;
  }

接口访问

 api.login(parameter)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread(),true)
    .subscribeWith(new DisposableObserver>() {
        @Override
        public void onNext(@NonNull Base result) {

        }

        @Override
        public void onError(@NonNull Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });

涉及到的类继承关系

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    /* 可消费的Observable */
    protected final ObservableSource<T> source;

    /**
     * 持有上游ObservableSource。
     */
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source() {
        return source;
    }

}

subscribeOn和observeOn都是Observable的方法,所以可以重复链式调用

subscribeOn(Schedulers.io())

在指定的Scheduler上异步地将观察者订阅到此ObservableSource。

  @CheckReturnValue
  @SchedulerSupport(SchedulerSupport.CUSTOM)
  public final Observable subscribeOn(Scheduler scheduler) {
      ObjectHelper.requireNonNull(scheduler, "scheduler is null");
      return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));
  }
->RxJavaPlugins#onAssembly可以添加自定义变换的Function这里f=null返回source
  @SuppressWarnings({ "rawtypes", "unchecked" })
  @NonNull
  public static  Observable onAssembly(@NonNull Observable source) {
      Function f = onObservableAssembly;
      if (f != null) {
          return apply(f, source);
      }
      return source;
  }

/*this指向BodyObservable
  scheduler指向IoScheduler*/
new ObservableSubscribeOn(this, scheduler)
->public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) {
      super(source);
      this.scheduler = scheduler;
  }

将BodyObservable订阅到ObservableSubscribeOn并返回

observeOn(AndroidSchedulers.mainThread())

修改ObservableSource以在指定调度程序上通知观察者

  @CheckReturnValue
  @SchedulerSupport(SchedulerSupport.CUSTOM)
  public final Observable observeOn(Scheduler scheduler, boolean delayError) {
      return observeOn(scheduler, delayError, bufferSize());
  }
->@CheckReturnValue
  @SchedulerSupport(SchedulerSupport.CUSTOM)
  public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
      ObjectHelper.requireNonNull(scheduler, "scheduler is null");
      ObjectHelper.verifyPositive(bufferSize, "bufferSize");
      //可以添加自定义变换的Function
      return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));
  }
参数说明:
delayError:如果调度程序是真正异步的,onError通知将在发射线程上的onNext通知之前调用。
bufferSize():
    ->public static int bufferSize() {
         return Flowable.bufferSize();
      }

/*this指向ObservableSubscribeOn
  scheduler指向HandlerScheduler(new Handler(Looper.getMainLooper()))*/
new ObservableObserveOn(this, scheduler, delayError, bufferSize)
->ObservableObserveOn
->public final class ObservableObserveOn extends AbstractObservableWithUpstream {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);//source指向ObservableSubscribeOn
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
    ...
  }

订阅主线程ObservableObserveOn接收上游ObservableSubscribeOn的发射任务和通知。

subscribeWith(new DisposableObserver>(){…});

将给定的观察者(子类)订阅到此Observable并按原样返回给定的观察者。

->@CheckReturnValue
  @SchedulerSupport(SchedulerSupport.NONE)
  public final > E subscribeWith(E observer) {
      subscribe(observer);
      return observer;
  }
//observer指向DisposableObserver
subscribe(observer);
  ->ObservableObserveOn
  ->@SchedulerSupport(SchedulerSupport.NONE)
    @Override
    protected void subscribeActual(Observer observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //HandlerScheduler#createWorker->new HandlerWorker(new Handler(Looper.getMainLooper()));
            Scheduler.Worker w = scheduler.createWorker();
            //source指向ObservableSubscribeOn
            //observer指向DisposableObserver
            source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));
        }
    }

将主线程DisposableObserver的观察者ObserveOnObserver订阅到工作线程的ObservableSubscribeOn
source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));

  ->ObservableSubscribeOn
  ->@SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            ...
        }
    }
  ->@Override
    public void subscribeActual(final Observer s) {

        /*s指向ObserveOnObserver  
          SubscribeOnObserver#actual指向ObserveOnObserver
          创建新的SubscribeOnObserver*/
        final SubscribeOnObserver parent = new SubscribeOnObserver(s);
        //将SubscribeOnObserver订阅到ObserveOnObserver
        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

s.onSubscribe(parent);

->ObserveOnObserver 
      ->@Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                ...

                queue = new SpscLinkedArrayQueue(bufferSize);
                /*actual指向ObserveOnObserver
                 ObserveOnObserver#actual指向DisposableObserver*/
                actual.onSubscribe(this);
            }
        }

actual.onSubscribe(this);
->DisposableObserver
->public final void onSubscribe(@NonNull Disposable s) {
        if (EndConsumerHelper.setOnce(this.s, s, getClass())) {
            onStart();
        }
    }

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

->scheduler指向IoScheduler

new SubscribeTask(parent)
->final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver parent;

        SubscribeTask(SubscribeOnObserver parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //source指向BodyObservable
            source.subscribe(parent);
        }
    }
source.subscribe(parent);

->final class BodyObservable extends Observable {
      private final Observable> upstream;

      BodyObservable(Observable> upstream) {
          //upstream指向responseObservable
          this.upstream = upstream;
      }

      @Override protected void subscribeActual(Observer observer) {
        //observer指向SubscribeOnObserver
        upstream.subscribe(new BodyObserver(observer));
      }
      ...
  }

upstream.subscribe(new BodyObserver(observer));
->final class CallExecuteObservable extends Observable> {
  private final Call originalCall;

  CallExecuteObservable(Call originalCall) {
    this.originalCall = originalCall;
  }

  @Override protected void subscribeActual(Observer> observer) {
    //call指向OkhttpCall
    Call call = originalCall.clone();
    //BodyObserver订阅OkhttpCall
    observer.onSubscribe(new CallDisposable(call));

    boolean terminated = false;
    try {
      Response response = call.execute();
      if (!call.isCanceled()) {
        observer.onNext(response);
      }
      if (!call.isCanceled()) {
        terminated = true;
        observer.onComplete();
      }
    } catch (Throwable t) {
      ...
    }
  }
  ...
 }


← The Programmer's Oath okhttp 源码分析 →

Powered by Hexo, Theme designs by @hpcslag.
Style-Framework Tocas-UI designs by @yamioldmel