2016-06-27 131 views
0

這是一個奇怪的錯誤,我嘗試使用「asyncio」安排「mongoimport」任務時遇到。一旦我啓動了mongod服務,並在終端上生成了粘貼的命令,它就起作用了。然而,當我嘗試使用python3.4 ASYNCIO,問題來了:當嘗試安排「mongoimport」任務時raise raise RunError('Event loop is closed')

File "/Users/wangyi/Documents/workspace/Math/machine_learning/ditech/io/collect.py", line 100, in <module> 
    parse_train_data() 
    File "/Users/wangyi/Documents/workspace/Math/machine_learning/ditech/io/collect.py", line 95, in parse_train_data 
    call_in_background(*targets) 
    File "/Users/wangyi/Documents/workspace/Math/machine_learning/ditech/io/collect.py", line 19, in wrapper 
    result = func(*args, **keywords) 
    File "/Users/wangyi/Documents/workspace/Math/machine_learning/ditech/io/collect.py", line 41, in call_in_background 
    loop.run_until_complete(asyncio.gather(*targets, loop=loop)) 
    File "/usr/local/lib/python3.4/asyncio/tasks.py", line 567, in gather 
    fut = async(arg, loop=loop) 
    File "/usr/local/lib/python3.4/asyncio/tasks.py", line 511, in async 
    task = loop.create_task(coro_or_future) 
    File "/usr/local/lib/python3.4/asyncio/base_events.py", line 211, in create_task 
    self._check_closed() 
    File "/usr/local/lib/python3.4/asyncio/base_events.py", line 265, in _check_closed 
    raise RuntimeError('Event loop is closed') 
RuntimeError: Event loop is closed 

這裏是我的代碼片段:

def parse_train_data(): 

    commandtpl = "mongoimport --host=127.0.0.1 -d DiDitech -c {table} -f {fields} --type tsv --file {target}" 

    for tb in tables: 
     tasks = list(map(lambda t: commandtpl.format(table=tb, fields=','.join(fields[tb]), target=t), train_dest[tb])) 
     print('commands of %s:' % tb) 
     print('' + '\n'.join(tasks)) 
     targets = [get_lines(ob) for ob in tasks] 
     call_in_background(*targets) 

call_in_background時(asyncio.gather的run_util_complete輕包裝用蒂默( *目標))。目標定義爲PEP中所述的coros。

def call_in_background(*targets): 
... 
    loop = get_loop() 
    print(loop.run_until_complete(asyncio.gather(*targets, loop=loop, return_exceptions=True))) 
    #loop.close() 

@asyncio.coroutine 
def get_lines(shell_command): 
    task = yield from asyncio.create_subprocess_shell(shell_command, 
      stdin=PIPE, stdout=PIPE, stderr=STDOUT) 

    return (yield from task.communicate())[0].splitlines() 
+0

'call_in_background'是什麼樣子的?它是否關閉了循環?請編輯該問題以包含[最小,完整和可驗證示例](https://stackoverflow.com/help/mcve)。 – dirn

+0

@dirn yes一旦run_until_complete返回,它將關閉循環。我應該打電話給asyncio Anrew的作者尋求幫助嗎? –

+0

這就是你的問題。在完成之前不要關閉循環。 – dirn

回答

0

現在我改變了代碼

... 

    flag = False 
    if loop is None: 
     loop = get_loop() 
     flag = True 
    print(loop.run_until_complete(asyncio.gather(*targets, loop=loop, return_exceptions=True))) 

    if flag == False: 
     loop.close() 

,並與重排碼結構終於作品的感謝。謝謝!!!

2016-06-28T16:39:59.128+0800 connected to: 127.0.0.1 
2016-06-28T16:39:59.136+0800 imported 279 documents 
2016-06-28T16:39:59.130+0800 connected to: 127.0.0.1 
2016-06-28T16:39:59.145+0800 imported 66 documents 
2016-06-28T16:39:59.134+0800 connected to: 127.0.0.1 
2016-06-28T16:39:59.142+0800 imported 288 documents 
2016-06-28T16:39:59.140+0800 connected to: 127.0.0.1 
2016-06-28T16:39:59.148+0800 imported 285 documents 
相關問題