2017-02-13 42 views
1

假設,我有在後續的結構線JSON文件中現有的嵌套列的值:PySpark - 添加新的嵌套列或更改

{ 
"a": 1, 
"b": { 
     "bb1": 1, 
     "bb2": 2 
     } 
} 

我想改變的關鍵bb1值或添加新密鑰,如:bb3。 目前,我使用spark.read.json將json文件加載到spark中,作爲DataFrame和df.rdd.map將RDD的每一行映射到dict。然後,更改嵌套鍵值或添加嵌套鍵並將該詞典轉換爲行。最後,將RDD轉換爲DataFrame。 工作流程的工作原理如下:

def map_func(row): 
    dictionary = row.asDict(True) 
    adding new key or changing key value 
    return as_row(dictionary) # as_row convert dict to row recursively 

df = spark.read.json("json_file") 
df.rdd.map(map_func).toDF().write.json("new_json_file") 

這會爲我工作。但我擔心轉換DataFrame - > RDD(行 - >字典 - >行) - > DataFrame會殺死效率。 有沒有其他方法可以滿足這種需求,但不以犧牲效率爲代價?


我使用的最終解決方案是使用withColumn並動態構建b的模式。 首先,我們可以從DF模式得到b_schema

b_schema = next(field['type'] for field in df.schema.jsonValue()['fields'] if field['name'] == 'b') 

之後,b_schema是字典,我們可以通過新的字段添加到其中:

b_schema['fields'].append({"metadata":{},"type":"string","name":"bb3","nullable":True}) 

然後,我們可以把它轉換到StructType由:

new_b = StructType.fromJson(b_schema) 

在map_func,我們可以轉換行與dict和填充新的領域:

def map_func(row): 
    data = row.asDict(True) 
    data['bb3'] = data['bb1'] + data['bb2'] 
    return data 

map_udf = udf(map_func, new_b) 
df.withColumn('b', map_udf('b')).collect() 

感謝@Mariusz

回答

3

您可以使用map_func作爲UDF,因此省去轉換DF - > RDD - > DF,仍然有蟒蛇的靈活性,以實現業務邏輯。所有你需要的是創建模式對象:

>>> from pyspark.sql.types import * 
>>> new_b = StructType([StructField('bb1', LongType()), StructField('bb2', LongType()), StructField('bb3', LongType())]) 

然後定義map_func和UDF:

>>> from pyspark.sql.functions import * 
>>> def map_func(data): 
...  return {'bb1': 4, 'bb2': 5, 'bb3': 6} 
... 
>>> map_udf = udf(map_func, new_b) 

這個UDF最後應用到數據幀:

>>> df = spark.read.json('sample.json') 
>>> df.withColumn('b', map_udf('b')).first() 
Row(a=1, b=Row(bb1=4, bb2=5, bb3=6)) 

編輯

根據評論:你可以以更簡單的方式將字段添加到現有StructType,例如:

>>> df = spark.read.json('sample.json') 
>>> new_b = df.schema['b'].dataType.add(StructField('bb3', LongType())) 
+0

似乎,map_func會得到一個Row。我如何修改行b。我想將bb1和bb2的總和設置爲新列的值bb3 – ryan

+0

列'b'將由'withColumn('b',something_here)'修改(替換)。在'map_func'裏面你可以訪問'b'的所有變量,例如求和bb1和bb2 udf可以看作如下:'map_udf = udf(lambda數據:{'bb1':data.bb1,'bb2':data。 bb2,'bb3':data.bb1 + data.bb2},new_b)' – Mariusz

+0

謝謝!但是,我們不能硬編碼模式new_b和dict對象,因爲當數據結構發生變化時,我們需要更改代碼?我們可以從列「b」推斷出模式並動態構建字典對象嗎? – ryan