2017-04-02 85 views
1

我有一個Flowables的問題,並將它們添加到compositeDisposables。 我想從Observable切換到Flowable,因爲操作可能會發出1000或更多的值。林與rxjava2有些沒有經驗,所以請原諒我,如果這個問題是愚蠢的:)Android rxjava2流動與compositedisposable

到目前爲止,我所用的可觀察到的是這樣的:

public Observable<String> uploadPictureRx(String path) 
    { 
     return Observable.create(new ObservableOnSubscribe<String>() 
     { 
      @Override 
      public void subscribe(ObservableEmitter<String> e) throws Exception 
      { 
       Uri file = Uri.fromFile(new File(path)); 
       String segment = file.getLastPathSegment(); 
       UploadTask uploadTask = reference.child("SomeChild").child(segment).putFile(file); 


       uploadTask.addOnFailureListener(new OnFailureListener() 
       { 
        @Override 
        public void onFailure(@NonNull Exception exception) 
        { 
         e.onError(exception); 
        } 
       }).addOnSuccessListener(new OnSuccessListener<UploadTask.TaskSnapshot>() 
       { 
        @Override 
        public void onSuccess(UploadTask.TaskSnapshot taskSnapshot) 
        {       
         //noinspection VisibleForTests 
         downloadUrl = taskSnapshot.getDownloadUrl(); 
         String url = downloadUrl.getPath(); 
         e.onNext(url); 
         e.onComplete(); 
        } 
       }).addOnProgressListener(new OnProgressListener<UploadTask.TaskSnapshot>() 
       { 
        @Override 
        public void onProgress(UploadTask.TaskSnapshot taskSnapshot) 
        { 
         //noinspection VisibleForTests 
         long bytes = taskSnapshot.getBytesTransferred(); 
         String bytesS = String.valueOf(bytes); 
         e.onNext(bytesS); 
        } 
       }); 
      } 
     }); 
    } 

,並呼籲像這樣的方法:

private void uploadPicToFireBaseStorage(String path) 
    { 
     compositeDisposable.add(storageService.uploadPictureRx(path) 
       .subscribeOn(Schedulers.io()) 
       .observeOn(mainScheduler) 
       .subscribeWith(new DisposableObserver<String>() 
       { 

        @Override 
        public void onNext(String s) 
        { 
         String ss = s; 
         System.out.println(ss); 
        } 

        @Override 
        public void onError(Throwable e) 
        { 
         e.printStackTrace(); 
        } 

        @Override 
        public void onComplete() 
        { 
         view.displayToast("Picture Upload completed"); 
        } 
       }) 
     ); 
    } 

這工作正常!然而,當我試圖做同樣的可流動的,而不是觀察到的,它不會編譯:

public Flowable<String> uploadPictureRx(String path) 
    { 
     return Flowable.create(new FlowableOnSubscribe<String>() 
     { 

      @Override 
      public void subscribe(FlowableEmitter<String> e) throws Exception 
      { 
       Uri file = Uri.fromFile(new File(path)); 
       String segment = file.getLastPathSegment(); 
       UploadTask uploadTask = reference.child("somechild").child(segment).putFile(file);  

       uploadTask.addOnFailureListener(new OnFailureListener() 
       { 
        @Override 
        public void onFailure(@NonNull Exception exception) 
        { 
         e.onError(exception); 
        } 
       }).addOnSuccessListener(new OnSuccessListener<UploadTask.TaskSnapshot>() 
       { 
        @Override 
        public void onSuccess(UploadTask.TaskSnapshot taskSnapshot) 
        {       
         //noinspection VisibleForTests 
         downloadUrl = taskSnapshot.getDownloadUrl(); 
         String url = downloadUrl.getPath(); 
         e.onNext(url); 
         e.onComplete(); 
        } 
       }).addOnProgressListener(new OnProgressListener<UploadTask.TaskSnapshot>() 
       { 
        @Override 
        public void onProgress(UploadTask.TaskSnapshot taskSnapshot) 
        { 
         //noinspection VisibleForTests 
         long bytes = taskSnapshot.getBytesTransferred(); 
         String bytesS = String.valueOf(bytes); 
         e.onNext(bytesS); 
        } 
       }); 
      } 
     }, BackpressureStrategy.BUFFER); 

    } 

的錯誤是: 推斷型「E」的類型參數「E」是不在其綁定;應該實現'org.reactivestreams.Subscriber

我的猜測是,該Flowable不實現一次性,這就是爲什麼它不會編譯。如果那是真的,我不知道,只是我迄今爲止的最佳猜測。 或者我必須更改subscribeWith()來訂閱()?我不知道這種改變會產生什麼影響。

無論如何建議如何使這項工作,並得到這個Flowable到我compositedisposable真的很感激。

謝謝你們!

編輯:

試圖改變DisposableObserver到訂閱服務器。但是這會導致以下錯誤: Compiler Error

+0

懸浮劑使用的訂閱而非一次性的背壓的原因。基本上使用Subscription.request()方法來告訴可觀察到那一刻我想要多少物品。 –

+0

顯示您的用戶代碼。我想錯誤是沒有你的創造。 –

+0

謝謝你的迴應!我的訂戶是不是代碼塊在中間?對不起,如果這是愚蠢的,但迄今爲止,我主要與單打或補充工作。 –

回答

0

流動人員使用訂閱而不是一次性出於Backpressure的原因。基本上使用Subscription.request()方法來告訴可觀察到那一刻我想要多少物品。

更改代碼:

private void uploadPicToFireBaseStorage(String path) 
{ 
    compositeDisposable.add(storageService.uploadPictureRx(path) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(mainScheduler) 
      .subscribeWith(new DisposableObserver<String>() 
      { 

       @Override 
       public void onNext(String s) 
       { 
        String ss = s; 
        System.out.println(ss); 
       } 

       @Override 
       public void onError(Throwable e) 
       { 
        e.printStackTrace(); 
       } 

       @Override 
       public void onComplete() 
       { 
        view.displayToast("Picture Upload completed"); 
       } 
      }) 
    ); 
} 

private void uploadPicToFireBaseStorage(String path) 
{ 
    compositeDisposable.add(storageService.uploadPictureRx(path) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(mainScheduler) 
      .subscribeWith(new ResourceSubscriber<String>() 
      { 
       @Override 
       public void onNext(String s) 
       { 
        String ss = s; 
        System.out.println(ss); 
       } 

       @Override 
       public void onError(Throwable e) 
       { 
        e.printStackTrace(); 
       } 

       @Override 
       public void onComplete() 
       { 
        view.displayToast("Picture Upload completed"); 
       } 
      }) 
    ); 
} 
+0

謝謝Pheonix Wang Ill給出了一個嘗試並報告回來! –

+0

試過,但不幸的是我現在正在得到另一個錯誤。我將更新我的問題,並插入帶有錯誤的屏幕截圖。 –

+0

您不能將訂閱者添加到CompositeDisposable,因爲CompositeDisposable需要一次性訂閱者而不是訂閱者。它們是不同的類。或者你可以使用ResourceSubscriber來實現一次性使用。我更新我的回答 –