2012-07-15 58 views
1

我使用的應用程序引擎與python使用排隊的任務處理列表。如何確保列表中的每個值僅使用一次?

我有日常任務要做的值列表。這些值存儲在在目標物業的「項目」值:

myproject1.targets=['foo','bar','foo2','bar2','foo3','bar3','foo4','bar4','foo5','bar5'] 

我的目標是要排隊的URL電話:url_to_my_worker每個值,與值作爲參數。

我目前在我的數據庫中只有一個項目對象。

我跑schedule_daily_projects_tasks基本上排入schedule_daily_profile_tasks每個配置文件對象

class schedule_daily_projects_tasks(webapp.RequestHandler): 
    def post(self): 
     key=self.request.get('key') 
     pro=project.get(key) 
     profiles=my_profile.gql("WHERE project=:1",pro) 
     logging.info(profiles) 
     for profile in profiles: 
      taskqueue.add(url='/control/schedule_daily_profile_tasks', params={'key': profile.key()}) 

然後運行 ​​'schedule_daily_profile_tasks' 爲每一個配置文件。

class schedule_daily_profile_tasks(webapp.RequestHandler): 
    def post(self): 
     key=self.request.get('key') 
     profile=my_profile.get(key) 
     pro=profile.project 
     for i in range(1, 6): 
      now=datetime.now() 
      tim=datetime(year=now.year, month=now.month, day=now.day, hour=8+i) 
      screen_name=pro.targets.pop() 
      taskqueue.add(url='/url_to_my_worker', params={'profk': key, 'screen_name':screen_name}, eta=tim) 
      pro.put() 

比方說,我有我的數據庫5個配置文件對象:配置文件1,以規範5 所以,如果一切順利的井,我應該有排隊的URL「/ url_to_my_worker」 5個任務,用參數:

1) params={'profk': profile1.key(), 'screen_name':'bar5'} 
2) params={'profk': profile2.key(), 'screen_name':'foo5'} 
3) params={'profk': profile3.key(), 'screen_name':'bar4'} 
4) params={'profk': profile4.key(), 'screen_name':'foo4'} 
5) params={'profk': profile5.key(), 'screen_name':'bar3'} 

但是,相反,我得到:

1) params={'profk': profile1.key(), 'screen_name':'bar5'} 
2) params={'profk': profile2.key(), 'screen_name':'bar5'} 
3) params={'profk': profile3.key(), 'screen_name':'bar5'} 
4) params={'profk': profile4.key(), 'screen_name':'bar5'} 
5) params={'profk': profile5.key(), 'screen_name':'bar5'} 

相信任務跑的太快,所以N N°1°前2開始具有 「突出」。因此myproject1.targets具有相同的值。

如何確保列表中的每個值僅被使用一次?

非常感謝

+3

我建議閱讀以下IBM文章:www.ibm.com/developerworks/aix/library/au-threadingpython/ – 2012-07-15 14:58:25

+0

那麼,這將建議使用Pull隊列schedule_daily_profile_tasks,以便每個任務只在前一個任務完成時才啓動? – user375348 2012-07-15 15:17:50

+0

不一定。隊列的第一個示例支持在同一隊列上運行的多個「工作」線程。 – 2012-07-15 15:20:07

回答

0

Python帶有一個內置的線程安全隊列庫模塊。

http://docs.python.org/library/queue.html

隊列模塊實現多生產者,多消費者隊列。當信息必須在多個線程之間安全地交換時,它在線程編程中特別有用。此模塊中的Queue類實現了所有必需的鎖定語義。這取決於Python中線程支持的可用性;請參閱線程模塊。

你可以這樣做,當調用pop()時,它會阻塞,直到釋放任何鎖。

編輯:鏈接的示例頁面使用隊列模塊來完成其工作。

import Queue 
targets = ['foo','bar','foo2','bar2','foo3','bar3','foo4','bar4','foo5','bar5'] 
queue = Queue.LifoQueue() # Last in first out 
for target in targets: 
    queue.put(target) 
myproject1.targets = queue 
########################## 
class schedule_daily_profile_tasks(webapp.RequestHandler): 
    .... 
    screen_name=pro.targets.get(block=true) 
+0

這在這裏沒有關係。 OP正在使用任務隊列,因此不同的任務將在不同的機器上運行,而不是在同一臺機器上的不同線程中運行。 – 2012-07-16 03:10:59

1

你可能需要考慮增加你的任務的批次:

targets=pro.targets 
tasks=[] 
... 
    screen_name=targets.pop() 
    tasks.append(taskqueue.Task(url='/url_to_my_worker', params={'profk': key, 'screen_name':screen_name}, eta=tim)) 
... 
pro.put() 
taskqueue.Queue().add(tasks) 

注意,就可以避免因創建隊列同時運行在特定的隊列任務。YAML指定max_concurrent_requests爲1:

queue: 
- name: default 
    max_concurrent_requests: 1 
2

您遇到的問題是或多或少爲你描述:你已經入隊正試圖同時修改同一個數據存儲對象的多個任務。由於您沒有使用事務,因此多個任務最終會檢索相同的數據,執行相同的操作,然後覆蓋對方的結果。

您可以使用數據存儲事務來避免這種情況,但更好的解決方案是重構您的任務,以便只有一個任務正在修改每個數據存儲實體。這樣,您就不需要擔心同步或交易問題。

0

如果您的項目中沒有大量的配置元素recs:Schedule_daily_projects_tasks()可以將所有配置文件元素連接到序列化的字符串中。將該字符串作爲參數傳遞給任務隊列。每個任務進程從字符串中彈出一個元素,對其進行處理,如果沒有發生異常,則使用較短的序列化字符串對任務進行排隊。如果您正在對同一個實體進行更新,則會以大約兩秒的時間延遲將任務排入隊列。這也將有助於規定處理任務隊列的實例數量,這可以降低成本。

相關問題