2016-06-22 53 views
0

繼承人的交易,我有我的數據庫中的某些條目。我打電話一個Django:For循環python信息

variables = Variable.objects.order_by('foo').values('foo') 

然後,我有一個for語句上的每個變量執行發現:

for x in variables: 
    #doing something.... 

我的問題是,「做一些事情」是一項持續的任務...即它不停止。那麼我怎麼能夠在第二個變量上運行for循環呢?

我認爲這與池化有關,但這並不意味着我只能同時進行4個進程?如果我想要說50個單獨的過程爲每個50個變量運行,並且每個過程不會停止,直到某個時間或以前,我該怎麼做?

甚至可以這樣做。

這裏是我的多碼:

if __name__ == '__main__': 
x = Variable.objects.order_by('foo').values('foo') 
for t in x: 
    t = t.values() 
    foo = "".join(t) 
    info('Starting...') 
    p = Process(target=myfunction, args=(foo,)) 
    p.start() 
    p.join() 

myFunction是什麼在一個無限循環運行...

@samuel:

# globals 
my_queue = multiprocessing.Manager().Queue() # queue to store our values 
stop_event = multiprocessing.Event() # flag which signals processes to stop 
my_pool = None 

def my_function(foo): 
    while not stop_event.is_set(): 
     print("starting %s" % foo) 
     try: 
      var = my_queue.get_nowait() # getting value from queue 
     except Queue.Empty: 
      print "No more items in queue" 
     # do you logic here 


# Since `t` could have unlimited size but do wan't to limit processes 
# we'll put all `t` value in queue 

x = Company.objects.order_by('ticker').values('ticker') 
for t in x: 
    foo = t.values() 
    my_queue.put(foo) 

MAX_PROCESSES = len(x) 
my_pool = multiprocessing.Pool(MAX_PROCESSES) 

for i in range(MAX_PROCESSES): 
    my_pool.apply_async(my_function, args=(foo,)) 
my_pool.close() 
my_pool.join() 
+1

['multithreading'(https://docs.python.org/2/library/threading.html) –

+0

我想這是一個他們。感謝您的快速響應 –

+0

@MoonCheesez - 對於50個併發任務,'threading'通常是一個糟糕的選擇,因爲python GIL只允許運行一次。 'multiprocessing'將是更好的選擇。那麼「多線程」應該是什麼呢? – tdelaney

回答

0

這就是解決方案可以實現的方式使用多處理庫。

我們將使用Poolapply_asyncQueue

# globals 
MAX_PROCESSES = 50 
my_queue = multiprocessing.Manager().Queue() # queue to store our values 
stop_event = multiprocessing.Event() # flag which signals processes to stop 
my_pool = None 

def my_function(proc_name, var): 
    while not stop_event.is_set(): 
     # do you logic here with var variable 


def var_scanner_process(): 
    # Since `t` could have unlimited size we'll put all `t` value in queue 
    while not stop_event.is_set(): # forever scan `values` for new items 
     x = Variable.objects.order_by('foo').values('foo') 
     for t in x: 
      t = t.values() 
      my_queue.put(t) 
     time.sleep(10) 

try: 
    var_scanner_process = Process(target=var_scanner) 
    var_scanner_process.start() 
    my_pool = multiprocessing.Pool(MAX_PROCESSES) 

    while not stop_event.is_set(): 
     try: # if queue isn't empty, get value from queue and create new process 
      var = my_queue.get_nowait() # getting value from queue 
      p = Process(target=my_function, args=("process-%d" % i, var)) 
      p.start() 
     exception Queue.Empty: 
      print "No more items in queue" 

except KeyboardInterrupt as stop_test_exception: 
    print(" CTRL+C pressed. Stopping test....") 
    stop_event.set() 
+0

我真的很感謝幫助的人。我可以澄清一下嗎?首先找到變量,然後放入「隊列」中,然後加入?那麼對於我在範圍內(「50」)它適用於該函數的異步並傳遞「進程ID」?什麼是進程ID?以及如何將「t」變爲「my_function」 –

+0

異常Queue.Empty:是一種無效的語法。你能幫助我嗎? –

+0

@DenisAngell你必須導入'Queue'模塊來處理'Queue.Empty'異常 – Samuel