2016-06-09 121 views
1

同步例:Golang非阻塞緩衝

type job struct { 
    Id int 
    Message string 
} 

for { 
    // getJob() blocks until job is received 
    job := getJob() 
    doSomethingWithJob(job) 
} 

我希望處理的工作,因爲他們從getJob進來與doSomethingWithJob。例如getJob可以是從MessagingQueue(例如RabbitMQ/Beanstalkd)收到的有效內容或處理HTTP請求。

我不想阻擋getJob,而我是doSomethingWithJob &反之亦然。然而,我確實想要控制/緩衝作業的數量,這樣我就不會使系統過載。例如最大併發度爲5.

go例程的概念目前讓我困惑,所以任何指向正確方向的指針都會非常感謝,以幫助我學習。

更新:謝謝@JimB的幫助。爲什麼工人5總是會選擇工作?

jobCh := make(chan *job) 

// Max 5 Workers 
for i := 0; i < 5; i++ { 

    go func() { 

     for job := range jobCh { 
      time.Sleep(time.Second * time.Duration(rand.Intn(3))) 
      log.Println(i, string(job.Message)) 
     } 
    }() 
} 

for { 
    job, err := getJob() 
    if err != nil { 
     log.Println("Closing Channel") 
     close(jobCh) 
     break 
    } 

    jobCh <- job 
} 

log.Println("Complete") 

實施例輸出

2016/06/09 22:19:57 5 {"id":10692,"name":"Test Message"} 
2016/06/09 22:19:57 5 {"id":10687,"name":"Test Message"} 
2016/06/09 22:19:57 5 {"id":10699,"name":"Test Message"} 
2016/06/09 22:19:57 5 {"id":10701,"name":"Test Message"} 
2016/06/09 22:19:57 5 {"id":10703,"name":"Test Message"} 
2016/06/09 22:19:57 5 {"id":10704,"name":"Test Message"} 
+0

它不是第5個工人,每個工人都有i = 5。它應該是'去func(我int){...}(i)'https://golang.org/doc/faq#closures_and_goroutines – Darigaaz

回答

4

可以啓動5個夠程從信道讀取調用doSomethingWithJob。這樣就不會有超過5個作業同時處理。

jobCh := make(chan *job) 

// start 5 workers to process jobs 
for i := 0; i < 5; i++ { 
    go func() { 
     for job := range jobCh { 
      doSomethingWithJob(job) 
     } 
    }() 
} 

// send jobs to workers as fast as we can 
for { 
    jobCh <- getJob() 
} 
+0

哇...我一直在看這一切倒退和過度複雜! – Gravy

+0

我已經嘗試過上面的方法。它可以工作,但看起來只有工人5正在完成這項工作。查看有問題的更新。 – Gravy

+0

@Gravy:https://golang.org/doc/faq#closures_and_goroutines。每個goroutine使用相同的'i'變量,而我的最後一個值是'5'。 – JimB