2017-07-25 121 views
0

我試圖使用Kafka Connect從Oracle數據庫接收數據。 Kafka連接器提供的默認對象是「GenericRecord」類型。這使得它的方式過於具體,導致通過執行record.getAsString(「someIDENTIFIER」)獲取數據的情況。是否有可能獲取特定類型的對象而不是GenericRecord類型。Kafka Connect集成

+0

您正在使用哪個Connect插件? JDBC? DBVisit?加洲的金門大橋? –

+0

我正在使用融合JDBC。 「connector.class」:「io.confluent.connect.jdbc.JdbcSourceConnector」 –

回答

2

卡夫卡連接信號源連接器與SourceRecord對象一起工作,並且卡夫卡連接工人被配置爲使用一個變換器序列化的SourceRecord爲二進制形式,其隨後寫入到卡夫卡主題。 Kafka Connect附帶JSON轉換器,Confluent提供Avro轉換器。所以,寫給Kafka的消息的二進制形式取決於你使用的是哪個轉換器。

(同樣地,水槽連接器與SinkRecord對象一起工作,並且卡夫卡連接工人使用其轉換器反序列化從卡夫卡讀取消息的二進制形式爲SinkRecord對象與所述連接器的交易。)

聽起來就像你正在寫一個卡夫卡消費者,並在那裏看到GenericRecord對象。如果是這樣,那麼你可能已經配置了卡夫卡連接工作者使用匯合的Avro的轉換器,這對於信號源連接器像JDBC連接器SourceRecord轉換成Avro的二進制格式卡夫卡連接,然後寫入到卡夫卡的話題。您的客戶端,然後有可能使用與Avro的解串器配置了卡夫卡的消費者,除非你給解串器的Avro的模式與它的工作將反序列化的Avro編碼消息到Avro的GenericRecord

但是,您可以將應用程序配置爲了解Avro架構的特定版本,並讓構建系統爲該版本的Avro架構生成代碼,以創建將反序列化Avro編碼的特定代碼消息轉換爲由模式描述的內存中形式。在Java中,這意味着您需要從模式生成類,然後在代碼中使用生成的類將GenericRecord複製到類的實例中。請參閱this complete consumer example,特別是this line,用於從GenericRecord轉換。在這個例子中,LogLine從Avro的模式生成的類:Avro公司的

GenericRecord genericEvent = (GenericRecord) messageAndMetadata.message(); 
LogLine event = (LogLine) SpecificData.get().deepCopy(LogLine.SCHEMA$, genericEvent); 

一個顯著的好處是,它直接支持模式演化,並匯合的模式註冊採取的這種優勢。因此,儘管源連接器可能會演變生成的表的Avro模式,以響應數據庫中表的結構更改,但只要數據庫模式發生變化以便Avro模式向後兼容,則Avro庫您的客戶端應用程序使用將自動從消息的Avro模式轉換爲您的應用程序使用的Avro模式。

當然,在某些時候,你會改變你的應用程序使用新的Avro的模式,但是,這並不必須是在同一時間。事實上,如果你配置模式註冊爲強制執行模式版本是向前和向後兼容,你可以或前更改客戶端應用程序後數據庫中更改和JDBC源連接器開始使用的Avro的新版本架構。

+0

謝謝你這麼多你的迴應。但我正在使用Scala編寫消費者。另外,我們使用Avro生成所有模式並生成相應的類。即使在使用scala或者有不同的方式來模擬這個時,這可以實現嗎? –

+0

請參閱[此問題](https://stackoverflow.com/questions/31763571/importing-avro-schema-in-scala),瞭解如何使用[avro4s庫](https:/ /github.com/sksamuel/avro4s)來生成你的課程。也可能有其他的圖書館。 –

+0

非常感謝你... –