2016-09-19 117 views
1

有點概念性問題Nodejs批處理

我有15(例如)需要處理的文件。但我不想一次處理它們一個。相反,我想開始處理其中的5個(任何5個順序並不重要),只要這5個文件中的一個被處理另一個文件即可啓動。這個想法是在處理所有文件之前同時處理最多5個文件。

試圖在節點,但一般我在想念的想法如何能夠實現

+0

我現在不能右出一個完整的答案,但檢查出的子進程API。您基本上想要啓動5個子進程,並使用主進程在需要時爲其提供工作。 – david

+0

除非你正在談論在單線程中做所有這些事情,在這種情況下我不確定我看到了好處 – david

+0

我認爲你需要給我們一些關於你的處理類型的想法(代碼最好)正試圖在這些文件上做。它是CPU密集型還是磁盤密集型?換句話說,你是否需要涉及多個CPU,或者你只是想最大限度地使用主JS線程? – jfriend00

回答

2

你可以做你想要通過下面的代碼是什麼工作了這一點,但我很困惑,爲什麼要這麼做?

function handle(file) { 
    new Promise(function(resolve, reject) { 
     doSomething(file, function(err) { 
     if(err) 
      reject(err); 
     else 
      resolve(); 
     }); 
    }) 
    .then(function() { 
     handle(files.shift()); 
    }); 
    } 

    var files = [1, 2, ....., 15]; 
    var max = 5; 
    while(max--) { 
    handle(files.shift()); 
    } 
1

下面是模擬多個工人工作的中心隊列讀一個小例子:https://jsfiddle.net/ctrlfrk/jsvyg69h/1/

// Fake "work" that is simply a task that takes as many milliseconds as its value. 
const workQueue = [1000,4000,2000,4000,5000,3000,7000,1000,9000,9000,4000,2000,1000,3000,8000,2000,3000,7000,6000,30000]; 


const Worker = (name) => (channel) => { 
    const history = []; 
    const next =() => { 
    const job = channel.getWork(); 
    if (!job) { // All done! 
     console.log('Worker ' + name + ' completed'); 
     return; 
    } 
    history.push(job); 
    console.log('Worker ' + name + ' grabbed new job:' + job +'. History is:', history); 

    window.setTimeout(next, job); //job is just the milliseconds. 
    }; 
    next(); 
} 

const Channel = (queue) => { 
    return { getWork:() => { 
    return queue.pop(); 
    }}; 
}; 

let channel = Channel(workQueue); 
let a = Worker('a')(channel); 
let b = Worker('b')(channel); 
let c = Worker('c')(channel); 
let d = Worker('d')(channel); 
2

對於這種類型的處理的更準確的名稱可能是「有限的並行執行」。 Mario Casciaro在他的書中介紹了這一點,即從第77頁開始的Node.js設計模式。這種模式的一個用例是當您想要控制一組可能會導致負載過重的並行任務時。下面的例子來自他的書。

有限公司並行執行模式

function TaskQueue(concurrency) { 
    this.concurrency = concurrency; 
    this.running = 0; 
    this.queue = []; 
} 

TaskQueue.prototype.pushTask = function(task, callback) { 
    this.queue.push(task); 
    this.next(); 
} 

TaskQueue.prototype.next = function() { 
    var self = this; 
    while(self.running < self.concurrency && self.queue.length) { 
    var task = self.queue.shift(); 
    task(function(err) { 
     self.running--; 
     self.next(); 
    }); 
    self.running++; 
    } 
}