2017-06-04 209 views
1

我是rxjs的新手。我在下面調用一個函數,讀完整個流並打印讀取控制檯語句,但我從來沒有看到「Subscibe done」,我不知道爲什麼。要完成此流將需要什麼?是明顯的錯嗎?RxJS訂閱永遠不會完成

const readline$ = RxNode.fromReadLineStream(rl) 
    .filter((element, index, observable) => { 
     if (index >= range.start && index < range.stop) { 
      console.log(`kept line is ${JSON.stringify(element)}`); 
      return true; 
     } else { 
      console.log(`not keeping line ${JSON.stringify(element)}`); 
      return false; 
     } 
    }) 
    .concatMap(line => Rx.Observable.fromPromise(myFunction(line))) 
    .do(response => console.log(JSON.stringify(response))); 

readline$.subscribe(i => { console.log(`Subscribe object: ${util.inspect(i)}`); }, 
        err => { console.error(`Subscribe error: ${util.inspect(err)}`); }, 
       done => { console.log("Subscribe done."); // NEVER CALLED 
          anotherFunc();     // NEVER CALLED 
        } 
); 
+1

我不確定這個*特定的observable *,但不是每個observable都會完成。 –

+1

你傳遞給'RxNode.fromReadLineStream'的節點流是什麼?流本身結束了嗎? – cartant

+0

是的,它是一個有限長度文件的RxNode.fromReadLineStream。 –

回答

相關問題