我正在構建一個服務器,它使用Twisted Python在Redis頂部存儲關鍵/值數據。 服務器通過HTTP接收JSON字典,將其轉換爲Python字典並放入緩衝區。每次存儲新數據時,服務器都會調度一個任務,該任務從緩衝區中彈出一個字典,並使用txredis客戶端將每個元組寫入Redis實例。Twisted Python中的另一個生產者/消費者問題
class Datastore(Resource):
isLeaf = True
def __init__(self):
self.clientCreator = protocol.ClientCreator(reactor, Redis)
d = self.clientCreator.connectTCP(...)
d.addCallback(self.setRedis)
self.redis = None
self.buffer = deque()
def render_POST(self, request):
try:
task_id = request.requestHeaders.getRawHeaders('x-task-id')[0]
except IndexError:
request.setResponseCode(503)
return '<html><body>Error reading task_id</body></html>'
data = json.loads(request.content.read())
self.buffer.append((task_id, data))
reactor.callLater(0, self.write_on_redis)
return ' '
@defer.inlineCallbacks
def write_on_redis(self):
try:
task_id, dic = self.buffer.pop()
log.msg('Buffer: %s' % len(self.buffer))
except IndexError:
log.msg('buffer empty')
defer.returnValue(1)
m = yield self.redis.sismember('DONE', task_id)
# Simple check
if m == '1':
log.msg('%s already stored' % task_id)
else:
log.msg('%s unpacking' % task_id)
s = yield self.redis.sadd('DONE', task_id)
d = defer.Deferred()
for k, v in dic.iteritems():
k = k.encode()
d.addCallback(self.redis.push, k, v)
d.callback(None)
基本上,我面對兩種不同的連接之間的生產者/消費者問題,但我不知道,目前的實施效果很好的扭曲paradygm。 我已經閱讀了關於Twisted中生產者/消費者接口的小文檔,但我不確定我是否可以在我的情況下使用它們。 歡迎任何評論家:在線程併發多年後,我試圖掌握事件驅動編程。