2011-03-01 107 views
2

我正在構建一個服務器,它使用Twisted Python在Redis頂部存儲關鍵/值數據。 服務器通過HTTP接收JSON字典,將其轉換爲Python字典並放入緩衝區。每次存儲新數據時,服務器都會調度一個任務,該任務從緩衝區中彈出一個字典,並使用txredis客戶端將每個元組寫入Redis實例。Twisted Python中的另一個生產者/消費者問題

class Datastore(Resource): 

isLeaf = True 

def __init__(self): 
    self.clientCreator = protocol.ClientCreator(reactor, Redis) 
    d = self.clientCreator.connectTCP(...) 
    d.addCallback(self.setRedis) 
    self.redis = None 
    self.buffer = deque() 


def render_POST(self, request): 
    try: 
     task_id = request.requestHeaders.getRawHeaders('x-task-id')[0] 
    except IndexError: 
     request.setResponseCode(503) 
     return '<html><body>Error reading task_id</body></html>' 

    data = json.loads(request.content.read()) 
    self.buffer.append((task_id, data)) 
    reactor.callLater(0, self.write_on_redis) 
    return ' ' 

@defer.inlineCallbacks 
def write_on_redis(self): 
    try: 
     task_id, dic = self.buffer.pop() 
     log.msg('Buffer: %s' % len(self.buffer)) 
    except IndexError: 
     log.msg('buffer empty') 
     defer.returnValue(1) 

    m = yield self.redis.sismember('DONE', task_id) 
    # Simple check 
    if m == '1': 
     log.msg('%s already stored' % task_id) 
    else: 
     log.msg('%s unpacking' % task_id) 
     s = yield self.redis.sadd('DONE', task_id) 

     d = defer.Deferred() 
     for k, v in dic.iteritems(): 
      k = k.encode() 
      d.addCallback(self.redis.push, k, v) 

     d.callback(None) 

基本上,我面對兩種不同的連接之間的生產者/消費者問題,但我不知道,目前的實施效果很好的扭曲paradygm。 我已經閱讀了關於Twisted中生產者/消費者接口的小文檔,但我不確定我是否可以在我的情況下使用它們。 歡迎任何評論家:在線程併發多年後,我試圖掌握事件驅動編程。

回答

2

Twisted生產者和消費者API,IProducerIConsumer,都是關於流量控制。您似乎沒有任何流量控制,只是將消息從一種協議轉發給另一種協議。

由於沒有流量控制,緩衝區只是額外的複雜性。只需將數據直接傳遞給write_on_redis方法即可擺脫它。這種方式write_on_redis不需要處理空的緩衝區大小,你不需要額外的資源屬性,甚至可以擺脫callLater(即使你保留緩衝區也可以做到這一點)。

雖然我不知道這是否回答你的問題。至於這種做法是否「效果很好」,這裏是我發現的東西通過閱讀代碼:

  • 如果數據到達比redis的接受它快,你的優秀作業列表可能會變得任意大,導致你耗盡內存。這就是流量控制所能提供的幫助。
  • 沒有錯誤圍繞sismember來電或sadd呼叫處理,你可能會失去工作如果這些失敗,因爲你已經從工作緩衝區彈出他們。
  • 做一個推的回調對Deferredd也意味着,任何失敗的推動將防止數據的其餘部分被推動。它也經過push返回Deferred的結果(我假設它返回一個Deferred)作爲第一個參數傳遞給下一個電話,所以除非push或多或少地忽略了它的第一個參數,你會不會推正確的數據到redis。

如果你想實現流量控制,那麼你需要讓你的H​​TTP服務器檢查self.buffer長度和可能拒絕新的任務 - 將其添加到self.buffer並返回一些錯誤代碼到客戶端。你仍然不會使用IConsumerIProducer,但它有點類似。