RxSwift 源码解析03:Observable 核心逻辑

本文主要分析 Observable 核心逻辑

Observable 三部曲

  • 序列三部曲:序列产生、订阅、销毁
  • 但是在实际代码中,如下所示,观察序列的整个流程是
    • 创建观察序列
    • 订阅信号(订阅观察)
    • 发送信号(发送事件)
    • 销毁观察序列
//1、创建序列
let ob = Observable<Any>.create { observer **in**
    //3、发送信号
    observer.onNext("发送信号")
    //error 和 complete 二选一
    //observer.onError(NSError.init(domain: "myError", code: 10010, userInfo: nil))
    observer.onCompleted()
    return Disposables.create()
}

//2、订阅信号
ob.subscribe { text **in**
    print("订阅到了: \(text)")
} onError: { error **in**
    print("error \(error)")
} onCompleted: {
    print("完成")
} onDisposed: {
    print("销毁")
}

.disposed(by: disposeBag)

//--------打印---------
订阅到了: 发送信号
完成
销毁

RxSwift的响应式核心逻辑类似于iOS中的UI事件响应

  • UIControl类调用addTarget响应#selector定义的事件
  • RxSwift 将响应事件留在 rx内部类进行处理
  • 在内部类由订阅中心执行onNext发送消息
  • 消息传送到subscribe接收

这里需要重点关注的是

  • 1、序列的创建方法 create() 中的闭包是什么时候执行的?
  • 2、订阅信号 subscribe() 中的闭包是什么时候执行的?
  • 3、信号从发送到接收的流程是怎样的?

下面我们带着这些问题来一一探索

RxSwift 核心逻辑分析

主要分三步进行分析

  • 创建序列 create()
  • 订阅信号 subscribe()
  • 发送信号 onNext()

创建序列 create()

  • 进入create 函数源码,其中是创建一个 AnonymousObservable 对象,AnonymousObservable 是匿名可观察序列

    create_01.png

  • 进入 AnonymousObservable 类,这个类主要用来存储产生事件的闭包(self.subscribeHandler = subscribeHandler)和 激活事件闭包的入口函数 run()

    create_02.png

  • 查看 AnonymousObservable的继承链:AnonymousObservable ---> Producer 生产者 --> Observable --> 遵循 ObservableType 协议 --> 遵循 ObservableConvertibleType 协议

    • ObservableConvertibleType 是一个协议,有一个关联类型(用于定义任意类型的序列,即创建序列时传入的类型Any),和函数 asObservable() (可将非序列对象转化为序列对象)
      create_03.png
    • ObservableType 是一个协议,提供一个基本功能:订阅信号


      create_04.png
    • Observable 是可观察序列,有两个函数,分别是 subscribe方法(无具体实现)、asObservable函数(返回一个 Observable 序列对象)


      create_05.png
    • Producer 生产者类(工厂设计模式),有两个函数:subscribe(订阅的具体实现,后面会讲解) 和 run(无具体实现)


      create_06.png

****create() 创建序列总结**

  • 从create()的源码中,可以看出创建序列的过程中,主要是保存了一个产生事件的闭包 subscribeHandler

  • AnonymousObservable 继承链:AnonymousObservable ---> Producer 生产者 --> Observable --> 遵循 ObservableType 协议 --> 遵循 ObservableConvertibleType 协议

    • ObservableConvertibleType:关联类型 Element + 函数 asObservable()
    • ObservableType:函数 subscribe()
    • Observable(可观察序列):函数 subscribe() + 函数 asObservable()
    • Producer(生产者):函数 subscribe() + 函数 run()
    • AnonymousObservable(匿名可观察序列):产生事件闭包 subscribeHandler + 函数 run()
  • create() 创建序列流程

    • 创建 AnonymousObservable 匿名可观察序列 A
    • A 保存发送信号的闭包,即产生事件的闭包(subscribeHandler)

订阅序列 subscribe()

  • 进入 subscribe() 源码

    • 创建 AnonymousObserver 匿名订阅者(观察者)对象
    • 将订阅回调闭包保存在 AnonymousObserver 的属性 eventHandler
    • 关键代码是 self.asObservable().subscribe(observer),其中 asObservable 是返回一个序列对象本身,然后调用 subscribe 将 AnonymousObserver匿名观察者对象传递过去
      subscribe_07.png
  • 为了确认 self.asObservable() 的类型,可以通过断点调试确认为 ,这里是一个 AnonymousObservable 匿名可观察序列对象


    subscribe_08.png
  • 查找 AnonymousObservable 类的 subscribe 方法,但是这个类中并没有这个函数,这个类的继承链为:AnonymousObservable --> ObserverBase --> 遵循 Disposable、ObserverType 协议

    • Disposable 是一个销毁协议,其中只有一个方法 dispose() 销毁方法


      subscribe_11.png
    • ObserverType 是一个观察者协议,有关联类型、on方法


      subscribe_12.png
    • ObserverBase 观察者基类,具备发送响应和销毁的功能


      subscribe_13.png
    • AnonymousObserver 匿名观察者类,主要用来保存事件响应闭包,以及发送响应、销毁等功能


      subscribe_14.png
  • AnonymousObservable 的父类 Producer 中查找,其源码如下

    • 从源码中发现,subscribe 中调用了 AnonymousObservablerun 函数,将 observer 订阅者作为参数传了进来
      subscribe_09.png
  • 进入 AnonymousObservable 类的 run 实现

    • 首先创建了一个 AnonymousObservableSink 对象,即管道对象,并持有 observer(订阅者/观察者对象)、cancel(销毁者对象)
    • 然后调用 AnonymousObservableSink 的 run函数,将 self(AnonymousObservable 匿名观察序列对象)作为参数传递过去
    • AnonymousObservableSink 类的主要作用就是链接 可观察者Observable观察者Observer,实现事件的传递,起到一个桥梁/通道的作用
      subscribe_10.png
  • 进入 AnonymousObservableSink 类的 run 实现

    • 这里的parent 就是传入的self,即 AnonymousObservable
    • subscribeHandler 是create函数产生的闭包,到这里我们解决了第一个疑问:create()的闭包什么时候执行
      subscribe_15.png
    • run中的 selfAnonymousObservableSink对象,简单理解就是将 AnonymousObservableSink对象 转换为 AnyObserver对象,那具体是如何转换的呢?
  • 进入 AnyObserver 的源码,AnyObserver是一个结构体,查看其第二个 init 方法,将 AnonymousObservableSink 的on函数赋值给了 AnyObserver 的observer属性,所以 self.observer 是一个函数 即on函数。所以 AnonymousObservableSink 中 run 函数中的 AnyObserver(self) 其实就是 create闭包中的参数 observer

    subscribe_16.png

    • 由于AnonymousObservable 是匿名类,仅内部使用,不暴露给外部,所以需要通过 AnyOberver 来调用 onNext等
    • AnyOberver 类遵循 ObserverType 协议,所以可以调用 onNext函数,此时 onNext函数 实现中的 on 就是 【AnyObserver - observer - on 函数块】中通过 observer保存的函数块
      subscribe_21.png
    • 这里就解释了 为什么发送信号的代码 observer.onNext("发送信号") 最终会触发 AnonymousObservableSink.on事件了
  • 进入 AnonymousObservableSinkon 函数,将event分解成不同的事件

    subscribe_17.png

  • 查找 AnonymousObservableSink 类的 forwardOn 函数实现,子类未找到,在其父类 Sink中找到了,源码如下

    • 这里的 observer 是 外部调用 subscribe() 函数,其内部生成的 AnonymousObserver 匿名观察者对象
      subscribe_18.png
  • 查找 AnonymousObserver 的 on 函数,未找到,在其父类 ObserverBase 中查找,实现如下

    • 内部调用了 onCore 函数,父类中是抽象方法,没有具体实现


      subscribe_19.png
    • 查找 AnonymousObserveronCore 函数实现,这里的 eventHandler 就是 调用subscribe 函数传入的闭包,到此就可以理解第三个问题 :subscribe 的闭包是什么时候调用的了。同时也将 创建的闭包和订阅的闭包进行了关联。
      subscribe_20.png

**subscribe() 订阅信号(订阅序列)总结

  • AnonymousObserver 继承链:AnonymousObservable --> ObserverBase --> 遵循 Disposable、ObserverType 协议

    • Disposable:函数 dispose()
    • ObserverType:关联类型 Element + 函数 on()
    • ObserverBase(观察者基类):函数 on() + 函数 onCore() + 函数 dispose()
    • AnonymousObservable(匿名观察者):事件回调闭包 eventHandler + 函数 onCore()
  • subscribe() 订阅信号(订阅序列)流程

    • 创建 AnonymousObserver匿名观察者 B,保存订阅信号的闭包,即事件回调闭包(eventHandler)
    • A 对象调用函数 asObservable() 转换为序列对象,序列调用函数 subscribe,将 B 传递过去,即 A.asObservable().subscribe(B) 等价于 A.subscribe(B)
    • 查找函数 subscribe() 实现,AnonymousObservable类中没有,从其父类 Producer 中找到,其中调用了 AnonymousObservablerun() 函数,将 B 作为参数传递,即 A.run(B)
    • 进入函数 run() 的源码,创建了一个 AnonymousObservableSink 通道对象,并持有 observer(观察者)、cancel(销毁者),同时调用了 AnonymousObservableSink 的函数 run(),即 AnonymousObservableSink.run(A)
    • 进入 AnonymousObservableSink类的函数 run() 源码, AsubscribeHandler 闭包回调,并传入一个 AnyObserver 对象,即 A.subscribeHandler(sink)
      • 这里是将 AnonymousObservableSink 转换为 AnyObserver 对象,并将 sink 的函数 on() 赋值给了 AnyObserver 的属性 observer(是一个 on 函数)AnyObserver(self) 等价于 create()闭包中的参数 ,即AnyObserver.observer = AnonymousObservableSink.on = create的闭包参数 observer
    • 进入 AnonymousObservableSink 的函数 on() 的实现,这里将 event 分解成了不同的事件,并调用 AnonymousObservableSink 的函数 forwardOn(),即 AnonymousObservableSink.on() --> AnonymousObservableSink.forwardOn()
    • 进入 AnonymousObservableSink 的函数 forwardOn()实现,其具体实现在父类 Sink 中,调用了 AnonymousObserver 的函数 on(),即 AnonymousObservableSink.forwardOn() --> AnonymousObserver.on()
    • 进入AnonymousObserver 的函数 on(),其具体实现在父类 ObserverBase 中,调用了 AnonymousObserver 的函数 onCore(),即 AnonymousObserver.on() --> AnonymousObserver.onCore()
    • 进入 AnonymousObserver 的函数 onCore() 的实现,BeventHandler 闭包回调,即 B.eventHandler(event)

发送信号 onNext()

  • 会走到创建 B 的回调闭包中发送信号
    onNext_22.png
  • 然后接着走函数 subscribe() 订阅信号的流程

发送信号 onNext()

  • 会走到创建 B 的回调闭包中发送信号
  • 然后接着走函数 subscribe() 订阅信号的流程

总结

创建 - 订阅 - 发送信号整体流程图

整体流程图.png

其核心逻辑思维导图

核心逻辑.png
  • create() 创建序列

    • 创建 AnonymousObservable 匿名可观察序列 A
    • 保存发送信号的闭包,即 A.subscribeHandler = subscribeHandler
  • subscribe() 订阅信号(订阅序列)

    • 创建 AnonymousObserver匿名观察者 B
    • 保存订阅信号的闭包,即 B.eventHandler = eventHandler
    • A 对象调用函数 asObservable() 转换为序列对象,序列调用函数 subscribe,将 B 传递过去,即 A.asObservable().subscribe(B) 等价于 A.subscribe(B)
    • 查找函数 subscribe() 实现,AnonymousObservable类中没有,从其父类 Producer 中找到,其中调用了 AnonymousObservablerun() 函数,将 B 作为参数传递,即 A.run(B)
    • 进入函数 run() 的源码,创建了一个 AnonymousObservableSink 通道对象,并持有 observer(观察者)、cancel(销毁者),同时调用了 AnonymousObservableSink 的函数 run(),即 AnonymousObservableSink.run(A)
    • 进入 AnonymousObservableSink类的函数 run() 源码, AsubscribeHandler 闭包回调,并传入一个 AnyObserver 对象,即 A.subscribeHandler(sink)
    • 进入 AnonymousObservableSink 的函数 on() 的实现,这里将 event 分解成了不同的事件,并调用 AnonymousObservableSink 的函数 forwardOn(),即 AnonymousObservableSink.on() --> AnonymousObservableSink.forwardOn()
    • 进入 AnonymousObservableSink 的函数 forwardOn()实现,其具体实现在父类 Sink 中,调用了 AnonymousObserver 的函数 on(),即 AnonymousObservableSink.forwardOn() --> AnonymousObserver.on()
    • 进入AnonymousObserver 的函数 on(),其具体实现在父类 ObserverBase 中,调用了 AnonymousObserver 的函数 onCore(),即 AnonymousObserver.on() --> AnonymousObserver.onCore()
    • 进入 AnonymousObserver 的函数 onCore() 的实现,BeventHandler 闭包回调,即 B.eventHandler(event)
  • 发送信号 onNext()

    • 会走到创建 B 的回调闭包中发送信号
    • 然后接着走函数 subscribe() 订阅信号的流程

继承链

  • AnonymousObservable 继承链:`AnonymousObservable ---> Producer 生产者 --> Observable --> 遵循 ObservableType 协议 --> 遵循 ObservableConvertibleType 协议


    AnonymousObservable 继承链.png
  • AnonymousObserver 继承链:AnonymousObservable --> ObserverBase --> 遵循 Disposable、ObserverType 协议

    AnonymousObserver 继承链.png

推荐阅读更多精彩内容