2016-03-28 76 views
0

我想以下面描述的方式(使用PySpark)在地圖(火花)中使用查找,並得到一個錯誤。火花:在地圖中使用查找

這是Spark不允許做的事嗎?

>>> rdd1 = sc.parallelize([(1,'a'),(2,'b'),(3,'c'),(4,'d')]).sortByKey() 
>>> rdd2 = sc.parallelize([2,4]) 
>>> rdd = rdd2.map(lambda x: (x, rdd1.lookup(x))) 
>>> rdd.collect() 

這樣做的原因在於,在實際的問題,我的工作,RDD1集是巨大的。所以像使用collectAsMap這樣的方法將其轉換爲字典的解決方案無效。

兩個RDD1集和RDD2都非常大,因此將它們連接也是極其緩慢

感謝

錯誤:

16/03/28 05:02:28 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/03/28 05:02:28 INFO DAGScheduler: Stage 1 (sortByKey at <stdin>:1) finished in 0.148 s 
16/03/28 05:02:28 INFO DAGScheduler: Job 1 finished: sortByKey at <stdin>:1, took 0.189587 s 
>>> rdd2 = sc.parallelize([2,4]) 
>>> rdd = rdd2.map(lambda x: (x, rdd1.lookup(x))) 
>>> rdd.collect() 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py", line 676, in collect 
    bytesInJava = self._jrdd.collect().iterator() 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py", line 2107, in _jrdd 
    pickled_command = ser.dumps(command) 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/serializers.py", line 402, in dumps 
    return cloudpickle.dumps(obj, 2) 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 816, in dumps 
    cp.dump(obj) 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 133, in dump 
    return pickle.Pickler.dump(self, obj) 
    File "/usr/lib64/python2.6/pickle.py", line 224, in dump 
    self.save(obj) 
    File "/usr/lib64/python2.6/pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/usr/lib64/python2.6/pickle.py", line 562, in save_tuple 
    save(element) 
    File "/usr/lib64/python2.6/pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 254, in save_function 
    self.save_function_tuple(obj, [themodule]) 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 304, in save_function_tuple 
    save((code, closure, base_globals)) 
    File "/usr/lib64/python2.6/pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/usr/lib64/python2.6/pickle.py", line 548, in save_tuple 
    save(element) 
    File "/usr/lib64/python2.6/pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/usr/lib64/python2.6/pickle.py", line 600, in save_list 
    self._batch_appends(iter(obj)) 
    File "/usr/lib64/python2.6/pickle.py", line 636, in _batch_appends 
    save(tmp[0]) 
    File "/usr/lib64/python2.6/pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 249, in save_function 
    self.save_function_tuple(obj, modList) 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 309, in save_function_tuple 
    save(f_globals) 
    File "/usr/lib64/python2.6/pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict 
    pickle.Pickler.save_dict(self, obj) 
    File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict 
    self._batch_setitems(obj.iteritems()) 
    File "/usr/lib64/python2.6/pickle.py", line 686, in _batch_setitems 
    save(v) 
    File "/usr/lib64/python2.6/pickle.py", line 331, in save 
    self.save_reduce(obj=obj, *rv) 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 650, in save_reduce 
    save(state) 
    File "/usr/lib64/python2.6/pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict 
    pickle.Pickler.save_dict(self, obj) 
    File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict 
    self._batch_setitems(obj.iteritems()) 
    File "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems 
    save(v) 
    File "/usr/lib64/python2.6/pickle.py", line 306, in save 
    rv = reduce(self.proto) 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 304, in get_return_value 
py4j.protocol.Py4JError: An error occurred while calling o51.__getnewargs__. Trace: 
py4j.Py4JException: Method __getnewargs__([]) does not exist 
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) 
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342) 
    at py4j.Gateway.invoke(Gateway.java:252) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:745) 


>>> 

回答

1

Is this something that Spark just does not allow doing?

是的,有。 Spark不支持嵌套操作或轉換。由於您已經覆蓋了join和幾乎局部變量,所以剩下的唯一選擇是使用外部系統(例如數據庫)進行查找。

0

RDD不能RDD.Such的過程中使用:
我們有兩個RDD:rdd1rdd2
你可以這樣做:rdd1.map(......)
但你不能做到這一點:rdd1.map(.....rdd2....)
所以,儘量工會/加入這些時,你該怎麼做一些複雜的動作。