2017-06-06 149 views
3

我是scala的新手,我試圖用Scala和Java創建一個混合項目。 但是我在運行測試代碼時遇到了一些問題。

<properties> 
    <scala.version>2.12.2</scala.version> 
</properties> 

<dependencies> 
    <dependency> 
     <groupId>org.scala-lang</groupId> 
     <artifactId>scala-library</artifactId> 
     <version>${scala.version}</version> 
    </dependency> 
    <dependency> 
     <groupId>org.scala-lang</groupId> 
     <artifactId>scala-compiler</artifactId> 
     <version>${scala.version}</version> 
    </dependency> 
    <dependency> 
     <groupId>org.scala-lang</groupId> 
     <artifactId>scala-reflect</artifactId> 
     <version>${scala.version}</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.11</artifactId> 
     <version>2.1.1</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> 
     <version>2.1.1</version> 
    </dependency> 
</dependencies> 

<build> 
    <plugins> 
     <plugin> 
      <groupId>org.scala-tools</groupId> 
      <artifactId>maven-scala-plugin</artifactId> 
      <version>2.15.2</version> 
      <executions> 
       <execution> 
        <id>compile</id> 
        <goals> 
         <goal>compile</goal> 
        </goals> 
        <phase>compile</phase> 
       </execution> 
       <execution> 
        <id>test-compile</id> 
        <goals> 
         <goal>testCompile</goal> 
        </goals> 
        <phase>test-compile</phase> 
       </execution> 
       <execution> 
        <phase>process-resources</phase> 
        <goals> 
         <goal>compile</goal> 
        </goals> 
       </execution> 
      </executions> 
     </plugin> 
     <plugin> 
      <artifactId>maven-compiler-plugin</artifactId> 
      <configuration> 
       <source>1.5</source> 
       <target>1.5</target> 
      </configuration> 
     </plugin> 
    </plugins> 
</build> 

我的代碼如下:

class BptConsumer { 

def consumeLogevent(): Unit ={ 
    val conf = new SparkConf().setMaster("local[2]").setAppName("PVStatistics"); 
    val ssc = new StreamingContext(conf,Seconds(5)); 

    val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "172.20.13.196:9092", 
    "key.deserializer" -> classOf[StringDeserializer], 
    "value.deserializer" -> classOf[StringDeserializer], 
    "group.id" -> "1", 
    "auto.offset.reset" -> "latest", 
    "enable.auto.commit" -> (false: java.lang.Boolean) 
) 

    val topics = Array("fd-blogs-tst") 

    val stream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    Subscribe[String, String](topics, kafkaParams) 
) 
    /*val rdd = stream.transform(x=>RDD[String]);*/ 
    val lines = stream.map(record => (record.key,record.value)) 

    lines.print(); 
    ssc.start(); 
    ssc.awaitTermination(); 
} 
} 
莫非

別人的幫助,當我運行測試,我如下得到一個錯誤

[]

和我的pom.xml我在找出這個問題?

回答

9

您正在Scala 2.12.2中使用Scala 2.11構建的Spark庫。改變你的斯卡拉版本到2.11版本:

<properties> 
    <scala.version>2.11.11</scala.version> 
</properties>