0

Ahoi全能的開發者。我在Spark中運行一些基本分析,我正在查詢多節點Cassandra。我跑哪裏我處理一些unlogic的代碼是:大查詢時間Spark + Cassandra

import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.SparkContext 
import com.datastax.spark.connector._ 


object cassSpark { 
    def main(args: Array[String]): Unit = { 

val conf = new SparkConf() 
    .set("spark.cassandra.connection.host","192.168.56.101") 
    .set("spark.cassandra.connection.host","192.168.56.102") 
    .set("spark.cassandra.connection.local_dc", "datacenter1") 
    .setMaster("local[*]") 
    .setAppName("cassSpark") 

val sc = new SparkContext(conf) 
val sqlContext = new SQLContext(sc) 



time{ 
val df = sqlContext 
.read 
.format("org.apache.spark.sql.cassandra") 
.options(Map("table" -> "nth1", "keyspace"->"nth", "cluster" -> "Test Cluster")) 
.load().cache() 

df.count() 


} 


def time[A](f: => A) = { 
    val s = System.nanoTime 
    val ret = f 
    println("time: " + (System.nanoTime - s)/1e6 + "ms") 
    ret 
} 
} 
} 

所以,星火的版本是1.6.0,卡桑德拉v3.0.10,連接器也是1.6.0。 Keyspace有replication_factor: 1,桌子有5列,實際上只有1排。你可以看到有兩個節點(在OracleVM中製作的虛擬機)。

我的問題是,當我測量查詢的時間從火花到卡桑德拉我得到了約20秒結果怎麼一回事,因爲沒有在表中只有一行這是不正常的我。我是否錯過了某些東西,或者我正在測量某些錯誤,或者可能正在使用我的代碼。有人能幫助我,或者告訴我如何有效地做到這一點或處理它。

[編輯]

由於@Artem阿利耶夫想,這是整個信息日誌:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
16/12/06 12:31:04 INFO SparkContext: Running Spark version 1.6.0 
16/12/06 12:31:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
16/12/06 12:31:07 INFO SecurityManager: Changing view acls to: superbrainbug 
16/12/06 12:31:07 INFO SecurityManager: Changing modify acls to: Ivan Majnaric 
16/12/06 12:31:07 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(superbrainbug); users with modify permissions: Set(superbrainbug) 
16/12/06 12:31:12 INFO Utils: Successfully started service 'sparkDriver' on port 62101. 
16/12/06 12:31:14 INFO Slf4jLogger: Slf4jLogger started 
16/12/06 12:31:14 INFO Remoting: Starting remoting 
16/12/06 12:31:15 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:62114] 
16/12/06 12:31:15 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 62114. 
16/12/06 12:31:15 INFO SparkEnv: Registering MapOutputTracker 
16/12/06 12:31:15 INFO SparkEnv: Registering BlockManagerMaster 
16/12/06 12:31:15 INFO DiskBlockManager: Created local directory at C:\Users\superbrainbug\AppData\Local\Temp\blockmgr-8b664e71-ead1-4462-b171-bf542a5eb444 
16/12/06 12:31:15 INFO MemoryStore: MemoryStore started with capacity 1124.6 MB 
16/12/06 12:31:18 INFO SparkEnv: Registering OutputCommitCoordinator 
16/12/06 12:31:20 INFO Utils: Successfully started service 'SparkUI' on port 4040. 
16/12/06 12:31:20 INFO SparkUI: Started SparkUI at http://192.168.56.1:4040 
16/12/06 12:31:20 INFO Executor: Starting executor ID driver on host localhost 
16/12/06 12:31:20 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 62136. 
16/12/06 12:31:20 INFO NettyBlockTransferService: Server created on 62136 
16/12/06 12:31:20 INFO BlockManagerMaster: Trying to register BlockManager 
16/12/06 12:31:21 INFO BlockManagerMasterEndpoint: Registering block manager localhost:62136 with 1124.6 MB RAM, BlockManagerId(driver, localhost, 62136) 
16/12/06 12:31:21 INFO BlockManagerMaster: Registered BlockManager 
16/12/06 12:31:23 WARN NettyUtil: Found Netty's native epoll transport, but not running on linux-based operating system. Using NIO instead. 
16/12/06 12:31:24 INFO Cluster: New Cassandra host /192.168.56.101:9042 added 
16/12/06 12:31:24 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster 
16/12/06 12:31:27 INFO CassandraSourceRelation: Input Predicates: [] 
16/12/06 12:31:27 INFO CassandraSourceRelation: Input Predicates: [] 
16/12/06 12:31:31 INFO SparkContext: Starting job: count at cassSpark.scala:69 
16/12/06 12:31:31 INFO DAGScheduler: Registering RDD 7 (count at cassSpark.scala:69) 
16/12/06 12:31:31 INFO DAGScheduler: Got job 0 (count at cassSpark.scala:69) with 1 output partitions 
16/12/06 12:31:31 INFO DAGScheduler: Final stage: ResultStage 1 (count at cassSpark.scala:69) 
16/12/06 12:31:31 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0) 
16/12/06 12:31:31 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0) 
16/12/06 12:31:31 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[7] at count at cassSpark.scala:69), which has no missing parents 
16/12/06 12:31:35 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 20.0 KB, free 20.0 KB) 
16/12/06 12:31:35 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 9.2 KB, free 29.2 KB) 
16/12/06 12:31:35 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:62136 (size: 9.2 KB, free: 1124.6 MB) 
16/12/06 12:31:35 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006 
16/12/06 12:31:35 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[7] at count at cassSpark.scala:69) 
16/12/06 12:31:35 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 
16/12/06 12:31:35 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,ANY, 20008 bytes) 
16/12/06 12:31:35 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 
16/12/06 12:31:35 INFO CacheManager: Partition rdd_4_0 not found, computing it 
16/12/06 12:31:36 INFO GenerateUnsafeProjection: Code generated in 395.192925 ms 
16/12/06 12:31:39 INFO MemoryStore: Block rdd_4_0 stored as values in memory (estimated size 1168.0 B, free 30.3 KB) 
16/12/06 12:31:39 INFO BlockManagerInfo: Added rdd_4_0 in memory on localhost:62136 (size: 1168.0 B, free: 1124.6 MB) 
16/12/06 12:31:39 INFO GeneratePredicate: Code generated in 35.193967 ms 
16/12/06 12:31:39 INFO GenerateColumnAccessor: Code generated in 52.555004 ms 
16/12/06 12:31:39 INFO GenerateMutableProjection: Code generated in 14.129896 ms 
16/12/06 12:31:39 INFO GenerateUnsafeProjection: Code generated in 14.130749 ms 
16/12/06 12:31:40 INFO GenerateMutableProjection: Code generated in 19.217034 ms 
16/12/06 12:31:40 INFO GenerateUnsafeRowJoiner: Code generated in 72.736302 ms 
16/12/06 12:31:40 INFO GenerateUnsafeProjection: Code generated in 133.407346 ms 
16/12/06 12:31:41 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 3895 bytes result sent to driver 
16/12/06 12:31:41 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 5452 ms on localhost (1/1) 
16/12/06 12:31:41 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/12/06 12:31:41 INFO DAGScheduler: ShuffleMapStage 0 (count at cassSpark.scala:69) finished in 5,542 s 
16/12/06 12:31:41 INFO DAGScheduler: looking for newly runnable stages 
16/12/06 12:31:41 INFO DAGScheduler: running: Set() 
16/12/06 12:31:41 INFO DAGScheduler: waiting: Set(ResultStage 1) 
16/12/06 12:31:41 INFO DAGScheduler: failed: Set() 
16/12/06 12:31:41 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[10] at count at cassSpark.scala:69), which has no missing parents 
16/12/06 12:31:41 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 9.3 KB, free 39.6 KB) 
16/12/06 12:31:41 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.6 KB, free 44.1 KB) 
16/12/06 12:31:41 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:62136 (size: 4.6 KB, free: 1124.6 MB) 
16/12/06 12:31:41 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006 
16/12/06 12:31:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[10] at count at cassSpark.scala:69) 
16/12/06 12:31:41 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 
16/12/06 12:31:41 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,NODE_LOCAL, 1999 bytes) 
16/12/06 12:31:41 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 
16/12/06 12:31:41 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 
16/12/06 12:31:41 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 15 ms 
16/12/06 12:31:41 INFO GenerateMutableProjection: Code generated in 92.969655 ms 
16/12/06 12:31:41 INFO GenerateMutableProjection: Code generated in 11.48414 ms 
16/12/06 12:31:41 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1830 bytes result sent to driver 
16/12/06 12:31:41 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 602 ms on localhost (1/1) 
16/12/06 12:31:41 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/12/06 12:31:41 INFO DAGScheduler: ResultStage 1 (count at cassSpark.scala:69) finished in 0,605 s 
16/12/06 12:31:41 INFO DAGScheduler: Job 0 finished: count at cassSpark.scala:69, took 10,592173 s 
time: 19242.858679ms 
16/12/06 12:31:48 INFO CassandraConnector: Disconnected from Cassandra cluster: Test Cluster 
16/12/06 12:31:48 INFO SparkContext: Invoking stop() from shutdown hook 
16/12/06 12:31:48 INFO SerialShutdownHooks: Successfully executed shutdown hook: Clearing session cache for C* connector 
16/12/06 12:31:48 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040 
16/12/06 12:31:48 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
16/12/06 12:31:48 INFO MemoryStore: MemoryStore cleared 
16/12/06 12:31:48 INFO BlockManager: BlockManager stopped 
16/12/06 12:31:48 INFO BlockManagerMaster: BlockManagerMaster stopped 
16/12/06 12:31:48 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 
16/12/06 12:31:48 INFO SparkContext: Successfully stopped SparkContext 
16/12/06 12:31:48 INFO ShutdownHookManager: Shutdown hook called 
16/12/06 12:31:48 INFO ShutdownHookManager: Deleting directory C:\Users\superbrainbug\AppData\Local\Temp\spark-17606f05-6bb8-4144-acb5-015f15ea1ea9 

Process finished with exit code 0 

另外,@Yves DARMAILLAC如果我刪除.cache()運行時間12S

回答

0

20秒爲您的環境看起來有點大,然後預期,它應該在2秒左右 您可以啓用信息級別的火花日誌和發佈結果在這裏

主要時間消費者在你的例子是初始化,第二次運行會快很多

  1. Spark是懶惰的框架,所以在第一次查詢了大量的初始化。您使用「本地」主設備,因此當您在火花集羣上運行時,它會比較慢。

  2. 連接器針對批量查詢進行了優化,因此需要做大量準備工作以更快地運行批量查詢:使用表格大小和令牌範圍位置查詢C *集羣元信息,創建分割,建立C *連接。

+0

嗯,從當地的這種切換到獨立的火花簇看起來很不錯的技巧,我來試試。你有更多具體的提示來改善我的代碼嗎? –

0

你爲什麼要堅持load()流?這不是必需的,因爲您不重複使用它。時間仍然是一樣的沒有緩存?

試試這個:

time{ 
    val df = sqlContext 
     .read 
     .format("org.apache.spark.sql.cassandra") 
     .options(Map("table" -> "nth1", "keyspace"->"nth", "cluster" -> Test Cluster")) 
     .load() 

    df.count() 
} 
+0

我更新了我的問題 –