2016-11-06 86 views
1

因此我有一個使用Spotify/kafka(在Windows 10上運行)運行的本地Docker Container。 在這個容器中有數據發佈在一個名爲「數據」的主題中。在Docker上運行的Zeppelin中使用Spark的卡夫卡

我可以在Eclipse中像這樣運行的Java應用程序消耗的數據:

private String topicName; 
    private ConsumerConfig config; 

    public KafkaConsumer() throws Exception { 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "localhost:9092"); 
     props.put("zookeeper.connect", "localhost:2181"); 
     props.put("group.id", "test"); 
     props.put("client.id", this.getClass().getSimpleName()); 
     props.put("key.deserializer", StringDeserializer.class.getName()); 
     props.put("value.deserializer", StringDeserializer.class.getName()); 
     props.put("partition.assignment.strategy", "range"); 

     this.config = new ConsumerConfig(props); 
     this.topicName = "data"; 
    } 

    public void run() { 
     ConsumerConnector connector = Consumer.createJavaConsumerConnector(config); 
     Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = connector.createMessageStreams(ImmutableMap.of(topicName, 1)); 
     List<KafkaStream<byte[], byte[]>> streams = messageStreams.get(topicName); 
     ExecutorService executor = Executors.newFixedThreadPool(streams.size()); 

     for (final KafkaStream<byte[], byte[]> stream : streams) { 
      executor.submit(new Runnable() { 
       public void run() { 
        ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); 
        while (iterator.hasNext()) { 
         MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next(); 
         String m = new String(messageAndMetadata.message()); 

         System.out.println(m); 
        } 
       } 
      }); 
     } 
    } 

我現在想的是消耗在dylanmei /飛艇泊塢窗容器的同一主題。我想在一個齊柏林筆記本運行以下命令:

import org.apache.kafka.clients.consumer.ConsumerRecord 
import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark.streaming.kafka010._ 
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 


val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "localhost:9092", 
    "zookeeper.connect" -> "localhost:2181", 
    "group.id" -> "example", 
    "key.deserializer" -> classOf[StringDeserializer], 
    "value.deserializer" -> classOf[StringDeserializer], 
    "partition.assignment.strategy" -> "range", 
) 

val topics = Array("data") 
val stream = KafkaUtils.createDirectStream[String, String](
    sc, 
    PreferConsistent, 
    Subscribe[String, String](topics, kafkaParams) 
) 

stream.map(record => (record.key, record.value)) 

這是我從spark kafka integration guide

採取但是,所有我曾經得到的是一個連接被拒絕(當我插入了錯誤的端口,也會發生,所以告訴我什麼都沒有):

java.net.ConnectException: Connection refused 
at java.net.PlainSocketImpl.socketConnect(Native Method) 
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) 
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) 
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) 
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
at java.net.Socket.connect(Socket.java:589) 
at org.apache.thrift.transport.TSocket.open(TSocket.java:182) 
at org.apache.zeppelin.interpreter.remote.ClientFactory.create(ClientFactory.java:51) 
at org.apache.zeppelin.interpreter.remote.ClientFactory.create(ClientFactory.java:37) 
at org.apache.commons.pool2.BasePooledObjectFactory.makeObject(BasePooledObjectFactory.java:60) 
at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:861) 
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435) 
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363) 
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess.getClient(RemoteInterpreterProcess.java:96) 
at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.init(RemoteInterpreter.java:216) 
at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getFormType(RemoteInterpreter.java:385) 
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.getFormType(LazyOpenInterpreter.java:105) 
at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:306) 
at org.apache.zeppelin.scheduler.Job.run(Job.java:176) 
at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:329) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

希望有人能幫助我讓它運行,我沒有線索。

謝謝您的問候!

+0

我會做的第一件事就是登錄到docker和telnet端口2181和9092,以確定進程是否正在偵聽 – user2230605

+0

您是否曾嘗試過使用'--net =「host」'?我認爲默認是橋接,您需要在連接字符串中使用網橋適配器地址。 – Cooper6581

回答

0

我認爲這應該是一個火花解釋器問題。我有同樣的問題,當我重新開始我的翻譯一切順利。你也可以檢查你的解釋器的配置。有時候,如果解釋器配置不好,你會得到這個錯誤。