如何首先從查詢中獲取所有數據,然後增量式地僅使用kafka連接器進行更改。
也許這可能對你有幫助。例如,我有一個表:
╔════╦═════════════╦═══════════╗
║ Id ║ Name ║ Surname ║
╠════╬═════════════╬═══════════╣
║ 1 ║ Martin ║ Scorsese ║
║ 2 ║ Steven ║ Spielberg ║
║ 3 ║ Christopher ║ Nolan ║
╚════╩═════════════╩═══════════╝
在這種情況下,我將創建一個視圖:
CREATE OR REPLACE VIEW EDGE_DIRECTORS AS
SELECT 0 AS EXID, ID, NAME, SURNAME
FROM DIRECTORS WHERE ID =< 2
UNION ALL
SELECT ID AS EXID, ID, NAME, SURNAME
FROM DIRECTORS WHERE ID > 2;
屬性文件中對卡夫卡的JDBC連接器,你可以使用:
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
mode=incrementing
incrementing.column.name=EXID
topic.prefix=
tasks.max=1
name=gv-jdbc-source-connector
connection.url=
table.types=VIEW
table.whitelist=EDGE_DIRECTORS
所以kafka jdbc連接器將採取以下步驟:
- 起初EXID = 0的所有數據;
- 它將在connector.offsets文件中存儲偏移值= 0;
- 新行將被插入到DIRECTORS表中。
- 卡夫卡JDBC連接器將執行:
Select EXID, ID, NAME, SURNAME FROM EDGE_DIRECTORS
並將 注意到EXID已增加。
- 數據將在Kafka Streams中更新。
不完全是我問的。目前即時通訊使用時間戳列。我必須將模式更改爲批量才能重新加載所有內容,然後更改回時間戳以讓卡夫卡逐漸加載已更改的數據或新數據(它會將查詢附加到來回時間戳以執行此操作)。我希望避免每次我想從「清潔」石板開始切換模式。 – mike01010