2017-07-14 53 views
0

我有一個RDD的字符串。每行對應各種日誌。映射函數寫在全局火花rdd

我有一個單一函數中的多個正則表達式匹配RDD的行以應用適應的正則表達式。

我想在RDD上映射這個獨特的函數,因此它可以快速處理每一行,並將每行處理存儲在另一個全局rdd中。

問題是,因爲我希望這項任務能夠並行化,所以我的全局RDD必須可以同時訪問以添加每條處理過的行。

我想知道是否有其他方式來做到這一點或任何事情!我期待着提高我的火花技能。

例如,這就是我想做的事:

我有這樣一個txt:

錯誤:Hahhaha PARAM_ERROR = 8 param_err2 = HTTPS

警告:HUHUHUHUH param_warn = tchu param_warn2 = wifi

我的正則表達式函數會將包含「ERROR」的行與數組匹配,例如Array("Error","8","https")

而另一個正則表達式函數將匹配包含「警告」與陣列例如Array("Warning","tchu","wifi")

行最後,我想獲得用於處理每一個線條RDD[Array[String]]

如何讓它與Spark並行?

+0

「我有一個單一功能的多個正則表達式匹配/情況下,RDD的線條應用適應正則表達式」 - 可以你編輯你的文章以包含這個函數的_signature_? –

回答

2

首先,理解在Spark中沒有什麼像「全局RDD」,也沒有理由需要類似的東西。在使用Spark時,您應該考慮根據轉換成另一個RDD,而不是根據更新 RDD(這是不可能的 - RDD是不可變的)。每個這樣的轉換將由Spark分佈式執行(並行)。

在這種情況下,如果我正確理解你的要求,你會想map每個記錄到以下結果之一:

  • Array[String],其中第一項是"ERROR",或:
  • 一個Array[String]其中第一項是"WARNING",或:
  • 如果沒有模式匹配的記錄,刪除

要做到這一點,你可以使用RDDmap(f)collect(f)方法:

// Sample data: 
val rdd = sc.parallelize(Seq(
    "ERROR : Hahhaha param_error=8 param_err2=https", 
    "WARNING : HUHUHUHUH param_warn=tchu param_warn2=wifi", 
    "Garbage - not matching anything" 
)) 

// First we can split in " : " to easily identify ERROR vs. WARNING 
val splitPrefix = rdd.map(line => line.split(" : ")) 

// Implement these parsing functions as you see fit; 
// The input would be the part following the " : ", 
// and the output should be a list of the values (not including the ERROR/WARNING) 
def parseError(v: String): List[String] = ??? // example input: "Hahhaha param_error=8 param_err2=https" 
def parseWarning(v: String): List[String] = ??? // example input: "HUHUHUHUH param_warn=tchu param_warn2=wifi" 

// Now we can use these functions in a pattern-matching function passed to RDD.collect, 
// which will transform each value that matches one of the cases, and will filter out 
// values that don't match anything 
val result: RDD[List[String]] = splitPrefix.collect { 
    case Array(l @ "ERROR", v) => l :: parseError(v) 
    case Array(l @ "WARNING", v) => l :: parseWarning(v) 
    // NOT adding a default case, so records that didn't match will be removed 
}  

// If you really want Array[String] and not List[String]:  
val arraysRdd: RDD[Array[String]] = result.map(_.toArray) 
+0

哦,是的該死的,這正是我想要的!非常感謝,我不知道我們可以使用像這樣的收集功能。每天都在Spark上進步,謝天謝地! :p – tricky