我有一個應用程序運行在服務器上,它接受來自電話應用程序的請求,然後在工作服務器上負載平衡請求。我試圖添加一個超時的情況下,主服務器上的出站隊列中的超時時間長度的消息從隊列中刪除。更具體地說,主服務器上的應用程序是用golang編寫的,並實現了負載均衡的Paranoid Pirate Pattern。我公司目前擁有的代碼是:如何使ZeroMQ超時排出隊列,但尚未在設定時間內發送的消息?
import (
"fmt"
zmq "github.com/pebbe/zmq4"
"time"
)
const (
HEARTBEAT_LIVENESS = 3
HEARTBEAT_INTERVAL = 1500 * time.Millisecond
MESSAGE_READY = "\001"
MESSAGE_HEARTBEAT = "\002"
)
var (
client *zmq.Socket
backend *zmq.Socket
frontend *zmq.Socket
workerPoller *zmq.Poller
brokerPoller *zmq.Poller
workerQueue []Worker
)
type Worker struct {
Id string
Expire time.Time
}
type RequestWrapper {
RequestToSend Request
}
func NewWorker(id string) Worker {
return Worker{
Id: id,
Expire: time.Now().Add(HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS),
}
}
func AddReadyWorker(workers []Worker, worker Worker) []Worker {
fmt.Println(worker.Id, " joined")
for i, w := range workers {
if worker.Id == w.Id {
if i == 0 {
workers = workers[1:]
} else if i == len(workers)-1 {
workers = workers[:i]
} else {
workers = append(workers[:i], workers[i+1:]...)
}
break
}
}
return append(workers, worker)
}
func PurgeInactiveWorkers() {
now := time.Now()
for i, worker := range workerQueue {
if now.Before(worker.Expire) {
workerQueue = workerQueue[i:]
return
}
}
workerQueue = workerQueue[0:0]
}
func LoadBalance() {
// Loop:
heartbeat := time.Tick(HEARTBEAT_INTERVAL)
for {
var sockets []zmq.Polled
// If you have available workers, poll on the both front and backend
// If not poll on backend with infinite timeout
if len(workerQueue) > 0 {
sockets, _ = brokerPoller.Poll(HEARTBEAT_INTERVAL)
} else {
sockets, _ = workerPoller.Poll(HEARTBEAT_INTERVAL)
}
for _, socket := range sockets {
switch socket.Socket {
// backend is a router
case backend:
workerId, _ := backend.Recv(0)
workerQueue = AddReadyWorker(workerQueue, NewWorker(workerId))
clientId, _ := backend.Recv(0)
if clientId != MESSAGE_READY && clientId != MESSAGE_HEARTBEAT {
route, _ := backend.Recv(0)
message, _ := backend.RecvBytes(0)
fmt.Println("Received response")
RouteResponse(route, message)
// frontend.Send(clientId, zmq.SNDMORE)
// frontend.Send("", zmq.SNDMORE)
// frontend.SendBytes(message, 0)
}
// frontend is a dealer
case frontend:
clientId, _ := frontend.Recv(0)
route, _ := frontend.Recv(0)
message, _ := frontend.RecvBytes(0)
backend.Send(workerQueue[0].Id, zmq.SNDMORE)
backend.Send(clientId, zmq.SNDMORE)
backend.Send(route, zmq.SNDMORE)
backend.SendBytes(message, 0)
workerQueue = workerQueue[1:]
}
}
select {
case <-heartbeat:
for _, worker := range workerQueue {
backend.Send(worker.Id, zmq.SNDMORE)
backend.Send(MESSAGE_HEARTBEAT, 0)
}
break
default:
}
PurgeInactiveWorkers()
}
}
如果後端發送一條消息,但在一段時間內它沒有實際發送一個工人,我希望它過期,而不是以往任何時候都發送。 有沒有可以實現這個功能的套接字選項?如果沒有,我需要做些什麼才能做到這一點?
兩種方式,我想我能做到這一點沒有套接字選項有:
1)具有後端包裹郵件中的包裝和發送到golang隊列,而不是通過zeromq。包裝包含消息被「發送」的時間。後端併發地從golang隊列的前面一次拉出一個,並檢查消息是否過期。如果是這樣,不要發送,如果沒有,發送消息。我可以讓後端將消息添加到golang隊列中,然後在同一代碼塊中真正發送消息。這樣,我不需要鎖。
2)通過zeromq將包裝器消息發送到一個檢索器,並且檢索器檢查它是否過期並提前返回。我不喜歡這個,因爲它看起來對它的表現不好。
而不是從隊列中刪除過期的消息,爲什麼不在隊列中添加「expires-at」屬性到隊列中。工作人員每次收到郵件時都可以檢查此屬性,並刪除已過期的郵件。 – colini