2013-03-24 54 views
7

是否有可能跨越多個消費者「管道」發電機的消耗?將一個迭代器傳遞給多個消費者?

例如,它是常見的有代碼與該圖案:

def consumer1(iterator): 
    for item in iterator: 
     foo(item) 

def consumer2(iterator): 
    for item in iterator: 
     bar(item) 

myiter = list(big_generator()) 
v1 = consumer1(myiter) 
v2 = consumer2(myiter) 

在這種情況下,多個功能完全消耗相同的迭代器,因此有必要緩存在一個列表中的迭代器。由於每個消費者用盡迭代器,因此itertools.tee是無用的。

我看到很多這樣的代碼,我總是希望我可以讓消費者一次消費一個項目,而不是緩存整個迭代器。例如: -

  1. consumer1消耗myiter[0]
  2. consumer2消耗myiter[0]
  3. consumer1消耗myiter[1]
  4. consumer2消耗myiter[1]
  5. 等...

如果我是馬磕了一個語法,它應該是這樣的:

c1_retval, c2_retval = iforkjoin(big_generator(), (consumer1, consumer2)) 

你可以親近與螺紋或多核和tee d迭代器,但線程以不同的速度意味着雙端隊列緩存內tee值可以得到非常大的消耗。這裏的重點不在於利用並行性或加速任務,而是爲了避免緩存大部分迭代器。

在我看來,如果不修改消費者,這可能是不可能的,因爲控制流程在消費者中。然而,當一個消費者實際使用迭代器控制進入迭代器的方法時,所以也許有可能以某種方式反轉控制流,以便迭代器一次一個地阻止消費者,直到它可以提供所有消費者爲止。

如果這是可能的,我不夠聰明,看看如何。有任何想法嗎?

+0

如果這個觀點不一定是並行性,那麼一種可能性就是集中迭代器,並將數據發送給每個* consumer *,也許使用一個帶有'position%task'列表的類,該列表返回每個'產量「呼叫。 – Rubens 2013-03-24 03:58:09

+0

也許這可以通過協程來完成?如果您有一箇中央迭代器,它將項目發送給單個協程,那麼可以獲得您想要的內容。無論如何,我認爲你應該檢查節省的內存是否很重要,因爲事情會變得複雜。我個人會簡單地使用'myiter = list(the_generator())'並傳遞序列。 – Bakuriu 2013-03-24 12:09:25

回答

1

有了不改變消費者的代碼(即具有在其中循環)的限制,你只剩下兩個選擇:

  1. 你已經包括在你的問題的辦法:緩存在內存中生成的項目,然後多次迭代它們。
  2. 在一個線程中運行每個消費者,並實現某種同步 - itertools.tee,其中一個緩衝區的大小= 1,這將阻止服務項目i+1,直到項目i已提供給所有消費者。

沒有其他選擇。你無法實現所有的下面,因爲它們是矛盾的:具有發電機

  • 具有環消耗它的所有
  • 然後

    1. ,(serially-)以前的循環後完成後,具有另一個循環以消耗所有的它再次
    2. 只保留O(1)在存儲器中(或盤等)的物品,同時消耗他們
    3. 未再生(即,不是重新創建所述發電機)

    生成的項目必須存儲在的某處如果您想重用它們。

    如果改變消費者的代碼是可以接受的,很明顯@猴子的解決方案是最簡單和最直接的。

  • 1

    這不工作?或者你是否需要整個迭代器,以便像這樣每個副本都行不通?如果是這樣,那麼我認爲你必須創建一個副本,否則生成兩次列表?

    for item in big_generator(): 
        consumer1.handle_item(item) 
        consumer2.handle_item(item) 
    
    +2

    這需要重寫消費者。許多消費者只是接受一個序列並自己消費。這種模式通過反轉控制,使消費者被告知何時消費。 (yield表達式的目的是爲了更容易使用這種風格的生成器進行編碼。)所以這不是我正在尋找的答案。 – 2013-03-24 04:02:27