2017-06-05 51 views
0

我們從Kafka獲取記錄,並且我們從Spark中的Kafka中獲取Cardnumber,並從Memsql記錄中執行Kafka cardnumber比較,並選擇count和cardnumber通過分組cardnumber。但數不正確的方式來在星火流來自Kafka的Spark Streaming以及與Memsql記錄的比較(count is not come proper)

例如計數在Memsql當我們執行它給出memsql命令提示符下面的輸出查詢

memsql> select card_number,count(*) from cardnumberalert5 where 
inserted_time <= now() and inserted_time >= NOW() - INTERVAL 10 MINUTE group 
by card_number; 
+------------------+----------+ 
| card_number  | count(*) | 
+------------------+----------+ 
| 4556655960290527 |  2 | 
| 6011255715328120 |  4 | 
| 4532133676538232 |  2 | 
| 6011614607071620 |  2 | 
| 4024007117099605 |  2 | 
| 347138718258304 |  4 | 
+------------------+----------+ 

我們注意到星火流計數是越來越分佈式

例如memsql輸出,當我們從memsql執行命令提示

+------------------+----------+ 
| card_number  | count(*) | 
+------------------+----------+ 
| 4556655960290527 |  2 | 

當相同的SQL是PERF ormed在星火流將打印輸出作爲

RECORDS FOUNDS **************************************** 
CARDNUMBER KAFKA ############### 4024007117099605 
CARDNUMBER MEMSQL ############### 4556655960290527 
COUNT MEMSQL ############### 1 
CARDNUMBER MEMSQL ############### 4556655960290527 
COUNT MEMSQL ############### 1 

這裏的計數顯示爲2,但我們所得到的cardnumber 2條記錄與計數1

打印在星火流輸出

RECORDS FOUNDS **************************************** 
CARDNUMBER KAFKA ############### 4024007117099605 
CARDNUMBER MEMSQL ############### 4556655960290527 
COUNT MEMSQL ############### 1 
CARDNUMBER MEMSQL ############### 6011255715328120 
COUNT MEMSQL ############### 2 
CARDNUMBER MEMSQL ############### 4532133676538232 
COUNT MEMSQL ############### 1 
CARDNUMBER MEMSQL ############### 6011614607071620 
COUNT MEMSQL ############### 1 
CARDNUMBER MEMSQL ############### 4024007117099605 
COUNT MEMSQL ############### 1 
CARDNUMBER MEMSQL ############### 347138718258304 
COUNT MEMSQL ############### 2 
CARDNUMBER MEMSQL ############### 4556655960290527 
COUNT MEMSQL ############### 1 
CARDNUMBER MEMSQL ############### 6011255715328120 
COUNT MEMSQL ############### 2 
CARDNUMBER MEMSQL ############### 4532133676538232 
COUNT MEMSQL ############### 1 
CARDNUMBER MEMSQL ############### 6011614607071620 
COUNT MEMSQL ############### 1 
CARDNUMBER MEMSQL ############### 4024007117099605 
COUNT MEMSQL ############### 1 
CARDNUMBER MEMSQL ############### 347138718258304 
COUNT MEMSQL ############### 2 

星火流節目

class SparkKafkaConsumer11(val ssc : StreamingContext,val sc : SparkContext,val spark : org.apache.spark.sql.SparkSession, val topics : Array[String], val kafkaParam : scala.collection.immutable.Map[String,Object]) { 

val stream = KafkaUtils.createDirectStream[String, String](
      ssc, 
      PreferConsistent, 
      Subscribe[String, String](topics, kafkaParam) 
     ) 

    val recordStream = stream.map(record => (record.value)) // Take the value only from the key,value pair for processing 

    recordStream.foreachRDD{rdd => 

val brokers = "174.24.154.244:9092" // Specify the BROKER 
val props = new HashMap[String, Object]() 
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer") 
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer") 
    props.put(ProducerConfig.CLIENT_ID_CONFIG,"SparkKafkaConsumer__11") 
val producer = new KafkaProducer[String,String](props) 

val result = spark.read 
      .format("com.memsql.spark.connector") 
      .options(Map("query" -> ("select card_number,count(*) from cardnumberalert5 where inserted_time <= now() and inserted_time >= NOW() - INTERVAL 10 MINUTE group by card_number"),"database" -> "fraud")) 
      .load() 

val record = rdd.map(line => line.split("\\|")) //Split the record and create a array of it. 

record.collect().foreach{recordRDD => 
    val now1 = System.currentTimeMillis 

    val now = new java.sql.Timestamp(now1) 
    val cardnumber_kafka = recordRDD(13).toString 
    val sessionID = recordRDD(1).toString 
    println("RECORDS FOUNDS ****************************************") 
    println("CARDNUMBER KAFKA ############### "+cardnumber_kafka) 

    result.collect().foreach{t => 

     val resm1 = t.getAs[String]("card_number") 
     println("CARDNUMBER MEMSQL ############### "+resm1) 
     val resm2 = t.getAs[Long]("count(*)") 
     println("COUNT MEMSQL ############### "+resm2) 

     if(resm1.equals(cardnumber_kafka)){ 
     if(resm2 > 2){ 
      println("INSIDE IF CONDITION FOR MORE THAN 3 COUNT"+now) 
      val messageToKafka = "---- THIRD OR MORE OCCURANCE ---- "+cardnumber_kafka 
      val message=new ProducerRecord[String, String]("output1",0,sessionID,messageToKafka) 
      try { 
      producer.send(message) 

      } catch { 
       case e: Exception => 
       e.printStackTrace 
       System.exit(1) 
      } 
     } 
     } 

    } 

} 


producer.close() 

} 

} 

不知道如何解決它,任何建議或幫助是高度讚賞提前

+0

你好Bhavesh,看來查詢被錯誤地推送到memsql分區,而不是通過聚合運行。要檢查是否發生這種情況,你可以檢查「結果」分區的數量?你可以用'result.rdd.getNumPartitions()'來檢查'我相信。 您還使用哪種版本的MemSQL和Spark以及MemSQL Spark連接器? –

+0

你好卡爾,非常感謝你的回覆,我們檢查了結果分區的數量,我們得到了4分區的結果。我們使用的版本MemSQL 5.5.8 MemSQL源分佈和Spark 2.1.0和MemSQL星火2.0連接器(https://github.com/memsql/memsql-spark-connector) \t \t com.memsql \t \t memsql-connector_2.11 \t \t 2.0.2 Bhavesh

回答

0

感謝我們能夠解決在星火配置設置以下屬性這個問題。

代碼:

.set("spark.memsql.disablePartitionPushdown","true") 
+1

很抱歉這麼晚纔回復Bhavesh - 我要提交的任務對我們的連接器,因爲這看起來像在下推邏輯的錯誤。感謝您提出! https://github.com/memsql/memsql-spark-connector/issues/37 –

相關問題