2016-11-16 299 views
1

引用this post後,我可以讀取駐留在* .tar.gz文件中的多個* .txt文件。但現在,我需要讀取* .tar.gz文件中的HDF5文件。樣本文件可以從million songs dataset下載here下載。任何人都可以告訴我如何更改以下代碼以便將HDF5文件讀入RDD中?謝謝!從Spark中的* .tar.gz壓縮文件中讀取HDF5文件

package a.b.c 

import org.apache.spark._ 
import org.apache.spark.sql.{SQLContext, DataFrame} 
import org.apache.spark.ml.tuning.CrossValidatorModel 
import org.apache.spark.ml.regression.LinearRegressionModel 
import org.apache.spark.ml.{Pipeline, PipelineModel} 
import org.apache.spark.ml.regression.LinearRegression 
import org.apache.spark.input.PortableDataStream 
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream 
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream 
import scala.util.Try 
import java.nio.charset._ 

object Main { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setAppName("lab1").setMaster("local") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 

    import sqlContext.implicits._ 
    import sqlContext._ 

    val inputpath = "path/to/millionsong.tar.gz" 
    val rawDF = sc.binaryFiles(inputpath, 2) 
       .flatMapValues(x => extractFiles(x).toOption) 
       .mapValues(_.map(decode())) 
       .map(_._2) 
       .flatMap(x => x) 
       .flatMap { x => x.split("\n") } 
       .toDF() 
    } 

    def extractFiles(ps: PortableDataStream, n: Int = 1024) = Try { 
    val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open)) 
    Stream.continually(Option(tar.getNextTarEntry)) 
     // Read until next exntry is null 
     .takeWhile(_.isDefined) 
     // flatten 
     .flatMap(x => x) 
     // Drop directories 
     .filter(!_.isDirectory) 
     .map(e => { 
     Stream.continually { 
      // Read n bytes 
      val buffer = Array.fill[Byte](n)(-1) 
      val i = tar.read(buffer, 0, n) 
      (i, buffer.take(i))} 
     // Take as long as we've read something 
     .takeWhile(_._1 > 0) 
     .map(_._2) 
     .flatten 
     .toArray}) 
     .toArray 
    } 

    def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) = new String(bytes, StandardCharsets.UTF_8) 
} 
+0

你有沒有找到一種方法來做到這一點? – Reginbald

回答

0

我設法通過寫字節流轉換爲本地文件,然後打開此文件爲H5,使用this提取特徵讀取壓縮包內的文件HDF5。這裏是我的代碼:

var tarFiles: Array[String] = Array() 
val tar_path = path + "millionsongsubset.tar.gz" 

//TODO: add all your tar.gz files in main folder path to tarFiles array 
//should add here as many tar.gz files as wanted containing the 
//hdf5 files for the songs 
tarFiles = tarFiles :+ tar_path 
//tarFiles = tarFiles :+ (path+"A.tar.gz") 
//tarFiles = tarFiles :+ (path+"B.tar.gz") 
//tarFiles = tarFiles :+ (path+"C.tar.gz") 

//This reads all tar.gz files in tarFiles list, and for each .h5 
//file within, it extracts each song's list of features. 
//Thus, it gets a list of features for all songs in the files. 
var allHDF5 = sc.parallelize(tarFiles).flatMap(path => { 
    val tar = new TarArchiveInputStream(new GzipCompressorInputStream(new FileInputStream(path))) 
    var entry: TarArchiveEntry = tar.getNextEntry().asInstanceOf[TarArchiveEntry] 
    var res: List[Array[Byte]] = List() 
    var i = 0 
    while (entry != null) { 
     var outputFile:File = new File(entry.getName()); 
     if (!entry.isDirectory() && entry.getName.contains(".h5")) { 
      var byteFile = Array.ofDim[Byte](entry.getSize.toInt) 
      tar.read(byteFile); 
      res = byteFile :: res 
      if(i%100==0) { 
       println("Read " + i + " files") 
      } 
      i = i+1 

     } 
     entry = tar.getNextEntry().asInstanceOf[TarArchiveEntry] 
    } 
    //All files are turned into byte arrays 
    res 

    }).map(bytes => { 
    // The toString method is used as a UUID for the file 
    val name = bytes.toString() 
    FileUtils.writeByteArrayToFile(new File(name), bytes) 
    val reader = HDF5Factory.openForReading(name) 
    val features = getFeatures(reader) 
    reader.close() 
    features 
    }) 

    println("Extracted songs from tar.gz, showing 5 examples") 
    allHDF5.take(5).foreach(x => { x.foreach(y => print(y+" ")) 
         println()}) 

的幾點意見:

  1. getFeatures方法:這種方法是代碼的here一個很簡單的適應,提取了一些功能,並將其送回的數組。請注意,爲了運行此特徵提取代碼,您將需要this library,它具有很好的javadoc
  2. 請注意,如果此代碼在具有多個執行程序的羣集中運行,則執行程序會在本地寫入.h5文件,因此如果它們圍繞羣集移動,則在某些時候您可能會嘗試讀取不存在的文件在本地執行。