2016-02-28 80 views
1

我寫了下面MyPythonGateway.java,這樣我可以從Python中叫我的定製Java類:pyspark:從pyspark調用一個自定義的java函數。我需要Java_Gateway嗎?

public class MyPythonGateway { 

    public String findMyNum(String input) { 
     return MyUtiltity.parse(input).getMyNum(); 
    } 

    public static void main(String[] args) { 
     GatewayServer server = new GatewayServer(new MyPythonGateway()); 
     server.start(); 
    } 
} 

,這裏是我如何在我的Python代碼使用它:

def main(): 

    gateway = JavaGateway()     # connect to the JVM 
    myObj = gateway.entry_point.findMyNum("1234 GOOD DAY") 
    print(myObj) 


if __name__ == '__main__': 
    main() 

現在我想使用PySpark中的MyPythonGateway.findMyNum()函數,而不僅僅是一個獨立的python腳本。我做了以下內容:

myNum = sparkcontext._jvm.myPackage.MyPythonGateway.findMyNum("1234 GOOD DAY") 
print(myNum) 

但是,我得到了以下錯誤:

... line 43, in main: 
myNum = sparkcontext._jvm.myPackage.MyPythonGateway.findMyNum("1234 GOOD DAY") 
    File "/home/edamameQ/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 726, in __getattr__ 
py4j.protocol.Py4JError: Trying to call a package. 

所以我怎麼錯過這裏?我不知道是否應該在使用pyspark時運行單獨的JavaApplication of MyPythonGateway來啓動網關服務器。請指教。謝謝!


下面是正是我需要的:

input.map(f) 

def f(row): 
    // call MyUtility.java 
    // x = MyUtility.parse(row).getMyNum() 
    // return x 

什麼是接近這一目標的最佳方式是什麼?謝謝!

回答

2

首先,您看到的錯誤通常意味着您嘗試使用的類無法訪問。所以很可能這是一個CLASSPATH問題。

關於總體思路有兩個重要的問題:

  • 你不能訪問SparkContext所以使用PySpark網關將無法正常工作的行爲或改造內(見How to use Java/Scala function from an action or a transformation?的一些細節))。如果你想從工人使用Py4J,你必須在每個工人機器上啓動一個單獨的網關。
  • 你真的不想在Python和JVM之間傳遞數據。 Py4J不適用於數據密集型任務。
+0

謝謝!基本上,MyUtitlity.java有點複雜,我們真的不想在python中重新編寫它。有沒有辦法從pyspark工作中調用MyUtility.java?如果還有其他選擇,我們不一定需要使用Py4J ... – Edamame

+0

嗯,很多取決於您的體系結構和代碼。可能最簡單和相對高效的解決方案是將數據「管道化」到Java代碼並讀取輸出。或者,你可以通過磁盤傳遞數據(這基本上是PySpark驅動程序用來處理事物的方式,儘管我認爲它不再是這種情況,或者也許是這樣)。最複雜的解決方案是具有處理請求的持久性(或臨時性,例如執行程序的生命週期期間)Java進程。 – zero323

+0

如何在駕駛員和工人身上正確地註冊罐子?然後讓Python包裝器能夠在驅動程序上正確調用jar包? –