2016-04-22 107 views
0

我正在嘗試使用Highland.js設計工作流程。我無法弄清楚Highland.js如何使用它。如何使用Highland.js編寫過濾器(使用數據庫)

我有一個基於流的工作流程如下(僞代碼),

read      //fs.createReadStream(...) 
    .pipe(parse)   //JSONStream.parse(...) 
    .pipe(filterDuplicate) //mongoClient.db.collection.count({}) > 0 
    .pipe(transform)  //fn(item) { return tranform(item); } 
    .pipe(write);   //mongoClient.db.collection.insert(doc) 

的filterDuplicate查找數據庫,以檢查是否存在(使用條件)讀取記錄,並返回一個布爾結果。爲了使過濾器工作,它需要一個活動的數據庫連接,我想要重複使用,直到數據流完成。一種方法是在讀取和關閉「完成」寫入事件之前打開一個連接;這意味着我需要將連接作爲參數傳遞給過濾器和寫入,如果兩個方法都使用相同的數據庫,則這會起作用。

在上面的工作流程中,filterDuplicate和write也可能使用不同的數據庫。所以我希望連接能夠在每個函數中包含和管理,這使得它成爲一個獨立的可重用單元。

我在尋找關於如何使用高地設計的任何輸入。

感謝。

回答

0

它不會像使用pipe一堆一樣簡單。你必須爲任務使用最合適的API方法。

這裏是什麼,你可能會最終接近一個粗略的例子:

read 
    .through(JSONStream.parse([true])) 
    .through((x) => { 
    h((next, push) => { // use a generator for async operations 
     h.wrapCallback(mongoCountQuery)(params) // you don't have to do it this way 
     .collect() 
     .toCallback((err, result) => { 
      if (result > 0) push(err, x); // if it met the criteria, hold onto it 
      return push(null, h.nil); // tell highland this stream is done 
     }); 
    }); 
    }) 
    .merge() // because you've got a stream of streams after that `through` 
    .map(transform) // just your standard map through a transform 
    .through((x) => { 
    h((next, push) => { // another generator for async operations 
     h.wrapCallback(mongoUpdateQuery)(params) 
     .toCallback((err, results) => { 
      push(err, results); 
      return push(null, h.nil); 
     }); 
    }); 
    }) 
    .merge() // another stream-of-streams situation 
    .toCallback(cb); // call home to say we're done 
+0

我爲高原機制,專門找(如果有的話)保持狀態。在這種情況下,一個打開的數據庫連接,爲mongoCountQuery和mongoUpdateQuery並在其適當關閉時觸發。經過一番閱讀後,我認爲這個狀態應該在外面維護,並且明確地作爲上下文傳遞給流處理函數。這樣,流處理功能只是使用上下文來完成他們的工作,而Highland則專注於協調流功能。 – Krishnan

+0

四個月後,我傾向於低估自己的答案。 – amsross