讓我道歉對我的問題的任何含糊開始我會努力,我可以提供有關這個主題的儘可能多的信息(希望不要太多),請讓我知道我是否應該提供更多。同樣,我對卡夫卡頗爲陌生,可能會在術語上磕磕絆絆。
因此,根據我對sink和source的工作原理的瞭解,我可以使用Kafka Quickstart指南提供的FileStreamSourceConnector將數據(Neo4j命令)寫入Kafka集羣中的主題。然後,我可以編寫我自己的Neo4j接收器連接器並讀取這些命令並將它們發送到一個或多個Neo4j服務器。爲了保持項目儘可能簡單,現在,我基於接收器連接器和任務關閉了Kafka快速入門指南的FileStreamSinkConnector和FileStreamSinkTask。
卡夫卡的FileStream:
我Neo4j的信宿連接:
package neo4k.sink;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class Neo4jSinkConnector extends SinkConnector {
public enum Keys {
;
static final String URI = "uri";
static final String USER = "user";
static final String PASS = "pass";
static final String LOG = "log";
}
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(Keys.URI, Type.STRING, "", Importance.HIGH, "Neo4j URI")
.define(Keys.USER, Type.STRING, "", Importance.MEDIUM, "User Auth")
.define(Keys.PASS, Type.STRING, "", Importance.MEDIUM, "Pass Auth")
.define(Keys.LOG, Type.STRING, "./neoj4sinkconnecterlog.txt", Importance.LOW, "Log File");
private String uri;
private String user;
private String pass;
private String logFile;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void start(Map<String, String> props) {
uri = props.get(Keys.URI);
user = props.get(Keys.USER);
pass = props.get(Keys.PASS);
logFile = props.get(Keys.LOG);
}
@Override
public Class<? extends Task> taskClass() {
return Neo4jSinkTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> config = new HashMap<>();
if (uri != null)
config.put(Keys.URI, uri);
if (user != null)
config.put(Keys.USER, user);
if (pass != null)
config.put(Keys.PASS, pass);
if (logFile != null)
config.put(Keys.LOG, logFile);
configs.add(config);
}
return configs;
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
}
我Neo4j的水槽任務:
package neo4k.sink;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.neo4j.driver.v1.AuthTokens;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.StatementResult;
import org.neo4j.driver.v1.exceptions.Neo4jException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Map;
public class Neo4jSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(Neo4jSinkTask.class);
private String uri;
private String user;
private String pass;
private String logFile;
private Driver driver;
private Session session;
public Neo4jSinkTask() {
}
@Override
public String version() {
return new Neo4jSinkConnector().version();
}
@Override
public void start(Map<String, String> props) {
uri = props.get(Neo4jSinkConnector.Keys.URI);
user = props.get(Neo4jSinkConnector.Keys.USER);
pass = props.get(Neo4jSinkConnector.Keys.PASS);
logFile = props.get(Neo4jSinkConnector.Keys.LOG);
driver = null;
session = null;
try {
driver = GraphDatabase.driver(uri, AuthTokens.basic(user, pass));
session = driver.session();
} catch (Neo4jException ex) {
log.trace(ex.getMessage(), logFilename());
}
}
@Override
public void put(Collection<SinkRecord> sinkRecords) {
StatementResult result;
for (SinkRecord record : sinkRecords) {
result = session.run(record.value().toString());
log.trace(result.toString(), logFilename());
}
}
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void stop() {
if (session != null)
session.close();
if (driver != null)
driver.close();
}
private String logFilename() {
return logFile == null ? "stdout" : logFile;
}
}
的問題:
編寫後,我接下來構建了包括它有任何相關性,但不包括任何卡夫卡的依賴,到一個JAR(或Uber Jar?這是一個文件)。然後,我編輯了connect-standalone.properties中的插件路徑,以包含該工件併爲我的Neo4j接收器編寫了一個屬性文件。我這樣做是爲了試圖遵循這些guidelines。
我Neo4j的信宿連接屬性文件:
name=neo4k-sink
connector.class=neo4k.sink.Neo4jSinkConnector
tasks.max=1
uri=bolt://localhost:7687
user=neo4j
pass=Hunter2
topics=connect-test
但是一旦運行獨立,我得到的輸出這個錯誤關閉流(第5行錯誤):
[2017-08-14 12:59:00,150] INFO Kafka version : 0.11.0.0 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-08-14 12:59:00,150] INFO Kafka commitId : cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-08-14 12:59:00,153] INFO Source task WorkerSourceTask{id=local-file-source-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:143)
[2017-08-14 12:59:00,153] INFO Created connector local-file-source (org.apache.kafka.connect.cli.ConnectStandalone:91)
[2017-08-14 12:59:00,153] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:100)
java.lang.IllegalArgumentException: Malformed \uxxxx encoding.
at java.util.Properties.loadConvert(Properties.java:574)
at java.util.Properties.load0(Properties.java:390)
at java.util.Properties.load(Properties.java:341)
at org.apache.kafka.common.utils.Utils.loadProps(Utils.java:429)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:84)
[2017-08-14 12:59:00,156] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:65)
[2017-08-14 12:59:00,156] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:154)
[2017-08-14 12:59:00,168] INFO Stopped [email protected]{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306)
[2017-08-14 12:59:00,173] INFO Stopped [email protected]{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865)
編輯:我應該提到,在輸出聲明添加插件的連接器加載部分中,我沒有看到任何提及的早期構建的jar,併爲connect- standalone.properties創建了一個途徑。這裏有一個背景片段:
[2017-08-14 12:58:58,969] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-08-14 12:58:58,969] INFO Added plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-08-14 12:58:58,969] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-08-14 12:58:58,969] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-08-14 12:58:58,970] INFO Added plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
結論:
我在虧損,我已經做了測試和研究了大約兩個小時,我不認爲我完全相信問問題。所以我會說如果你已經得到了這麼多,感謝你的閱讀。如果您發現任何令人擔憂的事情,我可能在代碼或方法上做了錯誤的事情(例如,打包jar),或者認爲我應該提供更多的上下文或控制檯日誌或任何真正讓我知道的東西。再次感謝你。
Kafka Connect無法讀取您定義連接器配置的屬性文件,這是「connect-standalone'啓動命令的第二個參數。我建議在文件中尋找隱藏的字符(沒有在這篇文章的內容中),嘗試沒有你的JAR(s)出現在類路徑中,甚至嘗試一個不同的空文件來驗證它可以至少被讀入正確,然後逐漸添加一行,直到您定義了所有配置屬性。 –
或者,嘗試將啓動命令中的配置文件指定爲完全限定的路徑。 –
就是這樣!謝謝。 –