backpressure

    0熱度

    1回答

    在RxJava 1/RxScala中,如何在下列情況下節流/背壓可觀測源? def fast: Observable[Foo] // Supports backpressure def afterExpensiveOp: Observable[Bar] = fast.flatMap(foo => Observable.from(expensiveOp(foo)) // Signa

    0熱度

    1回答

    ---abcde-----f-------gh-----i----> //Events 我有一個「工作隊列」,我想觀察/訂閱。這是一個要處理的命令對象的數組。新的作品通常以連發形式出現,並且需要連續處理(按收到的順序逐個處理,直至完全處理)。 我正在使用RxJS 5.0.0-beta.6。 (由其他庫強加的版本) 下面是一個工作示例,說明我想要的行爲,但使用RxJS v4。 問題的主要代碼是這

    0熱度

    1回答

    我在我的android應用程序中使用RxJava。我正在使用interval()函數使用計時器,但即使添加了onBackPressureDrop(),我仍然得到Missing Backpressure異常。我還爲我的訂戶添加了onError(),並將異常記錄到Crashlytics,但它仍然崩潰。請幫忙。我花了一個星期的時間來解決問題,但無濟於事。代碼偶爾崩潰,我甚至一次都無法重現它。 Trace

    0熱度

    1回答

    我已經編寫了一個Akka應用程序,該應用程序從Kafka獲取輸入,然後使用分片演員處理數據並輸出到Kafka。 但在某些場合分片區域不能處理負載,我也得到: 你或許應該實行流量控制,以避免水浸 遠程連接。 如何在此鏈/流中實施背壓? 卡夫卡消費 - >共享演員 - >卡夫卡生產者 從代碼一些片斷: ReactiveKafka kafka = new ReactiveKafka(); Subsc

    2熱度

    1回答

    我一直讀了幾文檔的背壓RxJava,但我不能找到喜歡它是如何在庫內部發生的詳細解釋,每個人都只是概括它像「生產者」是太快, 「消費者」太慢了。 例如像下面的代碼: Observable.interval(1, TimeUnit.MILLISECONDS) .observeOn(Schedulers.newThread()) .subscribe( i -> {

    0熱度

    1回答

    我一直在掙扎一段時間,我相信這是一個非常基本的問題。 我有一個Flowable從網絡中檢索一捆物品併發出它們。 Flowable .create(new FlowableOnSubscribe<Item>() { @Override public void subscribe(FlowableEmitter<Item> emitter) throws Except

    2熱度

    1回答

    我用阿卡流「ActorPublisher演員作爲流每個連接的數據Source發送到傳入的WebSocket或HTTP連接。 ActorPublisher的contract是定期通過提供需求請求數據 - 下游可接受的元素數量。如果需求爲0,我不應該發送更多元素。我觀察到,如果我緩衝元素,當消費者速度緩慢時,緩衝區大小在1到60之間波動,但大多數在40-50之間。 要流我使用阿卡-HTTP「s到的We

    4熱度

    1回答

    最近我意識到我不明白RxJava2背壓是如何工作的。 我做了小測試,我希望它應該會失敗,MissingBackpressureException例外: @Test public void testBackpressureWillFail() { Observable.<Integer>create(e -> { for (int i = 0; i < 10000; i++)

    3熱度

    1回答

    我想測試一些Akka流功能,如conflate。爲此,我需要在簡單的單元測試中構建一個不受背壓影響的源。天真的嘗試,如 Source.tick(1.milli, 1.milli, "tick").map(_ => Random.nextDouble()) 由於背壓不起作用。 OTOH通過HTTP可能是矯枉過正。 如何創建一個簡單的Source對於不受背壓影響的單元測試?

    4熱度

    1回答

    我正在分析Spark結構化流式處理中的背壓功能。有誰知道細節?是否有可能通過代碼調整處理傳入記錄? 謝謝