下面是一些示例代碼展示瞭如何做幾個被問及的事情。這使用多線程而不是多處理,並顯示了一些使用隊列,啓動/停止工作線程以及用附加數據更新matplotlib圖的例子。
(部分代碼來自回答其他問題,包括this one和this one)
的代碼示出了可能的實現異步工人中,向其中的數據可以用於隨後的處理被髮送。工作人員使用內部隊列來緩衝數據,並且從隊列中讀取數據的內部線程(循環)進行一些處理併發送結果以進行顯示。
還顯示了一個異步繪圖儀的實現。結果可以從多名工作人員發送給該繪圖員。 (這也使用內部隊列進行緩衝;這樣做是爲了允許主程序線程本身調用更新圖的函數,這似乎是matplotlib的要求。)
NB這是爲Python 2.7編寫的在OSX上。希望其中的一些可能有用。
import time
import threading
import Queue
import math
import matplotlib.pyplot as plt
class AsynchronousPlotter:
"""
Updates a matplotlib data plot asynchronously.
Uses an internal queue to buffer results passed for plotting in x, y pairs.
NB the output_queued_results() function is intended be called periodically
from the main program thread, to update the plot with any waiting results.
"""
def output_queued_results(self):
"""
Plots any waiting results. Should be called from main program thread.
Items for display are x, y pairs
"""
while not self.queue.empty():
item = self.queue.get()
x, y = item
self.add_point(x, y)
self.queue.task_done()
def queue_result_for_output(self, x, y):
"""
Queues an x, y pair for display. Called from worker threads, so intended
to be thread safe.
"""
self.lock.acquire(True)
self.queue.put([x, y])
self.lock.release()
def redraw(self):
self.ax.relim()
self.ax.autoscale_view()
self.fig.canvas.draw()
plt.pause(0.001)
def add_point(self, x, y):
self.xdata.append(x)
self.ydata.append(y)
self.lines.set_xdata(self.xdata)
self.lines.set_ydata(self.ydata)
self.redraw()
def __init__(self):
self.xdata=[]
self.ydata=[]
self.fig = plt.figure()
self.ax = self.fig.add_subplot(111)
self.lines, = self.ax.plot(self.xdata, self.ydata, 'o')
self.ax.set_autoscalex_on(True)
self.ax.set_autoscaley_on(True)
plt.ion()
plt.show()
self.lock = threading.Lock()
self.queue = Queue.Queue()
class AsynchronousWorker:
"""
Processes data asynchronously.
Uses an internal queue and internal thread to handle data passed in.
Does some processing on the data in the internal thread, and then
sends result to an asynchronous plotter for display
"""
def queue_data_for_processing(self, raw_data):
"""
Queues data for processing by the internal thread.
"""
self.queue.put(raw_data)
def _worker_loop(self):
"""
The internal thread loop. Runs until the exit signal is set.
Processes the supplied raw data into something ready
for display.
"""
while True:
try:
# check for any data waiting in the queue
raw_data = self.queue.get(True, 1)
# process the raw data, and send for display
# in this trivial example, change circle radius -> area
x, y = raw_data
y = y**2 * math.pi
self.ap.queue_result_for_output(x, y)
self.queue.task_done()
except Queue.Empty:
pass
finally:
if self.esig.is_set():
return
def hang_up(self):
self.esig.set() # set the exit signal...
self.loop.join() # ... and wait for thread to exit
def __init__(self, ident, ap):
self.ident = ident
self.ap = ap
self.esig = threading.Event()
self.queue = Queue.Queue()
self.loop = threading.Thread(target=self._worker_loop)
self.loop.start()
if __name__ == "__main__":
ap = AsynchronousPlotter()
num_workers = 5 # use this many workers
# create some workers. Give each worker some ID and tell it
# where it can find the output plotter
workers = []
for worker_number in range (num_workers):
workers.append(AsynchronousWorker(worker_number, ap))
# supply some data to the workers
for worker_number in range (num_workers):
circle_number = worker_number
circle_radius = worker_number * 4
workers[worker_number].queue_data_for_processing([circle_number, circle_radius])
# wait for workers to finish then tell the plotter to plot the results
# in a longer-running example we would update the plot every few seconds
time.sleep(2)
ap.output_queued_results();
# Wait for user to hit return, and clean up workers
raw_input("Hit Return...")
for worker in workers:
worker.hang_up()
考慮使用['multiprocessing'](https://docs.python.org/3.6/library/multiprocessing.html#module-multiprocessing)模塊,而不是線程。它創建了多個進程,而不是同一進程上的多個線程,這使得它對於CPU密集型任務(比如matplotlib所做的那種)更高效,因爲它可以利用多個處理器。 – tavnab
謝謝,我會研究多處理,這似乎更好。我基本上使用線程,因爲這是我聽說的唯一模塊。 – user169808
看看[使用工作人員池](https://docs.python.org/3.6/library/multiprocessing.html#using-a-pool-of-workers)示例,這應該是一個好地方學習如何使用自己的參數啓動多個並行函數調用,並將其擴展到您的問題。如果您在工作中遇到任何問題,請更新您的答案,我會盡力幫助您。 – tavnab