2017-07-18 79 views
0

我想在localmode中運行Storm拓撲來測試它。該項目實質上是從卡夫卡壺中取出一些日誌,進行一些預處理操作,然後將內容打印在屏幕上。在本地運行Storm拓撲時出錯

以下是我的拓撲代碼:

import org.apache.storm.*; 
import org.apache.storm.generated.*; 
import org.apache.storm.kafka.*; 
import org.apache.storm.spout.*; 
import org.apache.storm.topology.*; 


public class LogProcessingTopology { 
    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { 

     // zookeeper hosts for the Kafka cluster 
     ZkHosts zkHosts = new ZkHosts("10.0.10.70:2181"); 

     // Create the KafkaSpout configuartion 
     // Second argument is the topic name 
     // Third argument is the zookeeper root for Kafka 
     // Fourth argument is consumer group id 
     SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "access_log", "", "0"); 

     // Specify that the kafka messages are String 
     kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 

     // We want to consume all the first messages 
     // in the topic every time we run the topology 
     // to help in debugging. In production, this 
     // property should be false 
     //kafkaConfig.forceFromStart = true; 

     // Now we create the topology 
     TopologyBuilder builder = new TopologyBuilder(); 

     // set the kafka spout class 
     builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1); 

     // set the LogSplitter, IpToCountry, Keyword, 
     // and PersistenceBolt bolts 
     // class. 
     builder.setBolt("LogSplitter", new LogSplitterBolt(), 1) 
       .globalGrouping("KafkaSpout"); 
     builder.setBolt("IpToCountry", new UserInformationGetterBolt("./home/clio/Desktop/GeoLiteCity.dat"), 1) 
       .globalGrouping("LogSplitter"); 
     builder.setBolt("LogPrinter", new LogPrinterBolt(), 1).globalGrouping("LogSplitter"); 


      // create an instance of the LocalCluster class 
      // for executing the topology in the local mode. 
      LocalCluster cluster = new LocalCluster(); 
      Config conf = new Config(); 
      conf.setDebug(true); 

      // Submit topology for execution 
      cluster.submitTopology("KafkaTopology", conf, builder.createTopology()); 

      try { 
       // Wait for some time before exiting 
       System.out.println("**********************Waiting to consume from kafka"); 
       Thread.sleep(10000); 

      } catch (Exception exception) { 
       System.out.println("******************Thread interrupted exception : " + exception); 
      } 

      // kill KafkaTopology 
      cluster.killTopology("KafkaTopology"); 

      // shut down the storm test cluster 
      cluster.shutdown(); 

     } 

} 

但在運行此命令後mvn compile exec:java -Dexec.classpathScope=compile我沒有得到有關印在屏幕上的任何東西,只有以下屏幕輸出:

8110 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.nimbus - Activating KafkaTopology: KafkaTopology-1-1500410373 
**********************Waiting to consume from kafka 
17849 [timer] INFO o.a.s.s.EvenScheduler - Available slots: (["b676dc68-345b-4474-b97c-7aadbbf9a6f6" 1024] ["b676dc68-345b-4474-b97c-7aadbbf9a6f6" 1025] ["b676dc68-345b-4474-b97c-7aadbbf9a6f6" 1026] ["edd1760f-f9b0-4804-813c-61744b693d31" 1027] ["edd1760f-f9b0-4804-813c-61744b693d31" 1028] ["edd1760f-f9b0-4804-813c-61744b693d31" 1029]) 
17868 [timer] INFO o.a.s.d.nimbus - Setting new assignment for topology id KafkaTopology-1-1500410373: #org.apache.storm.daemon.common.Assignment{:master-code-dir "/tmp/634969ab-11d9-4385-ae59-e17d04f50e40", :node->host {"b676dc68-345b-4474-b97c-7aadbbf9a6f6" "Terminus"}, :executor->node+port {[4 4] ["b676dc68-345b-4474-b97c-7aadbbf9a6f6" 1024], [3 3] ["b676dc68-345b-4474-b97c-7aadbbf9a6f6" 1024], [2 2] ["b676dc68-345b-4474-b97c-7aadbbf9a6f6" 1024], [1 1] ["b676dc68-345b-4474-b97c-7aadbbf9a6f6" 1024], [5 5] ["b676dc68-345b-4474-b97c-7aadbbf9a6f6" 1024]}, :executor->start-time-secs {[1 1] 1500410383, [2 2] 1500410383, [3 3] 1500410383, [4 4] 1500410383, [5 5] 1500410383}, :worker->resources {["b676dc68-345b-4474-b97c-7aadbbf9a6f6" 1024] [0.0 0.0 0.0]}} 
18146 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.nimbus - Delaying event :remove for 30 secs for KafkaTopology-1-1500410373 
18153 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.nimbus - Adding topo to history log: KafkaTopology-1-1500410373 
18156 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.nimbus - Shutting down master 
18158 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting 
18159 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d576d60490003 
18164 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d576d60490003 closed 
18164 [com.loggingprocess.LogProcessingTopology.main()-EventThread] INFO o.a.s.s.o.a.z.ClientCnxn - EventThread shut down 
18164 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting 
18164 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:47772 which had sessionid 0x15d576d60490003 
18165 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d576d60490004 
18175 [com.loggingprocess.LogProcessingTopology.main()-EventThread] INFO o.a.s.s.o.a.z.ClientCnxn - EventThread shut down 
18175 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d576d60490004 closed 
18175 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:47774 which had sessionid 0x15d576d60490004 
18176 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting 
18176 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d576d60490001 
18186 [com.loggingprocess.LogProcessingTopology.main()-EventThread] INFO o.a.s.s.o.a.z.ClientCnxn - EventThread shut down 
18186 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d576d60490001 closed 
18187 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:47770 which had sessionid 0x15d576d60490001 
18187 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.zookeeper - closing zookeeper connection of leader elector. 
18188 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting 
18188 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d576d60490000 
18197 [com.loggingprocess.LogProcessingTopology.main()-EventThread] INFO o.a.s.s.o.a.z.ClientCnxn - EventThread shut down 
18197 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d576d60490000 closed 
18197 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:47766 which had sessionid 0x15d576d60490000 
18198 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.nimbus - Shut down master 
18198 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting 
18199 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d576d60490006 
18208 [com.loggingprocess.LogProcessingTopology.main()-EventThread] INFO o.a.s.s.o.a.z.ClientCnxn - EventThread shut down 
18208 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d576d60490006 closed 
18208 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:47778 which had sessionid 0x15d576d60490006 
18208 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting 
18209 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d576d60490008 
18220 [com.loggingprocess.LogProcessingTopology.main()-EventThread] INFO o.a.s.s.o.a.z.ClientCnxn - EventThread shut down 
18220 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d576d60490008 closed 
18220 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:47782 which had sessionid 0x15d576d60490008 
18222 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.s.ReadClusterState - Setting Thread[SLOT_1024,5,com.loggingprocess.LogProcessingTopology] assignment to null 
18222 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.s.ReadClusterState - Setting Thread[SLOT_1025,5,com.loggingprocess.LogProcessingTopology] assignment to null 
18222 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.s.ReadClusterState - Setting Thread[SLOT_1026,5,com.loggingprocess.LogProcessingTopology] assignment to null 
18222 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.s.ReadClusterState - Waiting for Thread[SLOT_1024,5,com.loggingprocess.LogProcessingTopology] to be EMPTY, currently EMPTY 
18222 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.s.ReadClusterState - Waiting for Thread[SLOT_1025,5,com.loggingprocess.LogProcessingTopology] to be EMPTY, currently EMPTY 
18222 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.s.ReadClusterState - Waiting for Thread[SLOT_1026,5,com.loggingprocess.LogProcessingTopology] to be EMPTY, currently EMPTY 
18222 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.s.Supervisor - Shutting down supervisor b676dc68-345b-4474-b97c-7aadbbf9a6f6 
18223 [Thread-10] INFO o.a.s.e.EventManagerImp - Event manager interrupted 
18224 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting 
18225 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d576d6049000a 
18231 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d576d6049000a closed 
18231 [com.loggingprocess.LogProcessingTopology.main()-EventThread] INFO o.a.s.s.o.a.z.ClientCnxn - EventThread shut down 
18232 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:47786 which had sessionid 0x15d576d6049000a 
18232 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.s.ReadClusterState - Setting Thread[SLOT_1027,5,com.loggingprocess.LogProcessingTopology] assignment to null 
18232 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.s.ReadClusterState - Setting Thread[SLOT_1028,5,com.loggingprocess.LogProcessingTopology] assignment to null 
18232 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.s.ReadClusterState - Setting Thread[SLOT_1029,5,com.loggingprocess.LogProcessingTopology] assignment to null 
18232 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.s.ReadClusterState - Waiting for Thread[SLOT_1027,5,com.loggingprocess.LogProcessingTopology] to be EMPTY, currently EMPTY 
18232 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.s.ReadClusterState - Waiting for Thread[SLOT_1028,5,com.loggingprocess.LogProcessingTopology] to be EMPTY, currently EMPTY 
18232 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.s.ReadClusterState - Waiting for Thread[SLOT_1029,5,com.loggingprocess.LogProcessingTopology] to be EMPTY, currently EMPTY 
18232 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.s.Supervisor - Shutting down supervisor edd1760f-f9b0-4804-813c-61744b693d31 
18233 [Thread-14] INFO o.a.s.e.EventManagerImp - Event manager interrupted 
18233 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting 
18233 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d576d6049000c 
18241 [com.loggingprocess.LogProcessingTopology.main()-EventThread] INFO o.a.s.s.o.a.z.ClientCnxn - EventThread shut down 
18241 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d576d6049000c closed 
18242 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:47790 which had sessionid 0x15d576d6049000c 
18243 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.testing - Shutting down in process zookeeper 
18243 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - NIOServerCnxn factory exited run method 
18244 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - shutting down 
18244 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.s.o.a.z.s.SessionTrackerImpl - Shutting down 
18244 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Shutting down 
18244 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.s.o.a.z.s.SyncRequestProcessor - Shutting down 
18244 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - PrepRequestProcessor exited loop! 
18244 [SyncThread:0] INFO o.a.s.s.o.a.z.s.SyncRequestProcessor - SyncRequestProcessor exited! 
18244 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.s.o.a.z.s.FinalRequestProcessor - shutdown of request processor complete 
18245 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.testing - Done shutting down in process zookeeper 
18245 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.testing - Deleting temporary path /tmp/634969ab-11d9-4385-ae59-e17d04f50e40 
18255 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.testing - Deleting temporary path /tmp/6a839f7c-1379-4112-b0bb-51138fd98f33 
18255 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.testing - Deleting temporary path /tmp/8c4c315a-9689-49b6-be65-659922ede1cb 
18256 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.testing - Deleting temporary path /tmp/30ba6e95-5040-4162-9a0d-ed95c4fc1778 
18288 [SessionTracker] INFO o.a.s.s.o.a.z.s.SessionTrackerImpl - SessionTrackerImpl exited loop! 

在這點的拓撲結構應該讀取卡夫卡內容(日誌),然後將它們打印在屏幕上,但這不會發生。

最後,這是我的pom.xml:

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 

    <groupId>com.learningstorm</groupId> 
    <artifactId>streaming</artifactId> 
    <version>1.0-SNAPSHOT</version> 

    <properties> 
     <storm.version>1.1.0</storm.version> 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
     <junit.version>4.12</junit.version> 
     <log4j.version>1.2.17</log4j.version> 
    </properties> 

    <name>loggingprocess</name> 

    <repositories> 
     <repository> 
      <id>clojars.org</id> 
      <url>http://clojars.org/repo</url> 
     </repository> 
     <repository> 
      <id>geoip</id> 
      <url>http://snambi.github.com/maven/</url> 
     </repository> 
     <repository> 
      <releases> 
       <enabled>true</enabled> 
      </releases> 
      <snapshots> 
       <enabled>false</enabled> 
      </snapshots> 
      <id>central</id> 
      <url>http://repo1.maven.org/maven2/</url> 
     </repository> 
    </repositories> 




    <dependencies> 

     <!-- Storm Dependencies --> 
     <dependency> 
      <groupId>org.apache.storm</groupId> 
      <artifactId>storm-core</artifactId> 
      <version>${storm.version}</version> 
      <scope>provided</scope> 
      <exclusions> 
       <exclusion> 
        <groupId>org.slf4j</groupId> 
        <artifactId>log4j-over-slf4j</artifactId> 
       </exclusion> 
       <exclusion> 
        <groupId>org.slf4j</groupId> 
        <artifactId>slf4j-log4j12</artifactId> 
       </exclusion> 
      </exclusions> 
     </dependency> 


     <!-- Storm Kafka Dependencies --> 

     <dependency> 
      <groupId>org.apache.storm</groupId> 
      <artifactId>storm-kafka</artifactId> 
      <version>${storm.version}</version> 
      <exclusions> 
       <exclusion> 
        <groupId>org.slf4j</groupId> 
        <artifactId>log4j-over-slf4j</artifactId> 
       </exclusion> 
       <exclusion> 
        <groupId>org.slf4j</groupId> 
        <artifactId>slf4j-log4j12</artifactId> 
       </exclusion> 
      </exclusions> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka-clients</artifactId> 
      <version>0.11.0.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka_2.11</artifactId> 
      <version>0.11.0.0</version> 
      <exclusions> 
       <exclusion> 
        <groupId>javax.jms</groupId> 
        <artifactId>jms</artifactId> 
       </exclusion> 
       <exclusion> 
        <groupId>com.sun.jdmk</groupId> 
        <artifactId>jmxtools</artifactId> 
       </exclusion> 
       <exclusion> 
        <groupId>com.sun.jmx</groupId> 
        <artifactId>jmxri</artifactId> 
       </exclusion> 
      </exclusions> 
     </dependency> 


     <!-- Storm Hbase Dependencies --> 
     <dependency> 
      <groupId>org.apache.storm</groupId> 
      <artifactId>storm-hbase</artifactId> 
      <version>${storm.version}</version> 
      <exclusions> 
       <exclusion> 
        <groupId>org.slf4j</groupId> 
        <artifactId>log4j-over-slf4j</artifactId> 
       </exclusion> 
       <exclusion> 
        <groupId>org.slf4j</groupId> 
        <artifactId>slf4j-log4j12</artifactId> 
       </exclusion> 
      </exclusions> 
     </dependency> 

     <!-- Test --> 
     <dependency> 
      <groupId>junit</groupId> 
      <artifactId>junit</artifactId> 
      <version>${junit.version}</version> 
      <scope>test</scope> 
     </dependency> 

     <!--Other Dependencies --> 
     <dependency> 
      <groupId>com.yammer.metrics</groupId> 
      <artifactId>metrics-core</artifactId> 
      <version>2.2.0</version> 
     </dependency> 
     <dependency> 
      <groupId>commons-collections</groupId> 
      <artifactId>commons-collections</artifactId> 
      <version>3.2.2</version> 
     </dependency> 
     <dependency> 
      <groupId>com.google.guava</groupId> 
      <artifactId>guava</artifactId> 
      <version>22.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.geomind</groupId> 
      <artifactId>geoip</artifactId> 
      <version>1.2.8</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.zookeeper</groupId> 
      <artifactId>zookeeper</artifactId> 
      <version>3.4.10</version> 
      <type>pom</type> 
     </dependency> 
     <dependency> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
      <version>1.2.17</version> 
     </dependency> 
     <dependency> 
      <groupId>org.scala-lang.modules</groupId> 
      <artifactId>scala-parser-combinators_2.11</artifactId> 
      <version>1.0.4</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.maven.plugins</groupId> 
      <artifactId>maven-jar-plugin</artifactId> 
      <version>3.0.2</version> 
     </dependency> 

    </dependencies> 


    <build> 
     <plugins> 
      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-compiler-plugin</artifactId> 
       <version>3.3</version> 
       <configuration> 
        <source>1.7</source> 
        <target>1.7</target> 
       </configuration> 
       <executions> 
        <execution> 
         <id>make-assembly</id> 
         <phase>package</phase> 
         <goals> 
          <goal>compile</goal> 
         </goals> 
        </execution> 
       </executions> 
      </plugin> 
      <plugin> 
       <artifactId>maven-assembly-plugin</artifactId> 
       <configuration> 
        <descriptorRefs> 
         <descriptorRef>jar-with-dependencies</descriptorRef> 
        </descriptorRefs> 
        <archive> 
         <manifest> 
          <mainClass></mainClass> 
         </manifest> 
        </archive> 
       </configuration> 
       <executions> 
        <execution> 
         <id>make-assembly</id> 
         <phase>package</phase> 
         <goals> 
          <goal>single</goal> 
         </goals> 
        </execution> 
       </executions> 
      </plugin> 
      <plugin> 
       <groupId>org.codehaus.mojo</groupId> 
       <artifactId>exec-maven-plugin</artifactId> 
       <version>1.2.1</version> 
       <executions> 
        <execution> 
         <goals> 
          <goal>exec</goal> 
         </goals> 
        </execution> 
       </executions> 
       <configuration> 
        <executable>java</executable> 
        <includeProjectDependencies>true</includeProjectDependencies> 
        <includePluginDependencies>false</includePluginDependencies> 
        <classpathScope>compile</classpathScope> 
        <mainClass>com.loggingprocess.LogProcessingTopology</mainClass> 
       </configuration> 
      </plugin> 
     </plugins> 
    </build> 

    </project> 

我使用風暴1.1.0和卡夫卡0.11.0和動物園管理員3.4.10。 我的實際Zookeeper ip:port實際上是一臺真正的機器,我指的是運行一個zookeeper實例。在我的噴口中,我也參考了Kafka BUT的一個實際運行實例,即使我將它們切換到localhost:也會發生同樣的情況。

可能是這個錯誤的原因是什麼?

編輯:另外,如果設置我的拓撲

builder.setBolt("LogSplitter", new LogSplitterBolt(), 1) 
     .globalGrouping("KafkaSpout"); 
builder.setBolt("IpToCountry", new UserInformationGetterBolt("./home/clio/Desktop/GeoLiteCity.dat"), 1) 
     .globalGrouping("LogSplitter"); 
builder.setBolt("LogPrinter", new LogPrinterBolt(), 1).globalGrouping("IpToCountry"); 

然後我得到一個正確的錯誤消息:

java.lang.RuntimeException: java.lang.InterruptedException 
    at org.apache.storm.utils.Utils.wrapInRuntime(Utils.java:1531) ~[storm-core-1.1.0.jar:1.1.0] 
    at org.apache.storm.zookeeper.Zookeeper.getChildren(Zookeeper.java:265) ~[storm-core-1.1.0.jar:1.1.0] 
    at org.apache.storm.cluster.ZKStateStorage.get_children(ZKStateStorage.java:174) ~[storm-core-1.1.0.jar:1.1.0] 
    at org.apache.storm.cluster.StormClusterStateImpl.assignments(StormClusterStateImpl.java:153) ~[storm-core-1.1.0.jar:1.1.0] 
    at org.apache.storm.daemon.supervisor.ReadClusterState.run(ReadClusterState.java:126) [storm-core-1.1.0.jar:1.1.0] 
    at org.apache.storm.event.EventManagerImp$1.run(EventManagerImp.java:54) [storm-core-1.1.0.jar:1.1.0] 
Caused by: java.lang.InterruptedException 
    at java.lang.Object.wait(Native Method) ~[?:1.8.0_131] 
    at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_131] 
    at org.apache.storm.shade.org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1342) ~[storm-core-1.1.0.jar:1.1.0] 
    at org.apache.storm.shade.org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1588) ~[storm-core-1.1.0.jar:1.1.0] 
    at org.apache.storm.shade.org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1625) ~[storm-core-1.1.0.jar:1.1.0] 
    at org.apache.storm.shade.org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:226) ~[storm-core-1.1.0.jar:1.1.0] 
    at org.apache.storm.shade.org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:219) ~[storm-core-1.1.0.jar:1.1.0] 
    at org.apache.storm.shade.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:109) ~[storm-core-1.1.0.jar:1.1.0] 
    at org.apache.storm.shade.org.apache.curator.framework.imps.GetChildrenBuilderImpl.pathInForeground(GetChildrenBuilderImpl.java:216) ~[storm-core-1.1.0.jar:1.1.0] 
    at org.apache.storm.shade.org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:207) ~[storm-core-1.1.0.jar:1.1.0] 
    at org.apache.storm.shade.org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:40) ~[storm-core-1.1.0.jar:1.1.0] 
    at org.apache.storm.zookeeper.Zookeeper.getChildren(Zookeeper.java:260) ~[storm-core-1.1.0.jar:1.1.0] 
    ... 4 more 
24506 [Thread-10] INFO o.a.s.e.EventManagerImp - Event manager interrupted 
24506 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting 
24507 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d57baaf05000a 
24548 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d57baaf05000a closed 
24548 [com.loggingprocess.LogProcessingTopology.main()-EventThread] INFO o.a.s.s.o.a.z.ClientCnxn - EventThread shut down 
24549 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:51562 which had sessionid 0x15d57baaf05000a 
24549 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.s.ReadClusterState - Setting Thread[SLOT_1027,5,com.loggingprocess.LogProcessingTopology] assignment to null 
24549 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.s.ReadClusterState - Setting Thread[SLOT_1028,5,com.loggingprocess.LogProcessingTopology] assignment to null 
24549 [com.loggingprocess.LogProcessingTopology.main()] INFO o.a.s.d.s.ReadClusterState - Setting Thread[SLOT_1029,5,com.loggingprocess.LogProcessingTopology] assignment to null 

回答

0

我設法解決這個問題幾次測試之後。

用下面的測試環境中運行:
- 視窗7 SP1
- 阿帕奇風暴1.0.3
- 爪哇1.8.0_111
- 方法的Eclipse的Mars.2(4.5.2)

實施例運行在本地羣集拓撲:方法的

private void runTopology(final StormTopology topology, final String topologyName, final long timeout) { 
    LocalCluster localCluster = new LocalCluster(); 

    // topology configuration. 
    final Config configuration = configureTopology(); 
    configuration.setDebug(true); 

    // submit the topology to local cluster. 
    localCluster.submitTopology(name, configuration, topology); 

    if (timeout >= 0) { 
     Utils.sleep(timeout); 

     // kill the topology 
     final KillOptions killOptions = new KillOptions(); 
     killOptions.set_wait_secs(0); 
     localCluster.killTopologyWithOpts(name, killOptions); 

     // wait until the topology is removed from the cluster 
     while (topologyExists(name)) { 
     // avoid cpu overuse 
      Utils.sleep(1000); 
     } 

     // for some reason I have to wait to be sure topology is stopped and local cluster can be shutdown 
     Utils.sleep(5000); 
     localCluster.shutdown(); 

    } 
} 

實施例,以檢查拓撲仍然在本地羣集運行:

private final boolean topologyExists(final String topologyName) { 

    // list all the topologies on the local cluster 
    final List<TopologySummary> topologies = localCluster.getClusterInfo().get_topologies(); 

    // search for a topology with the topologyName 
    if (null != topologies && !topologies.isEmpty()) { 
     final List<TopologySummary> collect = topologies.stream() 
       .filter(p -> p.get_name().equals(topologyName)).collect(Collectors.toList()); 
     if (null != collect && !collect.isEmpty()) { 
      return true; 
     } 
    } 
    return false; 
} 

我希望這會有所幫助。