2017-06-01 69 views
3

是否有人在Spark 2+中使用from_json解析了毫秒時間戳?它是如何完成的?用Spark 2解析json的時代毫秒

因此Spark changedTimestampType解析時代數值爲秒,而不是v2毫秒。

我輸入的是,有一列其中我試圖解析這樣的JSON格式的字符串蜂巢表:

val spark = SparkSession 
    .builder 
    .appName("Problematic Timestamps") 
    .enableHiveSupport() 
    .getOrCreate() 
import spark.implicits._ 
val schema = StructType(
    StructField("categoryId", LongType) :: 
    StructField("cleared", BooleanType) :: 
    StructField("dataVersion", LongType) :: 
    StructField("details", DataTypes.createArrayType(StringType)) :: 
    … 
    StructField("timestamp", TimestampType) :: 
    StructField("version", StringType) :: Nil 
) 
val item_parsed = 
    spark.sql("select * FROM source.jsonStrInOrc") 
    .select('itemid, 'locale, 
      from_json('internalitem, schema) 
       as 'internalitem, 
      'version, 'createdat, 'modifiedat) 
val item_flattened = item_parsed 
    .select('itemid, 'locale, 
      $"internalitem.*", 
      'version as'outer_version, 'createdat, 'modifiedat) 

這可以解析一行包含列:

{ 「時間戳」:1494790299549, 「清」:假, 「版本」: 「V1」, 「dataVersion」:2 「的categoryId」:2641, 「細節」:[],...}

這給了我timestamp個領域,如49338-01-08 00:39:09.0從價值1494790299549,我寧願理解爲:2017-05-14 19:31:39.549

現在我可以設置時間戳的模式是一個長期的,然後通過1000和投劃分值的時間戳,但後來我d有2017-05-14 19:31:39.000而不是2017-05-14 19:31:39.549。我無法找出如何我既可以:

  • 告訴from_json解析一個毫秒時間戳(可能以某種方式在架構中使用的子類TimestampType)
  • 使用LongType在架構並將其轉換爲保留毫秒的時間戳

回答

2

現在我可以設置架構時間戳是一個漫長的,然後通過1000

劃分值其實這正是你需要的,只要保持正確的類型。比方說,你只有Longtimestamp領域:

val df = spark.range(0, 1).select(lit(1494790299549L).alias("timestamp")) 
// df: org.apache.spark.sql.DataFrame = [timestamp: bigint] 

如果除以1000:

val inSeconds = df.withColumn("timestamp_seconds", $"timestamp"/1000) 
// org.apache.spark.sql.DataFrame = [timestamp: bigint, timestamp_seconds: double] 

你會得到秒鐘雙時間戳(注意,這是SQL,不是Scala的行爲)。

所有剩下的就是cast

inSeconds.select($"timestamp_seconds".cast("timestamp")).show(false) 
// +-----------------------+ 
// |timestamp_seconds  | 
// +-----------------------+ 
// |2017-05-14 21:31:39.549| 
// +-----------------------+ 
+0

噢,所以這不是將其劃分成另一個長整數?大! – dlamblin

-1

我發現,試圖做的選擇分工,然後澆注看上去並不乾淨對我來說,雖然它是一個完全有效的方法。我選擇了使用java.sql.timestamp的UDF,它實際上是在紀元毫秒中指定的。

import java.sql.Timestamp 

import org.apache.spark.sql.SparkSession 
import org.apache.spark.sql.functions.{explode, from_json, udf} 
import org.apache.spark.sql.types. 
{BooleanType, DataTypes, IntegerType, LongType, 
StringType, StructField, StructType, TimestampType} 

val tsmillis = udf { t: Long => new Timestamp (t) } 

val spark = SparkSession 
    .builder 
    .appName("Problematic Timestamps") 
    .enableHiveSupport() 
    .getOrCreate() 
import spark.implicits._ 
val schema = StructType(
    StructField("categoryId", LongType) :: 
    StructField("cleared", BooleanType) :: 
    StructField("dataVersion", LongType) :: 
    StructField("details", DataTypes.createArrayType(StringType)) :: 
    … 
    StructField("timestamp", LongType) :: 
    StructField("version", StringType) :: Nil 
) 
val item_parsed = 
    spark.sql("select * FROM source.jsonStrInOrc") 
    .select('itemid, 'locale, 
      from_json('internalitem, schema) 
       as 'internalitem, 
      'version, 'createdat, 'modifiedat) 
val item_flattened = item_parsed 
    .select('itemid, 'locale, 
      $"internalitem.categoryId", $"internalitem.cleared", 
      $"internalitem.dataVersion", $"internalitem.details", 
      tsmillis($"internalitem.timestamp"), 
      $"internalitem.version", 
      'version as'outer_version, 'createdat, 'modifiedat) 

看看如何在選擇。 我認爲這將是值得做一個性能測試,看看是否使用withcolumn劃分和鑄造比udf更快。