2015-05-19 76 views
5

我試過幾個不同的場景來嘗試使用Spark的1.3 DataFrames來處理像sciPy kurtosis或numpy std這樣的事情。以下是示例代碼,但它只是掛在10x10數據集(10行,10列)上。我已經試過:pySpark DataFrames與SciPy的聚合函數

print df.groupBy().agg(kurtosis(df.offer_id)).collect() 

print df.agg(kurtosis(df.offer_ID)).collect() 

但這個工程沒有問題:

print df.agg(F.min(df.offer_id), F.min(df.decision_id)).collect() 

我的猜測是因爲F是:from pyspark.sql import functions as F是編程在一個SQL函數我怎麼會用dataframes做事喜歡峯度。在數據集上?

這也只是掛:

print df.map(kurtosis(df.offer_id)).collect() 

回答

2

可悲的是星火SQL對Python的UDF的當前UDF支持是有點欠缺。我一直在試圖在Scala中添加一些UDF,並讓它們可以從Python中調用,因爲我正在開發一個項目,所以我使用kurtosis作​​爲UDAF實現的一個快速概念證明。該分公司目前住在https://github.com/holdenk/sparklingpandas/tree/add-kurtosis-support

的第一步是定義在斯卡拉我們UDAF - 這可能是不太理想的,但這裏是一個實現:

object functions { 
    def kurtosis(e: Column): Column = new Column(Kurtosis(EvilSqlTools.getExpr(e))) 
} 

case class Kurtosis(child: Expression) extends AggregateExpression { 
    def this() = this(null) 

    override def children = child :: Nil 
    override def nullable: Boolean = true 
    override def dataType: DataType = DoubleType 
    override def toString: String = s"Kurtosis($child)" 
    override def newInstance() = new KurtosisFunction(child, this) 
} 

case class KurtosisFunction(child: Expression, base: AggregateExpression) extends AggregateFunction { 
    def this() = this(null, null) 

    var data = scala.collection.mutable.ArrayBuffer.empty[Any] 
    override def update(input: Row): Unit = { 
    data += child.eval(input) 
    } 

    // This function seems shaaady 
    // TODO: Do something more reasonable 
    private def toDouble(x: Any): Double = { 
    x match { 
     case x: NumericType => EvilSqlTools.toDouble(x.asInstanceOf[NumericType]) 
     case x: Long => x.toDouble 
     case x: Int => x.toDouble 
     case x: Double => x 
    } 
    } 
    override def eval(input: Row): Any = { 
    if (data.isEmpty) { 
     println("No data???") 
     null 
    } else { 
     val inputAsDoubles = data.toList.map(toDouble) 
     println("computing on input "+inputAsDoubles) 
     val inputArray = inputAsDoubles.toArray 
     val apacheKurtosis = new ApacheKurtosis() 
     val result = apacheKurtosis.evaluate(inputArray, 0, inputArray.size) 
     println("result "+result) 
     Cast(Literal(result), DoubleType).eval(null) 
    } 
    } 
} 

然後,我們可以使用類似的邏輯,在星火SQL的使用functions.py實現:

"""Our magic extend functions. Here lies dragons and a sleepy holden.""" 
from py4j.java_collections import ListConverter 

from pyspark import SparkContext 
from pyspark.sql.dataframe import Column, _to_java_column 

__all__ = [] 
def _create_function(name, doc=""): 
    """ Create a function for aggregator by name""" 
    def _(col): 
     sc = SparkContext._active_spark_context 
     jc = getattr(sc._jvm.com.sparklingpandas.functions, name)(col._jc if isinstance(col, Column) else col) 
     return Column(jc) 
    _.__name__ = name 
    _.__doc__ = doc 
    return _ 

_functions = { 
    'kurtosis': 'Calculate the kurtosis, maybe!', 
} 


for _name, _doc in _functions.items(): 
    globals()[_name] = _create_function(_name, _doc) 
del _name, _doc 
__all__ += _functions.keys() 
__all__.sort() 

,然後我們可以繼續前進,把它作爲一個UDAF像這樣:

from sparklingpandas.custom_functions import * 
import random 
input = range(1,6) + range(1,6) + range(1,6) + range(1,6) + range(1,6) + range(1,6) 
df1 = sqlContext.createDataFrame(sc.parallelize(input)\ 
            .map(lambda i: Row(single=i, rand= random.randint(0,100000)))) 
df1.collect() 
import pyspark.sql.functions as F 
x = df1.groupBy(df1.single).agg(F.min(df1.rand)) 
x.collect() 
j = df1.groupBy(df1.single).agg(kurtosis(df1.rand)) 
j.collect() 
+0

我不認爲UDF解決方案工作原因,當我做以下事情:kert = udf(lambda x:kurtosis(x),FloatType())print df.select(kert(df.offer_id))。collect )不起作用,因爲它分別傳遞每個值。你不能用它做一個.agg,所以我想用另一種方式來思考。 – theMadKing

+1

這的確如此,我實際上將Sparkling Pandas作爲一個側面項目工作,並對這種感興趣的項目感興趣,所以我開始了一些工作來實現對此的支持。我會更新我的答案以獲得詳細信息。 – Holden

+0

更新(它的很多代碼主要是因爲我們需要在Scala方面+ Python方面做一些事情)。 – Holden