2017-11-11 109 views
0

嘗試在UDF中逐個發送列的列表,但使用for循環但出現錯誤,即數據框未找到col_name。目前在列表list_col我們有兩列,但它可以改變。所以我想寫一個代碼,它適用於列的每一個列表。在這段代碼中,我一次連接一列的行,行的值是結構格式即列表中的列表。對於每一個空,我必須給空間。Pyspark:在UDF中通過動態列

list_col=['pcxreport','crosslinediscount'] 
    def struct_generater12(row): 
    list3 = [] 
    main_str = '' 
    if(row is None): 
     list3.append(' ') 
    else: 
     for i in row: 
      temp = '' 
      if(i is None): 
       temp+= ' ' 
      else: 
       for j in i: 
        if (j is None): 
         temp+= ' ' 
        else: 
         temp+= str(j) 
      list3.append(temp) 
    for k in list3: 
     main_str +=k 
    return main_str 


    A = udf(struct_generater12,returnType=StringType()) 
    # z = addlinterestdetail_FDF1.withColumn("Concated_pcxreport",A(addlinterestdetail_FDF1.pcxreport)) 
    for i in range(0,len(list_col)-1): 
     struct_col='Concate_' 
     struct_col+=list_col[i] 
     col_name=list_col[i] 
     z = addlinterestdetail_FDF1.withColumn(struct_col,A(addlinterestdetail_FDF1.col_name)) 
     struct_col='' 

    z.show() 

回答

1

addlinterestdetail_FDF1.col_name意味着列被命名爲"col_name",你沒有訪問包含在可變col_name的字符串。

當調用在列UDF,可以

  • 直接使用其字符串名稱:A(col_name)
  • 或使用pyspark SQL函數col

    import pyspark.sql.functions as psf 
    z = addlinterestdetail_FDF1.withColumn(struct_col,A(psf.col(col_name))) 
    

你應該考慮使用pyspark sql函數進行連接,而不是編寫UDF。首先,讓我們創建一個嵌套結構的樣本數據幀:

import json 
j = {'pcxreport':{'a': 'a', 'b': 'b'}, 'crosslinediscount':{'c': 'c', 'd': None, 'e': 'e'}} 
jsonRDD = sc.parallelize([json.dumps(j)]) 
df = spark.read.json(jsonRDD) 
df.printSchema() 
df.show() 

    root 
    |-- crosslinediscount: struct (nullable = true) 
    | |-- c: string (nullable = true) 
    | |-- d: string (nullable = true) 
    | |-- e: string (nullable = true) 
    |-- pcxreport: struct (nullable = true) 
    | |-- a: string (nullable = true) 
    | |-- b: string (nullable = true) 

    +-----------------+---------+ 
    |crosslinediscount|pcxreport| 
    +-----------------+---------+ 
    |  [c,null,e]| [a,b]| 
    +-----------------+---------+ 

我們將寫有嵌套列名的字典:

list_col=['pcxreport','crosslinediscount'] 
list_subcols = dict() 
for c in list_col: 
    list_subcols[c] = df.select(c+'.*').columns 

現在我們可以「平坦」 StructType,與' '取代None,和連結:

import itertools 
import pyspark.sql.functions as psf 
df.select([c + '.*' for c in list_col])\ 
    .na.fill({c:' ' for c in list(itertools.chain.from_iterable(list_subcols.values()))})\ 
    .select([psf.concat(*sc).alias(c) for c, sc in list_subcols.items()])\ 
    .show() 

    +---------+-----------------+ 
    |pcxreport|crosslinediscount| 
    +---------+-----------------+ 
    |  ab|    c e| 
    +---------+-----------------+ 
+0

感謝隊友其爲我工作 –

+0

@RahulKumarSingh也許你應該考慮[接受答案](https://stackoverflow.com/help/someone-answers)。 – Prem

+0

在列表中我有很多數據幀我應該如何合併一個數據幀中的所有數據幀。名單的長度不固定...................謝謝先進 –