2017-10-19 111 views
1

你好傢伙我有一個數據框是最新的每一個日期,每天我需要添加新的qte和新的ca到舊的和更新日期。 所以我需要更新已存在的那些,並添加新的ones.Here一個例子,我想有在年底加入數據框並執行操作

val histocaisse = spark.read 
     .format("csv") 
     .option("header", "true") //reading the headers 
     .load("C:/Users/MHT/Desktop/histocaisse_dte1.csv") 

    val hist = histocaisse 
     .withColumn("pos_id", 'pos_id.cast(LongType)) 
     .withColumn("article_id", 'pos_id.cast(LongType)) 
     .withColumn("date", 'date.cast(DateType)) 
     .withColumn("qte", 'qte.cast(DoubleType)) 
     .withColumn("ca", 'ca.cast(DoubleType)) 



    val histocaisse2 = spark.read 
     .format("csv") 
     .option("header", "true") //reading the headers 

     .load("C:/Users/MHT/Desktop/histocaisse_dte2.csv") 

    val hist2 = histocaisse2.withColumn("pos_id", 'pos_id.cast(LongType)) 
     .withColumn("article_id", 'pos_id.cast(LongType)) 
     .withColumn("date", 'date.cast(DateType)) 
     .withColumn("qte", 'qte.cast(DoubleType)) 
     .withColumn("ca", 'ca.cast(DoubleType)) 
    hist2.show(false) 

+------+----------+----------+----+----+ 
|pos_id|article_id|date  |qte |ca | 
+------+----------+----------+----+----+ 
|1  |1   |2000-01-07|2.5 |3.5 | 
|2  |2   |2000-01-07|14.7|12.0| 
|3  |3   |2000-01-07|3.5 |1.2 | 
+------+----------+----------+----+----+ 

+------+----------+----------+----+----+ 
|pos_id|article_id|date  |qte |ca | 
+------+----------+----------+----+----+ 
|1  |1   |2000-01-08|2.5 |3.5 | 
|2  |2   |2000-01-08|14.7|12.0| 
|3  |3   |2000-01-08|3.5 |1.2 | 
|4  |4   |2000-01-08|3.5 |1.2 | 
|5  |5   |2000-01-08|14.5|1.2 | 
|6  |6   |2000-01-08|2.0 |1.25| 
+------+----------+----------+----+----+ 

+------+----------+----------+----+----+ 
|pos_id|article_id|date  |qte |ca | 
+------+----------+----------+----+----+ 
|1  |1   |2000-01-08|5.0 |7.0 | 
|2  |2   |2000-01-08|39.4|24.0| 
|3  |3   |2000-01-08|7.0 |2.4 | 
|4  |4   |2000-01-08|3.5 |1.2 | 
|5  |5   |2000-01-08|14.5|1.2 | 
|6  |6   |2000-01-08|2.0 |1.25| 
+------+----------+----------+----+----+ 

這裏是我做了什麼

val histoCombinaison2=hist2.join(hist,Seq("article_id","pos_id"),"left") 
     .groupBy("article_id","pos_id").agg((hist2("qte")+hist("qte")) as ("qte"),(hist2("ca")+hist("ca")) as ("ca"),hist2("date")) 

    histoCombinaison2.show() 

和我得到了以下異常

Exception in thread "main" org.apache.spark.sql.AnalysisException: expression '`qte`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.; 
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58) 
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:218) 

回答

0
// import functions 
import org.apache.spark.sql.functions.{coalesce, lit} 

// we might not need groupBy, 
// since after join, all the information will be in the same row 
// so instead of using aggregate function, we simply combine the related fields as a new column. 
val df = hist2.join(hist1, Seq("article_id", "pos_id"), "left") 
    .select($"pos_id", $"article_id", 
    coalesce(hist2("date"), hist1("date")).alias("date"), 
    (coalesce(hist2("qte"), lit(0)) + coalesce(hist1("qte"), lit(0))).alias("qte"), 
    (coalesce(hist2("ca"), lit(0)) + coalesce(hist1("ca"), lit(0))).alias("ca")) 
    .orderBy("pos_id", "article_id") 

// df.show() 
|pos_id|article_id|  date| qte| ca| 
+------+----------+----------+----+----+ 
|  1|   1|2000-01-08| 5.0| 7.0| 
|  2|   2|2000-01-08|29.4|24.0| 
|  3|   3|2000-01-08| 7.0| 2.4| 
|  4|   4|2000-01-08| 3.5| 1.2| 
|  5|   5|2000-01-08|14.5| 1.2| 
|  6|   6|2000-01-08| 2.0|1.25| 
+------+----------+----------+----+----+ 

感謝。

+0

@LLi,我已經測試過,當表hist hist是em我得到一個錯誤,如果代碼是如果表格hist爲空,我想只是添加元素在表hist2? – maher

0

正如我剛纔提到您的評論,你應該定義你schema,並用它在閱讀csvdataframe作爲

import sqlContext.implicits._ 

import org.apache.spark.sql.types._ 
val schema = StructType(Seq(
    StructField("pos_id", LongType, true), 
    StructField("article_id", LongType, true), 
    StructField("date", DateType, true), 
    StructField("qte", LongType, true), 
    StructField("ca", DoubleType, true) 
)) 

val hist1 = sqlContext.read 
    .format("csv") 
    .option("header", "true") 
    .schema(schema) 
    .load("C:/Users/MHT/Desktop/histocaisse_dte1.csv") 

hist1.show 

val hist2 = sqlContext.read 
    .format("csv") 
    .option("header", "true") //reading the headers 
    .schema(schema) 
    .load("C:/Users/MHT/Desktop/histocaisse_dte2.csv") 

hist2.show 

那麼你應該使用when功能來定義您需要實現因爲

邏輯
val df = hist2.join(hist1, Seq("article_id", "pos_id"), "left") 
    .select($"pos_id", $"article_id", 
    when(hist2("date").isNotNull, hist2("date")).otherwise(when(hist1("date").isNotNull, hist1("date")).otherwise(lit(null))).alias("date"), 
    (when(hist2("qte").isNotNull, hist2("qte")).otherwise(lit(0)) + when(hist1("qte").isNotNull, hist1("qte")).otherwise(lit(0))).alias("qte"), 
    (when(hist2("ca").isNotNull, hist2("ca")).otherwise(lit(0)) + when(hist1("ca").isNotNull, hist1("ca")).otherwise(lit(0))).alias("ca")) 

我希望答案有幫助