2017-06-30 14 views
1

我正在開發一個工具來刮取推文並對其進行處理,以便用戶建立詞頻分析。由於需要處理的數據量很大,我將文字處理部分從推文刮取部分分離。是否有一種直觀的方式來循環多處理.Connection對象直到達到EOF?

multiprocessing.Connectionrec, sen = multiprocessing.Pipe(False))提供了在進程間傳輸數據的有用工具。但是,當發送端顯式調用Connection.close()時,我無法找到接收端可能會告訴EOF何時到達的實用程序。我試過:

def yielder(conn): 
    yield conn.recv() 

但是,這只是在返回管道中的第一個項目後停止。 目前與我繞過該問題的嘗試,除了一個while True循環中的語句:

try: 
    status = rec.recv() 
    ...process data... 
except BrokenPipeError: 
    break 

我也看到這個可以通過發送一個特定的結束標誌,讓當它接收到接收端終止該進程完成。但是這些都是違反直覺和醜陋的做法,違反Python的禪宗:

美麗比醜陋更好。

...

應該有one--和最好只有一個--obvious辦法做到這一點。

我錯過了什麼嗎?有沒有簡單,優雅的方式,如C++的

while getline(istreamobject, line) 

來執行我的任務?

回答

0

您可以使用第二種形式的調用iteriter(callable, sentinel) -> iterator將其設置爲for循環。儘管如此,你仍然必須抓住例外。

try: 
    for status in iter(conn.recv, None): 
     ... 
except BrokenPipeError: 
    pass 

如果不是關閉的管道,你發送一個「EOF」下的管道,你可以刪除try/except並做for status in iter(conn.recv, 'EOF message'),當'EOF message'被收到(可以是任何東西),iter停止的循環。通常情況下,EOF消息是一個空字符串,所以經常可以看到喜歡的東西:

for line in iter(file.read, ''): 
    ... 

itertools recipes有這個功能稱爲iter_except。這基本上是你想用yielder功能

def iter_except(func, exception, first=None): 
    """ Call a function repeatedly until an exception is raised. 

    Converts a call-until-exception interface to an iterator interface. 
    Like builtins.iter(func, sentinel) but uses an exception instead 
    of a sentinel to end the loop. 

    Examples: 
     iter_except(functools.partial(heappop, h), IndexError) # priority queue iterator 
     iter_except(d.popitem, KeyError)       # non-blocking dict iterator 
     iter_except(d.popleft, IndexError)      # non-blocking deque iterator 
     iter_except(q.get_nowait, Queue.Empty)     # loop over a producer Queue 
     iter_except(s.pop, KeyError)        # non-blocking set iterator 

    """ 
    try: 
     if first is not None: 
      yield first()   # For database APIs needing an initial cast to db.first() 
     while True: 
      yield func() 
    except exception: 
     pass 

所以之前做什麼,你也可以做這樣的事情:

for status in iter_except(conn.recv, BrokenPipeError): 
    ... 

或者剛剛殺青的yielder功能:

def yielder(conn): 
    try: 
     while True: 
      yield conn.recv() 
    except BrokenPipeError: 
     pass 

for status in yielder(conn): 
    ... 
+0

yielder函數不會拋出任何異常。它只是產生管道中的第一個值然後停止。 – flymousechiu

+0

@flymousechiu它仍然只產生沒有'while True'循環的第一個值嗎? – Artyer

+0

是的,如果你嘗試下面的話,就可以明顯看出來:'import multiprocessing as mp','def yielder(conn):yield conn.recv()','rec,sen = mp.Pipe(False)','for i in範圍(10):sen.send(i)','list(yielder(rec))'。應該返回[0] – flymousechiu