2016-05-18 86 views
4

我想找到一個例子,我可以製作和訂閱卡夫卡的avro郵件。生產和消費卡夫卡Avro郵件沒有匯合組件

在這個時候,我想使用沒有任何融合附加組件的「香草」卡夫卡部署。

這是可能的嗎?我發現的所有例子都很快就開始使用融合特定工具來處理avro消息。

我相信我應該有一種方法可以在kafka平臺上發佈和使用avro消息,並使用任何「分發特定」無插件。

+0

如果我理解你的問題,沒有任何內置的方式來生成和加載卡夫卡的avro消息。基本上,您可以使用像fastavro這樣的avro客戶端,在生成Kafka之前立即將其序列化爲avro格式,並在從主題消費後立即加載。 –

+0

那麼像在自定義序列化程序中包裝avro序列化/反序列化邏輯? –

+0

嗯......或多或少,在生成主題之前將其序列化爲avro格式。根據您使用的客戶端,可能會有快捷方式來執行此操作。 –

回答

5

當然,你可以做到這一點,沒有任何Confluent工具。但是你必須在你身邊做額外的工作(例如在你的應用程序代碼中) - 這是提供Avro相關工具的最初動機,例如你提到的來自Confluent的工具。

一種方法是直接使用Apache Avro Java API手動序列化/反序列化Kafka消息的有效負載(例如從YourJavaPojobyte[])。 (我想你認爲Java是選擇的編程語言。)這將如何?這是一個例子。

  • 首先,您將手動序列化應用程序中的數據有效載荷,將數據寫入卡夫卡。在這裏,您可以使用Avro序列化API對有效負載進行編碼(從Java POjo到byte[]),然後使用Kafka的Java生產者客戶端將寫入編碼負載寫入Kafka主題。
  • 然後,在數據管道的下游,您將在另一個從Kafka讀取數據的應用程序中進行反序列化。在這裏,您可以使用Kafka的Java客戶端客戶端讀取來自同一個Kafka主題的(編碼)數據,並使用Avro反序列化API再次解碼有效載荷(從byte[]到Java pojo)。

當然,在使用流處理工具如Kafka Streams(將包含在即將推出的Apache Kafka 0.10中)或Apache Storm中時,您也可以直接使用Avro API。

最後,您還可以選擇使用某些實用程序庫(無論是來自Confluent還是其他地方),以便您不必直接使用Apache Avro API。關於它的價值,我已經在kafka-storm-starter,例如,如AvroDecoderBolt.scala所示。在這裏,Avro序列化/反序列化通過使用Scala庫Twitter Bijection完成。下面是AvroDecoderBolt.scala一個例子片斷給你的總體思路:

// This tells Bijection how to automagically deserialize a Java type `T`, 
    // given a byte array `byte[]`. 
    implicit private val specificAvroBinaryInjection: Injection[T, Array[Byte]] = 
SpecificAvroCodecs.toBinary[T] 

    // Let's put Bijection to use. 
    private def decodeAndEmit(bytes: Array[Byte], collector: BasicOutputCollector) { 
    require(bytes != null, "bytes must not be null") 
    val decodeTry = Injection.invert(bytes) // <-- deserialization, using Twitter Bijection, happens here 
    decodeTry match { 
     case Success(pojo) => 
     log.debug("Binary data decoded into pojo: " + pojo) 
     collector.emit(new Values(pojo)) // <-- Here we are telling Storm to send the decoded payload to downstream consumers 
     () 
     case Failure(e) => log.error("Could not decode binary data: " + Throwables.getStackTraceAsString(e)) 
    } 
    } 

所以,是的,你當然可以選擇不使用任何額外的庫如匯合的Avro的串行器/解串器(目前提供作爲confluentinc/schema-registry部分)或Twitter's Bijection。是否值得額外的努力取決於你自己決定。