2017-04-10 98 views
0

我正在將地板文件加載爲火花數據集。我可以查詢並從查詢中創建新的數據集。現在,我想向數據集(「hashkey」)添加一個新列並生成值(例如md5sum(nameValue))。我怎樣才能做到這一點?將列添加到火花數據集並轉換數據

public static void main(String[] args) { 

    SparkConf sparkConf = new SparkConf(); 

    sparkConf.setAppName("Hello Spark"); 
    sparkConf.setMaster("local"); 

    SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example") 
      .config("spark.master", "local").config("spark.sql.warehouse.dir", "file:///C:\\spark_warehouse") 
      .getOrCreate(); 

    Dataset<org.apache.spark.sql.Row> df = spark.read().parquet("meetup.parquet"); 
    df.show(); 

    df.createOrReplaceTempView("tmpview"); 

    Dataset<Row> namesDF = spark.sql("SELECT * FROM tmpview where name like 'Spark-%'"); 

    namesDF.show(); 

} 

輸出看起來是這樣的:

+-------------+-----------+-----+---------+--------------------+ 
|   name|meetup_date|going|organizer|    topics| 
+-------------+-----------+-----+---------+--------------------+ 
| Spark-H20| 2016-01-01| 50|airisdata|[h2o, repeated sh...| 
| Spark-Avro| 2016-01-02| 60|airisdata| [avro, usecases]| 
|Spark-Parquet| 2016-01-03| 70|airisdata| [parquet, usecases]| 
+-------------+-----------+-----+---------+--------------------+ 

回答

1

只需添加火花SQL函數MD5在您查詢。

Dataset<Row> namesDF = spark.sql("SELECT *, md5(name) as modified_name FROM tmpview where name like 'Spark-%'"); 
0
Dataset<Row> ds = sqlContext.read() 
    .format("com.databricks.spark.csv") 
    .option("inferSchema", "true") 
    .option("header", "true") 
    .option("delimiter","|") 
    .load("/home/cloudera/Desktop/data.csv"); 
ds.printSchema(); 

將打印:將上述代碼之後

root 
|-- ReferenceValueSet_Id: integer (nullable = true) 
|-- ReferenceValueSet_Name: string (nullable = true) 
|-- Code_Description: string (nullable = true) 
|-- Code_Type: string (nullable = true) 
|-- Code: string (nullable = true) 
|-- CURR_FLAG: string (nullable = true) 
|-- REC_CREATE_DATE: timestamp (nullable = true) 
|-- REC_UPDATE_DATE: timestamp (nullable = true) 

Dataset<Row> df1 = ds.withColumn("Key", functions.lit(1)); 
     df1.printSchema(); 

,它將附加具有恆定值一列。

root 
|-- ReferenceValueSet_Id: integer (nullable = true) 
|-- ReferenceValueSet_Name: string (nullable = true) 
|-- Code_Description: string (nullable = true) 
|-- Code_Type: string (nullable = true) 
|-- Code: string (nullable = true) 
|-- CURR_FLAG: string (nullable = true) 
|-- REC_CREATE_DATE: timestamp (nullable = true) 
|-- REC_UPDATE_DATE: timestamp (nullable = true) 
|-- Key: integer (nullable = true) 

您可以看到名稱爲Key的列被添加到數據集中。

如果你想添加一些列的constunt值的位置,你可以使用下面的代碼來添加它。

Dataset<Row> df1 = ds.withColumn("Key", functions.lit(ds.col("Code"))); 
     df1.printSchema(); 
     df1.show(); 

現在,它將打印watever的值在列CODE中。到名爲Key的新列中。