2017-06-06 128 views
0

怎樣才能讓字典的流水線RDD成數據幀中pyspark轉換字典的RDD的df,

[{'ACARS 20170507/20170506085012209001.rcv': 'QU SOUTA8X\r\n.BJSXCXA 060849\r\nM12\r\nFI CX731/AN B-LAN\r\nDT BJS HKG 060849 M63A\r\n- OFF,V01,CX 731 20170506 1,VHHH,OMDB,0833,0849,----, 600', 'ACARS 20170507/20170502020906017001.rcv': 'QU SOUTA8X\r\n.BJSXCXA 020209\r\nM12\r\nFI KA876/AN B-LAB\r\nDT BJS HKG 020209 M11A\r\n- OFF,V01,KA 876 20170502 1,VHHH,ZSPD,0149,0208,----, 294', 'ACARS 20170507/20170505050124358002.rcv': 'QU SOUTA8X\r\n.BKKXCXA 050501\r\nCFD\r\nFI CX690/AN B-LAJ\r\nDT BKK XSP 050501 C10A\r\n- .1/WRN/DBN17D/WN1705050500 261707002SMOKE LAVATORY DET FAULT'}] 

回答

1

以下代碼段應該工作

>>> from pyspark.sql import Row 
>>> 
>>> data = [{'foo': 'bar', 'hello': 'world'}] 
>>> rdd = spark.sparkContext.parallelize(data) 
>>> df = rdd.map(lambda x: Row(**x)).toDF() 

>>> df.show() 
+---+-----+ 
|foo|hello| 
+---+-----+ 
|bar|world| 
+---+-----+ 
+0

其返回DF爲'ACARS 20170507/20170507235838492001.rcv:串,ACARS 20170507/20170507235911543001.rcv:串,ACARS20170507235933392001分之20170507 .rcv:string,ACARS 20170507/20170507235957177001.rcv:string'值部分顯示爲'string' –

0

開始:

>>> a = [{'ACARS 20170507/20170506085012209001.rcv': 'QU SOUTA8X\r\n.BJSXCXA 060849\r\nM12\r\nFI CX731/AN B-LAN\r\nDT BJS HKG 060849 M63A\r\n- OFF,V01,CX 731 20170506 1,VHHH,OMDB,0833,0849,----, 600', 'ACARS 20170507/20170502020906017001.rcv': 'QU SOUTA8X\r\n.BJSXCXA 020209\r\nM12\r\nFI KA876/AN B-LAB\r\nDT BJS HKG 020209 M11A\r\n- OFF,V01,KA 876 20170502 1,VHHH,ZSPD,0149,0208,----, 294', 'ACARS 20170507/20170505050124358002.rcv': 'QU SOUTA8X\r\n.BKKXCXA 050501\r\nCFD\r\nFI CX690/AN B-LAJ\r\nDT BKK XSP 050501 C10A\r\n- .1/WRN/DBN17D/WN1705050500 261707002SMOKE LAVATORY DET FAULT'}] 
>>> rdd = sc.parallelize(a) 

得到一個RDD的鑰匙:

>>> rdd_k = rdd.flatMap(lambda x: x.keys()) 
>>> rdd_k.take(3) 
['ACARS 20170507/20170506085012209001.rcv', 'ACARS 20170507/20170505050124358002.rcv', 'ACARS 20170507/20170502020906017001.rcv'] 

得到一個RDD與價值觀:

>>> rdd_v = rdd.flatMap(lambda x: x.values()) 
>>> rdd_v.take(3) 
['QU SOUTA8X\r\n.BJSXCXA 060849\r\nM12\r\nFI CX731/AN B-LAN\r\nDT BJS HKG 060849 M63A\r\n- OFF,V01,CX 731 20170506 1,VHHH,OMDB,0833,0849,----, 600', 'QU SOUTA8X\r\n.BKKXCXA 050501\r\nCFD\r\nFI CX690/AN B-LAJ\r\nDT BKK XSP 050501 C10A\r\n- .1/WRN/DBN17D/WN1705050500 261707002SMOKE LAVATORY DET FAULT', 'QU SOUTA8X\r\n.BJSXCXA 020209\r\nM12\r\nFI KA876/AN B-LAB\r\nDT BJS HKG 020209 M11A\r\n- OFF,V01,KA 876 20170502 1,VHHH,ZSPD,0149,0208,----, 294'] 

郵編兩個RDDS,你將有一個元組的RDD,每個元組是一對你的出發詞典(鍵,值):

>>> newRdd = rdd_k.zip(rdd_v) 
>>> newRdd.first() 
('ACARS 20170507/20170506085012209001.rcv', 'QU SOUTA8X\r\n.BJSXCXA 060849\r\nM12\r\nFI CX731/AN B-LAN\r\nDT BJS HKG 060849 M63A\r\n- OFF,V01,CX 731 20170506 1,VHHH,OMDB,0833,0849,----, 600') 

轉換爲數據幀:

>>> df = newRdd.toDF() 
>>> df.show() 
+--------------------+--------------------+ 
|     _1|     _2| 
+--------------------+--------------------+ 
|ACARS 20170507/20...|QU SOUTA8X 
.BJSX...| 
|ACARS 20170507/20...|QU SOUTA8X 
.BKKX...| 
|ACARS 20170507/20...|QU SOUTA8X 
.BJSX...| 
+--------------------+--------------------+ 
0

創建一個字典第一工作的功能,然後應用到RDD字典。

helpin = [{'ACARS 20170507/20170506085012209001.rcv': 'QU SOUTA8X\r\n.BJSXCXA 060849\r\nM12\r\nFI CX731/AN B-LAN\r\nDT BJS HKG 060849 M63A\r\n- OFF,V01,CX 731 20170506 1,VHHH,OMDB,0833,0849,----, 600', 'ACARS 20170507/20170502020906017001.rcv': 'QU SOUTA8X\r\n.BJSXCXA 020209\r\nM12\r\nFI KA876/AN B-LAB\r\nDT BJS HKG 020209 M11A\r\n- OFF,V01,KA 876 20170502 1,VHHH,ZSPD,0149,0208,----, 294', 'ACARS 20170507/20170505050124358002.rcv': 'QU SOUTA8X\r\n.BKKXCXA 050501\r\nCFD\r\nFI CX690/AN B-LAJ\r\nDT BKK XSP 050501 C10A\r\n- .1/WRN/DBN17D/WN1705050500 261707002SMOKE LAVATORY DET FAULT'}] 

from pyspark.sql import SparkSession # convert rdd to dataframe 
spark = SparkSession(sc) 

def helpfunc(dicin): 
    dicout = sc.parallelize(dicin).map(lambda x:(x,dicin[x])).toDF() 
    return (dicout) 


helpdic = helpin[0] 
helpfunc(helpdic).show() 

當實際助人爲樂是RDD,使用:

helpin.map(lambda x:helpfunc(x))