2011-11-20 44 views
11

由於沒有人提供瞭解決方案this post加上我急需一種解決方法的事實,這裏是我的情況和一些抽象的解決方案/辯論的想法。龍捲風芹菜集成hacks

我的堆棧:

  1. 龍捲風
  2. 芹菜
  3. MongoDB的
  4. Redis的
  5. 的RabbitMQ

我的問題:查找龍捲風派遣一個芹菜的方式任務(解決)一個然後異步收集結果(任何想法? )。

方案1:(請求/響應劈加網絡掛接)

  • 龍捲風接收(用戶)請求,然後保存在本地存儲器中(或在Redis的)一個{作業ID:(用戶)請求}要記住傳播的響應,並觸發一個芹菜任務與作業ID
  • 當芹菜完成任務,它在某個網址進行網絡掛接,並告訴龍捲風,這JOBID已完成(加上結果)
  • 龍捲風檢索(用戶)請求並將響應轉發給(用戶)

會發生這種情況嗎?它有沒有邏輯?

方案2:(龍捲風加長輪詢)

  • 龍捲風調度芹菜任務和一些主JSON數據返回給客戶端(jQuery的)
  • jQuery不會在接收一些長輪詢根據某些數據庫標誌,每x微秒和龍捲風應答。當芹菜任務完成時,這個數據庫標誌被設置爲True,然後jQuery「循環」結束。

這是否高效?

任何其他想法/模式?

回答

4

我偶然發現了這個問題,反覆敲擊結果後端看起來並不理想。所以我使用Unix Sockets實現了類似於場景1的Mixin。

任務完成後立即通知Tornado(準確時,只要鏈下一個任務運行),並且僅後端結果一次。這裏是link

+0

偉大的工作Eren! – hymloth

9

我的解決方案包括從投票龍捲風芹菜:

class CeleryHandler(tornado.web.RequestHandlerr): 

    @tornado.web.asynchronous 
    def get(self):  

     task = yourCeleryTask.delay(**kwargs) 

     def check_celery_task(): 
      if task.ready(): 
       self.write({'success':True}) 
       self.set_header("Content-Type", "application/json") 
       self.finish() 
      else: 
       tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task) 

     tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task) 

這裏是post它。

+0

你可以請你重新發布博客鏈接,它被取下來! – vgoklani

+1

編輯爲archive.org鏈接 – rbu

8

這是我們對該問題的解決方案。由於我們在應用程序中查找了幾個處理程序的結果,因此我們將芹菜查找作爲混合類。

這也使得代碼更易於讀取tornado.gen模式。

from functools import partial 

class CeleryResultMixin(object): 
    """ 
    Adds a callback function which could wait for the result asynchronously 
    """ 
    def wait_for_result(self, task, callback): 
     if task.ready(): 
      callback(task.result) 
     else: 
      # TODO: Is this going to be too demanding on the result backend ? 
      # Probably there should be a timeout before each add_callback 
      tornado.ioloop.IOLoop.instance().add_callback(
       partial(self.wait_for_result, task, callback) 
      ) 


class ARemoteTaskHandler(CeleryResultMixin, tornado.web.RequestHandler): 
    """Execute a task asynchronously over a celery worker. 
    Wait for the result without blocking 
    When the result is available send it back 
    """ 
    @tornado.web.asynchronous 
    @tornado.web.authenticated 
    @tornado.gen.engine 
    def post(self): 
     """Test the provided Magento connection 
     """ 
     task = expensive_task.delay(
      self.get_argument('somearg'), 
     ) 

     result = yield tornado.gen.Task(self.wait_for_result, task) 

     self.write({ 
      'success': True, 
      'result': result.some_value 
     }) 
     self.finish() 
3

現在,https://github.com/mher/tornado-celery來搶救......

class GenAsyncHandler(web.RequestHandler): 
    @asynchronous 
    @gen.coroutine 
    def get(self): 
     response = yield gen.Task(tasks.sleep.apply_async, args=[3]) 
     self.write(str(response.result)) 
     self.finish()