我想讀取Scala中的一個輸入文件,我知道該結構,但是我只需要每隔9個條目。到目前爲止,我已經成功地讀取使用整個事情:Scala只讀取文件的某些部分
val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0))
val fields = lines.map(line => line.split(","))
的問題,這給我留下了一個數組,它是巨大(我們談論的數據20GB)。我不僅看到自己爲了在RDD [數組[String]]和數組[String]之間進行轉換而被迫編寫一些非常難看的代碼,但它實質上使我的代碼無用。
我試着用
.map()
.flatMap() and
.reduceByKey()
但是沒什麼居然把我收集的「細胞」,到我需要他們的格式之間的不同的方法和混音。
下面是應該發生的:從我們的服務器讀取的文本文件的文件夾,該代碼應閱讀每個格式文本的「線」:
*---------*
| NASDAQ: |
*---------*
exchange, stock_symbol, date, stock_price_open, stock_price_high, stock_price_low, stock_price_close, stock_volume, stock_price_adj_close
,並只保留STOCK_SYMBOL的保持因爲這是我正在計算的標識符。到目前爲止,我的嘗試是將整個事物轉換爲一個數組,只將第一個索引從第一個索引中收集到collect_cells var。問題是,根據我的計算和實際生活結果,代碼需要335天才能運行(不是玩笑)。
這裏是我當前的代碼以供參考:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SparkNum {
def main(args: Array[String]) {
// Do some Scala voodoo
val sc = new SparkContext(new SparkConf().setAppName("Spark Numerical"))
// Set input file as per HDFS structure + input args
val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0))
val fields = lines.map(line => line.split(","))
var collected_cells:Array[String] = new Array[String](0)
//println("[MESSAGE] Length of CC: " + collected_cells.length)
val divider:Long = 9
val array_length = fields.count/divider
val casted_length = array_length.toInt
val indexedFields = fields.zipWithIndex
val indexKey = indexedFields.map{case (k,v) => (v,k)}
println("[MESSAGE] Number of lines: " + array_length)
println("[MESSAGE] Casted lenght of: " + casted_length)
for(i <- 1 to casted_length) {
println("[URGENT DEBUG] Processin line " + i + " of " + casted_length)
var index = 9 * i - 8
println("[URGENT DEBUG] Index defined to be " + index)
collected_cells :+ indexKey.lookup(index)
}
println("[MESSAGE] collected_cells size: " + collected_cells.length)
val single_cells = collected_cells.flatMap(collected_cells => collected_cells);
val counted_cells = single_cells.map(cell => (cell, 1).reduceByKey{case (x, y) => x + y})
// val result = counted_cells.reduceByKey((a,b) => (a+b))
// val inmem = counted_cells.persist()
//
// // Collect driver into file to be put into user archive
// inmem.saveAsTextFile("path to server location")
// ==> Not necessary to save the result as processing time is recorded, not output
}
}
的底部,目前註釋掉,因爲我試圖調試它,但它作爲僞代碼,我知道我需要做。我可能想指出,我與斯卡拉完全不熟悉,因此諸如_符號之類的東西混淆了我的生活。
謝謝你的時間。
預處理外星火文件只保留每9號線。對於那部分使用Spark只會讓生活變得困難。 –
「納斯達克」橫幅是文件中的頭文件嗎? – maasg
讓我明白:您想要以CSV格式讀取包含股票報價的文件(或多個文件),並且您想要提取它包含的(唯一的?)'stock_symbol'的數量? – maasg