2016-12-16 77 views
1

我想讀取Scala中的一個輸入文件,我知道該結構,但是我只需要每隔9個條目。到目前爲止,我已經成功地讀取使用整個事情:Scala只讀取文件的某些部分

val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0)) 
val fields = lines.map(line => line.split(",")) 

的問題,這給我留下了一個數組,它是巨大(我們談論的數據20GB)。我不僅看到自己爲了在RDD [數組[String]]和數組[String]之間進行轉換而被迫編寫一些非常難看的代碼,但它實質上使我的代碼無用。

我試着用

.map() 
.flatMap() and 
.reduceByKey() 

但是沒什麼居然把我收集的「細胞」,到我需要他們的格式之間的不同的方法和混音。

下面是應該發生的:從我們的服務器讀取的文本文件的文件夾,該代碼應閱讀每個格式文本的「線」:

*---------* 
| NASDAQ: | 
*---------* 
exchange, stock_symbol, date, stock_price_open, stock_price_high, stock_price_low, stock_price_close, stock_volume, stock_price_adj_close 

,並只保留STOCK_SYMBOL的保持因爲這是我正在計算的標識符。到目前爲止,我的嘗試是將整個事物轉換爲一個數組,只將第一個索引從第一個索引中收集到collect_cells var。問題是,根據我的計算和實際生活結果,代碼需要335天才能運行(不是玩笑)。

這裏是我當前的代碼以供參考:

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 

object SparkNum { 


    def main(args: Array[String]) { 

    // Do some Scala voodoo 
    val sc = new SparkContext(new SparkConf().setAppName("Spark Numerical")) 

    // Set input file as per HDFS structure + input args 
    val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0)) 
    val fields = lines.map(line => line.split(",")) 
    var collected_cells:Array[String] = new Array[String](0) 

    //println("[MESSAGE] Length of CC: " + collected_cells.length) 

    val divider:Long = 9 
    val array_length = fields.count/divider 
    val casted_length = array_length.toInt 

    val indexedFields = fields.zipWithIndex 
    val indexKey = indexedFields.map{case (k,v) => (v,k)} 

    println("[MESSAGE] Number of lines: " + array_length) 
    println("[MESSAGE] Casted lenght of: " + casted_length) 



    for(i <- 1 to casted_length) { 

     println("[URGENT DEBUG] Processin line " + i + " of " + casted_length) 

     var index = 9 * i - 8 

     println("[URGENT DEBUG] Index defined to be " + index) 

     collected_cells :+ indexKey.lookup(index) 

    } 



    println("[MESSAGE] collected_cells size: " + collected_cells.length) 



    val single_cells = collected_cells.flatMap(collected_cells => collected_cells); 
    val counted_cells = single_cells.map(cell => (cell, 1).reduceByKey{case (x, y) => x + y}) 
    // val result = counted_cells.reduceByKey((a,b) => (a+b)) 

    // val inmem = counted_cells.persist() 
    // 
    // // Collect driver into file to be put into user archive 
    // inmem.saveAsTextFile("path to server location") 

    // ==> Not necessary to save the result as processing time is recorded, not output 


    } 

} 

的底部,目前註釋掉,因爲我試圖調試它,但它作爲僞代碼,我知道我需要做。我可能想指出,我與斯卡拉完全不熟悉,因此諸如_符號之類的東西混淆了我的生活。

謝謝你的時間。

+0

預處理外星火文件只保留每9號線。對於那部分使用Spark只會讓生活變得困難。 –

+0

「納斯達克」橫幅是文件中的頭文件嗎? – maasg

+0

讓我明白:您想要以CSV格式讀取包含股票報價的文件(或多個文件),並且您想要提取它包含的(唯一的?)'stock_symbol'的數量? – maasg

回答

2

有一些需要澄清的問題概念:

當我們執行這段代碼:

val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0)) 
val fields = lines.map(line => line.split(",")) 

這並不造成巨大的數據大小的數組。該表達式代表基礎數據的轉換。它可以進一步轉化,直到我們將數據減少到我們期望的信息集。

在這種情況下,我們希望有一個記錄的stock_symbol場編碼的CSV:

exchange, stock_symbol, date, stock_price_open, stock_price_high, stock_price_low, stock_price_close, stock_volume, stock_price_adj_close 

我也要去假設數據文件包含這樣的橫幅:

*---------* 
| NASDAQ: | 
*---------* 

我們要做的第一件事就是刪除看起來像這個橫幅的東西。實際上,我將假設第一個字段是以字母數字字符開頭的證券交易所的名稱。我們將做到這一點,我們做任何分裂之前,導致:

val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0)) 
val validLines = lines.filter(line => !line.isEmpty && line.head.isLetter) 
val fields = validLines.map(line => line.split(",")) 

它有助於寫類型的變量,有胸懷,我們有我們所期望的數據類型的和平。隨着我們Scala技能的進步,可能變得不那麼重要。讓我們改寫以上類型的表達式:

val lines: RDD[String] = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0)) 
val validLines: RDD[String] = lines.filter(line => !line.isEmpty && line.head.isLetter) 
val fields: RDD[Array[String]] = validLines.map(line => line.split(",")) 

我們感興趣的是stock_symbol領域,這位置上是元素#1基於0陣列:

val stockSymbols:RDD[String] = fields.map(record => record(1)) 

如果我們要算符號,所有剩下的就是發出計數:

val totalSymbolCount = stockSymbols.count() 

因爲我們有所有記錄一個條目這不是非常有幫助。稍微有趣的問題是:

我們有多少個不同的股票代碼?

val uniqueStockSymbols = stockSymbols.distinct.count() 

我們每個符號有多少條記錄?

val countBySymbol = stockSymbols.map(s => (s,1)).reduceByKey(_+_) 

在星火2.0,爲Dataframes和數據集CSV支持可出鑑於盒 的,我們的數據不具有與字段名稱的標題行(什麼大的數據集通常的),我們將需要提供的列名:

val stockDF = sparkSession.read.csv("/tmp/quotes_clean.csv").toDF("exchange", "symbol", "date", "open", "close", "volume", "price") 

現在我們可以回答我們的問題很簡單:

val uniqueSymbols = stockDF.select("symbol").distinct().count 
val recordsPerSymbol = stockDF.groupBy($"symbol").agg(count($"symbol")) 
+0

嗨maasg,非常感謝您的回答!我在適用的地方更改了我的代碼,但是當我在.reduceByKey()中使用_時,Maven抱怨「擴展函數缺少參數類型」。現在我會嘗試創建一個新行,或者我會嘗試使用舊的佈局.reduceByKey((a,b)=>(a + b))。乾杯。 – Synaxr

+1

@Synaxr查看Spark Notebook中的代碼:https://gist.github.com/maasg/7b8a4991ba9e2c236ddd8dfd823352cc – maasg