2017-08-24 159 views
0

我試圖從隊列(RabbitMQ)讀取URL並進行有限數量的併發HTTP請求,即有10個工作人員對從隊列接收的URL進行併發請求的池(永遠)。限制處理來自RabbitMQ的消息時的併發性

到目前爲止,我已經實現了一個以消費者爲每RabbitMQ的教程: https://www.rabbitmq.com/tutorials/tutorial-one-go.html

,並嘗試了一些從發現網絡上的實例方法,在這裏的例子結束: http://jmoiron.net/blog/limiting-concurrency-in-go/

不幸的是,我當前的代碼運行了大約一分鐘,然後無限期地凍結。我嘗試過添加/移動例程,但似乎無法按預期工作(我對Go非常陌生)。

當前代碼:

package main 

import (
    "fmt" 
    "log" 
    "net/http" 
    "time" 

    "github.com/Xide/bloom" 
    "github.com/streadway/amqp" 
) 

func failOnError(err error, msg string) { 
    if err != nil { 
     log.Fatalf("%s: %s", msg, err) 
     panic(fmt.Sprintf("%s: %s", msg, err)) 
    } 
} 

var netClient = &http.Client{ 
    Timeout: time.Second * 10, 
} 

func getRequest(url string) { 
    //resp, err := http.Get(string(url)) 
    resp, err := netClient.Get(string(url)) 
    if err != nil { 
     log.Printf("HTTP request error: %s", err) 
     return 
    } 
    fmt.Println("StatusCode:", resp.StatusCode) 
    fmt.Println(resp.Request.URL) 
} 

func main() { 
    bf := bloom.NewDefaultScalable(0.1) 

    conn, err := amqp.Dial("amqp://127.0.0.1:5672/") 
    failOnError(err, "Failed to connect to RabbitMQ") 
    defer conn.Close() 

    ch, err := conn.Channel() 
    failOnError(err, "Failed to open a channel") 
    defer ch.Close() 

    q, err := ch.QueueDeclare(
     "urls",   // name 
     true,    // durable 
     false,    // delete when unused 
     false,    // exclusive 
     false,    // no-wait 
     nil,    // arguments 
    ) 
    failOnError(err, "Failed to declare a queue") 

    err = ch.Qos(
     1,  // prefetch count 
     0,  // prefetch size 
     false, //global 
    ) 
    failOnError(err, "Failed to set Qos") 

    msgs, err := ch.Consume(
     q.Name, // queue 
     "",  // consumer 
     false, // auto-ack 
     false, // exclusive 
     false, // no-local 
     false, // no-wait 
     nil, // args 
    ) 
    failOnError(err, "Failed to register a consumer") 

    forever := make(chan bool) 

    concurrency := 10 
    sem := make(chan bool, concurrency) 
    go func() { 
     for d := range msgs { 
      sem <- true 
      url := string(d.Body) 
      if bf.Match(url) == false { 
       bf.Feed(url) 
       log.Printf("Not seen: %s", d.Body) 
       go func(url string) { 
        defer func() { <-sem }() 
        getRequest(url) 
       }(url) 
      } else { 
       log.Printf("Already seen: %s", d.Body) 
      } 
      d.Ack(false) 
     } 
     for i := 0; i < cap(sem); i++ { 
      sem <- true 
     } 
    }() 

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C") 
    <-forever 
} 
+0

可能你的日誌輸出增加的問題,這將有助於人們看到是怎麼回事 – Lewis42

+0

試着用'-race'標誌運行程序,它可以幫助您進行調試:https://開頭博客。 golang.org/race-detector – Nebril

+0

如果將併發設置爲10,它會使大約60個HTTP請求(逐漸變慢)然後凍結。用-race編譯不提供任何信息。 – user3104123

回答

2

你沒有正確地處理你的HTTP響應,導致一組開放連接的增長。試試這個:

func getRequest(url string) { 
    resp, err := netClient.Get(string(url)) 
    if err != nil { 
     log.Printf("HTTP request error: %s", err) 
     return 
    } 
    // Add this bit: 
    defer func() { 
     io.Copy(ioutil.Discard, resp.Body) 
     resp.Body.Close() 
    }() 
    fmt.Println("StatusCode:", resp.StatusCode) 
    fmt.Println(resp.Request.URL) 
} 

這一點,你從通道看完信息後,似乎是不必要的和潛在問題:

for i := 0; i < cap(sem); i++ { 
     sem <- true 
    } 

爲什麼填補sem通道你從隊列中讀取所有的消息後, ?您已將盡可能多的消息添加到您希望從中讀取的消息中,因此最好是,如果您對其餘代碼進行了錯誤更改,則可能會導致問題。

完全無關的問題,但是這是多餘的:

if err != nil { 
    log.Fatalf("%s: %s", msg, err) 
    panic(fmt.Sprintf("%s: %s", msg, err)) 
} 

the documentationFatalf已經存在,所以panic將永遠不會被調用。如果你想記錄和panic,請嘗試log.Panicf,這是專門爲此目的而設計的。

0

當您收到一條消息時,您將添加到sem,但只有在您沒有看到url時從sem中刪除。

因此,一旦你已經「看到」10個網址,你的應用就會鎖定。 因此,您需要將<-sem添加到記錄「已見過」的else語句中。

無論哪種方式,這是一種相當奇怪的方式來做這種併發。 這是一個更加習慣於on Play的版本。

注意,在這個版本中,我們只產生了10個聽兔子頻道的goroutines。

包主

進口( 「FMT」 「日誌」 「淨/ HTTP」 「時間」

"github.com/Xide/bloom" 
"github.com/streadway/amqp" 

FUNC failOnError(ERR出錯,味精串){ 若ERR =零{ log.Fatalf(! 「%S%S」,味精,ERR) } }

變種netClient = & http.Client { 超時:time.Second * 10, }

FUNC調用getRequest(URL串){// RESP,ERR:= http.Get(字符串(URL) ) RESP,ERR:!= netClient.Get(字符串(URL)) 如果ERR =零{ log.Printf( 「HTTP請求錯誤:%s」,ERR) 返回 } resp.Body.Close( ) fmt.Println( 「的StatusCode:」,resp.StatusCode) fmt.Println(resp.Request.URL) }

func main() { 
    bf := bloom.NewDefaultScalable(0.1) 

    conn, err := amqp.Dial("amqp://127.0.0.1:5672/") 
    failOnError(err, "Failed to connect to RabbitMQ") 
    defer conn.Close() 

    ch, err := conn.Channel() 
    failOnError(err, "Failed to open a channel") 
    defer ch.Close() 

    q, err := ch.QueueDeclare(
     "urls", // name 
     true, // durable 
     false, // delete when unused 
     false, // exclusive 
     false, // no-wait 
     nil, // arguments 
    ) 
    failOnError(err, "Failed to declare a queue") 

    err = ch.Qos(
     1,  // prefetch count 
     0,  // prefetch size 
     false, //global 
    ) 
    failOnError(err, "Failed to set Qos") 

    msgs, err := ch.Consume(
     q.Name, // queue 
     "",  // consumer 
     false, // auto-ack 
     false, // exclusive 
     false, // no-local 
     false, // no-wait 
     nil, // args 
    ) 
    failOnError(err, "Failed to register a consumer") 

    concurrency := 10 
    var wg sync.Waitgroup    // used to coordinate when they are done, ie: if rabbit conn was closed 
    for x := 0; x < concurrency; x++ { // spawn 10 goroutines, all reading from the rabbit channel 
     go func() { 
      defer wg.Done() // signal that this goroutine is done 
      for d := range msgs { 
       url := string(d.Body) 
       if bf.Match(url) == false { 
        bf.Feed(url) 
        log.Printf("Not seen: %s", d.Body) 
        getRequest(url) 
       } else { 
        log.Printf("Already seen: %s", d.Body) 
       } 
       d.Ack(false) 
      } 
      log.Println("msgs channel closed") 
     }() 
    } 

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C") 
    wg.Wait() // when all goroutine's exit, the app exits 
} 
+0

上面的例子與退出:'恐慌:同步:負WaitGroup counter' @更新的例子大衛-budworth – user3104123

+0

,我忽略與工人的數量(併發性)來初始化Waitgroup。我實際上無法運行應用程序,因爲我沒有提交任何項目,所以您可能需要調整一下。重點更多的是展示另一種方式,並解釋爲什麼您的解決方案掛起。 –

相關問題