1

我想弄清楚如何從查詢中初始獲取所有數據,然後增量只使用kafka連接器進行更改。原因是我想將所有數據加載到彈性搜索中,然後保持與我的kafka流同步。 目前,我通過首先使用mode = bulk的連接器來完成此操作,然後將其更改爲時間戳。這工作正常。但是,如果我們想要將所有數據重新加載到Streams和ES,這意味着我們必須編寫一些腳本來清除或刪除kafka流和es索引數據,修改連接ini以將模式設置爲批量,重新啓動一切,給它時間加載所有數據,然後再次將腳本修改爲時間戳模式,然後再次重新啓動所有內容(原因是偶爾需要這樣的腳本,批量更新通過etl進程糾正歷史數據,我們不但有控制權,並且此過程不更新時間戳)卡夫卡JDBC連接器加載所有數據,然後增量

是否有人正在做類似的事情,並且找到了更優雅的解決方案?

回答

0

很久以後又回來了。該方法能夠解決這一點,從來沒有使用散裝模式

  1. 站連接器
  2. 擦拭偏移文件爲每個連接器JVM
  3. (可選)如果你想要做一個完整的擦拭和負荷情況,想要可能還刪除您的主題使用kafka/connect utils/rest api(並且不要忘記狀態主題)
  4. 重新啓動連接。
0

如何首先從查詢中獲取所有數據,然後增量式地僅使用kafka連接器進行更改。

也許這可能對你有幫助。例如,我有一個表:

╔════╦═════════════╦═══════════╗ 
║ Id ║ Name  ║ Surname ║ 
╠════╬═════════════╬═══════════╣ 
║ 1 ║ Martin  ║ Scorsese ║ 
║ 2 ║ Steven  ║ Spielberg ║ 
║ 3 ║ Christopher ║ Nolan  ║ 
╚════╩═════════════╩═══════════╝ 

在這種情況下,我將創建一個視圖:

CREATE OR REPLACE VIEW EDGE_DIRECTORS AS 
SELECT 0 AS EXID, ID, NAME, SURNAME 
FROM DIRECTORS WHERE ID =< 2 
UNION ALL 
SELECT ID AS EXID, ID, NAME, SURNAME 
FROM DIRECTORS WHERE ID > 2; 

屬性文件中對卡夫卡的JDBC連接器,你可以使用:

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector 
mode=incrementing 
incrementing.column.name=EXID 
topic.prefix= 
tasks.max=1 
name=gv-jdbc-source-connector 
connection.url= 
table.types=VIEW 
table.whitelist=EDGE_DIRECTORS 

所以kafka jdbc連接器將採取以下步驟:

  1. 起初EXID = 0的所有數據;
  2. 它將在connector.offsets文件中存儲偏移值= 0;
  3. 新行將被插入到DIRECTORS表中。
  4. 卡夫卡JDBC連接器將執行:Select EXID, ID, NAME, SURNAME FROM EDGE_DIRECTORS並將 注意到EXID已增加。
  5. 數據將在Kafka Streams中更新。
+0

不完全是我問的。目前即時通訊使用時間戳列。我必須將模式更改爲批量才能重新加載所有內容,然後更改回時間戳以讓卡夫卡逐漸加載已更改的數據或新數據(它會將查詢附加到來回時間戳以執行此操作)。我希望避免每次我想從「清潔」石板開始切換模式。 – mike01010

相關問題