0

我想將Kafka + Cassandra連接到Spark 1.5.1。Spark 1.5.1 + Scala 2.10 + Kafka + Cassandra = Java.lang.NoSuchMethodError:

庫的版本:

scalaVersion := "2.10.6" 

libraryDependencies ++= Seq(
    "org.apache.spark" % "spark-streaming_2.10" % "1.5.1", 
    "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.1", 
    "com.datastax.spark" % "spark-cassandra-connector_2.10" % "1.5.0-M2" 
) 

初始化,並使用到應用程序:

val sparkConf = new SparkConf(true) 
     .setMaster("local[2]") 
     .setAppName("KafkaStreamToCassandraApp") 
     .set("spark.executor.memory", "1g") 
     .set("spark.cores.max", "1") 
     .set("spark.cassandra.connection.host", "127.0.0.1") 

創建模式爲卡桑德拉是這樣的:

CassandraConnector(sparkConf).withSessionDo { session => 
     session.execute(s"DROP KEYSPACE IF EXISTS kafka_streaming") 
     session.execute(s"CREATE KEYSPACE IF NOT EXISTS kafka_streaming WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }") 
     session.execute(s"CREATE TABLE IF NOT EXISTS kafka_streaming.wordcount (word TEXT PRIMARY KEY, count COUNTER)") 
     session.execute(s"TRUNCATE kafka_streaming.wordcount") 
    } 
準備 jar

而且,創建幾個策略:

assemblyMergeStrategy in assembly := { 
    case PathList("com", "esotericsoftware", [email protected]_*) => MergeStrategy.last 
    case PathList("com", "google", [email protected]_*) => MergeStrategy.first 
    case PathList("org", "apache", [email protected]_*) => MergeStrategy.last 
    case PathList("io", "netty", [email protected]_*) => MergeStrategy.last 
    case PathList("com", "codahale", [email protected]_*) => MergeStrategy.last 
    case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.first 

我認爲這個問題與

case PathList("com", "google", [email protected]_*) => MergeStrategy.first 

捆綁使用MergeStrategy.last連接。

任何想法?

了異常:

Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.reflect.TypeToken.isPrimitive()Z 
     at com.datastax.driver.core.TypeCodec.<init>(TypeCodec.java:142) 
     at com.datastax.driver.core.TypeCodec.<init>(TypeCodec.java:136) 
     at com.datastax.driver.core.TypeCodec$BlobCodec.<init>(TypeCodec.java:609) 
     at com.datastax.driver.core.TypeCodec$BlobCodec.<clinit>(TypeCodec.java:606) 
     at com.datastax.driver.core.CodecRegistry.<clinit>(CodecRegistry.java:147) 
     at com.datastax.driver.core.Configuration$Builder.build(Configuration.java:259) 
     at com.datastax.driver.core.Cluster$Builder.getConfiguration(Cluster.java:1135) 
     at com.datastax.driver.core.Cluster.<init>(Cluster.java:111) 
     at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:178) 
     at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:1152) 
     at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createCluster(CassandraConnectionFactory.scala:85) 
     at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:155) 
+0

這是你的整個代碼??? – eliasah

+0

對不起,擴展,請你檢查一下吧 – scalaz

+0

你是如何構建你的應用的?你在使用程序集插件嗎? – eliasah

回答

0

基於錯誤

[error] /home/user/.ivy2/cache/org.apache.spark/spark-network-common_2.10/jars/spark-network-common_2.10-1.5.0.jar:com/google/common/base/Optional.class 
[error] /home/user/.ivy2/cache/com.google.guava/guava/bundles/guava-16.0.1.jar:com/google/common/base/Optional.class 

似乎最後是最新的一個,也許就可以把:

case PathList("com", "google", "common", "base", [email protected]_*) => MergeStrategy.last