我有下面的代碼:每個my_id
,我想基礎上,timestamp
場amount
場排序:pyspark:排序中reduceByKey錯誤:<lambda>類型錯誤:「詮釋」對象不是可調用
output_rdd = my_df.rdd.map(lambda r: (r['my_id'], [r['timestamp'],[r['amount']]]))\
.reduceByKey(lambda a, b: sorted(a+b, key=(a+b)[0]))\
.map(lambda r: r[1])
不過,我得到了以下錯誤:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 30.0 failed 4 times, most recent failure: Lost task 0.3 in stage 30.0 (TID 52, ph-hdp-prd-dn02): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/data/0/yarn/nm/usercache/phanalytics-test/appcache/application_1474532589728_2983/container_e203_1474532589728_2983_01_000014/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/data/0/yarn/nm/usercache/analytics-test/appcache/application_1474532589728_2983/container_e203_1474532589728_2983_01_000014/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark-latest/python/pyspark/rdd.py", line 2371, in pipeline_func
File "/usr/local/spark-latest/python/pyspark/rdd.py", line 2371, in pipeline_func
File "/usr/local/spark-latest/python/pyspark/rdd.py", line 317, in func
File "/usr/local/spark-latest/python/pyspark/rdd.py", line 1792, in combineLocally
File "/data/0/yarn/nm/usercache/phanalytics-test/appcache/application_1474532589728_2983/container_e203_1474532589728_2983_01_000014/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues
d[k] = comb(d[k], v) if k in d else creator(v)
File "<ipython-input-11-ec09929e01e4>", line 6, in <lambda>
TypeError: 'int' object is not callable
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:390)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
任何想法,我錯過了什麼?非常感謝你!