2017-08-16 69 views
0

我有這樣在給定列名稱和值多列拆分列pyspark

+-----+-------+------------+---+---+----+------+--------------------+ 
|CHROM| POS|   ID|REF|ALT|QUAL|FILTER|    INFO| 
+-----+-------+------------+---+---+----+------+--------------------+ 
| 1|1014143| rs786201005| C| T| .|  .|RS=786201005;RSPO...| 
| 1|1014228|  rs1921| G|A,C| .|  .|RS=1921;RSPOS=101...| 
| 1|1014316| rs672601345| C| CG| .|  .|RS=672601345;RSPO...| 
| 1|1014359| rs672601312| G| T| .|  .|RS=672601312;RSPO...| 
| 1|1020183| rs539283387| G| C| .|  .|RS=539283387;RSPO...| 
| 1|1020216| rs764659938| C| G| .|  .|RS=764659938;RSPO...| 
| 1|1020217| rs115173026| G| T| .|  .|RS=115173026;RSPO...| 
| 1|1020221|rs1057523287| C| T| .|  .|RS=1057523287;RSP...| 
| 1|1020239| rs201073369| G|A,C| .|  .|RS=201073369;RSPO...| 
| 1|1022188| rs115704555| A| G| .|  .|RS=115704555;RSPO...| 
+-----+-------+------------+---+---+----+------+--------------------+ 

一個DF我的信息欄已被分隔的多個值「;」它們的形式是'column_name = value'。我希望我的df信息列在多個列中以相應的值分隔,如下所示

Pre_Col| Info    |  RS | RSPOS |dbSNPBuildID| SSR |...| 
-------+--------------------+------------+-------+------------+-----+---+ 
... |RS=786201005;RSPO...| 786201005 |1012143| 144  | 0 |...| 
... |RS=115173026;RSPO...| 115173026 |9043523| 123  | 2 |...| 

info列可以有多個變量值。有可能RS值不能在其他行中,相同的情況可能與其他值相同。在這種情況下,我想RS值爲'null'。我通過地圖駕駛這個df。建議

後我有我的編輯代碼,並得到如下結果

+-----+-------+------------+---+---+----+------+--------------------+-----+ 
|CHROM| POS|   ID|REF|ALT|QUAL|FILTER|    INFO| kvs| 
+-----+-------+------------+---+---+----+------+--------------------+-----+ 
| 1|1014143| rs786201005| C| T| .|  .|RS=786201005;RSPO...|Map()| 
| 1|1014228|  rs1921| G|A,C| .|  .|RS=1921;RSPOS=101...|Map()| 
| 1|1014316| rs672601345| C| CG| .|  .|RS=672601345;RSPO...|Map()| 
| 1|1014359| rs672601312| G| T| .|  .|RS=672601312;RSPO...|Map()| 
| 1|1020183| rs539283387| G| C| .|  .|RS=539283387;RSPO...|Map()| 
| 1|1020216| rs764659938| C| G| .|  .|RS=764659938;RSPO...|Map()| 
| 1|1020217| rs115173026| G| T| .|  .|RS=115173026;RSPO...|Map()| 
| 1|1020221|rs1057523287| C| T| .|  .|RS=1057523287;RSP...|Map()| 
| 1|1020239| rs201073369| G|A,C| .|  .|RS=201073369;RSPO...|Map()| 
| 1|1022188| rs115704555| A| G| .|  .|RS=115704555;RSPO...|Map()| 
+-----+-------+------------+---+---+----+------+--------------------+-----+ 

和我的模式是

root 
|-- CHROM: string (nullable = true) 
|-- POS: string (nullable = true) 
|-- ID: string (nullable = true) 
|-- REF: string (nullable = true) 
|-- ALT: string (nullable = true) 
|-- QUAL: string (nullable = true) 
|-- FILTER: string (nullable = true) 
|-- INFO: string (nullable = true) 
|-- kvs: map (nullable = true) 
| |-- key: string 
| |-- value: string (valueContainsNull = true) 

我可以進一步分裂這些映射值變爲列?

任何幫助將不勝感激。

+0

請在當前狀態下你的代碼,所以我們可以幫助你提高,並得到所需的解決方案。 –

+0

當RS爲空時,你會有'RS =; RSPO..'嗎? 'RS,RSPOS,dbSNPBuildID,SSR'是'Info'中唯一存在的列嗎? – philantrovert

+0

@philantrovert,沒有可以有很多列裏面的信息可以是27或更多 –

回答

1

PySpark converting a column of type 'map' to multiple columns in a dataframe調整答案:

from pyspark.sql.functions import col, udf, explode 

@udf("map<string,string>") 
def to_map(s): 
    if s: 
     kvs = [x.split("=") for x in s.split(";")] 
     return {kv[0]: kv[1] for kv in kvs if len(kvs) == 2} 

with_map = df.withColumn("kvs", to_map("INFO")) 

keys = (with_map 
    .select(explode("kvs")) 
    .select("key") 
    .distinct() 
    .rdd.flatMap(lambda x: x) 
    .collect()) 

with_map.select(*["*"] + [col("kvs").getItem(k).alias(k) for k in keys]) 

對於舊版本:

from pyspark.sql.types import * 

def to_map_(s): 
    if s: 
     kvs = [x.split("=") for x in s.split(";")] 
     return {kv[0]: kv[1] for kv in kvs if len(kvs) == 2} 

to_map = udf(to_map_, MapType(StringType(), StringType()))