2010-08-12 38 views
2

我一直在通過簡單網絡研究圖遍歷算法,並且我想使用多處理來運行它,因爲它需要大量的I/O有界調用,當我通過整個網絡進行擴展。簡單版本的運行非常快:在Python中將圖遍歷轉換爲多處理

already_seen = {} 
already_seen_get = already_seen.get 

GH_add_node = GH.add_node 
GH_add_edge = GH.add_edge 
GH_has_node = GH.has_node 
GH_has_edge = GH.has_edge 


def graph_user(user, depth=0): 
    logger.debug("Searching for %s", user) 
    logger.debug("At depth %d", depth) 
    users_to_read = followers = following = [] 

    if already_seen_get(user): 
     logging.debug("Already seen %s", user) 
     return None 

    result = [x.value for x in list(view[user])] 

    if result: 
     result = result[0] 
     following = result['following'] 
     followers = result['followers'] 
     users_to_read = set().union(following, followers) 

    if not GH_has_node(user): 
     logger.debug("Adding %s to graph", user) 
     GH_add_node(user) 

    for follower in users_to_read: 
     if not GH_has_node(follower): 
      GH_add_node(follower) 
      logger.debug("Adding %s to graph", follower) 
      if depth < max_depth: 
       graph_user(follower, depth + 1) 

     if GH_has_edge(follower, user): 
      GH[follower][user]['weight'] += 1 
     else: 
      GH_add_edge(user, follower, {'weight': 1}) 

它比我的多版本實際上顯著快:

to_write = Queue() 
to_read = Queue() 
to_edge = Queue() 
already_seen = Queue() 


def fetch_user(): 
    seen = {} 
    read_get = to_read.get 
    read_put = to_read.put 
    write_put = to_write.put 
    edge_put = to_edge.put 
    seen_get = seen.get 

    while True: 
     try: 
      logging.debug("Begging for a user") 

      user = read_get(timeout=1) 
      if seen_get(user): 
       continue 

      logging.debug("Adding %s", user) 
      seen[user] = True 
      result = [x.value for x in list(view[user])] 
      write_put(user, timeout=1) 

      if result: 
       result = result.pop() 
       logging.debug("Got user %s and result %s", user, result) 
       following = result['following'] 
       followers = result['followers'] 
       users_to_read = list(set().union(following, followers)) 

       [edge_put((user, x, {'weight': 1})) for x in users_to_read] 

       [read_put(y, timeout=1) for y in users_to_read if not seen_get(y)] 

     except Empty: 
      logging.debug("Fetches complete") 
      return 


def write_node(): 
    users = [] 
    users_app = users.append 
    write_get = to_write.get 

    while True: 
     try: 
      user = write_get(timeout=1) 
      logging.debug("Writing user %s", user) 
      users_app(user) 
     except Empty: 
      logging.debug("Users complete") 
      return users 


def write_edge(): 
    edges = [] 
    edges_app = edges.append 
    edge_get = to_edge.get 

    while True: 
     try: 
      edge = edge_get(timeout=1) 
      logging.debug("Writing edge %s", edge) 
      edges_app(edge) 
     except Empty: 
      logging.debug("Edges Complete") 
      return edges 


if __name__ == '__main__': 
    pool = Pool(processes=1) 
    to_read.put(me) 

    pool.apply_async(fetch_user) 
    users = pool.apply_async(write_node) 
    edges = pool.apply_async(write_edge) 

    GH.add_weighted_edges_from(edges.get()) 
    GH.add_nodes_from(users.get()) 

    pool.close() 
    pool.join() 

什麼我想不出就是爲什麼單個進程的版本是如此之快。理論上,多處理版本應該同時書寫和閱讀。我猜想隊列上存在鎖定爭用,這是速度減慢的原因,但我沒有任何證據。當我縮放fetch_user進程的數量時,它似乎運行速度更快,但是在同步看到它們的數據時遇到了問題。所以我有一些想法是

  • 這是一個很好的應用 多處理?我原本是 使用它,因爲我想能夠 並行從數據庫中獲取。
  • 從同一隊列中讀寫時如何避免資源爭用?
  • 我錯過了一些明顯的設計警告?
  • 我該怎麼做才能在閱讀器之間共享一個查找表,所以我不會一直提取同一個用戶兩次?
  • 當增加獲取進程的數量時,他們的作者最終鎖定。它看起來像寫入隊列沒有被寫入,但讀取隊列已滿。有沒有比超時和異常處理更好的方式來處理這種情況?

回答

1

Queues在Python中是同步的。這意味着一次只能讀寫一個線程,這肯定會在您的應用程序中引發瓶頸。

一個更好的解決方案是基於hash function分配處理,並通過簡單的模塊操作將處理分配給線程。因此,舉例來說,如果您有4個線程,則可能有4個隊列:

thread_queues = [] 
for i in range(4): 
    thread_queues = Queue() 

for user in user_list: 
    user_hash=hash(user.user_id) #hash in here is just shortcut to some standard hash utility 
    thread_id = user_hash % 4 
    thread_queues[thread_id].put(user) 

# From here ... your pool of threads access thread_queues but each thread ONLY accesses 
# one queue based on a numeric id given to each of them. 

大部分散列函數將均勻分配您的數據。我通常使用UMAC。但是也許你可以試試Python String實現中的散列函數。

另一個改進是避免使用隊列並使用非同步對象,如列表。

+0

我終於嘗試了這一點,同步是瓶頸。麻煩在於決定如何劃分數據。 – dcolish 2010-10-28 20:58:42