2015-04-17 76 views
0

我試圖從1.2版本使用新的連接功能,但我得到repl的repartitionByCassandraReplica函數的錯誤。cassandra火花連接器錯誤與repartitionByCassandraReplica函數

我試圖複製網站的例子,一對夫婦的元素創造了一個卡桑德拉表(shopping_history): https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.mde

import com.datastax.spark.connector.rdd._ 
import com.datastax.spark.connector.cql.CassandraConnector 
import com.datastax.spark.connector._ 
import com.datastax.driver.core._ 

case class CustomerID(cust_id: Int) 
val idsOfInterest = sc.parallelize(1 to 1000).map(CustomerID(_)) 
val repartitioned = idsOfInterest.repartitionByCassandraReplica("cim_dev", "shopping_history", 10) 
repartitioned.first() 

我得到這個錯誤:

15/04/13 18:35:43 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, dev2-cim.aid.fr): java.lang.ClassNotFoundException: $line31.$read$$iwC$$iwC$CustomerID 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:372) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:360) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at java.lang.Class.forName0(Native Method) 
    at java.lang.Class.forName(Class.java:344) 
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59) 
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) 
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) 
    at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) 
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) 
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) 
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:1098) 
    at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:1098) 
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) 
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
    at org.apache.spark.scheduler.Task.run(Task.scala:56) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

我使用spark 1.2.0和connector 1.2.0 RC 3. idsOfInterest上使用的joinWithCassandraTable函數可以工作。

我也很好奇joinWithCassandraTable/cassandraTable與In子句/ foreachPartition(withSessionDo)語法之間的區別。

它們是否都將數據請求到作爲協調器的本地節點? joinWithCassandraTable與repartitionByCassandraReplica結合如同異步查詢一樣高效,僅向本地節點請求數據?如果不應用repartitionByCassandraReplica會發生什麼情況?

我已經問過這個問題上卡桑德拉連接器的谷歌小組論壇: https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/b615ANGSySc

感謝

+0

我不知道你的類加載器的問題,不知道你是如何運行這個代碼,你可以給我們提交你的提交命令或啓動命令嗎? – RussS

+0

@RussS,我的啓動命令是spark-shell :),其中spark.executor.extraClassPath/spark.driver.extraClassPath在spark-default.conf中設置到cassandra連接器jar中。奇怪的是,找不到的類是在shell中創建的... – Alex

+0

使用完整的程序集?另外嘗試--jar有時會在某些版本的spark上出現一些classloader怪異。 – RussS

回答

2

我會在這裏回答你的第二個問題,如果我能與第一部分隨動根據更多信息來了解情況。

我也好奇betwween的差異: joinWithCassandraTable/cassandraTable用在從句/ foreachPartition(withSessionDo)語法。

帶有in子句的cassandraTable將創建一個單獨的spark分區。所以對於非常小的子句可能沒問題,但子句必須從驅動程序序列化到火花應用程序。這對於大的子句可能是非常糟糕的,一般來說,如果我們不需要,我們不希望將數據從火花驅動程序來回發送給執行者。

joinWithCassandraTableforeachPartition(withSessionDo)非常相似。主要區別在於joinWithCassandraTable調用使用Connector轉換和閱讀代碼,這將使得從Cassandra行中獲取Scala對象變得更加容易。在這兩種情況下,您的數據都將保持RDD格式,並且不會被序列化回驅動程序。他們還將使用先前RDD的分區器(或最後一個公開preferredLocation方法的RDD),以便他們能夠使用repartitionByCassandraTable進行工作。

如果未應用repartitionByCassandraTable,則會在可能或可能不是您要請求的信息的協調者的節點上請求數據。這將在您的查詢中添加額外的網絡躍點,但這可能不是一個非常大的性能損失。在加入之前,您想重新分配的時間實際上取決於數據的總量和重新分配操作中火花洗牌的成本。

+0

感謝您的回答RussS。我仍然對「可能會或可能不是協調人」部分感到好奇。到目前爲止,我使用帶IN子句的foreachPartition(withSessionDo)語法來查詢關於一批customerID(Part.Key)的一些時間序列信息,並且遇到了非常大的查詢中的一些問題。我想知道是否每個執行者都在查詢本地cassandra作爲一個協調器,從而產生大量的網絡流量和CPU負載。這就是爲什麼我在joinWithCassandraTable中插入內容。那麼,執行者什麼時候需要協調員? 1.1和1.2之間的行爲有何不同? – Alex

+0

in子句還執行連接不會執行的服務器端multiget。至於協調員,如果只有一個多維數據集查詢,則每個協調器只能獲得一個協調器。 – RussS