2012-04-26 67 views
1

我有執行廣度優先搜索資源的算法:螺紋〜廣度優先處理

def crawl(starting_node) 
    items=[starting_node] 
    until items.empty? 
    item = items.shift 
    kids = item.slow_network_action # takes seconds 
    kids.each{ |kid| items << kid } 
    end 
end 

我想用幾個併發線程並行化slow_network_action。
什麼是合理的方式來做到這一點?

下面是工作的技術,但我覺得一定是不正確的方法:

def crawl(starting_node) 
    mutex = Mutex.new 
    items = [starting_node] 
    4.times.map{ 
    loop do 
     unless item=mutex.synchronize{ items.shift } 
     sleep LONGER_THAN_LONGEST_NETWORK_ACTION 
     break unless item=mutex.synchronize{ items.shift } 
     end 
     kids = item.slow_network_action 
     mutex.synchronize{ 
     kids.each{ |kid| items << kid } 
     } 
    end 
    }.each(&:join) 
end 

我想這樣做有螺紋的睡眠時間實際上在等待一個項目被添加到隊列中,在添加項目時喚醒,並且當所有人都在等待時沒有添加任何線程時退出所有線程。


這個交替代碼差不多的作品,但對於可以(並)發生死鎖,總缺乏適當的退出策略:

require 'thread' 
def crawl(starting_node) 
    items = Queue.new 
    items << starting_node 
    4.times.map{ 
    while item=items.shift 
     kids = item.slow_network_action 
     kids.each{ |kid| items << kid } 
    end 
    }.each(&:join) 
end 
+0

您需要查看監視器和條件變量:http://www.ruby-doc.org/stdlib-1.9.3/libdoc/monitor/rdoc/Monitor.html。我會寫一個更詳細的答案,但我要去睡覺了。 – matt 2012-04-27 02:13:48

回答

2

這應該指向你在正確的方向:

require 'monitor' 

NUM_THREADS = 4 

def crawl(starting_node) 
    items = [starting_node] 
    items.extend MonitorMixin 
    item_cond = items.new_cond 

    threads = [] 
    working_threads = 0 
    finished = false 

    NUM_THREADS.times do 
    items.synchronize do 
     working_threads += 1 
    end 
    threads << Thread.new do 
     item = nil 
     kids = [] 
     loop do 
     items.synchronize do 

      #add any new items to array 
      items.concat kids 

      if (items.empty? && working_threads == 1) 
      #all other threads are waiting, and there's no more items 
      #to process, so we must be done 
      finished = true 
      end 

      #wake up all waiting threads, either to finish or do more work 
      #watch out for thundering herds 
      item_cond.broadcast unless (items.empty? && !finished) 

      #wait, but first decrement count of working threads 
      #so we can determine when to finish 
      working_threads -= 1 
      item_cond.wait_while { items.empty? && !finished} 
      Thread.exit if finished 
      working_threads += 1 

      #get next item 
      item = items.shift 
     end 

     kids = item.slow_network_action 
     end 

    end 
    end 

    threads.each(&:join) 
end 

這使得items陣列分成monitor並執行任何同步通過,伴隨着一個asociated從監視器創建的。

這與Queue如何在內部工作類似,只是這也檢查所有工作何時完成(實際上增加了一點複雜性)。

線程主循環以一個空的kids數組開頭,該數組被添加到items以避免在循環中需要兩個單獨的同步塊以及與它們一起使用的競爭條件。

請注意,這使用broadcast這導致所有等待線程喚醒,並可能會導致thundering herd。我不認爲這會在這裏引起任何問題。另一種方法是逐個添加kids的元素,併爲每個元素調用signal。當所有工作完成時,這會增加處理案件的複雜性。

+0

非常感謝!在得到驗收標記之前,我必須閱讀,測試和消化這個,但看起來很合理。 – Phrogz 2012-04-27 21:49:07