2016-10-03 51 views
0

我已將RabbitMQ pub/sub tutorial轉換爲下面的虛擬測試。不知何故,它不像預期的那樣工作。RabbitMQ pub/sub實施不起作用

amqpURL是有效的AMQP服務(即RabbitMQ)URL。我用隊列示例測試了它,它可以工作。不知何故,它在「交換」失敗

我希望TestDummy記錄「[x] Hello World」。不知何故,它沒有發生。只有發送一半按預期工作。

我得到了什麼問題?

import (
    "fmt" 
    "log" 
    "testing" 

    "github.com/streadway/amqp" 
) 

func TestDummy(t *testing.T) { 
    done := exchangeReceive() 
    exchangeSend("Hello World") 
    <-done 
} 

func exchangeSend(msg string) { 
    failOnError := func(err error, msg string) { 
     if err != nil { 
      log.Fatalf("%s: %s", msg, err) 
      panic(fmt.Sprintf("%s: %s", msg, err)) 
     } 
    } 

    log.Printf("exchangeSend: connect %s", amqpURL) 
    conn, err := amqp.Dial(amqpURL) 
    failOnError(err, "Failed to connect to RabbitMQ") 
    defer conn.Close() 

    ch, err := conn.Channel() 
    failOnError(err, "Failed to open a channel") 
    defer ch.Close() 

    err = ch.ExchangeDeclare(
     "logs", // name 
     "fanout", // type 
     true,  // durable 
     false, // auto-deleted 
     false, // internal 
     false, // no-wait 
     nil,  // arguments 
    ) 
    failOnError(err, "Failed to declare an exchange") 

    body := []byte(msg) 
    err = ch.Publish(
     "logs", // exchange 
     "",  // routing key 
     false, // mandatory 
     false, // immediate 
     amqp.Publishing{ 
      ContentType: "text/plain", 
      Body:  []byte(body), 
     }) 
    failOnError(err, "Failed to publish a message") 

    log.Printf(" [x] Sent %s", body) 
} 

func exchangeReceive() <-chan bool { 

    done := make(chan bool) 

    failOnError := func(err error, msg string) { 
     if err != nil { 
      log.Fatalf("%s: %s", msg, err) 
      panic(fmt.Sprintf("%s: %s", msg, err)) 
     } 
    } 

    log.Printf("exchangeReceive: connect %s", amqpURL) 
    conn, err := amqp.Dial(amqpURL) 
    failOnError(err, "Failed to connect to RabbitMQ") 
    defer conn.Close() 

    ch, err := conn.Channel() 
    failOnError(err, "Failed to open a channel") 
    defer ch.Close() 

    err = ch.ExchangeDeclare(
     "logs", // name 
     "fanout", // type 
     true,  // durable 
     false, // auto-deleted 
     false, // internal 
     false, // no-wait 
     nil,  // arguments 
    ) 
    failOnError(err, "Failed to declare an exchange") 

    q, err := ch.QueueDeclare(
     "", // name 
     false, // durable 
     false, // delete when usused 
     true, // exclusive 
     false, // no-wait 
     nil, // arguments 
    ) 
    failOnError(err, "Failed to declare a queue") 

    err = ch.QueueBind(
     q.Name, // queue name 
     "",  // routing key 
     "logs", // exchange 
     false, 
     nil) 
    failOnError(err, "Failed to bind a queue") 

    msgs, err := ch.Consume(
     q.Name, // queue 
     "",  // consumer 
     true, // auto-ack 
     false, // exclusive 
     false, // no-local 
     false, // no-wait 
     nil, // args 
    ) 
    failOnError(err, "Failed to register a consumer") 

    go func() { 
     for d := range msgs { 
      log.Printf(" [x] %s", d.Body) 
      done <- true 
     } 
    }() 

    log.Printf(" [*] Waiting for logs. To exit press CTRL+C") 

    return done 
} 

回答

0

這裏的一些愚蠢的錯誤。當exchangeRecieve結束時,延期統計信息被觸發並關閉連接。這就是我重寫失敗的原因。

我已經改變了我的代碼,這樣一來,它的工作:

import (
    "fmt" 
    "os" 
    "testing" 
    "time" 

    "github.com/streadway/amqp" 
) 

func TestDummy(t *testing.T) { 
    amqpURL := os.Getenv("CLOUDAMQP_URL") 
    t.Logf(" [*] amqpURL: %s", amqpURL) 

    results1 := exchangeReceive(t, "consumer 1", amqpURL) 
    results2 := exchangeReceive(t, "consumer 2", amqpURL) 
    time.Sleep(50 * time.Millisecond) 

    exchangeSend(t, amqpURL, "Hello World") 
    if want, have := "Hello World", <-results1; want != have { 
     t.Errorf("expected %#v, got %#v", want, have) 
    } 
    if want, have := "Hello World", <-results2; want != have { 
     t.Errorf("expected %#v, got %#v", want, have) 
    } 
} 

func exchangeReceive(t *testing.T, name, amqpURL string) <-chan string { 

    out := make(chan string) 

    failOnError := func(err error, msg string) { 
     if err != nil { 
      t.Fatalf("%s: %s", msg, err) 
      panic(fmt.Sprintf("%s: %s", msg, err)) 
     } 
    } 

    conn, err := amqp.Dial(amqpURL) 
    failOnError(err, "Failed to connect to RabbitMQ") 

    ch, err := conn.Channel() 
    failOnError(err, "Failed to open a channel") 

    err = ch.ExchangeDeclare(
     "logs", // name 
     "fanout", // type 
     true,  // durable 
     false, // auto-deleted 
     false, // internal 
     false, // no-wait 
     nil,  // arguments 
    ) 
    failOnError(err, "Failed to declare an exchange") 

    q, err := ch.QueueDeclare(
     "", // name 
     false, // durable 
     false, // delete when usused 
     true, // exclusive 
     false, // no-wait 
     nil, // arguments 
    ) 
    failOnError(err, "Failed to declare a queue") 

    err = ch.QueueBind(
     q.Name, // queue name 
     "",  // routing key 
     "logs", // exchange 
     false, 
     nil) 
    failOnError(err, "Failed to bind a queue") 

    msgs, err := ch.Consume(
     q.Name, // queue 
     "",  // consumer 
     true, // auto-ack 
     false, // exclusive 
     false, // no-local 
     false, // no-wait 
     nil, // args 
    ) 
    failOnError(err, "Failed to register a consumer") 

    go func() { 
     for d := range msgs { 
      t.Logf(" [x] %s received: %s", name, d.Body) 
      out <- string(d.Body) 
     } 
    }() 

    t.Logf(" [*] %s ready to receive", name) 
    return out 
} 

func exchangeSend(t *testing.T, amqpURL, msg string) { 
    failOnError := func(err error, msg string) { 
     if err != nil { 
      t.Fatalf("%s: %s", msg, err) 
      panic(fmt.Sprintf("%s: %s", msg, err)) 
     } 
    } 

    conn, err := amqp.Dial(amqpURL) 
    failOnError(err, "Failed to connect to RabbitMQ") 
    defer conn.Close() 

    ch, err := conn.Channel() 
    failOnError(err, "Failed to open a channel") 
    defer ch.Close() 

    err = ch.ExchangeDeclare(
     "logs", // name 
     "fanout", // type 
     true,  // durable 
     false, // auto-deleted 
     false, // internal 
     false, // no-wait 
     nil,  // arguments 
    ) 
    failOnError(err, "Failed to declare an exchange") 

    body := []byte(msg) 
    err = ch.Publish(
     "logs", // exchange 
     "",  // routing key 
     false, // mandatory 
     false, // immediate 
     amqp.Publishing{ 
      ContentType: "text/plain", 
      Body:  []byte(body), 
     }) 
    failOnError(err, "Failed to publish a message") 

    t.Logf(" [x] Sent %s", body) 
}