2017-10-20 75 views
0

我想讀取浮點數字流,做一些簡單的計算並將該值附加到全局列表中。你能說出我錯了嗎?該列表不附加。更新Dask中的全局列表

from random import random 
from time import sleep 

def process(x): 
    from random import random 
    sleep(random()*2) 
    t = x * 2 
    processed_queue.append(t) 
    print(processed_queue) 
    return t 

if __name__ == "__main__": 

    from distributed import Client 
    from queue import Queue 

    client = Client() 

    processed_queue = [] 
    input_q = Queue() 

    remote_q = client.scatter(input_q) 
    processed_q = client.map(process, remote_q) 
    result_q = client.gather(processed_q) 

    for i in [random() for x in range(100)]: 
     sleep(random()) 
     input_q.put(i) 
     print(i) 
     print(processed_queue) 
     print(result_q.qsize()) 

回答

0

Whilest queue.Queuemultiprocessing.Queue可以用來發送線程和進程,通常這種編程逐副作用的不被DASK鼓勵模型之間的數據。

您可以將數據傳遞給羣集執行的函數,並使用client.submit實時獲得它們的返回值,那麼隊列會爲您做些什麼,否則您無法做到這一點?另外,還有一些dask構造,比如共享變量,可能可以做到這一點,但(再次)很少使用,我認爲你不太可能成爲正確的範例。

由於代碼不適用於您的具體原因,因此:Client()爲調度程序創建至少一個獨立進程,爲具有一個或多個線程的工作程序創建至少一個進程(請參閱您的任務管理器,頂層或其他系統 - 觀看工具)。 queue.Queue是流程本地的,因此每個進程都會看到空的隊列並添加到該進程中,但是在主進程中沒有看到該信息,並且在工作人員中看不到輸入隊列上的操作。