1
因此我有一個使用Spotify/kafka(在Windows 10上運行)運行的本地Docker Container。 在這個容器中有數據發佈在一個名爲「數據」的主題中。在Docker上運行的Zeppelin中使用Spark的卡夫卡
我可以在Eclipse中像這樣運行的Java應用程序消耗的數據:
private String topicName;
private ConsumerConfig config;
public KafkaConsumer() throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "test");
props.put("client.id", this.getClass().getSimpleName());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("partition.assignment.strategy", "range");
this.config = new ConsumerConfig(props);
this.topicName = "data";
}
public void run() {
ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = connector.createMessageStreams(ImmutableMap.of(topicName, 1));
List<KafkaStream<byte[], byte[]>> streams = messageStreams.get(topicName);
ExecutorService executor = Executors.newFixedThreadPool(streams.size());
for (final KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new Runnable() {
public void run() {
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while (iterator.hasNext()) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
String m = new String(messageAndMetadata.message());
System.out.println(m);
}
}
});
}
}
我現在想的是消耗在dylanmei /飛艇泊塢窗容器的同一主題。我想在一個齊柏林筆記本運行以下命令:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"zookeeper.connect" -> "localhost:2181",
"group.id" -> "example",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"partition.assignment.strategy" -> "range",
)
val topics = Array("data")
val stream = KafkaUtils.createDirectStream[String, String](
sc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
這是我從spark kafka integration guide
採取但是,所有我曾經得到的是一個連接被拒絕(當我插入了錯誤的端口,也會發生,所以告訴我什麼都沒有):
java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.thrift.transport.TSocket.open(TSocket.java:182)
at org.apache.zeppelin.interpreter.remote.ClientFactory.create(ClientFactory.java:51)
at org.apache.zeppelin.interpreter.remote.ClientFactory.create(ClientFactory.java:37)
at org.apache.commons.pool2.BasePooledObjectFactory.makeObject(BasePooledObjectFactory.java:60)
at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:861)
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435)
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess.getClient(RemoteInterpreterProcess.java:96)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.init(RemoteInterpreter.java:216)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getFormType(RemoteInterpreter.java:385)
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.getFormType(LazyOpenInterpreter.java:105)
at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:306)
at org.apache.zeppelin.scheduler.Job.run(Job.java:176)
at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:329)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
希望有人能幫助我讓它運行,我沒有線索。
謝謝您的問候!
我會做的第一件事就是登錄到docker和telnet端口2181和9092,以確定進程是否正在偵聽 – user2230605
您是否曾嘗試過使用'--net =「host」'?我認爲默認是橋接,您需要在連接字符串中使用網橋適配器地址。 – Cooper6581