2016-12-02 279 views
3

我試圖從kafka接收流式數據。在這個過程中,我能夠接收和存儲流數據到JavaPairInputDStream。現在我需要分析與出該數據,存儲到任何database.So我想轉換此JavaPairInputDStream數據集數據幀如何將JavaPairInputDStream轉換爲Spark中的DataSet/DataFrame

我試過到目前爲止是什麼:

import java.util.Arrays; 
import java.util.Collections; 
import java.util.HashMap; 
import java.util.Iterator; 
import java.util.List; 
import java.util.Map; 
import java.util.Set; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.sql.Dataset; 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.SQLContext; 
import org.apache.spark.sql.SparkSession; 
import org.apache.spark.sql.catalog.Function; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.AbstractJavaDStreamLike; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairDStream; 
import org.apache.spark.streaming.api.java.JavaPairInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.KafkaUtils; 


import kafka.serializer.StringDecoder; 
import scala.Tuple2; 

//Streaming Working Code 

public class KafkaToSparkStreaming 
{ 
    public static void main(String arr[]) throws InterruptedException 
    { 


     SparkConf conf = new SparkConf(); 
     conf.set("spark.app.name", "SparkReceiver"); //The name of application. This will appear in the UI and in log data. 
     //conf.set("spark.ui.port", "7077"); //Port for application's dashboard, which shows memory and workload data. 
     conf.set("dynamicAllocation.enabled","false"); //Which scales the number of executors registered with this application up and down based on the workload 
     //conf.set("spark.cassandra.connection.host", "localhost"); //Cassandra Host Adddress/IP 
     conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer"); //For serializing objects that will be sent over the network or need to be cached in serialized form. 
     //conf.setMaster("local"); 
     conf.set("spark.streaming.stopGracefullyOnShutdown", "true"); 

     JavaSparkContext sc = new JavaSparkContext(conf); 
     // Create the context with 2 seconds batch size 
     JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); 

     Map<String, String> kafkaParams = new HashMap<String, String>(); 

     kafkaParams.put("zookeeper.connect", "localhost:2181"); //Make all kafka data for this cluster appear under a particular path. 
     kafkaParams.put("group.id", "testgroup"); //String that uniquely identifies the group of consumer processes to which this consumer belongs 
     kafkaParams.put("metadata.broker.list", "localhost:9092"); //Producer can find a one or more Brokers to determine the Leader for each topic. 
     kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder"); //Serializer to use when preparing the message for transmission to the Broker. 
     kafkaParams.put("request.required.acks", "1"); //Producer to require an acknowledgement from the Broker that the message was received. 

     Set<String> topics = Collections.singleton("ny-2008.csv"); 

     //Create an input DStream for Receiving data from socket 
     JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, 
       String.class, 
       String.class, 
       StringDecoder.class, 
       StringDecoder.class, 
       kafkaParams, topics); 

    //System.out.println(directKafkaStream); 
     directKafkaStream.print(); 
} 
} 

回答

6

這裏是使用Spark 2.0的完整工作代碼。

import java.util.Collections; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.Set; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.api.java.function.VoidFunction; 
import org.apache.spark.sql.Dataset; 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.RowFactory; 
import org.apache.spark.sql.SparkSession; 
import org.apache.spark.sql.types.DataTypes; 
import org.apache.spark.sql.types.StructField; 
import org.apache.spark.sql.types.StructType; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.KafkaUtils; 

import kafka.serializer.StringDecoder; 
import scala.Tuple2; 


public class KafkaToSparkStreaming { 
    public static void main(String arr[]) throws InterruptedException 
    { 


     SparkConf conf = new SparkConf(); 
     conf.set("spark.app.name", "SparkReceiver"); //The name of application. This will appear in the UI and in log data. 
     //conf.set("spark.ui.port", "7077"); //Port for application's dashboard, which shows memory and workload data. 
     conf.set("dynamicAllocation.enabled","false"); //Which scales the number of executors registered with this application up and down based on the workload 
     //conf.set("spark.cassandra.connection.host", "localhost"); //Cassandra Host Adddress/IP 
     conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer"); //For serializing objects that will be sent over the network or need to be cached in serialized form. 
     conf.setMaster("local"); 
     conf.set("spark.streaming.stopGracefullyOnShutdown", "true"); 

     JavaSparkContext sc = new JavaSparkContext(conf); 
     // Create the context with 2 seconds batch size 
     JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); 

     Map<String, String> kafkaParams = new HashMap<String, String>(); 

     kafkaParams.put("zookeeper.connect", "localhost:2181"); //Make all kafka data for this cluster appear under a particular path. 
     kafkaParams.put("group.id", "testgroup"); //String that uniquely identifies the group of consumer processes to which this consumer belongs 
     kafkaParams.put("metadata.broker.list", "localhost:9092"); //Producer can find a one or more Brokers to determine the Leader for each topic. 
     kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder"); //Serializer to use when preparing the message for transmission to the Broker. 
     kafkaParams.put("request.required.acks", "1"); //Producer to require an acknowledgement from the Broker that the message was received. 

     Set<String> topics = Collections.singleton("ny-2008.csv"); 

     //Create an input DStream for Receiving data from socket 
     JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, 
       String.class, 
       String.class, 
       StringDecoder.class, 
       StringDecoder.class, 
       kafkaParams, topics); 

     //Create JavaDStream<String> 
     JavaDStream<String> msgDataStream = directKafkaStream.map(new Function<Tuple2<String, String>, String>() { 
      @Override 
      public String call(Tuple2<String, String> tuple2) { 
       return tuple2._2(); 
      } 
      }); 
     //Create JavaRDD<Row> 
     msgDataStream.foreachRDD(new VoidFunction<JavaRDD<String>>() { 
       @Override 
       public void call(JavaRDD<String> rdd) { 
        JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() { 
         @Override 
         public Row call(String msg) { 
         Row row = RowFactory.create(msg); 
         return row; 
         } 
        }); 
     //Create Schema  
     StructType schema = DataTypes.createStructType(new StructField[] {DataTypes.createStructField("Message", DataTypes.StringType, true)}); 
     //Get Spark 2.0 session  
     SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf()); 
     Dataset<Row> msgDataFrame = spark.createDataFrame(rowRDD, schema); 
     msgDataFrame.show(); 
       } 
     }); 

     ssc.start();    
     ssc.awaitTermination(); 
    } 

} 

class JavaSparkSessionSingleton { 
     private static transient SparkSession instance = null; 
     public static SparkSession getInstance(SparkConf sparkConf) { 
     if (instance == null) { 
      instance = SparkSession 
      .builder() 
      .config(sparkConf) 
      .getOrCreate(); 
     } 
     return instance; 
     } 
    } 
+0

不工作的代碼和JavaSpark SessionSingleton是編譯錯誤 – kumar

+0

我已添加完整的代碼。這將在Spark 2.0中起作用。如果您想在Spark 1.6.2中運行它,那麼您需要修改JavaSparkSessionSingleton並獲取SQLContext,如示例/ streaming/JavaSqlNetworkWordCount.java 1.6.2版本 – abaghel

+0

感謝人的工作,我爲此代碼爭取了3天,你只解決1小時,你是非常非常聰明的人,我的郵箱編號是:kumara1223 @ gmail.com,請與我聯繫。謝謝你,朋友。 – kumar

0

技術上DSTREAM是的的RRD順序,你會不會轉換DSTREAM到Datframe而不是你們每個人都RDD轉換成數據幀/數據集如下(Scala代碼請轉換它在Java中對你的情況):

stream.foreachRDD {RDD =>

VAL數據幀= rdd.map {情況(鍵,值)=>行(鍵,值)}。toDF()

}

相關問題