2017-08-27 135 views
0

我想要做的是這樣的:如何使用concurrent.futures.ThreadPoolExecutor或multiprocessing.pool.ThreadPool將某些變量綁定到線程?

class MyThread(threading.Thread): 
    def __init__(self, host, port): 
     threading.Thread.__init__(self) 
     # self._sock = self.initsocket(host, port) 
     self._id = random.randint(0, 100) 

    def run(self): 
     for i in range(3): 
      print("current id: {}".format(self._id)) 

def main(): 
    ts = [] 
    for i in range(5): 
     t = MyThread("localhost", 3001) 
     t.start() 
     ts.append(t) 

    for t in ts: 
     t.join() 

我這些輸出:

current id: 10 
current id: 10 
current id: 13 
current id: 43 
current id: 13 
current id: 10 
current id: 83 
current id: 83 
current id: 83 
current id: 13 
current id: 98 
current id: 43 
current id: 98 
current id: 43 
current id: 98 

該輸出我想要的。正如你所看到的,我的_id在不同的線程中是不同的,但是在單線程中,我共享相同的_id_id只是其中的一個變量,我有很多其他類似的變量)。

現在,我想做同樣的事情multiprocessing.pool.ThreadPool

class MyProcessor(): 
    def __init__(self, host, port): 
     # self._sock = self.initsocket(host, port) 
     self._id = random.randint(0, 100) 

    def __call__(self, i): 
     print("current id: {}".format(self._id)) 
     return self._id * i 

def main(): 
    with ThreadPool(5) as p: 
     p.map(MyProcessor("localhost", 3001), range(15)) 

但現在_id將被所有線程共享:

current id: 58 
current id: 58 
current id: 58 
current id: 58 
current id: 58 
current id: 58 
current id: 58 
current id: 58 
current id: 58 
current id: 58 
current id: 58 
current id: 58 
current id: 58 
current id: 58 
current id: 58 

而且隨着concurrent.futures.ThreadPoolExecutor,我也嘗試着做同樣的東西:

class MyProcessor(): 
    def __init__(self, host, port): 
     # self.initsocket(host, port) 
     self._id = random.randint(0, 100) 

    def __call__(self, i): 
     print("current id: {}".format(self._id)) 
     return self._id * i 

def main(): 
    with ThreadPoolExecutor(max_workers=5) as executor: 
     func = MyProcessor("localhost", 3001) 
     futures = [executor.submit(func, i) for i in range(15)] 
     for f in as_completed(futures): 
      pass 

產量是這樣的:

current id: 94 
current id: 94 
current id: 94 
current id: 94 
current id: 94 
current id: 94 
current id: 94 
current id: 94 
current id: 94 
current id: 94 
current id: 94 
current id: 94 
current id: 94 
current id: 94 
current id: 94 

當然,我得到這樣的結果並不陌生,因爲我只是叫__init__一次。但我所要求的是:

我該怎麼做concurrent.futures.ThreadPoolExecutormultiprocessing.pool.ThreadPool(也請不要再有全局變量)。

+0

在__init__中設置的任何內容都是共享的。在'__call__'中設置的任何東西都是線程本地的。在'__init__'中設置端口和其他共享的東西。在'__call__'中設置ID。和我對你的其他問題的回答一樣。 –

+0

你只需看看呼叫屬於哪個線程並根據該線索進行調整。 '__init__'是主線程。 '__call__'由單個任務線程運行。 –

+0

我會在這裏起草答案並更新另一個,現在我明白了一旦我今晚到達計算機後會出現什麼困惑。 –

回答

2

有幾個問題在這裏進行,我會盡我所能解決所有問題。

在第一個示例中,您可以完全控制您創建的所有Thread,因此每個線程都會在初始化程序中獲得唯一ID。這個問題當然是你一次啓動所有的線程,這對於大量的線程來說可能是非常低效的。

在這個問題的兩個線程池示例中,您都爲可調用對象初始化標識,因此當然,您沒有爲每個線程分配標識。做正確的方法是初始化每個線程的ID,通過在__call__方法做:

 
class MyProcessor(): 
    def __init__(self, host, port): 
     self.initsocket(host, port) 

    def __call__(self, i): 
     id_ = random.randint(0, 100) 
     print("current id: {}".format(id_)) 
     return id_ * i 

def main(): 
    func = MyProcessor("localhost", 3001) 
    with ThreadPoolExecutor(max_workers=5) as executor: 
     collections.deque(executor.map(MyProcessor, range(15)), maxlen=0) 

請注意,您可以通過使用map方法有作爲縮短concurrent.futures.ThreadPoolExecutor例如,如果你關心的是最終結果,而不是中間對象Future。調用deque(..., maxlen=0)是使用迭代器的標準習慣用法。

鑑於你在評論中鏈接到的要點,我明白你爲什麼要擁有線程本地數據。然而,你當然不需要一個全局變量來實現這個結果。這裏有幾個選擇:

  1. 只要您thread-local數據添加到self在初始化,瞧,這是所有調用訪問而不全球:

    def __init__(self, host, port): 
        self.thread_local = threading.local() 
    
    def __call__(self, i): 
        try: 
         id_ = self.thread_local.id_ 
        except AttributeError: 
         id_ = random.randint(0, 100) 
        ... 
    
  2. 使用功能局部數據而不是線程本地數據。您正在使用線程本地數據來避免將您的連接(在要點中)傳遞給一些私有函數。這不是真正的需要,只是一種美學選擇。你總是可以有def _send_data(self, conn, **kwargs)def _recv_data(self, conn),因爲連接實際來自的唯一地方是__call__

雖然有可能的情況下選擇1號是一種可能性,我高度建議您這樣做與任何類型的線程池管理器的使用。線程池可能會重複使用同一個線程從它們提交到的隊列中順序運行任務。這意味着你將在一個本應該打開它的任務中結束相同的連接。在原始示例中,您可以獨立創建所有線程,但在回收池線程上多次調用MyProcessor時可能會不正常。

+0

我已經更新了我對你的其他問題的回答。我認爲你正在使用'local'對象使事情變得不必要的複雜,但是我已經向你展示了一種非全局使用它的方法。 –

+0

是的,根據[源](https://github.com/uqfoundation/multiprocess/blob/master/py3.6/multiprocess/pool.py#L204),套接字可能沒有清理,如果'線程'被清理。無論如何,謝謝 – roger

+0

所有更多的理由讓你的套接字住在'__call__'中,而不是在線程本地存儲中。 –

相關問題