2015-04-20 34 views
6

我有一個數據集,我想提取那些(審查/文本)具有(審查/時間)的x和y之間,例如(1183334400 <時間< 1185926400),RDD濾波器階火花

這裏是

product/productId: B000278ADA 
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large 
product/price: 46.34 
review/userId: A17KXW1PCUAIIN 
review/profileName: Mark Anthony "Mark" 
review/helpfulness: 4/4 
review/score: 5.0 
review/time: 1174435200 
review/summary: Jobst UltraSheer Knee High Stockings 
review/text: Does a very good job of relieving fatigue. 

product/productId: B000278ADB 
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large 
product/price: 46.34 
review/userId: A9Q3932GX4FX8 
review/profileName: Trina Wehle 
review/helpfulness: 1/1 
review/score: 3.0 
review/time: 1352505600 
review/summary: Delivery was very long wait..... 
review/text: It took almost 3 weeks to recieve the two pairs of stockings . 

product/productId: B000278ADB 
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large 
product/price: 46.34 
review/userId: AUIZ1GNBTG5OB 
review/profileName: dgodoy 
review/helpfulness: 1/1 
review/score: 2.0 
review/time: 1287014400 
review/summary: sizes recomended in the size chart are not real 
review/text: sizes are much smaller than what is recomended in the chart. I tried to put it and sheer it!. 

我的火花Scala代碼:我的數據樣本

import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.io.{LongWritable, Text} 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat 
import org.apache.spark.{SparkConf, SparkContext} 

object test1 { 
    def main(args: Array[String]): Unit = { 
    val conf1 = new SparkConf().setAppName("golabi1").setMaster("local") 
    val sc = new SparkContext(conf1) 
    val conf: Configuration = new Configuration 
    conf.set("textinputformat.record.delimiter", "product/title:") 
    val input1=sc.newAPIHadoopFile("data/Electronics.txt",  classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf) 
    val lines = input1.map { text => text._2} 
    val filt = lines.filter(text=>(text.toString.contains(tt => tt in (startdate until enddate)))) 
    filt.saveAsTextFile("data/filter1") 
    } 
} 

,但我的代碼不能正常工作,

如何過濾這些行?

+1

我在輸入文件中看不到分隔字符串「product/productId:」。 – ipoteka

+1

你期望輸出什麼,你面臨什麼問題? – maasg

回答

10

比這簡單得多。試試這個:

object test1 
{ 
    def main(args: Array[String]): Unit = 
    { 
    val conf1 = new SparkConf().setAppName("golabi1").setMaster("local") 
    val sc = new SparkContext(conf1) 

    def extractDateAndCompare(line: String): Boolean= 
    { 
     val from = line.indexOf("/time: ") + 7 
     val to = line.indexOf("review/text: ") -1 
     val date = line.substring(from, to).toLong 
     date > startDate && date < endDate 
    } 

    sc.textFile("data/Electronics.txt") 
     .filter(extractDateAndCompare) 
     .saveAsTextFile("data/filter1") 
    } 
} 

我通常會發現那些中間輔助方法使事情更清晰。當然,這裏假定邊界日期是在某個地方定義的,並且輸入文件包含格式問題。我故意這樣做是爲了保持這種簡單,但添加一個try,返回一個Option子句並使用flatMap()可以幫助您避免錯誤(如果有)。另外,您的原始文本有點麻煩,您可能需要探索Json,TSV文件或其他一些更簡單的格式。

+0

請注意,我從頭編碼它,索引上可能有小細節等,但我希望你明白。 –

+0

親愛的丹尼爾,我有評論的1點千兆字節的數據集(文本)這裏是一個示例我的數據集: 產品/產品編號:B000278ADA 產品/標題:因爲Jobst ULTR 產品/價格:46.34 審查/用戶名:A1ZJAH4 審查/評論:0/0 評論/得分:5.0 評論/時間:1359936000 評論/摘要:一站式購物 評論/文字:很高興找到你正在尋找的東西。 我想提取在一段時間內的評論/文本,例如我想在2002年提取評論/文本,對於這項工作我寫上面的代碼,認爲一個完整的評論數據作爲RDD的記錄 –

+0

哦,我看到你更新了示例文本。那麼這意味着每個「記錄」會產生多行? –