2015-11-03 114 views
3

當您有一個大的可觀察圖形(即使用merge,groupBy,join等組成的可觀察組合多次),並拋出異常時,有時很難計算排除異常源自的地方。我想知道是否有可能找出源文件中的可見操作符被喚醒的地方。一個例子應該更清楚。追蹤[rx]大圖中的可觀察異常到源代碼

例如,給出以下IllegalStateException: Only one subscriber allowed!和堆棧跟蹤,我想知道是否有可能找出行數operatorMergeoperatorFilteroperatorGroupBy等從我的源文件誘發。是否有可能做到這一點不知何故,無論是使用調試器,打印報表或以其他方式?

java.lang.IllegalStateException: Only one subscriber allowed! 
     at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilS 
ubscriber.java:124) 
     at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilS 
ubscriber.java:81) 
     at rx.Observable$1.call(Observable.java:144) 
     at rx.Observable$1.call(Observable.java:136) 
     at rx.Observable.unsafeSubscribe(Observable.java:7531) 
     at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2.call(OperatorGroupBy. 
java:251) 
     at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2.call(OperatorGroupBy. 
java:236) 
     at rx.Observable$1.call(Observable.java:144) 
     at rx.Observable$1.call(Observable.java:136) 
     at rx.Observable$1.call(Observable.java:144) 
     at rx.Observable$1.call(Observable.java:136) 
     at rx.Observable$1.call(Observable.java:144) 
     at rx.Observable$1.call(Observable.java:136) 
     at rx.Observable$1.call(Observable.java:144) 
     at rx.Observable$1.call(Observable.java:136) 
     at rx.Observable$1.call(Observable.java:144) 
     at rx.Observable$1.call(Observable.java:136) 
     at rx.Observable$1.call(Observable.java:144) 
     at rx.Observable$1.call(Observable.java:136) 
     at rx.Observable$1.call(Observable.java:144) 
     at rx.Observable$1.call(Observable.java:136) 
     at rx.Observable.unsafeSubscribe(Observable.java:7531) 
     at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMer 
ge.java:215) 
     at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:1 
85) 
     at rx.internal.operators.**OperatorMerge**$MergeSubscriber.onNext(OperatorMerge.java:1 
20) 
     at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55) 
     at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55) 
     at rx.internal.operators.SingleDelayedProducer.emit(SingleDelayedProducer.java:80) 

     at rx.internal.operators.SingleDelayedProducer.set(SingleDelayedProducer.java:63) 
     at rx.internal.operators.OperatorToObservableList$1.onCompleted(OperatorToObservab 
leList.java:93) 
     at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:44) 
     at rx.internal.operators.**OperatorFilter**$1.onCompleted(OperatorFilter.java:42) 
     at rx.internal.operators.OperatorTakeUntilPredicate$ParentSubscriber.onNext(Operat 
orTakeUntilPredicate.java:54) 
     at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:84) 
     at rx.internal.operators.**OperatorGroupBy**$GroupBySubscriber$2$2.onNext(OperatorGrou 
pBy.java:286) 
     at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:1 
81) 
     at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150) 
     at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupB 
y.java:340) 
     at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy. 
java:226) 
     at rx.internal.operators.OnSubscribeRefCount$2.onNext(OnSubscribeRefCount.java:124 
) 
     at rx.internal.operators.OperatorPublish$PublishSubscriber.dispatch(OperatorPublis 
h.java:560) 
     at rx.internal.operators.**OperatorPublish**$PublishSubscriber.onNext(OperatorPublish. 
java:258) 
     at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676 
) 
     at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5 
86) 
     at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676 
) 
     at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5 
86) 
     at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676 
) 
     at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5 
86) 
     at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscr 
ibeFromIterable.java:98) 
     at rx.Subscriber.setProducer(Subscriber.java:177) 
     at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java 
:50) 
     at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java 
:33) 
     at rx.Observable.unsafeSubscribe(Observable.java:7531) 
     at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMer 
ge.java:215) 
     at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:1 
85) 
     at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:1 
20) 
     at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55) 
     at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55) 
     at rx.internal.operators.OnSubscribeRefCount$2.onNext(OnSubscribeRefCount.java:124 
) 
     at rx.internal.operators.OperatorPublish$PublishSubscriber.dispatch(OperatorPublis 
h.java:560) 
     at rx.internal.operators.OperatorPublish$PublishSubscriber.onNext(OperatorPublish. 
java:258) 
     at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55) 
     at rx.internal.operators.OperatorScan$2.onNext(OperatorScan.java:112) 
     at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676 
) 
     at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5 
86) 
     at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2$2.onNext(OperatorGrou 
pBy.java:286) 
     at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:1 
81) 
     at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150) 
     at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupB 
y.java:340) 
     at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy. 
java:226) 
     at rx.lang.scala.Subscriber$$anon$3.onNext(Subscriber.scala:198) 
... 

這個問題的出現本質上是因爲Observable的整個觀點是在執行時將a)代碼從b)中分離出來。但對於調試程序來說,這是一場噩夢。因此,重複上面的問題,我想知道是否有可能在源代碼中跟蹤每個組合的原始行。

回答

1

有關於額外的調試信息,但整個庫運行100X慢一些實驗和被放棄了。

問題可能出現在您的groupBy中的flatMap中,您在此訂閱GroupedObservable並將其交還給現在無法訂閱它的flatMap:GroupedObservable只能使用一次。您需要使用publish()replay()運營商之一,並相應調整功能邏輯。

+0

感謝您的意見;它確實通過「共享」組和可嵌套觀察量來修復。 – Luciano

0

一年之後我還在爲此努力奮鬥,並且仍然沒有發現追蹤執行的好方法。我發現我依靠在代碼中放置打印語句來看看發生了什麼。這是我能夠掌握正在發生的事情的唯一途徑。

我發現有用的唯一的事情是創造這樣的模式,所以我沒有寫doOnNext(x => println(x))顯示發生了什麼事情,每一次:

implicit class ObservableTrace[T](o : rx.lang.scala.Observable[T]) { 
    import java.time.LocalTime 
    def trace(name : String) : rx.lang.scala.Observable[T] = { 
     def print(s: String) = println(s"${LocalTime.now} : $name : $s") 
     (o doOnNext (x => print("next:" + x)) 
     doOnSubscribe print("subscribed") 
     doOnCompleted print("completed") 
     doOnError (e => print("error: " + e)) 
     doOnUnsubscribe print("unsubscribed") 
     ) 
    } 

這使得編輯代碼快速 - 只寫一對夫婦的觀察到的myobservable.trace("My Observable"),這可以很容易地看到,當不同的生命週期事件發生。