2
我一直在通過簡單網絡研究圖遍歷算法,並且我想使用多處理來運行它,因爲它需要大量的I/O有界調用,當我通過整個網絡進行擴展。簡單版本的運行非常快:在Python中將圖遍歷轉換爲多處理
already_seen = {}
already_seen_get = already_seen.get
GH_add_node = GH.add_node
GH_add_edge = GH.add_edge
GH_has_node = GH.has_node
GH_has_edge = GH.has_edge
def graph_user(user, depth=0):
logger.debug("Searching for %s", user)
logger.debug("At depth %d", depth)
users_to_read = followers = following = []
if already_seen_get(user):
logging.debug("Already seen %s", user)
return None
result = [x.value for x in list(view[user])]
if result:
result = result[0]
following = result['following']
followers = result['followers']
users_to_read = set().union(following, followers)
if not GH_has_node(user):
logger.debug("Adding %s to graph", user)
GH_add_node(user)
for follower in users_to_read:
if not GH_has_node(follower):
GH_add_node(follower)
logger.debug("Adding %s to graph", follower)
if depth < max_depth:
graph_user(follower, depth + 1)
if GH_has_edge(follower, user):
GH[follower][user]['weight'] += 1
else:
GH_add_edge(user, follower, {'weight': 1})
它比我的多版本實際上顯著快:
to_write = Queue()
to_read = Queue()
to_edge = Queue()
already_seen = Queue()
def fetch_user():
seen = {}
read_get = to_read.get
read_put = to_read.put
write_put = to_write.put
edge_put = to_edge.put
seen_get = seen.get
while True:
try:
logging.debug("Begging for a user")
user = read_get(timeout=1)
if seen_get(user):
continue
logging.debug("Adding %s", user)
seen[user] = True
result = [x.value for x in list(view[user])]
write_put(user, timeout=1)
if result:
result = result.pop()
logging.debug("Got user %s and result %s", user, result)
following = result['following']
followers = result['followers']
users_to_read = list(set().union(following, followers))
[edge_put((user, x, {'weight': 1})) for x in users_to_read]
[read_put(y, timeout=1) for y in users_to_read if not seen_get(y)]
except Empty:
logging.debug("Fetches complete")
return
def write_node():
users = []
users_app = users.append
write_get = to_write.get
while True:
try:
user = write_get(timeout=1)
logging.debug("Writing user %s", user)
users_app(user)
except Empty:
logging.debug("Users complete")
return users
def write_edge():
edges = []
edges_app = edges.append
edge_get = to_edge.get
while True:
try:
edge = edge_get(timeout=1)
logging.debug("Writing edge %s", edge)
edges_app(edge)
except Empty:
logging.debug("Edges Complete")
return edges
if __name__ == '__main__':
pool = Pool(processes=1)
to_read.put(me)
pool.apply_async(fetch_user)
users = pool.apply_async(write_node)
edges = pool.apply_async(write_edge)
GH.add_weighted_edges_from(edges.get())
GH.add_nodes_from(users.get())
pool.close()
pool.join()
什麼我想不出就是爲什麼單個進程的版本是如此之快。理論上,多處理版本應該同時書寫和閱讀。我猜想隊列上存在鎖定爭用,這是速度減慢的原因,但我沒有任何證據。當我縮放fetch_user進程的數量時,它似乎運行速度更快,但是在同步看到它們的數據時遇到了問題。所以我有一些想法是
- 這是一個很好的應用 多處理?我原本是 使用它,因爲我想能夠 並行從數據庫中獲取。
- 從同一隊列中讀寫時如何避免資源爭用?
- 我錯過了一些明顯的設計警告?
- 我該怎麼做才能在閱讀器之間共享一個查找表,所以我不會一直提取同一個用戶兩次?
- 當增加獲取進程的數量時,他們的作者最終鎖定。它看起來像寫入隊列沒有被寫入,但讀取隊列已滿。有沒有比超時和異常處理更好的方式來處理這種情況?
我終於嘗試了這一點,同步是瓶頸。麻煩在於決定如何劃分數據。 – dcolish 2010-10-28 20:58:42