2

我有一個EC2實例,它有一個使用事件發射器處理數據的數據流。例如。處理數據流日誌

stream.on('new event', function doSomething(event){ do more stuff...}) 

該數據流具有潛在事件每秒幾萬,我想記錄這些事件的處理以有效的方式。換句話說,我不願意在每次發生新事件時發出日誌條目。

因此,我想我會批量發送日誌。例如。

let logArray = []; 
function sendToLogs(logs) {\** send stuff *\} 

stream.on('new event', function doSomething(event){ 
    \\do some stuff 

    logArray.push({newLog: event}) 
    if (logArray.length >= 500) { 
    sendToLogs(logArray) 
    logArray = []; 
    } 
}) 

不過,恐怕有這麼多的事件,同時進來的時候,上面的代碼可能會導致異常行爲。我已經在本地日誌中看到過這種情況:這個數組的長度相當戲劇性地跳躍,並且可以同時爲不同的事件設置相同的值。

此外,使用cloudwatch日誌需要我在不同的日誌函數調用之間傳遞'sequenceTokens'。如果兩個事件同時觸發日誌條件,事情可能會變得很奇怪。 (即使我單獨記錄每個事件,也會存在此問題。)

我應該如何處理這類數據流的日誌記錄?

回答

1

我會將登錄分隔成一個或多個單獨的進程。您的主應用程序會將日誌消息放置在SQS隊列中,並使用「即忘即忘」類型的邏輯。然後,您的日誌記錄應用程序將讀取隊列並寫入您選擇的日誌。優點是活動的爆發會被隊列吸收。隊列的長度沒有直接的限制,所以應該能夠處理。實際上,你不再排隊消息,SQS是。

此外,如果隊列增長超過你想要的,你可以有多個日誌記錄應用程序來處理負載。

的缺點是:

  1. 你需要寫這個單獨的進程來處理記錄到的CloudWatch或者永遠。
  2. 您的日誌不會是實時的。在您的主應用程序日誌和將日誌消息放入CloudWatch時,至少會有一些延遲。有了額外的日誌記錄過程,您應該關閉,但這不是保證。
+0

SQS肯定可以工作。如果需要更接近實時行爲,則Lambda函數將是另一種選擇。如果fire和forget進程只是委託給lambda,那麼你會得到一個「fan out」模式,lambda可以動態擴展到所需的日誌進程數。 –