2017-02-16 38 views
0

我想在一個pyspark數據框中使用我寫的類和udf處理URL的。我知道urllib和其他URL解析庫,但對於這種情況下,我需要使用我自己的代碼。使用python類與火花DataFrame解析URL的

爲了得到一個URL的tld我交叉檢查它對iana公共後綴列表。

這裏是我的代碼的簡化

class Parser: 

    # list of available public suffixes for extracting top level domains 
    file = open("public_suffix_list.txt", 'r') 

    data = [] 
    for line in file: 
     if line.startswith("//") or line == '\n': 
      pass 
     else: 
      data.append(line.strip('\n')) 

    def __init__(self, url): 
     self.url = url 

     #the code here extracts port,protocol,query etc. 

     #I think this bit below is causing the error 
     matches = [r for r in self.data if r in self.hostname] 

     #extra functionality in my actual class 

     i = matches.index(self.string) 

     try: 
      self.tld = matches[i] 

     # logic to find tld if no match 

類純Python工程,以便例如我可以運行

import Parser 

x = Parser("www.google.com") 
x.tld #returns ".com" 

然而,當我嘗試做

import Parser 
from pyspark.sql.functions import udf 

parse = udf(lambda x: Parser(x).url) 

df = sqlContext.table("tablename").select(parse("column")) 

當我打電話給我得到的行動

File "<stdin>", line 3, in <lambda> 
    File "<stdin>", line 27, in __init__ 
TypeError: 'in <string>' requires string as left operand 

所以我的猜測是它沒有將數據解釋爲字符串列表?

我還試圖用

file = sc.textFile("my_file.txt")\ 
     .filter(lambda x: not x.startswith("//") or != "")\ 
     .collect() 

data = sc.broadcast(file) 

打開我的文件,而不是,但導致

例外:看來,你正試圖從廣播變量,動作或引用SparkContext transforamtion。 SparkContext只能在驅動程序上使用,而不能在其上運行的代碼中使用。有關更多信息,請參閱SPARK-5063。

任何想法?

在此先感謝

編輯:抱歉,我沒有我的碼放在手邊,以我的測試代碼沒有很好地解釋我有問題。我最初報告的錯誤是我使用的測試數據的結果。

我已經更新了我的問題,以更加反思我面臨的挑戰。

回答

0

爲什麼在這種情況下你需要一個類(定義你的類的代碼是不正確的,你在init方法中使用它之前從未聲明過self.data)影響你想要的輸出的唯一相關行是self.string=string,所以你基本上是以udf身份傳遞身份函數。

UnicodeDecodeError是由於您的文件中的編碼問題,它與您的類的定義無關。

第二個錯誤是在該行sc.broadcast(file),其中的細節可以在這裏找到:Spark: Broadcast variables: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion

編輯1

我將重新定義你的類結構如下。您基本上需要通過調用self.data = data來創建實例self.data,然後才能使用它。另外,在init方法之前寫入的任何內容都將被執行,而不管您是否調用該類。所以移出文件解析部分不會有任何影響。

# list of available public suffixes for extracting top level domains 
file = open("public_suffix_list.txt", 'r') 

data = [] 
for line in file: 
    if line.startswith("//") or line == '\n': 
     pass 
    else: 
     data.append(line.strip('\n')) 

class Parser: 
    def __init__(self, url): 
     self.url = url 
     self.data = data 

     #the code here extracts port,protocol,query etc. 

     #I think this bit below is causing the error 
     matches = [r for r in self.data if r in self.hostname] 

     #extra functionality in my actual class 

     i = matches.index(self.string) 

     try: 
      self.tld = matches[i] 

     # logic to find tld if no match 
+0

我已經添加了一些澄清到我原來的問題。 我正在使用類,因爲我正在解析一個url,我認爲將url作爲一個對象並將url的功能作爲實例變量來處理是有意義的。 – johnaphun

+0

檢查編輯是否適用於您。 –

+0

沒有,仍然產生相同的錯誤。通過使用sc.textFile讀取我的數據然後播放,我在Zeppelin確實有一些運氣。 但是,當提交我的應用程序時,導入我的類時遇到了麻煩,因爲我無法導入已經定義了一個spark的上下文。 @拉夫 - 居所 – johnaphun