我試圖從隊列(RabbitMQ)讀取URL並進行有限數量的併發HTTP請求,即有10個工作人員對從隊列接收的URL進行併發請求的池(永遠)。限制處理來自RabbitMQ的消息時的併發性
到目前爲止,我已經實現了一個以消費者爲每RabbitMQ的教程: https://www.rabbitmq.com/tutorials/tutorial-one-go.html
,並嘗試了一些從發現網絡上的實例方法,在這裏的例子結束: http://jmoiron.net/blog/limiting-concurrency-in-go/
不幸的是,我當前的代碼運行了大約一分鐘,然後無限期地凍結。我嘗試過添加/移動例程,但似乎無法按預期工作(我對Go非常陌生)。
當前代碼:
package main
import (
"fmt"
"log"
"net/http"
"time"
"github.com/Xide/bloom"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
var netClient = &http.Client{
Timeout: time.Second * 10,
}
func getRequest(url string) {
//resp, err := http.Get(string(url))
resp, err := netClient.Get(string(url))
if err != nil {
log.Printf("HTTP request error: %s", err)
return
}
fmt.Println("StatusCode:", resp.StatusCode)
fmt.Println(resp.Request.URL)
}
func main() {
bf := bloom.NewDefaultScalable(0.1)
conn, err := amqp.Dial("amqp://127.0.0.1:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"urls", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, //global
)
failOnError(err, "Failed to set Qos")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
concurrency := 10
sem := make(chan bool, concurrency)
go func() {
for d := range msgs {
sem <- true
url := string(d.Body)
if bf.Match(url) == false {
bf.Feed(url)
log.Printf("Not seen: %s", d.Body)
go func(url string) {
defer func() { <-sem }()
getRequest(url)
}(url)
} else {
log.Printf("Already seen: %s", d.Body)
}
d.Ack(false)
}
for i := 0; i < cap(sem); i++ {
sem <- true
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
可能你的日誌輸出增加的問題,這將有助於人們看到是怎麼回事 – Lewis42
試着用'-race'標誌運行程序,它可以幫助您進行調試:https://開頭博客。 golang.org/race-detector – Nebril
如果將併發設置爲10,它會使大約60個HTTP請求(逐漸變慢)然後凍結。用-race編譯不提供任何信息。 – user3104123