我有兩個可觀察對象(A,B),並且我希望第一個在第二次運行之前先完成運行。但是,這甚至不是我遇到的問題。問題是,當A在B之前被添加時,B除非在A之前放置B,否則B不會運行。但是,我在場景就像是這樣的:使用Observable Zip misblehaving
- A - 皮卡
- 乙 - 交貨
有三種類型的訂單。 Pickup Only
,Delivery Only
和Pickup And Delivery
。 Pickups
需要在各種情況下運行Deliveries
之前。 A Delivery
只有Pickup
標記爲true
。 A只有Pickup
,需要在picked up and delivered
上關閉。這就是爲什麼我需要Pickup
先發送所有本地保存的皮卡,然後再發送。所以,我這樣做:
皮卡
private Observable<UpdateMainResponse> getDeliveredOrders() {
String token = PrefUtil.getToken(context);
BehaviorSubject<Integer> pageControl = BehaviorSubject.create(1);
Observable<UpdateMainResponse> ret = pageControl.asObservable().concatMap(integer -> {
if (integer - 1 != deliveryUpdate.size()) {
Log.e(TAG, "DeliveredOrders: " + deliveryUpdate.size());
RealmOrderUpdate theDel = deliveryUpdate.get(integer-1);
Log.e(TAG, "DeliveryUpdate: " + theDel.toString());
DeliverOrder pickupOrder = new DeliverOrder();
pickupOrder.setUuid(theDel.getUuid());
pickupOrder.setCode(theDel.getDest_code());
pickupOrder.setDelivered_lat(theDel.getLoc_lat());
pickupOrder.setDelivered_long(theDel.getLoc_long());
return apiService.deliverOrder(theDel.getOrderId(), token, pickupOrder)
.subscribeOn(Schedulers.immediate())
.doOnNext(updateMainResponse -> {
try {
Log.e(TAG, updateMainResponse.toString());
realm.executeTransaction(realm1 -> theDel.deleteFromRealm());
} catch (Exception e) {
e.printStackTrace();
} finally {
pageControl.onNext(integer + 1);
}
});
} else {
return Observable.<UpdateMainResponse>empty().doOnCompleted(pageControl::onCompleted);
}
});
return Observable.defer(() -> ret);
}
交貨
private Observable<UpdateMainResponse> getPickedOrders() {
Log.e(TAG, "PickedOrders: " + pickUpdate.size());
String token = PrefUtil.getToken(context);
BehaviorSubject<Integer> pageControl = BehaviorSubject.create(1);
Observable<UpdateMainResponse> ret = pageControl.asObservable().concatMap(integer -> {
Log.e(TAG, "MainPickedInteger: " + integer);
if (integer - 1 != pickUpdate.size()) {
RealmOrderUpdate thePick = pickUpdate.get(integer - 1);
Log.e(TAG, "PickedUpdate: " + thePick.toString());
PickupOrder pickupOrder = new PickupOrder();
pickupOrder.setUuid(thePick.getUuid());
pickupOrder.setCode(thePick.getSource_code());
pickupOrder.setPicked_lat(thePick.getLoc_lat());
pickupOrder.setPicked_long(thePick.getLoc_long());
return apiService.pickupOrder(thePick.getOrderId(), token, pickupOrder)
.subscribeOn(Schedulers.immediate())
.doOnNext(updateMainResponse -> {
try {
Log.e(TAG, updateMainResponse.toString());
realm.executeTransaction(realm1 -> thePick.deleteFromRealm());
} catch (Exception e) {
e.printStackTrace();
} finally {
pageControl.onNext(integer + 1);
}
});
} else {
return Observable.<UpdateMainResponse>empty().doOnCompleted(pageControl::onCompleted);
}
});
return Observable.defer(() -> ret);
}
拉鍊
private Observable<ZipperResponse> batchedZip() {
return Observable.zip(getPickedOrders(), getDeliveredOrders(), (updateMainResponse, updateMainResponse2) -> {
List<UpdateMainResponse> orders = new ArrayList<>();
bakeries.add(updateMainResponse);
bakeries.add(updateMainResponse2);
return new ZipperResponse(orders);
});
}
利用拉鍊
public void generalUpload(APIRequestListener listener) {
batchedZip.subscribe(new Subscriber<ZipperResponse>() {
@Override
public void onCompleted() {
listener.didComplete();
unsubscribe();
}
@Override
public void onError(Throwable e) {
listener.handleDefaultError(e);
unsubscribe();
}
@Override
public void onNext(ZipperResponse zipperResponse) {
Log.e(TAG, zipperResponse.size());
}
});
}
問題
我不知道爲什麼
getDeliveredOrders()
不會被調用,除非我把它移動到第一個前getPickedOrders()
通過Rx Documentation for Zip閱讀我可以看到它不會像我預期的那樣在
getPickedOrders()
之前運行的所有運行。它必須一個接一個地做。例如:皮卡之一,然後交付之一
任何有助於瞭解發生了什麼,將不勝感激。由於
你是說我應該使用'.merge'而不是'.zip'。請注意,我不需要'pickup pickup和交付源()',因爲爲什麼?在這種情況下,我只是存儲'皮卡和交貨',所以,沒有必要這樣做。它始終落在確保'拾取'先交付'之前' –
然後不要使用第三個參數。在高層次上,我描述了你想要做什麼?或者你想先收集一切,運行所有取貨任務,然後執行所有交貨任務? –
其實,這就是我的想法和我所做的。因爲這個人'pickUpdate'是一個初始化的'List''pickUpdate = realm.where(RealmOrderUpdate.class).equalTo(「pickup」,true).findAll();'並且我使用'BehaviorSubject'循環到發送所有'皮卡' –