2010-11-01 91 views
3

對於預期會阻塞(並且不能輕易修改爲使用像Tornado的異步HTTP請求客戶端之類的東西)的Tornado服務器中的操作,我已經將工作卸載到使用multiprocessing模塊分離工作進程。具體來說,我使用的是多處理器Pool,因爲它提供了一種稱爲apply_async的方法,它與Tornado非常協調,因爲它將回調作爲其參數之一。Errno 9在Python中使用帶Tornado的多處理模塊

我最近意識到池預先分配進程的數量,所以如果它們全部變爲阻塞,那麼需要新進程的操作將不得不等待。我意識到服務器仍然可以連接,因爲apply_async通過將任務添加到任務隊列中工作,並且本身立即完成,但我期待產生n進程n我需要的阻塞任務的數量去表演。

我想我可以使用add_handler方法爲我的Tornado服務器的IOLoop添加一個處理程序,爲每個新創建的IOLoop創建一個PID。我以前做過類似的事情,但是它使用了popen和一個任意的命令。這種方法的一個例子是here。不過,我想將參數傳遞給我的範圍內的任意目標Python函數,所以我想堅持使用multiprocessing

但是,似乎有些東西不喜歡我的multiprocessing.Process對象具有的PID。我得到IOError: [Errno 9] Bad file descriptor。這些過程是否受到某種限制?我知道PID在我實際啓動過程之前是不可用的,但我做了啓動過程。下面是我做了演示此問題的示例的源代碼:

#!/usr/bin/env python 

"""Creates a small Tornado program to demonstrate asynchronous programming. 
Specifically, this demonstrates using the multiprocessing module.""" 

import tornado.httpserver 
import tornado.ioloop 
import tornado.web 
import multiprocessing as mp 
import random 
import time 

__author__ = 'Brian McFadden' 
__email__ = '[email protected]' 

def sleepy(queue): 
    """Pushes a string to the queue after sleeping for 5 seconds. 
    This sleeping can be thought of as a blocking operation.""" 

    time.sleep(5) 
    queue.put("Now I'm awake.") 
    return 

def random_num(): 
    """Returns a string containing a random number. 
    This function can be used by handlers to receive text for writing which 
    facilitates noticing change on the webpage when it is refreshed.""" 

    n = random.random() 
    return "<br />Here is a random number to show change: {0}".format(n) 

class SyncHandler(tornado.web.RequestHandler): 
    """Demonstrates handing a request synchronously. 
    It executes sleepy() before writing some more text and a random number to 
    the webpage. While the process is sleeping, the Tornado server cannot 
    handle any requests at all.""" 

    def get(self): 
     q = mp.Queue() 
     sleepy(q) 
     val = q.get() 
     self.write(val) 
     self.write('<br />Brought to you by SyncHandler.') 
     self.write('<br />Try refreshing me and then the main page.') 
     self.write(random_num()) 

class AsyncHandler(tornado.web.RequestHandler): 
    """Demonstrates handing a request asynchronously. 
    It executes sleepy() before writing some more text and a random number to 
    the webpage. It passes the sleeping function off to another process using 
    the multiprocessing module in order to handle more requests concurrently to 
    the sleeping, which is like a blocking operation.""" 

    @tornado.web.asynchronous 
    def get(self): 
     """Handles the original GET request (normal function delegation). 
     Instead of directly invoking sleepy(), it passes a reference to the 
     function to the multiprocessing pool.""" 

     # Create an interprocess data structure, a queue. 
     q = mp.Queue() 
     # Create a process for the sleepy function. Provide the queue. 
     p = mp.Process(target=sleepy, args=(q,)) 
     # Start it, but don't use p.join(); that would block us. 
     p.start() 
     # Add our callback function to the IOLoop. The async_callback wrapper 
     # makes sure that Tornado sends an HTTP 500 error to the client if an 
     # uncaught exception occurs in the callback. 
     iol = tornado.ioloop.IOLoop.instance() 
     print "p.pid:", p.pid 
     iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ) 

    def _finish(self, q): 
     """This is the callback for post-sleepy() request handling. 
     Operation of this function occurs in the original process.""" 

     val = q.get() 
     self.write(val) 
     self.write('<br />Brought to you by AsyncHandler.') 
     self.write('<br />Try refreshing me and then the main page.') 
     self.write(random_num()) 
     # Asynchronous handling must be manually finished. 
     self.finish() 

class MainHandler(tornado.web.RequestHandler): 
    """Returns a string and a random number. 
    Try to access this page in one window immediately after (<5 seconds of) 
    accessing /async or /sync in another window to see the difference between 
    them. Asynchronously performing the sleepy() function won't make the client 
    wait for data from this handler, but synchronously doing so will!""" 

    def get(self): 
     self.write('This is just responding to a simple request.') 
     self.write('<br />Try refreshing me after one of the other pages.') 
     self.write(random_num()) 

if __name__ == '__main__': 
    # Create an application using the above handlers. 
    application = tornado.web.Application([ 
     (r"/", MainHandler), 
     (r"/sync", SyncHandler), 
     (r"/async", AsyncHandler), 
    ]) 
    # Create a single-process Tornado server from the application. 
    http_server = tornado.httpserver.HTTPServer(application) 
    http_server.listen(8888) 
    print 'The HTTP server is listening on port 8888.' 
    tornado.ioloop.IOLoop.instance().start() 

這裏是回溯:

Traceback (most recent call last): 
    File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 810, in _stack_context 
    yield 
    File "/usr/local/lib/python2.6/dist-packages/tornado/stack_context.py", line 77, in StackContext 
    yield 
    File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 827, in _execute 
    getattr(self, self.request.method.lower())(*args, **kwargs) 
    File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 909, in wrapper 
    return method(self, *args, **kwargs) 
    File "./process_async.py", line 73, in get 
    iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ) 
    File "/usr/local/lib/python2.6/dist-packages/tornado/ioloop.py", line 151, in add_handler 
    self._impl.register(fd, events | self.ERROR) 
IOError: [Errno 9] Bad file descriptor 

上面的代碼實際上是從使用進程池舊的範例修改。我曾經爲我的同事和我自己保存了一段時間的參考資料(因此有很多評論)。我以這樣的方式構建它,以便我可以並排打開兩個小型瀏覽器窗口向我的上司展示/ sync URI阻止連接,而/ async允許更多連接。爲了這個問題的目的,你所需要做的就是重新嘗試訪問/ async處理程序。它立即出錯。

該怎麼辦? PID如何「壞」?如果你運行該程序,你可以看到它被打印到標準輸出。

爲了記錄,我在Ubuntu 10.04上使用Python 2.6.5。龍捲風是1.1。

+0

我知道對這個主題沒有太多興趣,但對於未來的絆腳石:正如Tornado郵件列表中的某位人員指出的那樣,PID不等同於其管道的文件描述符。我找不到一種以'multiprocessing.Process'對象友好的編程方式訪問stdout的方法,因此我使用了'multiprocessing.Pipe'對象,併爲IOLoop和一個FD提供了一個FD作爲參數。只要你需要打開管道(垃圾收集 - >腐敗和崩潰),就要小心保持管道暢通無阻。 – Brian 2010-11-02 20:31:23

+2

也許你應該回答自己的問題,併發佈一個工作代碼示例?我會感激我的想法。 – oDDsKooL 2012-10-10 12:38:38

回答

2

add_handler需要一個有效的文件描述符,而不是一個PID。作爲預期的一個例子,龍捲風本身通常通過傳遞一個套接字對象的fileno()來使用add_handler,該對象的文件描述符返回該對象的文件描述符。 PID在這種情況下是無關緊要的。

+0

任何建議,使此代碼工作呢? – oDDsKooL 2012-10-10 12:09:16

+0

可能您需要添加類似於process.stdout.fileno的內容,以便您可以傳遞文件描述符編號而不是進程的PID,只是一個雖然,但並未嘗試自己。 – securecurve 2012-12-23 16:43:19

相關問題