我有一個Flowables的問題,並將它們添加到compositeDisposables。 我想從Observable切換到Flowable,因爲操作可能會發出1000或更多的值。林與rxjava2有些沒有經驗,所以請原諒我,如果這個問題是愚蠢的:)Android rxjava2流動與compositedisposable
到目前爲止,我所用的可觀察到的是這樣的:
public Observable<String> uploadPictureRx(String path)
{
return Observable.create(new ObservableOnSubscribe<String>()
{
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception
{
Uri file = Uri.fromFile(new File(path));
String segment = file.getLastPathSegment();
UploadTask uploadTask = reference.child("SomeChild").child(segment).putFile(file);
uploadTask.addOnFailureListener(new OnFailureListener()
{
@Override
public void onFailure(@NonNull Exception exception)
{
e.onError(exception);
}
}).addOnSuccessListener(new OnSuccessListener<UploadTask.TaskSnapshot>()
{
@Override
public void onSuccess(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
downloadUrl = taskSnapshot.getDownloadUrl();
String url = downloadUrl.getPath();
e.onNext(url);
e.onComplete();
}
}).addOnProgressListener(new OnProgressListener<UploadTask.TaskSnapshot>()
{
@Override
public void onProgress(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
long bytes = taskSnapshot.getBytesTransferred();
String bytesS = String.valueOf(bytes);
e.onNext(bytesS);
}
});
}
});
}
,並呼籲像這樣的方法:
private void uploadPicToFireBaseStorage(String path)
{
compositeDisposable.add(storageService.uploadPictureRx(path)
.subscribeOn(Schedulers.io())
.observeOn(mainScheduler)
.subscribeWith(new DisposableObserver<String>()
{
@Override
public void onNext(String s)
{
String ss = s;
System.out.println(ss);
}
@Override
public void onError(Throwable e)
{
e.printStackTrace();
}
@Override
public void onComplete()
{
view.displayToast("Picture Upload completed");
}
})
);
}
這工作正常!然而,當我試圖做同樣的可流動的,而不是觀察到的,它不會編譯:
public Flowable<String> uploadPictureRx(String path)
{
return Flowable.create(new FlowableOnSubscribe<String>()
{
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception
{
Uri file = Uri.fromFile(new File(path));
String segment = file.getLastPathSegment();
UploadTask uploadTask = reference.child("somechild").child(segment).putFile(file);
uploadTask.addOnFailureListener(new OnFailureListener()
{
@Override
public void onFailure(@NonNull Exception exception)
{
e.onError(exception);
}
}).addOnSuccessListener(new OnSuccessListener<UploadTask.TaskSnapshot>()
{
@Override
public void onSuccess(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
downloadUrl = taskSnapshot.getDownloadUrl();
String url = downloadUrl.getPath();
e.onNext(url);
e.onComplete();
}
}).addOnProgressListener(new OnProgressListener<UploadTask.TaskSnapshot>()
{
@Override
public void onProgress(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
long bytes = taskSnapshot.getBytesTransferred();
String bytesS = String.valueOf(bytes);
e.onNext(bytesS);
}
});
}
}, BackpressureStrategy.BUFFER);
}
的錯誤是: 推斷型「E」的類型參數「E」是不在其綁定;應該實現'org.reactivestreams.Subscriber
我的猜測是,該Flowable不實現一次性,這就是爲什麼它不會編譯。如果那是真的,我不知道,只是我迄今爲止的最佳猜測。 或者我必須更改subscribeWith()來訂閱()?我不知道這種改變會產生什麼影響。
無論如何建議如何使這項工作,並得到這個Flowable到我compositedisposable真的很感激。
謝謝你們!
編輯:
試圖改變DisposableObserver到訂閱服務器。但是這會導致以下錯誤: Compiler Error
懸浮劑使用的訂閱而非一次性的背壓的原因。基本上使用Subscription.request()方法來告訴可觀察到那一刻我想要多少物品。 –
顯示您的用戶代碼。我想錯誤是沒有你的創造。 –
謝謝你的迴應!我的訂戶是不是代碼塊在中間?對不起,如果這是愚蠢的,但迄今爲止,我主要與單打或補充工作。 –