我正在使用斯卡拉在Spark中完成ETL過程。 原始日誌文件包含兩列名稱和年齡。我的ETL進程讀取並驗證原始日誌並生成兩個列,即標誌和驗證消息。 標誌列指定行是否有效不是(如果行有效= 1 else 0) 驗證列指定行爲無效的原因。斯卡拉Spark中的ETL過程
Ex。 原始日誌文件
RAM,35
SAM,45
JAM,ww
這裏最後一行是無效的,所以我最終的輸出是
RAM,35,1,""
SAM,45,1,""
JAM,ww,0,"invalid age"
我的Scala代碼是
import sqlContext._
val people = sc.textFile("hdfs://..../rawT.csv").map(_.split(","))
val base_people = people.map{r => val formatted_people = if(r(1).matches("^\\d*$")) ("1","") else ("0","Invalid Age"); (r(0), r(1), formatted_people)}
但是在這個代碼RDD是越來越爲 產生base_people: org.apache.spark.rdd.RDD[(String, String, (String, String))]
我想要RDD無字符串陣列, base_people: org.apache.spark.rdd.RDD[(String, String, String, String)]
我的邏輯是正確的還是我需要關注一些其他代碼的東西。
感謝您的幫助。這是很好的工作。 –