2017-06-06 102 views
1

我在Json中有一個由Websocket提供的流數據,其大小在每秒1MB和60MB之間變化。通過Kafka和Spark消耗大數據

我得解碼數據然後解析它,最後寫入到mysql。

我想2個想法:

1)從插槽中讀取數據,然後對數據進行解碼,並通過Avro公司發送給消費者的生產者, 然後來獲取數據並寫入到MySQL的星火地圖,減少消費

2)從Socket讀取數據然後將數據發送到Consumer in Producer, 然後在Consumer中獲取數據,然後在Spark上解碼並將解析的數據發送到Spark Job以寫入到mysql。

你有什麼想法嗎?

生產者

/* 
* To change this license header, choose License Headers in Project Properties. 
* To change this template file, choose Tools | Templates 
* and open the template in the editor. 
*/ 
package com.tan; 


import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.clients.producer.ProducerConfig; 

import java.util.Properties; 


import java.util.stream.Stream; 

import java.io.BufferedReader; 
import java.io.FileNotFoundException; 
import java.io.FileReader; 
import java.io.IOException; 
import java.nio.file.Files; 
import java.nio.file.Paths; 
/** 
* 
* @author Tan 
*/ 
public class MainKafkaProducer { 

    /** 
    * @param args the command line arguments 
    */ 
    public static void main(String[] args) throws InterruptedException { 
     // TODO code application logic here 
     Properties props = new Properties(); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 

     //props.put("group.id", "mygroup"); 
     //props.put("max.partition.fetch.bytes", "100000000"); 
     //props.put("serializer.class", "kafka.serializer.StringEncoder"); 
     //props.put("partitioner.class","kafka.producer.DefaultPartitioner"); 
     //props.put("request.required.acks", "1"); 

     KafkaProducer<String, String> producer = new KafkaProducer<>(props); 

     // Read the data from websocket and send it to consumer 
     //for (int i = 0; i < 100; i++) { 
      String fileName = "/Users/Tan/Desktop/feed.json"; 
      try{ 
       BufferedReader file = new BufferedReader(new FileReader(fileName)); 
       String st = file.readLine(); 
       for(int i = 0; i < 100; i++) 
       { 
        ProducerRecord<String, String> record = new ProducerRecord<>("mytopic", st); 
        producer.send(record); 
       } 
      }catch(IOException e){ 
       e.printStackTrace(); 
      } 
     //} 

     /* 
     for(int i = 0; i < 100; i++) 
     { 
      ProducerRecord<String, String> record2 = new ProducerRecord<>("mytopic", "Hasan-" + i); 
      producer.send(record2); 
     } 
     */ 


     producer.close(); 
    } 

} 

消費者

/* 
* To change this license header, choose License Headers in Project Properties. 
* To change this template file, choose Tools | Templates 
* and open the template in the editor. 
*/ 
package com.tan; 

import kafka.serializer.DefaultDecoder; 
import kafka.serializer.StringDecoder; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaPairInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.KafkaUtils; 

import java.util.Collections; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.Set; 
/** 
* 
* @author Tan 
*/ 
public class MainKafkaConsumer { 
    /** 
    * @param args the command line arguments 
    */ 
    public static void main(String[] args) { 

     SparkConf conf = new SparkConf() 
       .setAppName(MainKafkaConsumer.class.getName()) 
       .setMaster("local[*]"); 
     JavaSparkContext sc = new JavaSparkContext(conf); 
     JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); 

     Set<String> topics = Collections.singleton("mytopic"); 
     Map<String, String> kafkaParams = new HashMap<>(); 
     kafkaParams.put("metadata.broker.list", "localhost:9092"); 

     JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, 
       String.class, String.class, 
       StringDecoder.class, StringDecoder.class, 
       kafkaParams, topics); 

     directKafkaStream.foreachRDD(rdd -> { 

      rdd.foreach(records -> { 

       System.out.println(records._2); 

      }); 

     }); 
     /* 
     directKafkaStream.foreachRDD(rdd -> { 
      System.out.println("--- New RDD with " + rdd.partitions().size() 
        + " partitions and " + rdd.count() + " records"); 
      rdd.foreach(record -> { 
       System.out.println(record._2); 
      }); 
     }); 
     */ 



     ssc.start(); 
     ssc.awaitTermination(); 

    } 

} 

回答

2

你的過程是好的,關鍵是隻爲Avro的轉換。你的數據不是那麼大,1Mb到60Mb。

在這裏,我有一個類似的過程,從MQ讀取數據,轉換爲avro,發送到kafka,從kafka消耗,解析數據並在其他MQ中發佈。

當我們的數據非常龐大時,Avro可以提供很多幫助,例如> = 1Gb。但在某些情況下,我們的數據非常小,如< 10Mb。在這種情況下,Avro使我們的處理速度有點慢,網絡傳輸沒有任何收益。

我建議你,如果你的網絡足夠好,不能轉換成AVRO,最好不要AVRO。爲了提高Spark端的性能,需要配置大量分區的kafka主題,因爲如果你只有一個分區,你的spark就不會正確地進行parallization。檢查this文字,可以幫助你。

+0

感謝您的評論,我刪除了avro,我發送了kafka的數據,但我無法使用Spark的數據。 (JSON格式和3 MB的數據)我添加了我的代碼 – Tan