2015-11-20 55 views
3

我有數據幀66列的處理(需要幾乎每個列的值被改變某種方式),所以我運行下面的語句星火斯卡拉2.10元組限制

val result = data.map(row=> (
     modify(row.getString(row.fieldIndex("XX"))), 
     (...) 
     ) 
    ) 

,直到第66列。 由於此版本的scala限制爲22對的最大元組,因此無法執行此類操作。 問題是,有沒有解決方法? 所有行操作後,我將它與具體列名的df,

result.toDf("c1",...,"c66") 
    result.storeAsTempTable("someFancyResult") 

「修改」功能僅僅是一個例子來說明我的觀點

+1

switch to scala 2.11? – Odomontois

+0

我想這可能是很簡單,但它不是 – Silverrose

+0

@Odomontois AFAIK斯卡拉2.11不支持基數> 22,即元組沒有Tuple23的情況。 – moem

回答

5

如果你要做的就是從現有的DataFrame修改值最好是使用UDF代替在RDD上映射:

import org.apache.spark.sql.functions.udf 

val modifyUdf = udf(modify) 
data.withColumn("c1", modifyUdf($"c1")) 

如果由於某種原因,上述方法不適合你的需求,你可以做最簡單的事情是從RDD[Row]重新DataFrame。例如像這樣:

import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.{StructField, StructType, IntegerType} 


val result: RDD[Row] = data.map(row => { 
    val buffer = ArrayBuffer.empty[Any] 

    // Add value to buffer 
    buffer.append(modify(row.getAs[String]("c1"))) 

    // ... repeat for other values 

    // Build row 
    Row.fromSeq(buffer) 
}) 

// Create schema 
val schema = StructType(Seq(
    StructField("c1", StringType, false), 
    // ... 
    StructField("c66", StringType, false) 
)) 

sqlContext.createDataFrame(result, schema) 
+0

真正的問題,如果他已經有一個數據幀,這是簡單的路線 –

+0

@Ewan看起來是這樣。這不僅是最簡單的方法,而且更有效率。 – zero323

+0

非常感謝!這是救我:) – Silverrose

1

它周圍是相當繁瑣的方式,但它確實工作中,嘗試此示例代碼,讓你開始,你可以看到有超過22列被訪問:

object SimpleApp { 
    class Record(val x1: String, val x2: String, val x3: String, ... val x24:String) extends Product with Serializable { 
    def canEqual(that: Any) = that.isInstanceOf[Record] 

    def productArity = 24 

    def productElement(n: Int) = n match { 
     case 0 => x1 
     case 1 => x2 
     case 2 => x3 
     ... 
     case 23 => x24 
    } 
    } 

    def main(args: Array[String]) { 

    val conf = new SparkConf().setAppName("Product Test") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc); 

    val record = new Record("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x") 

    import sqlContext._ 
    sc.parallelize(record :: Nil).registerAsTable("records") 

    sql("SELECT x1 FROM records").collect() 
    } 
}