2016-10-10 55 views
1

我有兩個可觀察對象(A,B),並且我希望第一個在第二次運行之前先完成運行。但是,這甚至不是我遇到的問題。問題是,當A在B之前被添加時,B除非在A之前放置B,否則B不會運行。但是,我在場景就像是這樣的:使用Observable Zip misblehaving

  • A - 皮卡
  • 乙 - 交貨

有三種類型的訂單。 Pickup OnlyDelivery OnlyPickup And DeliveryPickups需要在各種情況下運行Deliveries之前。 A Delivery只有Pickup標記爲true。 A只有Pickup,需要在picked up and delivered上關閉。這就是爲什麼我需要Pickup先發送所有本地保存的皮卡,然後再發送。所以,我這樣做:

皮卡

private Observable<UpdateMainResponse> getDeliveredOrders() { 

    String token = PrefUtil.getToken(context); 

    BehaviorSubject<Integer> pageControl = BehaviorSubject.create(1); 
    Observable<UpdateMainResponse> ret = pageControl.asObservable().concatMap(integer -> { 

     if (integer - 1 != deliveryUpdate.size()) { 
      Log.e(TAG, "DeliveredOrders: " + deliveryUpdate.size()); 
      RealmOrderUpdate theDel = deliveryUpdate.get(integer-1); 
      Log.e(TAG, "DeliveryUpdate: " + theDel.toString()); 
      DeliverOrder pickupOrder = new DeliverOrder(); 
      pickupOrder.setUuid(theDel.getUuid()); 
      pickupOrder.setCode(theDel.getDest_code()); 
      pickupOrder.setDelivered_lat(theDel.getLoc_lat()); 
      pickupOrder.setDelivered_long(theDel.getLoc_long()); 
      return apiService.deliverOrder(theDel.getOrderId(), token, pickupOrder) 
        .subscribeOn(Schedulers.immediate()) 
        .doOnNext(updateMainResponse -> { 
         try { 
          Log.e(TAG, updateMainResponse.toString()); 
          realm.executeTransaction(realm1 -> theDel.deleteFromRealm()); 
         } catch (Exception e) { 
          e.printStackTrace(); 
         } finally { 
          pageControl.onNext(integer + 1); 
         } 
        }); 
     } else { 
      return Observable.<UpdateMainResponse>empty().doOnCompleted(pageControl::onCompleted); 
     } 
    }); 

    return Observable.defer(() -> ret); 
} 

交貨

private Observable<UpdateMainResponse> getPickedOrders() { 

    Log.e(TAG, "PickedOrders: " + pickUpdate.size()); 

    String token = PrefUtil.getToken(context); 

    BehaviorSubject<Integer> pageControl = BehaviorSubject.create(1); 
    Observable<UpdateMainResponse> ret = pageControl.asObservable().concatMap(integer -> { 

     Log.e(TAG, "MainPickedInteger: " + integer); 
     if (integer - 1 != pickUpdate.size()) { 
      RealmOrderUpdate thePick = pickUpdate.get(integer - 1); 
      Log.e(TAG, "PickedUpdate: " + thePick.toString()); 
      PickupOrder pickupOrder = new PickupOrder(); 
      pickupOrder.setUuid(thePick.getUuid()); 
      pickupOrder.setCode(thePick.getSource_code()); 
      pickupOrder.setPicked_lat(thePick.getLoc_lat()); 
      pickupOrder.setPicked_long(thePick.getLoc_long()); 
      return apiService.pickupOrder(thePick.getOrderId(), token, pickupOrder) 
        .subscribeOn(Schedulers.immediate()) 
        .doOnNext(updateMainResponse -> { 
         try { 
          Log.e(TAG, updateMainResponse.toString()); 
          realm.executeTransaction(realm1 -> thePick.deleteFromRealm()); 
         } catch (Exception e) { 
          e.printStackTrace(); 
         } finally { 
          pageControl.onNext(integer + 1); 
         } 
        }); 
     } else { 
      return Observable.<UpdateMainResponse>empty().doOnCompleted(pageControl::onCompleted); 
     } 
    }); 

    return Observable.defer(() -> ret); 
} 

拉鍊

private Observable<ZipperResponse> batchedZip() { 
    return Observable.zip(getPickedOrders(), getDeliveredOrders(), (updateMainResponse, updateMainResponse2) -> { 
     List<UpdateMainResponse> orders = new ArrayList<>(); 
     bakeries.add(updateMainResponse); 
     bakeries.add(updateMainResponse2); 
     return new ZipperResponse(orders); 
    }); 
} 

利用拉鍊

public void generalUpload(APIRequestListener listener) { 

    batchedZip.subscribe(new Subscriber<ZipperResponse>() { 
     @Override 
     public void onCompleted() { 
      listener.didComplete(); 
      unsubscribe(); 
     } 

     @Override 
     public void onError(Throwable e) { 
      listener.handleDefaultError(e); 
      unsubscribe(); 
     } 

     @Override 
     public void onNext(ZipperResponse zipperResponse) { 
      Log.e(TAG, zipperResponse.size()); 
     } 
    }); 
} 

問題

  1. 我不知道爲什麼getDeliveredOrders()不會被調用,除非我把它移動到第一個前getPickedOrders()

  2. 通過Rx Documentation for Zip閱讀我可以看到它不會像我預期的那樣在getPickedOrders()之前運行的所有運行。它必須一個接一個地做。例如:皮卡之一,然後交付之一

任何有助於瞭解發生了什麼,將不勝感激。由於

回答

1

好了,如果我說對了:

  • 皮卡只:需要通過分揀過程中運行,那麼他們完成。
  • 僅供貨:需要通過交貨流程,然後完成。
  • 取貨和交貨:需要先通過取件,然後通過交貨。

在非常高的水平上,幾乎preudo代碼,爲什麼這個過程不起作用?

Observable<Item> performPickup(Item item); 
Observable<Item> performDelivery(Item item); 
Observable<Items> items = ...; 

items 
.flatMap(item -> item.needsPickup() ? performPickup(item) : Observable.just(item)) 
.flatMap(item -> item.needsDelivery() ? performDelivery(item) : Observable.just(item)) 
.doOnNext(completedItem -> ...) 

如果您有三種不同的來源:

Observable<Item> items = Observable.merge(
    pickupSource(), 
    deliverySource(), 
    pickupAndDeliverySource()); 
+0

你是說我應該使用'.merge'而不是'.zip'。請注意,我不需要'pickup pickup和交付源()',因爲爲什麼?在這種情況下,我只是存儲'皮卡和交貨',所以,沒有必要這樣做。它始終落在確保'拾取'先交付'之前' –

+0

然後不要使用第三個參數。在高層次上,我描述了你想要做什麼?或者你想先收集一切,運行所有取貨任務,然後執行所有交貨任務? –

+0

其實,這就是我的想法和我所做的。因爲這個人'pickUpdate'是一個初始化的'List ''pickUpdate = realm.where(RealmOrderUpdate.class).equalTo(「pickup」,true).findAll();'並且我使用'BehaviorSubject'循環到發送所有'皮卡' –