博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Rxjava 中的源码解析(二)线程切换
阅读量:4551 次
发布时间:2019-06-08

本文共 6670 字,大约阅读时间需要 22 分钟。

在 Android 开发中,使用最多就是把耗时任务放到其他线程去执行,其他线程执行完了后,就切换到 UI 线程(也叫主线程)进行接收数据;常用代码如下:

声明一个 Observer

class IntegerObserver : Observer
{ override fun onError(e: Throwable?) { Log.d("Observable", "onError: $e") } override fun onCompleted() { Log.d("Observable", "onCompleted:" + Thread.currentThread()) Log.d(tag, "onCompleted") } override fun onNext(t: Int) { Log.d("Observable", "onNext: $t") }}复制代码

实例代码

var observable: Observable
= Observable.create { Log.d("Observable", "call:" + Thread.currentThread()) it.onNext(1) it.onNext(2) it.onNext(3) it.onCompleted() } // 1.生成一个 observer observable = observable.subscribeOn(Schedulers.io())// 2.工作线程池设置, observable = observable.observeOn(AndroidSchedulers.mainThread()) //3.把查看数据,放到主线程中看 var subscription: Subscription = observable.subscribe(IntegerObserver())// 4.触发任务(订阅开始) subscription.isUnsubscribed复制代码

打印结果:

D/Observable: call:Thread[RxIoScheduler-2,5,main]D/Observable: onNext: 1D/Observable: onNext: 2D/Observable: onNext: 3D/Observable: onCompleted:Thread[main,5,main]D/Observable: onCompleted复制代码

以上这种模式大家都非常熟悉,那么我想从源码里面去看看,为什么,他是怎么做到线程之间的切换的呢?代码中 14 已经说明过了; 为了便于理解, 我把上一篇文章中的主要几个角色的图再贴一下; 命名: 图一

好了那么我们就来分析一下源码中
2
3 ;

一、分析一下 "2.工作线程池设置" 代码

1.1 源码分析

做这个代码分析的时候,为了便于理解,可以先把 3 这个代码注释掉;

observable = observable.subscribeOn(Schedulers.io()) // 2.工作线程池设置//observable = observable.observeOn(AndroidSchedulers.mainThread()) //3.把查看数据,放到主线程中看复制代码

图一中我们可以看出来,创建一个 Observerable 需要一个 OnSubscribe;以上这个地方创建 Observable 主要是需要 rx.internal.operators.OperatorSubscribeOn 这么一个 OnSubscribe对象,然后我们可以看下这个OperatorSubscribeOn 这个对象的源码:

之前设置的 scheduler 就是在这里的,然后在看call 方法中会创建 Woker inner , 这个 inner执行 schedule, 就会在异步线程执行到 source.unsafeSubscribe();

代码的 8666 行,这里执行的是 onSubscribe.call() 方法,这个方法其实就是会执行到自己的
1 这个地方的代码;
1 代码执行完后,又会回调执行到
OperatorSubscribeOn 中的第50行这里的代码,这里代码就是会执行onNext(), 这样就完整的走完一个异步任务的过程;这个线程切换最关键的代码在
OperatorSubscribeOn 中;是由这个
OperatorSubscribeOn call() 方法去触发异步线程工作的;

1.2 结论

从 1.1 上分析得到一个结论: subscribeOn 这个代码过程中主要是对 OnSubscibe 进行封装;然后在 subscribe() 触发的时候直接执行 OnSubscibe 的 call 方法;call 方法里面参数是 Subscriber, 可以直接执行 Subscriber 的 3 个生命周期方法;这就完成了回调过程;

subcriber()   —> OnSubscribe.call(subscriber)   —> schedule()   —> 内部类的 Subscriber   —> 触发内部类中 onNext()   —> 触发被内部类包裹的 subscriber.onNext()复制代码

二、分析一下 "3.把查看数据,放到主线程中看"

2.1 源码分析

observable = observable.observeOn(AndroidSchedulers.mainThread()) //3.把查看数据,放到主线程中看复制代码

同理以上的 observable 的创建过程需要创建一个 OnSubscribe, 那我们就先看看 observable.observeOn() 创建的 OnSubscribe 创建的对象是由哪个实现类完成, 我们一起来跟踪一下源码。

到这里个地方需要注意一下这里有个 lift 方法,lift() 方法接收的参数是 Operator 这个一个重要的角色类; 这里先留放着,等会回来说;我们再看看 lift() 方法;

从上面的代码里面,我可以看出来这个 Observable 的创建需要 rx.internal.operators.OnSubscribeLift 这个实现类,这个实现类是 implements OnSubscribe<R> ,所以我们去看下这个类的 call 方法;

这里的call 方法里面调用了红色箭头指定的地方,这里看到没有,有个 operator, 这里的 operator 就是之前在 observable.observeOn() 中实例化了一个 rx.internal.operators.OperatorObserveOn 这个类的对象;那我们继续看下这个对象的 call() 方法;

@Override    public Subscriber
call(Subscriber
child) { if (scheduler instanceof ImmediateScheduler) { // avoid overhead, execute directly return child; } else if (scheduler instanceof TrampolineScheduler) { // avoid overhead, execute directly return child; } else { ObserveOnSubscriber
parent = new ObserveOnSubscriber
(scheduler, child, delayError, bufferSize); // parent.init(); // 初始化操作 return parent; } }复制代码

这个 call 的作用就是把一个 Subscriber 包装一下,换成另外一个 Subscriber 是 rx.internal.operators.OperatorObserveOn.ObserveOnSubscriber 这是一个内部类,同样的他会被触发 call 方法, 再看下 call() 方法;

从上面那个 for 可以看出来,这个想要一个死循环;从 localChild.onNext(localOn.getValue(v)) 这个对于这个 Subscriber 进行传递回调;那这里还有一个问题,就是这个call 怎么回调到主线程中呢?或者怎么进行线程之间的切换呢?在回退一步到 rx.internal.operators.OperatorObserveOn 的 call 方法

@Override    public Subscriber
call(Subscriber
child) { if (scheduler instanceof ImmediateScheduler) { // avoid overhead, execute directly return child; } else if (scheduler instanceof TrampolineScheduler) { // avoid overhead, execute directly return child; } else { ObserveOnSubscriber
parent = new ObserveOnSubscriber
(scheduler, child, delayError, bufferSize); // parent.init(); // 初始化操作 return parent; } }复制代码

这里有个 parent.init()

void init() {            // don't want this code in the constructor because `this` can escape through the             // setProducer call            Subscriber
localChild = child; localChild.setProducer(new Producer() { @Override public void request(long n) { if (n > 0L) { BackpressureUtils.getAndAddRequest(requested, n); schedule(); // 这个地方启动的线程切换操作 } } }); localChild.add(recursiveScheduler); localChild.add(this); }复制代码

从上面这个地方来看,schedule() 就是把接收消息的事情,切换到自己的线程池里面去操作了;这个线程池中主要是 Scheduler.Worker 在工作, Worker 主要是接收 Action0, Action0 的回调就是 call(), 也就是 rx.internal.operators.OperatorObserveOn.ObserveOnSubscriber 中的 call(), 这个 call() 里面有个循环,用来接收和处理所有的消息,代码就是上面说的那个 localChild.onNext(localOn.getValue(v)) 这个调用地方是在循环体内,所以这个地方会被调用多次,就是把值传递给 subsciber.onNext(), 也就是我们的订阅者(Subscriber),我们自己的注册的回调;这个步骤主要是一直在封装 Subscriber;

2.2 结论

从 2.1 代码分析上可以看出来, observable.observeOn() 里面的代码来看,主要也是对 OnSubscibe 进行装饰;装饰完后,然后去触发执行 Subscriber;这里有个特别需要注意的地方,在装饰 OnSubscibe 的时候,引入了一个 Operator 角色;这个角色是 OnSubscribeLift 的必要成员变量, OnSubscribeLift 的 call 方法会触发 Operator 的 call 方法;这个 call 方法里面会包装一个新的 Subscriber 给 OnSubscribe 使用;归纳一下这个触发过程

subscribe()   —> OnSubscribeLift.call()   —> hook.onLift(operator).call(o)   —> OperatorObserveOn.call()   —> rx.internal.operators.OperatorObserveOn.ObserveOnSubscriber. call()    —> 创建线程循环接收可能被创建了   —> ObserveOnSubscriber.onNext, onComplete, onNext   —> work.schedule()   -> 这个在循环体内 localChild.onNext(localOn.getValue(v));复制代码

三、总结

  • Observable 创建必须要有一个OnSubscribe, OnSubscribe 的 call 方法里面传递的必须是一个 Subscriber 对象;
  • 在 rxjava 中 OnSubscribe 会因为多次创建 Observable; 也需要对 OnSubscribe 进行多次创建,OnSubscribe 的 call 方法可以把执行任务放到一个线程池里面进行;
  • 当在 observable.observeOn() 做线程切换的时候,会有一个 Operator 角色出现,这个类的内部类会有一个死循环,用来接听自己的Subscriber 下发 onNext, onComplete, onError 触发监听这些回调函数会触发 schedule(), 这个schedule() 本质会按照一定的条件触发那个死循环,并且把收到的数据在这个死循环里面传递下去;
  • 看源码的过程中,所有的关键代码都在 call() 方法里面,这点很重要;
  • 从目前看源码的感受,最重要是 3 个角色,Observable, OnSubscribe, Subscriber; 大部分情况下,主要是对这3种角色对象,进行装饰;

转载于:https://juejin.im/post/5d391c46f265da1bba594877

你可能感兴趣的文章
Vue的简单入门
查看>>
使用最快的方法计算2的16次方是多少?
查看>>
urllib 中的异常处理
查看>>
【SQL Server高可用性】高可用性概述
查看>>
通过SQL Server的扩展事件来跟踪SQL语句在运行时,时间都消耗到哪儿了?
查看>>
SQL优化:重新编译存储过程和表
查看>>
PCB“有铅”工艺将何去何从?
查看>>
Solr环境搭建
查看>>
ASP.NET的URL过滤
查看>>
自己写的Web服务器
查看>>
自定义定时组件
查看>>
2-素数打比表
查看>>
性能测试
查看>>
浅谈 Python 的 with 语句
查看>>
使用koa+angular+mysql 完成了一个企业站
查看>>
SQL使用范例
查看>>
转 SQL集合函数中利用case when then 技巧
查看>>
WEB ICON 的探讨
查看>>
[内核编程] 键盘过滤第一个例子ctrl2cap(4.1~4.4)汇总,测试
查看>>
Java读书笔记05 类与对象
查看>>