2014-09-19 79 views
3

我需要在一組任務之間分派作業。 std::sync::deque足以解決此問題,但如果隊列爲空,則需要阻止該任務。阻止一個任務,直到隊列在Rust中不爲空

下面的代碼(在GitHub gist可用)是std::sync::deque如何使用一個工作示例:

extern crate time; 

use std::io::timer::sleep; 
use std::sync::deque::{BufferPool, Empty, Abort, Data}; 
use std::time::Duration; 

fn main() { 

    let start = time::precise_time_s(); 
    let pool = BufferPool::new(); 
    let (worker, stealer) = pool.deque(); 

    for task_id in range(1i, 5) { 
    let sc = stealer.clone(); 
    spawn(proc() { 
     loop { 
     let elapse = time::precise_time_s() - start; 
     match sc.steal() { 
      Empty  => { println!("[{} @ {:#7.4}] No items", task_id, elapse); sleep(Duration::milliseconds(300)) }, 
      Abort  => println!("[{} @ {:#7.4}] ABORT. Retrying.", task_id, elapse), 
      Data(item) => println!("[{} @ {:#7.4}] Found {}", task_id, elapse, item) 
     } 
     } 
    }); 
    } 

    for item in range(1i, 1000) { 
    for n in range(1i, 20) { 
     worker.push(item * n); 
    } 
    sleep(Duration::milliseconds(1000)); 
    } 

} 

我看到有一個std::sync::TaskPool,但current implementation如果將作業發送給一個任務,甚至線程忙於較舊的工作。

我的問題是:什麼是最好的方式來阻止任務,直到隊列中有任何項目?

回答

3

在可能的解決方案是使用一個信號:

extern crate time; 

use std::io::timer::sleep; 
use std::sync::deque::{BufferPool, Empty, Abort, Data}; 
use std::sync::{Semaphore, Arc}; 
use std::time::Duration; 

fn main() { 

    let start = time::precise_time_s(); 
    let pool = BufferPool::new(); 
    let (worker, stealer) = pool.deque(); 
    let sem = Arc::new(Semaphore::new(0)); 

    for task_id in range(1i, 5) { 
    let sc = stealer.clone(); 
    let s = sem.clone(); 
    spawn(proc() { 
     loop { 
     let elapse = time::precise_time_s() - start; 
     s.acquire(); 
     match sc.steal() { 
      Empty  => { 
       println!("[{} @ {:#7.4}] No items", task_id, elapse); 
       sleep(Duration::milliseconds(300)) 
      }, 
      Abort  => { 
       println!("[{} @ {:#7.4}] ABORT. Retrying.", task_id, elapse); 
       s.release(); 
      }, 
      Data(item) => println!("[{} @ {:#7.4}] Found {}", task_id, elapse, item) 
     } 
     } 
    }); 
    } 

    for item in range(1i, 1000) { 
    for n in range(1i, 20) { 
     worker.push(item * n); 
     sem.release(); 
    } 
    sleep(Duration::milliseconds(1000)); 
    } 

} 

正如你可以在這裏看到你發佈在生產的每一個價值信號資源,並從隊列中取一個值之前獲取它。在這種情況下,返回的值永遠不會爲空,但Abort仍然是可能的,您必須釋放資源,因爲沒有任何內容被讀取,但值仍然在隊列中。

另一種可能的解決方案是使用通道,在沒有任何值的情況下進行阻塞。對於性能,你將不得不基準這兩個解決方案。

+1

謝謝!最後,我爲所有工作人員使用了一個全局信號量,每個工作都是「釋放()」。代碼在https://gist.github.com/ayosec/aee066d5e5a3f38c66b9#file-jobs_semaphore-rs – Ayose 2014-09-22 09:07:01

+1

@Ayose:你忘了在'Abort'上發佈資源。這樣,當任務阻塞時,您將爲隊列中的每次中止留下一個值。 – Arjan 2014-09-22 13:23:05

+0

或者只是在循環中調用'steal()',直到獲得數據。 – Arjan 2014-09-22 16:01:37