2016-02-19 43 views
1

我試圖將數十億數據值的MultiMap轉換爲Spark DataFrame來運行計算,然後將結果寫入cassandra表。Spark - MultiMap可以在JAVA中轉換爲DataFrame

我從下面的cassandra查詢和循環生成multimap。如果能有更好的方式來獲取和處理這些數據到DataFrame中,就像我使用循環一樣,我會很樂意接受建議。

代碼更新了與答案:

//Build ResultSet from cassandra query for data manipulation. 
     Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\";"); 
     //Statement stmt = new SimpleStatement("SELECT power, bandwidth, start_frequency FROM model.reports;"); 
     stmt.setFetchSize(1000); 
     ResultSet results = session.execute(stmt); 

// Get the Variables from each Row of Cassandra Data   
Multimap<Double, Float> data = LinkedListMultimap.create(); 
     for (Row row : results){  
      // Column Names in Cassandra (Case Sensitive) 
      start_frequency = row.getDouble("Start_Frequency"); 
      power = row.getFloat("Power"); 
      bandwidth = row.getDouble("Bandwidth"); 

// Create Channel Power Buckets, place information into prepared statement binding, write to cassandra.    
       for(channel = 1.6000E8; channel <= channel_end; ){ 
        if((channel >= start_frequency) && (channel <= (start_frequency + bandwidth))) {  
        data.put(channel, power); 
        } // end if 
        channel+=increment; 
       } // end for  
     } // end "row" for 

// Create Spark List for DataFrame   
     List<Value> values = data.asMap().entrySet() 
      .stream() 
      .flatMap(x -> x.getValue() 
        .stream() 
        .map(y -> new Value(x.getKey(), y))) 
      .collect(Collectors.toList()); 

// Create DataFrame and Calculate Results 
    sqlContext.createDataFrame(sc.parallelize(values), Value.class).groupBy(col("channel")) 
     .agg(min("power"), max("power"), avg("power")) 
     .write().mode(SaveMode.Append)  
     .option("table", "results") 
     .option("keyspace", "model") 
     .format("org.apache.spark.sql.cassandra").save(); 

    } // end session 
} // End Compute 

public class Value implements Serializable { 
    public Value(Double channel, Float power) { 
     this.channel = channel; 
     this.power = power; 
    } 
    Double channel; 
    Float power; 

    public void setChannel(Double channel) { 
     this.channel = channel; 
    } 
    public void setPower(Float power) { 
     this.power = power; 
    } 
    public Double getChannel() { 
     return channel; 
    } 
    public Float getPower() { 
     return power; 
    } 

    @Override 
    public String toString() { 
     return "[" +channel +","+power+"]"; 
    } 
} 

樣品多重映射具有類型{雙} = [浮點]其中可以存在多個浮動項對於每個雙

{1.50E8=[10, 20], 1.51E8=[-10, -13, -14, -15], 1.52E8=[-10, -11] 

我需要使用火花來獲得每個這些的最小值,最大值和平均值。例如,對於第一個1.50ED將分10,最高20,平均15

我已經有,我可以用一次,我可以在一個不是Temptable得到它和操作上的數據幀代碼:

queryMV.groupBy(col("channel")) 
.agg(min("power"), max("power"), avg("power")) 
.write().mode(SaveMode.Append)  
.option("table", "results") 
.option("keyspace", "model") 
.format("org.apache.spark.sql.cassandra").save(); 

我將不勝感激關於如何使用JAVA將multimap轉換爲DataFrame的一些技巧。我一直無法找到任何有關使用multimaps火花的文檔。

我目前正在使用一個解決方案,執行初始查詢並使用for循環將原始數據寫入新表,我可以直接映射到一個臨時/數據框,但這需要很長時間,因爲我必須寫入計算之前數十億行到cassandra。我想使用一個multimap或類似的東西,並直接轉換爲火花進行計算。

+0

我想使用火花,因爲這個計算將被處理超過十億個不同的值。該表格將如下所示:'key:value,value,value'我需要獲取關鍵值並獲取值的最小值,最大值和平均值。例如,如果我的密鑰是1.50E8,我的值是10,20我的輸出應該是1.50E8最小10,最大20,平均15 – mithrix

回答

1

唉了Java parallelize方法它可以是一個的T列表或parallelizePairsTuple<K, V>列表。所以你需要轉換。雖然createDataFrame僅適用於RDD和Scala Seq,並且需要架構(bean或StructType)。

爲了讓它更有趣com.google.common.collect.ImmutableEntry不是可序列化的,所以您需要使用Java進行轉換,因此,Java版本的@Pankaj Arora解決方案將無法工作,除非您將轉換邏輯轉換爲Java。即

public class Value implements Serializable { 
    public Value(Double a, Float b) { 
     this.a = a; 
     this.b = b; 
    } 
    Double a; 
    Float b; 

    public void setA(Double a) { 
     this.a = a; 
    } 
    public void setB(Float b) { 
     this.b = b; 
    } 
    public Double getA() { 
     return a; 
    } 
    public Float getB() { 
     return b; 
    } 

    public String toString() { 
     return "[" +a +","+b+"]"; 
    } 
} 


    Multimap<Double, Float> data = LinkedListMultimap.create(); 
    data.put(1d, 1f); 
    data.put(1d, 2f); 
    data.put(2d, 3f); 

    List<Value> values = data.asMap().entrySet() 
      .stream() 
      .flatMap(x -> x.getValue() 
        .stream() 
        .map(y -> new Value(x.getKey(), y))) 
      .collect(Collectors.toList()); 

    sqlContext.createDataFrame(sc.parallelize(values), Value.class).show(); 

鑑於你的編輯,我會看看從關閉創建對象(而不是多圖)。

+0

你能提供一個循環的例子嗎? – mithrix

+0

完成。但是你有沒有使用這個multimap?我建議你跳過這一步,直接在加載步驟中創建(更好地命名)「Value」對象。 –

+0

我只使用multimap,因爲我不確定在將數據發送到數據框之前如何正確編譯所有數據。如果你知道更好的方法,我會喜歡這個建議,因爲這對數十億個對象的運行是非常重要的。順便說一下,您的multimap版本的解決方案完美地工作。 – mithrix

0
case class Output(a : Double ,b : Int) 
val input = Map(1.50E8-> List(10, 20) , 1.51E8-> List(-10, -13, -14, -15), 1.52E8-> List(-10, -11)).toArray 
val inputRdd = sc.parallelize(input) 
val queryMV = inputRdd.flatMap(x=> x._2.map(y=> Output(x._1, y))).toDF 
+0

這看起來很有希望。我需要做的一件事就是使用Map而不知道提前值是什麼。這似乎是SCALA它是相同的JAVA除了沒有val前綴? – mithrix

相關問題