2017-08-29 137 views
1

我試圖在GO使用GO語言在gnatsd中實現請求/響應functinonality,並且我意識到gnatsd不會以異步方式回覆請求。對請求的NATS異步回覆不是異步

我開始使用NATS github示例https://github.com/nats-io/go-nats/tree/master/examples進行調查 - 示例nats-req.go和nats-rply.go。這些例子效果很好。

然後,我修改它們只是爲了測試gnatsd上的並行請求,並提供一些調試信息,其中處理異步回覆的goroutine ID。 有修改示例的源代碼。

nats-rply.go已修改爲僅返回傳入請求的文本以及有關當前goroutine ID的信息。我還增加了異步處理功能1秒睡眠模擬一些處理時間。

package main 
import (
    "fmt" 
    "github.com/nats-io/go-nats" 
    "flag" 
    "log" 
    "runtime" 
    "time" 
    "bytes" 
    "strconv" 
) 

// NOTE: Use tls scheme for TLS, e.g. nats-rply -s tls://demo.nats.io:4443 foo hello 
func usage() { 
    log.Fatalf("Usage: nats-rply [-s server][-t] <subject> \n") 
} 

func printMsg(m *nats.Msg, i int) { 
    log.Printf("[#%d] Received on [%s]: '%s'\n", i, m.Subject, string(m.Data)) 
} 

func main() { 
    log.Printf("Main goroutine ID:%d\n", getGID()) 
    var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)") 
    var showTime = flag.Bool("t", false, "Display timestamps") 

    //log.SetFlags(0) 
    flag.Usage = usage 
    flag.Parse() 

    args := flag.Args() 
    if len(args) < 1 { 
     usage() 
    } 

    nc, err := nats.Connect(*urls) 
    if err != nil { 
     log.Fatalf("Can't connect: %v\n", err) 
    } 

    subj, i := args[0], 0 

    nc.Subscribe(subj, func(msg *nats.Msg) { 
     i++ 
     printMsg(msg, i) 
     //simulation of some processing time 
     time.Sleep(1 * time.Second) 
     newreply := []byte(fmt.Sprintf("REPLY TO request \"%s\", GoroutineId:%d", string(msg.Data), getGID())) 
     nc.Publish(msg.Reply, []byte(newreply)) 
    }) 
    nc.Flush() 

    if err := nc.LastError(); err != nil { 
     log.Fatal(err) 
    } 

    log.Printf("Listening on [%s]\n", subj) 
    if *showTime { 
     log.SetFlags(log.LstdFlags) 
    } 

    runtime.Goexit() 
} 

func getGID() uint64 { 
    b := make([]byte, 64) 
    b = b[:runtime.Stack(b, false)] 
    b = bytes.TrimPrefix(b, []byte("goroutine ")) 
    b = b[:bytes.IndexByte(b, ' ')] 
    n, _ := strconv.ParseUint(string(b), 10, 64) 
    return n 
} 

NATS-req.go已被修改爲發送在單獨的10個夠程10個請求開始並行地,請求超時已被設置爲3,5秒。我嘗試了使用共享NATS連接(函數oneReq())的goroutines,並且還使用自己的NATS連接(函數onReqSeparateConnect())構造了goroutine - 同樣的失敗結果。

package main 

import (
    "flag" 
    "fmt" 
    "github.com/nats-io/go-nats" 
    "sync" 
    "time" 
    "log" 
) 

// NOTE: Use tls scheme for TLS, e.g. nats-req -s tls://demo.nats.io:4443 foo hello 
func usage() { 
    log.Fatalf("Usage: nats-req [-s server (%s)] <subject> \n", nats.DefaultURL) 
} 

func main() { 
    //var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)") 

    //log.SetFlags(0) 
    flag.Usage = usage 
    flag.Parse() 

    args := flag.Args() 
    if len(args) < 1 { 
     usage() 
    } 

    nc, err := nats.Connect(nats.DefaultURL) 
    if err != nil { 
     log.Fatalf("Can't connect: %v\n", err) 
    } 
    defer nc.Close() 
    subj := args[0] 

    var wg sync.WaitGroup 
    wg.Add(10) 
    for i := 1; i <= 10; i++ { 
     //go oneReq(subj, fmt.Sprintf("Request%d", i), nc, &wg) 
     go oneReqSeparateConnect(subj, fmt.Sprintf("Request%d", i), &wg) 
    } 

    wg.Wait() 

} 

func oneReq(subj string, payload string, nc *nats.Conn, wg *sync.WaitGroup) { 
    defer wg.Done() 
    msg, err := nc.Request(subj, []byte(payload), 3500*time.Millisecond) 
    if err != nil { 
     if nc.LastError() != nil { 
      log.Printf("Error in Request: %v\n", nc.LastError()) 
     } 
     log.Printf("Error in Request: %v\n", err) 
    } else { 
     log.Printf("Published [%s] : '%s'\n", subj, payload) 
     log.Printf("Received [%v] : '%s'\n", msg.Subject, string(msg.Data)) 
    } 
} 

func oneReqSeparateConnect(subj string, payload string, wg *sync.WaitGroup) { 
    defer wg.Done() 
    nc, err := nats.Connect(nats.DefaultURL) 
    if err != nil { 
     log.Printf("Can't connect: %v\n", err) 
     return 
    } 
    defer nc.Close() 
    msg, err := nc.Request(subj, []byte(payload), 3500*time.Millisecond) 
    if err != nil { 
     if nc.LastError() != nil { 
      log.Printf("Error in Request: %v\n", nc.LastError()) 
     } 
     log.Printf("Error in Request: %v\n", err) 
    } else { 
     log.Printf("Published [%s] : '%s'\n", subj, payload) 
     log.Printf("Received [%v] : '%s'\n", msg.Subject, string(msg.Data)) 
    } 
} 

而且有結果 - 不受歡迎的行爲,它看起來是NATS-rply.go處理傳入reqests只創建一個夠程和請求以串行的方式進行處理。 nats-req.go一次發送全部10個請求,超時設置爲3,5秒。 nats-rply.go以串行方式以一秒間隔開始響應請求,因此只有3個請求被滿足,直到超過3,5秒超時 - 其餘請求超時。響應消息還包含GoroutineID,對於所有傳入的請求都是相同的!即使再次啓動nats-req時,goroutine id也是一樣,只有在重新啓動nats-rply.go服務器時,ID纔會更改。

在NATS

NATS-req.go日誌

D:\PRAC\TSP\AMON>nats-req foo 
2017/08/29 18:46:48 Sending: 'Request9' 
2017/08/29 18:46:48 Sending: 'Request7' 
2017/08/29 18:46:48 Sending: 'Request10' 
2017/08/29 18:46:48 Sending: 'Request4' 
2017/08/29 18:46:48 Sending: 'Request8' 
2017/08/29 18:46:48 Sending: 'Request6' 
2017/08/29 18:46:48 Sending: 'Request1' 
2017/08/29 18:46:48 Sending: 'Request5' 
2017/08/29 18:46:48 Sending: 'Request2' 
2017/08/29 18:46:48 Sending: 'Request3' 
2017/08/29 18:46:49 Published [foo] : 'Request9' 
2017/08/29 18:46:49 Received [_INBOX.xrsXYOB2QmW1f52pkfLHya.xrsXYOB2QmW1f52pkfLHzJ] : 'REPLY TO request "Request9", GoroutineId:36' 
2017/08/29 18:46:50 Published [foo] : 'Request7' 
2017/08/29 18:46:50 Received [_INBOX.xrsXYOB2QmW1f52pkfLI02.xrsXYOB2QmW1f52pkfLI0l] : 'REPLY TO request "Request7", GoroutineId:36' 
2017/08/29 18:46:51 Published [foo] : 'Request10' 
2017/08/29 18:46:51 Received [_INBOX.xrsXYOB2QmW1f52pkfLI1U.xrsXYOB2QmW1f52pkfLI2D] : 'REPLY TO request "Request10", GoroutineId:36' 
2017/08/29 18:46:52 Error in Request: nats: timeout 
2017/08/29 18:46:52 Error in Request: nats: timeout 
2017/08/29 18:46:52 Error in Request: nats: timeout 
2017/08/29 18:46:52 Error in Request: nats: timeout 
2017/08/29 18:46:52 Error in Request: nats: timeout 
2017/08/29 18:46:52 Error in Request: nats: timeout 
2017/08/29 18:46:52 Error in Request: nats: timeout 

NATS-rply.go日誌

C:\Users\belunek>nats-rply foo 
2017/08/29 18:46:46 Main goroutine ID:1 
2017/08/29 18:46:46 Listening on [foo] 
2017/08/29 18:46:48 [#1] Received on [foo]: 'Request9' 
2017/08/29 18:46:49 [#2] Received on [foo]: 'Request7' 
2017/08/29 18:46:50 [#3] Received on [foo]: 'Request10' 
2017/08/29 18:46:51 [#4] Received on [foo]: 'Request4' 
2017/08/29 18:46:52 [#5] Received on [foo]: 'Request8' 
2017/08/29 18:46:53 [#6] Received on [foo]: 'Request6' 
2017/08/29 18:46:54 [#7] Received on [foo]: 'Request1' 
2017/08/29 18:46:55 [#8] Received on [foo]: 'Request5' 
2017/08/29 18:46:56 [#9] Received on [foo]: 'Request2' 
2017/08/29 18:46:57 [#10] Received on [foo]: 'Request3' 

請任何想法,如何正確地執行請求/響應通信與asyns (並行)響應處理? 感謝您的任何信息。

回答

2

Gnatsd回覆Request在異步的方式,但它不會啓動夠程爲每個請求,只是純粹的異步。而且,由於您使用time.Sleep來模擬處理負載,這會暫停調用goroutine,它看起來像是同步處理。如果您修改您的示例以使用goroutine,則一切正常。

... 
nc.Subscribe(subj, func(msg *nats.Msg) { 
    go handler(msg, i, nc) 
}) 
... 

func handler(msg *nats.Msg, i int, nc *nats.Conn) { 
    i++ 
    printMsg(msg, i) 
    //simulation of some processing time 
    time.Sleep(1 * time.Second) 
    newreply := []byte(fmt.Sprintf("REPLY TO request \"%s\", GoroutineId:%d", string(msg.Data), getGID())) 
    nc.Publish(msg.Reply, []byte(newreply)) 
} 

輸出:

./nats-rply test 
2017/08/30 00:17:05 Main goroutine ID:1 
2017/08/30 00:17:05 Listening on [test] 
2017/08/30 00:17:11 [#1] Received on [test]: 'Request6' 
2017/08/30 00:17:11 [#1] Received on [test]: 'Request5' 
2017/08/30 00:17:11 [#1] Received on [test]: 'Request1' 
2017/08/30 00:17:11 [#1] Received on [test]: 'Request8' 
2017/08/30 00:17:11 [#1] Received on [test]: 'Request3' 
2017/08/30 00:17:11 [#1] Received on [test]: 'Request7' 
2017/08/30 00:17:11 [#1] Received on [test]: 'Request9' 
2017/08/30 00:17:11 [#1] Received on [test]: 'Request4' 
2017/08/30 00:17:11 [#1] Received on [test]: 'Request2' 
2017/08/30 00:17:11 [#1] Received on [test]: 'Request10' 

./nats-req test 
2017/08/30 00:17:12 Published [test] : 'Request3' 
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm6Bq] : 'REPLY TO request "Request3", GoroutineId:37' 
2017/08/30 00:17:12 Published [test] : 'Request7' 
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm5z6] : 'REPLY TO request "Request7", GoroutineId:42' 
2017/08/30 00:17:12 Published [test] : 'Request10' 
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm5wY] : 'REPLY TO request "Request10", GoroutineId:43' 
2017/08/30 00:17:12 Published [test] : 'Request5' 
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm6EO] : 'REPLY TO request "Request5", GoroutineId:34' 
2017/08/30 00:17:12 Published [test] : 'Request8' 
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm66k] : 'REPLY TO request "Request8", GoroutineId:36' 
2017/08/30 00:17:12 Published [test] : 'Request1' 
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm64C] : 'REPLY TO request "Request1", GoroutineId:35' 
2017/08/30 00:17:12 Published [test] : 'Request2' 
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm6Gw] : 'REPLY TO request "Request2", GoroutineId:41' 
2017/08/30 00:17:12 Published [test] : 'Request4' 
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm69I] : 'REPLY TO request "Request4", GoroutineId:40' 
2017/08/30 00:17:12 Published [test] : 'Request9' 
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm61e] : 'REPLY TO request "Request9", GoroutineId:39' 
2017/08/30 00:17:12 Published [test] : 'Request6' 
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm5u0] : 'REPLY TO request "Request6", GoroutineId:38' 
+0

非常感謝您的澄清和工作示例!它工作完美無瑕。 –

+0

一個小問題,如果你想讓你的計數器正常工作,你需要用一個互斥體來包裹它,例如, https://stackoverflow.com/questions/16783273/golang-best-way-to-implement-global-counters-for-highly-concurrent-applications –

1

請記住,通過啓動從消息處理程序去規,你的處理順序就走出了窗外。這是NATS連續調用消息處理程序的原因,爲用戶提供了有保證的順序。如果訂單對您而言並不重要,那麼確實很容易在一個單獨的go-routine(或go-routines池)中開始處理消息。