2016-12-16 54 views
6

當前正在編寫一個小型持久隊列庫,它將讀取/寫入行到文本文件。下面是add方法,例如:將觀察對象實現爲持久隊列庫

Queue.prototype.add = function(line, cb){ 

    getLock(this, err => { 
     if(err){ 
      this.emit('error', err); 
      releaseLock(err, cb); 
     } 
     else{ 
      fs.appendFile(this.filepath, line, err => { 
       err && this.emit('error', err); 
       releaseLock(err, cb); 
      }); 
     } 
    }); 
}; 

我覺得挺尷尬的,是支持事件發射器和回調(或事件發射器和承諾)。

換句話說,對隊列中的每個方法(添加,偷看,刪除),我需要返回/回調的結果這是具體到每一個電話。僅使用事件發射器意味着調用者可能會針對不屬於他們剛剛創建的調用的結果採取行動。因此回調或承諾似乎勢在必行 - 您不能僅使用事件發射器。

什麼我不知道是 - 可以觀測某種方式解決其配對回調與事件發射器或承諾與事件發射器的問題?

我期待找到一種方法來實現這個事件觸發/異步隊列,只有一種類型的異步回調機制。也許observables不是這裏的答案,但我正在尋找一個好的設計模式。

回答

2

我不太清楚你爲什麼需要事件發射這裏....如果使用可觀察每個用戶將自己的呼叫得到的結果/錯誤。

我會重寫你的方法,例如:

function appendFileObs(filePath, line){ 
    return Rx.Observable.create((obs) => { 
     fs.appendFile(filePath, line, (err, result) => { 
      if(err) obs.onError(err); 
      else { 
       obs.onNext(result); 
       obs.onCompleted(); 
      } 
     }); 
    }); 
}); 
// Similar for getLock and releaseLock 


Queue.prototype.add = function(line){ 
    return getLockObs(this) 
     .flatMap(() => appendFileObs(this.filePath, line)) 
     .flatMap(result => releaseLockObs(undefined).map(() => result)) 
     .catch((err) => { 
      return releaseLockObs(err); 
     }); 
}; 

在這個解決方案,我不感到自豪的是,流具有內部的副作用,這可能改善的,但你的想法。

這樣,當有人呼叫。新增(線).subscribe(),它會得到的結果和發生在他的電話的錯誤。

如果您需要廣播中發生的錯誤,你可以使用一個BehaviourSubject,這是在同一時間觀測和觀察的

+0

感謝這將幫助我(有用的東西!) - 有也是一種方式一個監聽器來監聽隊列發出的所有事件(使用可觀察事件)?至少這對調試很有用。 –

+0

哦,我想你已經回答了我的問題通過暗指「BehaviorSubject」 –

+0

我想與觀測是不可能鏈方法,例如:queue.add(一號線)。新增(2號線)。新增(3號線).subscribe()還是它? –