1

我的流媒體應用程序不能將數據保存到CAS, 我嘗試了不同的方法,使用foreachRDDstream.print找出爲什麼它不工作,但它不會顯示任何信息。
輸入數據,我使用kafka-console-producer.sh火花卡夫卡卡桑德拉不工作

object letsRun extends App { 
    import org.apache.spark.{SparkConf, SparkContext} 
    import org.apache.spark._ 
    import com.datastax.spark.connector._ 
    import org.apache.spark.sql._ 
    import com.datastax.spark.connector.writer._ 
    import org.apache.spark.streaming._ 
    import org.apache.spark.streaming.StreamingContext._ 
    import com.datastax.spark.connector.streaming._ 
    import org.apache.spark.streaming.kafka010._ 
    import org.apache.kafka.clients.consumer.ConsumerRecord 
    import org.apache.kafka.common.serialization.StringDeserializer 
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 


    val conf = new SparkConf().setMaster("local[*]").setAppName("test").set("spark.cassandra.connection.host", "192.168.1.44") 
    //val sc = new SparkContext(conf) 
    val ssc = new StreamingContext(conf, Seconds(5)) 
    ssc.sparkContext.setLogLevel("WARN") 

    val kafkaParams = Map[String, Object](
     "bootstrap.servers" -> "192.168.1.46:9092", 
     "key.deserializer" -> classOf[StringDeserializer], 
     "value.deserializer" -> classOf[StringDeserializer], 
     "group.id" -> "use_a_separate_group_id_for_each_stream", 
     "auto.offset.reset" -> "latest", 
     "enable.auto.commit" -> (false: java.lang.Boolean) 
) 

    val topics = Set[String]("testTopic") 
    val stream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    Subscribe[String, String](topics, kafkaParams) 
) 
    implicit val sqlRowWriter = SqlRowWriter.Factory 

    val messages = stream.map(record => (record.key, record.value)) 
    messages.saveToCassandra("ks", "tb", SomeColumns("key", "value")) 
    ssc.start() 
    ssc.awaitTermination() 
} 

的放出來在Eclipse:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
17/06/12 20:23:25 INFO SparkContext: Running Spark version 2.1.1 
17/06/12 20:23:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
17/06/12 20:23:26 WARN Utils: Your hostname, dev resolves to a loopback address: 127.0.0.1; using 192.168.1.41 instead (on interface wlo1) 
17/06/12 20:23:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 
17/06/12 20:23:26 INFO SecurityManager: Changing view acls to: dev 
17/06/12 20:23:26 INFO SecurityManager: Changing modify acls to: dev 
17/06/12 20:23:26 INFO SecurityManager: Changing view acls groups to: 
17/06/12 20:23:26 INFO SecurityManager: Changing modify acls groups to: 
17/06/12 20:23:26 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(dev); groups with view permissions: Set(); users with modify permissions: Set(dev); groups with modify permissions: Set() 
17/06/12 20:23:26 INFO Utils: Successfully started service 'sparkDriver' on port 39585. 
17/06/12 20:23:26 INFO SparkEnv: Registering MapOutputTracker 
17/06/12 20:23:26 INFO SparkEnv: Registering BlockManagerMaster 
17/06/12 20:23:26 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 
17/06/12 20:23:26 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 
17/06/12 20:23:26 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-8eb85c8e-216f-4b69-a567-3e7833cd675d 
17/06/12 20:23:26 INFO MemoryStore: MemoryStore started with capacity 870.9 MB 
17/06/12 20:23:26 INFO SparkEnv: Registering OutputCommitCoordinator 
17/06/12 20:23:27 INFO Utils: Successfully started service 'SparkUI' on port 4040. 
17/06/12 20:23:27 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.41:4040 
17/06/12 20:23:27 INFO Executor: Starting executor ID driver on host localhost 
17/06/12 20:23:27 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43841. 
17/06/12 20:23:27 INFO NettyBlockTransferService: Server created on 192.168.1.41:43841 
17/06/12 20:23:27 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 
17/06/12 20:23:27 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.41, 43841, None) 
17/06/12 20:23:27 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.41:43841 with 870.9 MB RAM, BlockManagerId(driver, 192.168.1.41, 43841, None) 
17/06/12 20:23:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.41, 43841, None) 
17/06/12 20:23:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.1.41, 43841, None) 
17/06/12 20:23:27 WARN KafkaUtils: overriding enable.auto.commit to false for executor 
17/06/12 20:23:27 WARN KafkaUtils: overriding auto.offset.reset to none for executor 
17/06/12 20:23:27 WARN KafkaUtils: overriding executor group.id to spark-executor-use_a_separate_group_id_for_each_stream 
17/06/12 20:23:27 WARN KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135 
+0

Spark 2.1.1,kafka 0.10.2.1,spark-kafka 2.1.1,kafka-clients 0.10.2.1,spark cassandra connector 2.0.1 –

+0

您是否看到spark ui中的streaming選項卡? – maasg

+0

好的。你能接受你自己的答案嗎? – maasg

回答

1

對不起,我的問題,我解決了這個問題由我自己 我設置

ssc.sparkContext.setLogLevel("DEBUG") 

spark試圖解析我的虛擬機的主機名,但在配置kafka地址我使用IP,所以添加地址到/ etc/hosts,它的工作原理!