2016-04-22 89 views
0

我們使用protobuf編碼消息處理kafka/samza作業的管道。對於某些數據集,流水線可能相當長,我們希望爲流水線中的每個階段添加時間戳/ id以監視效率和服務健康狀況。您可以將序列化消息注入另一個protobuf消息中嗎

附加信息將被添加到架構中稱爲接觸點的重複字段。顯然,在java/samza中解碼消息,添加附加消息並再次序列化具有隨消息大小而增加的開銷(一些可能相當大,增加了反序列化時間),管道的某些部分僅僅是檢查消息的過濾器關鍵,甚至可能甚至不需要反序列化,所以這些上的開銷越小越好。

是否有可能在不反序列化的情況下將第二個序列化消息注入到現有消息中,如果是的話這是非常糟糕的做法(我只能認爲會這樣),是否有更好的解決方案,反序列化/添加/序列化以監視消息路徑/流程的時間

回答

2

一般來說,這將是非常棘手的,並且由於以下原因無法以「流式傳輸」方式完成:子消息的前綴大小爲編碼在一個可變長度的整數。因此,注入某些內容意味着將所有父級大小遞歸調整到根,並且由於大小的可變長度編碼,大小更改可能會再次移動內容。

爲避免此問題,您可以做的一件事情可能是使用固定大小的字段作爲時間戳,並確保它們在第一階段構建原型時充滿了值,因此您已經分配了原型中的相應空間。這應該允許您使用CodedInputStream掃描原始文件(理想情況下唯一的)時間戳字段ID,並使用CodedOutputStream將修補後的流寫回。獲得這個正確的將仍然需要了解內部格式。我建議首先從一個空的通過「過濾器」開始,並檢查輸出是否與輸入相匹配(如果遇到任何問題,請更新問題)

+0

完全忘記了消息大小前綴確實複雜有一點關係。由於我們不知道消息可能具有的階段數量,因此不確定固定值是否可行。將爲一個lib創建一個好的未來項目,所以我將來可能會回來。歡呼的信息! –