2016-11-10 63 views
0

有一些過濾器的功能是這樣的:如何在不同的pyspark腳本中共享相同的廣播變量包含的過濾器函數?

def filter1(x): 
    if broadcast_variable1.value[x] > 1: 
     return False 
    return True 

def filter2(x): 
    if broadcast_variable2.value[x] < 1: 
     error_accumulator_variable.add(1) 
     return False 
    return True 

這些功能都包含在我的主要pyspark腳本。現在我想將它們分成一個模塊文件以便於維護(我有兩個用於不同用途的pyspark腳本,但它們具有相同的過濾器),然後將其餘文件保存在不同的文件中。

如何在不同的pyspark腳本中共享這些類似的過濾功能?

感謝您的慷慨幫助

回答

0

用它作爲論據功能:

def filter1(x, bv): 
    ... 

def filter2(x, bv): 
    ... 

broadcast_variable1 = ... 

someRDD.filter(lambda x: filter1(x, broadcast_variable1)) 
1

我的基本目的是爲了使程序更易讀。所以我找到了一個簡單的方法來實現這一點。

def x1(x,broadcast_variable1): 
    if broadcast_variable1.value[x] > 1: 
    return False 

def filter1(x): 
    return x1(x,broadcast_variable1) 

,這也可以在鑽營的方式(下面的代碼是不是在Python)rewriten:

def x1(broadcast_variable)(x) = 
    if broadcast_variable.value[x] > 1: 
    return False 
def filter1(x) = x1(broadcast_variable1) 
現在

,我可以寫火花像這樣的代碼:

RDD.filter(filter1).map(map1)....