2017-06-21 141 views
2

我有一個應用程序運行在服務器上,它接受來自電話應用程序的請求,然後在工作服務器上負載平衡請求。我試圖添加一個超時的情況下,主服務器上的出站隊列中的超時時間長度的消息從隊列中刪除。更具體地說,主服務器上的應用程序是用golang編寫的,並實現了負載均衡的Paranoid Pirate Pattern。我公司目前擁有的代碼是:如何使ZeroMQ超時排出隊列,但尚未在設定時間內發送的消息?

import (
    "fmt" 
    zmq "github.com/pebbe/zmq4" 
    "time" 
) 

const (
    HEARTBEAT_LIVENESS = 3 
    HEARTBEAT_INTERVAL = 1500 * time.Millisecond 

    MESSAGE_READY  = "\001" 
    MESSAGE_HEARTBEAT = "\002" 
) 

var (
    client *zmq.Socket 
    backend *zmq.Socket 
    frontend *zmq.Socket 
    workerPoller *zmq.Poller 
    brokerPoller *zmq.Poller 
    workerQueue []Worker 
) 

type Worker struct { 
    Id string 
    Expire time.Time 
} 

type RequestWrapper { 
    RequestToSend Request 

} 

func NewWorker(id string) Worker { 
    return Worker{ 
     Id: id, 
     Expire: time.Now().Add(HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS), 
    } 
} 

func AddReadyWorker(workers []Worker, worker Worker) []Worker { 
    fmt.Println(worker.Id, " joined") 
    for i, w := range workers { 
     if worker.Id == w.Id { 
      if i == 0 { 
       workers = workers[1:] 
      } else if i == len(workers)-1 { 
       workers = workers[:i] 
      } else { 
       workers = append(workers[:i], workers[i+1:]...) 
      } 
      break 
     } 
    } 
    return append(workers, worker) 
} 

func PurgeInactiveWorkers() { 
    now := time.Now() 
    for i, worker := range workerQueue { 
     if now.Before(worker.Expire) { 
      workerQueue = workerQueue[i:] 
      return 
     } 
    } 

    workerQueue = workerQueue[0:0] 
} 

func LoadBalance() { 
// Loop: 
    heartbeat := time.Tick(HEARTBEAT_INTERVAL) 
    for { 
     var sockets []zmq.Polled 

     // If you have available workers, poll on the both front and backend 
     // If not poll on backend with infinite timeout 
     if len(workerQueue) > 0 { 
      sockets, _ = brokerPoller.Poll(HEARTBEAT_INTERVAL) 
     } else { 
      sockets, _ = workerPoller.Poll(HEARTBEAT_INTERVAL) 
     } 

     for _, socket := range sockets { 
      switch socket.Socket { 
       // backend is a router 
       case backend: 
        workerId, _ := backend.Recv(0) 
        workerQueue = AddReadyWorker(workerQueue, NewWorker(workerId)) 
        clientId, _ := backend.Recv(0) 
        if clientId != MESSAGE_READY && clientId != MESSAGE_HEARTBEAT { 
         route, _ := backend.Recv(0) 
         message, _ := backend.RecvBytes(0) 

         fmt.Println("Received response") 
         RouteResponse(route, message) 

         // frontend.Send(clientId, zmq.SNDMORE) 
         // frontend.Send("", zmq.SNDMORE) 
         // frontend.SendBytes(message, 0) 
        } 
       // frontend is a dealer 
       case frontend: 
        clientId, _ := frontend.Recv(0) 
        route, _ := frontend.Recv(0) 
        message, _ := frontend.RecvBytes(0) 

        backend.Send(workerQueue[0].Id, zmq.SNDMORE) 
        backend.Send(clientId, zmq.SNDMORE) 
        backend.Send(route, zmq.SNDMORE) 
        backend.SendBytes(message, 0) 

        workerQueue = workerQueue[1:] 
      } 
     } 

     select { 
      case <-heartbeat: 
       for _, worker := range workerQueue { 
        backend.Send(worker.Id, zmq.SNDMORE) 
        backend.Send(MESSAGE_HEARTBEAT, 0) 
       } 
       break 
      default: 
     } 

     PurgeInactiveWorkers() 
    } 
} 

如果後端發送一條消息,但在一段時間內它沒有實際發送一個工人,我希望它過期,而不是以往任何時候都發送。 有沒有可以實現這個功能的套接字選項?如果沒有,我需要做些什麼才能做到這一點?

兩種方式,我想我能做到這一點沒有套接字選項有:

1)具有後端包裹郵件中的包裝和發送到golang隊列,而不是通過zeromq。包裝包含消息被「發送」的時間。後端併發地從golang隊列的前面一次拉出一個,並檢查消息是否過期。如果是這樣,不要發送,如果沒有,發送消息。我可以讓後端將消息添加到golang隊列中,然後在同一代碼塊中真正發送消息。這樣,我不需要鎖。

2)通過zeromq將包裝器消息發送到一個檢索器,並且檢索器檢查它是否過期並提前返回。我不喜歡這個,因爲它看起來對它的表現不好。

+1

而不是從隊列中刪除過期的消息,爲什麼不在隊列中添加「expires-at」屬性到隊列中。工作人員每次收到郵件時都可以檢查此屬性,並刪除已過期的郵件。 – colini

回答

0

最後,解決方案是添加一個像@colini和@bazza這樣expires-at屬性,並在每次心跳後從隊列中刪除超時消息。但是,這樣做並滿足我的應用程序的所有要求比第一眼看起來要困難得多,所以我最終使用了RabbitMQ,它的ttl-expires參數提供了所需的功能。

0

在較新的API版本中,有一個選項可以丟棄所有「舊」消息,並始終只提供「最新」消息。

如果這符合您的期望,並且所有同行都符合API v.4.0 +,那麼您就完成了。

ZMQ_CONFLATE:僅保留最後一條消息

如果設置,插座應在其入站/出站隊列只保留一個消息,收到此消息是最後消息/發送的最後一條消息。忽略ZMQ_RCVHWMZMQ_SNDHWM選項。不支持多部分消息,特別是只有一部分消息保存在套接字內部隊列中。
Option value typeint
Option value unitboolean
Default value0 (false)
Applicable socket typesZMQ_PULLZMQ_PUSHZMQ_SUBZMQ_PUBZMQ_DEALER

+1

該選項僅保留最後一條消息,這將有可能丟棄尚未過期的消息。另外,它不支持我發送的多部分消息。 –

+0

當然!這些正是使我複製API規範的原因。如果滿足其他條件(將有效負載包裝到BLOB中是一種常見的性能動機實踐),則這些事實都不是使用此選項的原因。 – user3666197

1

你想要做的是使用通信作爲執行集合。發送者想知道什麼時候接收者收到消息。

ZMQ實現Actor模型。您需要修改通信順序進程模型(其中發送超時)。基本上,您需要向工作人員添加控制消息流,但想法是服務器要求工作人員接收消息,服務器等待答覆。答覆意味着工作人員已經準備好接收消息,並且服務器和工作人員都在其程序流程中發送/接收。如果該答覆在超時秒內未能到達,則服務器不會發送實際消息。

或者你可以通過讓所有東西都去工作人員而作弊,不管是什麼東西,都會包含一條帶有「在X時間發送」字段的消息,並讓工作人員決定放棄舊消息。

相關問題