2017-05-03 115 views
1

我開始學習反應流,因爲我對使用RxJava替代更傳統的事件總線的新趨勢感到好奇。 This blog post是如何完成的典型描述。如果我理解正確,RxJava 1.x並不嚴格執行反應流,但它非常相似。版本2.0包含一些符合要求的類,或者至少通過TCK,因此此代碼的更新版本可能會有所不同。使用Reactive Streams Processor作爲事件總線通常可以嗎?

public class UserLocationModel { 

    private PublishSubject<LatLng> subject = PublishSubject.create(); 

    public void setLocation(LatLng latLng) { 
    subject.onNext(latLng); 
    } 

    public Observable<LatLng> getUserLocation() { 
    return subject; 
    } 
} 

在無流術語,我認爲subjectProcessor,這既是一種PublisherSubscriber

的問題是,呼籲未訂閱任何一個SubscriberonNext似乎違反了無流規範,特別rule 1.9

這僅僅是一個實現細節,它可以工作嗎?我是否正確地認爲你通常不能依賴於這個工作來實現兼容的Reactive Streams實現?

回答

4

Subject s和Processor s的標準RxJava 2是寬鬆的,所以您不必在調用其他方法之前先調用onSubscribe。這部分歸因於傳統性1.x主題沒有onSubscribe,部分原因是RxJava 2處理器不能通過選擇協調Subscriber端和Publisher端之間的請求,因此不能用於Subscription

如果您訂購RxJava Processor任何符合RS的Publisher,它們似乎會盡可能請求Long.MAX_VALUE和中繼信號。如果您訂購符合RS標準的Subscriber到RxJava Processor s,他們將承諾那些Subscriber s的背壓,並且不會溢出它們,但是,缺少請求可能導致個別發射MissingBackpressureException,並將Subscriber「扔」掉。 extensions library中有一個自定義的Publisher,用於協調請求。

我正確地認爲你一般不能依靠這個工作來完成兼容的Reactive Streams實現。

中沒有任何的規範,因此在TCK測試不應該用Processor未收到onSubscribe電話會怎麼樣但它需要它,因此,我認爲這已經成爲一個實現細節。

這裏有兩個更大的問題:

  1. 受試者發明與反應世界彌合勢在必行世界和GUI案件和非backpressured案件事件的multicasters很好地工作。在無功 - 無功多播中,它們是更好更直接的選擇,例如publish(Function)
  2. 在事件公交中思考是一個倒退,因爲您通過在單個「鐵路」上剷除和排除事件來創建單個堵塞點。相比之下,反應性的設計有利於個人和通常獨立的流,其中每個流可以根據需要在線程之間跳轉,並且可能在最後時刻避免主線程。