2013-02-28 158 views
11

我在試圖找到如何異步使用Redis和Tornado。我找到了tornado-redis,但我需要的不僅僅是在代碼中添加yield如何異步使用Tornado和Redis?

我有以下代碼:

import redis 
import tornado.web 

class WaiterHandler(tornado.web.RequestHandler): 

    @tornado.web.asynchronous 
    def get(self): 
     client = redis.StrictRedis(port=6279) 
     pubsub = client.pubsub() 
     pubsub.subscribe('test_channel') 

     for item in pubsub.listen(): 
      if item['type'] == 'message': 
       print item['channel'] 
       print item['data'] 

     self.write(item['data']) 
     self.finish() 


class GetHandler(tornado.web.RequestHandler): 

    def get(self): 
     self.write("Hello world") 


application = tornado.web.Application([ 
    (r"/", GetHandler), 
    (r"/wait", WaiterHandler), 
]) 

if __name__ == '__main__': 
    application.listen(8888) 
    print 'running' 
    tornado.ioloop.IOLoop.instance().start() 

我需要獲得訪問/ URL,並獲得了「Hello World」的同時,有一個在/wait掛起的請求。 我該怎麼辦?

+1

Redis的發佈/訂閱不應在'web.RequestHandler'使用,因爲它會阻止ioloop同時'發佈訂閱等待。聽()'。看看http://tornadogists.org/532067/瞭解一個可用的websocket示例。 – 2013-02-28 21:19:33

+0

websocket是一個不錯的選擇,但是我的應用程序需要在瀏覽器中工作,這些瀏覽器不支持websockets。我正在使用長輪詢。這就是我需要「異步獲取」的原因。 – 2013-03-01 13:16:21

+0

@HelieelsonSantos在這種情況下,您最好的選擇是保持訂閱渠道歷史的本地狀態(由單獨的線程提供),然後立即將該狀態寫入響應並完成「get」操作。客戶應該保留最後獲取索引的某些記錄,或者最後獲得時間等,這樣可以保持不同客戶的連續性。當我得到時間時,我會在幾個小時內用一個例子寫一個答案。 – 2013-03-01 16:24:48

回答

5

您不應該在主龍捲風線程中使用Redis pub/sub,因爲它會阻止IO循環。您可以在主線程中處理來自Web客戶端的長輪詢,但您應該創建一個單獨的線程來監聽Redis。當您收到消息時,您可以使用ioloop.add_callback()和/或threading.Queue與主線程進行通信。

1

好了,所以這是我的我怎麼會用GET請求做例子。

我增加兩個主要組件:

首先是一個簡單的螺紋發佈訂閱監聽器,追加新的消息到本地列表對象。 我還向類中添加了列表訪問器,因此您可以像監聽器線程一樣從常規列表中讀取數據。就您的WebRequest而言,您只是從本地列表對象讀取數據。這會立即返回,並且不會阻止當前請求完成或將來的請求被接受和處理。

class OpenChannel(threading.Thread): 
    def __init__(self, channel, host = None, port = None): 
     threading.Thread.__init__(self) 
     self.lock = threading.Lock() 
     self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379) 
     self.pubsub = self.redis.pubsub() 
     self.pubsub.subscribe(channel) 

     self.output = [] 

    # lets implement basic getter methods on self.output, so you can access it like a regular list 
    def __getitem__(self, item): 
     with self.lock: 
      return self.output[item] 

    def __getslice__(self, start, stop = None, step = None): 
     with self.lock: 
      return self.output[start:stop:step] 

    def __str__(self): 
     with self.lock: 
      return self.output.__str__() 

    # thread loop 
    def run(self): 
     for message in self.pubsub.listen(): 
      with self.lock: 
       self.output.append(message['data']) 

    def stop(self): 
     self._Thread__stop() 

第二個是ApplicationMixin類。這是繼承了Web請求類的輔助對象,以便添加功能和屬性。在這種情況下,它會檢查通道偵聽器是否已存在所請求的通道,如果找不到任何通道偵聽器,則會創建一個,並將偵聽器句柄返回給WebRequest。

# add a method to the application that will return existing channels 
# or create non-existing ones and then return them 
class ApplicationMixin(object): 
    def GetChannel(self, channel, host = None, port = None): 
     if channel not in self.application.channels: 
      self.application.channels[channel] = OpenChannel(channel, host, port) 
      self.application.channels[channel].start() 
     return self.application.channels[channel] 

WebRequest類現在把聽者彷彿它是一個靜態列表(銘記你需要給self.write字符串)

class ReadChannel(tornado.web.RequestHandler, ApplicationMixin): 
    @tornado.web.asynchronous 
    def get(self, channel): 
     # get the channel 
     channel = self.GetChannel(channel) 
     # write out its entire contents as a list 
     self.write('{}'.format(channel[:])) 
     self.finish() # not necessary? 

最後,創建應用程序後,我加入一個空的字典作爲一個屬性

# add a dictionary containing channels to your application 
application.channels = {} 

除了正在運行的線程的一些清理,一旦你退出應用程序

# clean up the subscribed channels 
for channel in application.channels: 
    application.channels[channel].stop() 
    application.channels[channel].join() 

的完整代碼:

import threading 
import redis 
import tornado.web 



class OpenChannel(threading.Thread): 
    def __init__(self, channel, host = None, port = None): 
     threading.Thread.__init__(self) 
     self.lock = threading.Lock() 
     self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379) 
     self.pubsub = self.redis.pubsub() 
     self.pubsub.subscribe(channel) 

     self.output = [] 

    # lets implement basic getter methods on self.output, so you can access it like a regular list 
    def __getitem__(self, item): 
     with self.lock: 
      return self.output[item] 

    def __getslice__(self, start, stop = None, step = None): 
     with self.lock: 
      return self.output[start:stop:step] 

    def __str__(self): 
     with self.lock: 
      return self.output.__str__() 

    # thread loop 
    def run(self): 
     for message in self.pubsub.listen(): 
      with self.lock: 
       self.output.append(message['data']) 

    def stop(self): 
     self._Thread__stop() 


# add a method to the application that will return existing channels 
# or create non-existing ones and then return them 
class ApplicationMixin(object): 
    def GetChannel(self, channel, host = None, port = None): 
     if channel not in self.application.channels: 
      self.application.channels[channel] = OpenChannel(channel, host, port) 
      self.application.channels[channel].start() 
     return self.application.channels[channel] 

class ReadChannel(tornado.web.RequestHandler, ApplicationMixin): 
    @tornado.web.asynchronous 
    def get(self, channel): 
     # get the channel 
     channel = self.GetChannel(channel) 
     # write out its entire contents as a list 
     self.write('{}'.format(channel[:])) 
     self.finish() # not necessary? 


class GetHandler(tornado.web.RequestHandler): 

    def get(self): 
     self.write("Hello world") 


application = tornado.web.Application([ 
    (r"/", GetHandler), 
    (r"/channel/(?P<channel>\S+)", ReadChannel), 
]) 


# add a dictionary containing channels to your application 
application.channels = {} 


if __name__ == '__main__': 
    application.listen(8888) 
    print 'running' 
    try: 
     tornado.ioloop.IOLoop.instance().start() 
    except KeyboardInterrupt: 
     pass 

    # clean up the subscribed channels 
    for channel in application.channels: 
     application.channels[channel].stop() 
     application.channels[channel].join() 
+0

您可以輕鬆地將列表替換爲支持非阻塞訪問的隊列或其他對象,並僅返回自上一請求以來收到的消息。但是,您必須爲每個客戶端維護一個隊列,並確保使用非阻塞獲取並正確處理「Empty」異常。 – 2013-03-01 19:26:30

2

對於Python> = 3。3,我建議你使用aioredis。 我沒有測試下面的代碼,但它應該是類似的東西:

import redis 
import tornado.web 
from tornado.web import RequestHandler 

import aioredis 
import asyncio 
from aioredis.pubsub import Receiver 


class WaiterHandler(tornado.web.RequestHandler): 

    @tornado.web.asynchronous 
    def get(self): 
     client = await aioredis.create_redis((host, 6279), encoding="utf-8", loop=IOLoop.instance().asyncio_loop) 

     ch = redis.channels['test_channel'] 
     result = None 
     while await ch.wait_message(): 
      item = await ch.get() 
      if item['type'] == 'message': 
       print item['channel'] 
       print item['data'] 
       result = item['data'] 

     self.write(result) 
     self.finish() 


class GetHandler(tornado.web.RequestHandler): 

    def get(self): 
     self.write("Hello world") 


application = tornado.web.Application([ 
    (r"/", GetHandler), 
    (r"/wait", WaiterHandler), 
]) 

if __name__ == '__main__': 
    print 'running' 
    tornado.ioloop.IOLoop.configure('tornado.platform.asyncio.AsyncIOLoop') 
    server = tornado.httpserver.HTTPServer(application) 
    server.bind(8888) 
    # zero means creating as many processes as there are cores. 
    server.start(0) 
    tornado.ioloop.IOLoop.instance().start()