2017-07-25 53 views
0

我有一個類型爲pandas.tslib.Timestamp的時間戳列的熊貓數據框。我通過從「createDataFrame」(link to source)的pyspark源代碼,看了看,似乎他們的數據轉換爲numpy的記錄數組列表:在Pandas數據框轉換過程中,Spark如何處理Timestamp類型?

data = [r.tolist() for r in data.to_records(index=False)] 

然而,時間戳類型被轉換在這個過程中的列表的多頭:

> df = pd.DataFrame(pd.date_range(start=datetime.datetime.now(),periods=5,freq='s')) 
> df 
0 2017-07-25 11:53:29.353923 
1 2017-07-25 11:53:30.353923 
2 2017-07-25 11:53:31.353923 
3 2017-07-25 11:53:32.353923 
4 2017-07-25 11:53:33.353923 
> df.to_records(index=False).tolist() 
[(1500983799614193000L,), (1500983800614193000L,), (1500983801614193000L,), (1500983802614193000L,), (1500983803614193000L,)] 

現在,如果我通過這樣一個列表到RDD,做一些操作(不接觸時間戳列),然後調用

> spark.createDataFrame(rdd,schema) // with schema mentioning that column as TimestampType 
TypeError: TimestampType can not accept object 1465197332112000000L in type <type 'long'> 
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

w ^我應該做什麼(在將列表轉換爲RDD之前)以保留日期時間類型。

編輯1

的幾種方法,我知道,這將涉及到數據幀後製作處理是:

  1. 添加時區信息爲datetime對象大熊貓。然而,這似乎沒有必要,並可能導致錯誤取決於您自己的時區。

  2. 使用日期時間庫將long轉換爲時間戳。

假設tstampl是輸入:TSTAMP =日期時間(1970,1,1)+ timedelta(微秒= tstampl/1000)

  • 將日期時間轉換爲Pandas數據框一側的字符串,然後在Spark數據框一側轉換爲日期時間。
  • 正如Suresh的回答說明如下

    但是我正在尋找這會照顧所有處理的數據幀創作本身之前簡單的方法。

    回答

    0

    我試着將timestamp列轉換爲字符串類型,然後在pandas系列上應用tolist()。使用spark數據框中的列表並將其轉換回時間戳。

    >>> df = pd.DataFrame(pd.date_range(start=datetime.datetime.now(),periods=5,freq='s')) 
    >>> df 
            0 
    0 2017-07-25 21:51:53.963 
    1 2017-07-25 21:51:54.963 
    2 2017-07-25 21:51:55.963 
    3 2017-07-25 21:51:56.963 
    4 2017-07-25 21:51:57.963 
    
    >>> df1 = df[0].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S')) 
    >>> type(df1) 
    <class 'pandas.core.series.Series'> 
    >>> df1.tolist() 
    ['2017-07-25 21:51:53', '2017-07-25 21:51:54', '2017-07-25 21:51:55', '2017-07-25 21:51:56', '2017-07-25 21:51:57'] 
    
    from pyspark.sql.types import StringType,TimestampType 
    >>> sdf = spark.createDataFrame(df1.tolist(),StringType()) 
    >>> sdf.printSchema() 
    root 
        |-- value: string (nullable = true) 
    >>> sdf = sdf.select(sdf['value'].cast('timestamp')) 
    >>> sdf.printSchema() 
    root 
        |-- value: timestamp (nullable = true) 
    
    >>> sdf.show(5,False) 
    +---------------------+ 
    |value    | 
    +---------------------+ 
    |2017-07-25 21:51:53.0| 
    |2017-07-25 21:51:54.0| 
    |2017-07-25 21:51:55.0| 
    |2017-07-25 21:51:56.0| 
    |2017-07-25 21:51:57.0| 
    +---------------------+ 
    
    +0

    是的,我知道這種方法和1更多的方法涉及長時間戳重新轉換(目前我正在使用)。問題是所有這些方法都需要某種後置數據幀轉換處理。這是我希望避免的。 – tangy

    +0

    您是否正在使用/ 1000000000進行時間戳轉換,並將其轉換爲Timestamp? – Suresh

    +0

    不。假設tstampl是輸入:tstamp = datetime(1970,1,1)+ timedelta(微秒= tstampl/1000) – tangy

    相關問題