2016-09-27 115 views
0

我有下面的代碼:每個my_id,我想基礎上,timestampamount場排序:pyspark:排序中reduceByKey錯誤:<lambda>類型錯誤:「詮釋」對象不是可調用

output_rdd = my_df.rdd.map(lambda r: (r['my_id'], [r['timestamp'],[r['amount']]]))\ 
         .reduceByKey(lambda a, b: sorted(a+b, key=(a+b)[0]))\ 
         .map(lambda r: r[1]) 

不過,我得到了以下錯誤:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 30.0 failed 4 times, most recent failure: Lost task 0.3 in stage 30.0 (TID 52, ph-hdp-prd-dn02): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/data/0/yarn/nm/usercache/phanalytics-test/appcache/application_1474532589728_2983/container_e203_1474532589728_2983_01_000014/pyspark.zip/pyspark/worker.py", line 172, in main 
    process() 
    File "/data/0/yarn/nm/usercache/analytics-test/appcache/application_1474532589728_2983/container_e203_1474532589728_2983_01_000014/pyspark.zip/pyspark/worker.py", line 167, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/local/spark-latest/python/pyspark/rdd.py", line 2371, in pipeline_func 
    File "/usr/local/spark-latest/python/pyspark/rdd.py", line 2371, in pipeline_func 
    File "/usr/local/spark-latest/python/pyspark/rdd.py", line 317, in func 
    File "/usr/local/spark-latest/python/pyspark/rdd.py", line 1792, in combineLocally 
    File "/data/0/yarn/nm/usercache/phanalytics-test/appcache/application_1474532589728_2983/container_e203_1474532589728_2983_01_000014/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues 
    d[k] = comb(d[k], v) if k in d else creator(v) 
    File "<ipython-input-11-ec09929e01e4>", line 6, in <lambda> 
TypeError: 'int' object is not callable 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:390) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

任何想法,我錯過了什麼?非常感謝你!

回答

1

注意以下用Python documentation -

的關鍵參數的值應該是一個函數,它接受一個參數,並返回一個鍵用於排序的目的。這種技術速度很快,因爲每個輸入記錄只需要調用一次關鍵函數。

將傳遞給key的參數轉換爲python函數或lambda函數,然後重試。

1

key應該是一個函數。嘗試

...  .reduceByKey(lambda a, b: sorted(a+b, key=lambda x: x[0])) \ 
相關問題