2016-09-26 90 views
2

我正在使用play framework Iteratee來讀取文件。我想按塊處理這個文件塊(對於每一步)。如何使用Play Iteratees按塊讀取和處理文件塊(對於塊的每個步驟)

我撰寫下列步驟:

  • groupByLines: Enumeratee[Array[Byte], List[String]]
  • turnIntoLines: Enumeratee[List[String], List[Line]](I定義case class Line(number: Int, value: String)
  • parseChunk: Iteratee[List[Line], Try[List[T]]](例如CSV解析)

要定義groupByLines,我需要使用Iteratee.fold將前一個塊的最後一行與當前塊的第一個連接起來。

問題是,這會創建一個包含文件所有行的單個塊。

但我想按塊處理文件塊。我的意思是groupByLines應該產生200行(例如)的塊。

同樣的問題發生在turnIntoLine。我也使用fold來創建線。我需要使用累加器(由fold提供)來壓縮行號和行內容。

我是一個玩遊戲iteratee的初學者。

這裏是我的代碼:

val chunkSize = 1024 * 8 

val enumerator: Enumerator[Array[Byte]] = Enumerator.fromFile(file, chunkSize) 

def isLastChunk(chunk: Array[Byte]): Boolean = { 
    chunk.length < chunkSize 
} 

val groupByLines: Enumeratee[Array[Byte], List[String]] = Enumeratee.grouped { 
    println("groupByLines") 
    Iteratee.fold[Array[Byte], (String, List[String])]("", List.empty) { 
    case ((accLast, accLines), chunk) => 
     println("groupByLines chunk size " + chunk.length) 
     new String(chunk) 
     .trim 
     .split("\n") 
     .toList match { 
     case lines @ Cons(h, tail) => 
      val lineBetween2Chunks: String = accLast + h 

      val goodLines = 
      isLastChunk(chunk) match { 
       case true => Cons(lineBetween2Chunks, tail) 
       case false => Cons(lineBetween2Chunks, tail).init 
      } 

      (lines.last, accLines ++ goodLines) 
     case Nil => ("", accLines) 
     } 
    }.map(_._2) 
} 


val turnIntoLines: Enumeratee[List[String], List[Line]] = Enumeratee.grouped { 
    println("turnIntoLines") 
    Iteratee.fold[List[String], (Int, List[Line])](0, List.empty) { 
    case ((index, accLines), chunk) => 
     println("turnIntoLines chunk size " + chunk.length) 
     val lines = 
     ((Stream from index) zip chunk).map { 
      case (lineNumber, content) => Line(lineNumber, content) 
     }.toList 
     (index + chunk.length, lines ++ accLines) 
    }.map(_._2) 
} 

回答

0

這裏的問題是,如何處理通過使用播放Iteratees行文件行。

首先,爲了讀取使用UTF-8一個文件中,我使用:

object EnumeratorAdditionalOperators { 
    implicit def enumeratorAdditionalOperators(e: Enumerator.type): EnumeratorAdditionalOperators = new EnumeratorAdditionalOperators(e) 
} 

class EnumeratorAdditionalOperators(e: Enumerator.type) { 

    def fromUTF8File(file: File, chunkSize: Int = 1024 * 8): Enumerator[String] = 
    e.fromFile(file, chunkSize) 
     .map(bytes => new String(bytes, Charset.forName("UTF-8"))) 

} 

然後,向分割輸入塊成線(切口在'\n'):

object EnumerateeAdditionalOperators { 
    implicit def enumerateeAdditionalOperators(e: Enumeratee.type): EnumerateeAdditionalOperators = new EnumerateeAdditionalOperators(e) 
} 

class EnumerateeAdditionalOperators(e: Enumeratee.type) { 

    def splitToLines: Enumeratee[String, String] = e.grouped(
    Traversable.splitOnceAt[String,Char](_ != '\n') &>> 
     Iteratee.consume() 
) 

} 

第三,到加行號碼,我用了這裏發現的一段代碼https://github.com/michaelahlers/michaelahlers-playful/blob/master/src/main/scala/ahlers/michael/playful/iteratee/EnumerateeFactoryOps.scala。我定義implicits的方法 「添加」 到EnumeratorEnumeratee

class EnumerateeAdditionalOperators(e: Enumeratee.type) { 

    /** 
    * As a complement to [[play.api.libs.iteratee.Enumeratee.heading]] and [[play.api.libs.iteratee.Enumeratee.trailing]], allows for inclusion of arbitrary elements between those from the producer. 
    */ 
    def joining[E](separators: Enumerator[E])(implicit ec: ExecutionContext): Enumeratee[E, E] = 
    zipWithIndex[E] compose Enumeratee.mapInputFlatten[(E, Int)] { 

     case Input.Empty => 
     Enumerator.enumInput[E](Input.Empty) 

     case Input.El((element, index)) if 0 < index => 
     separators andThen Enumerator(element) 

     case Input.El((element, _)) => 
     Enumerator(element) 

     case Input.EOF => 
     Enumerator.enumInput[E](Input.EOF) 

    } 

    /** 
    * Zips elements with an index of the given [[scala.math.Numeric]] type, stepped by the given function. 
    * 
    * (Special thanks to [[https://github.com/eecolor EECOLOR]] for inspiring this factory with his answer to [[https://stackoverflow.com/a/27589990/700420 a question about enumeratees on Stack Overflow]].) 
    */ 
    def zipWithIndex[E, I](first: I, step: I => I)(implicit ev: Numeric[I]): Enumeratee[E, (E, I)] = 
    e.scanLeft[E](null.asInstanceOf[E] -> ev.minus(first, step(ev.zero))) { 
     case ((_, index), value) => 
     value -> step(index) 
    } 

    /** 
    * Zips elements with an incrementing index of the given [[scala.math.Numeric]] type, adding one each time. 
    */ 
    def zipWithIndex[E, I](first: I)(implicit ev: Numeric[I]): Enumeratee[E, (E, I)] = zipWithIndex(first, ev.plus(_, ev.one)) 

    /** 
    * Zips elements with an incrementing index by the same contract [[scala.collection.GenIterableLike#zipWithIndex zipWithIndex]]. 
    */ 
    def zipWithIndex[E]: Enumeratee[E, (E, Int)] = zipWithIndex(0) 

    // ... 

} 

注意。這個技巧可以編寫,例如:Enumerator.fromUTF8File(file)

放在一起:

case class Line(number: Int, value: String) 


Enumerator.fromUTF8File(file) &> 
Enumeratee.splitToLines ><> 
Enumeratee.zipWithIndex ><> Enumeratee.map{ 
    case (e, idx) => Line(idx, e) 
} // then an Iteratee or another Enumeratee 

新的代碼比對問題給出一個更簡潔明瞭。