2017-02-15 72 views
1

我學習的Apache星火使用Scala,並想用它來處理跨多行這樣的DNA數據集:多行星火滑動窗口

ATGTAT 
ACATAT 
ATATAT 

我想這個映射到一個固定的羣體大小k並計數組。因此,對於k = 3,我們會得到每個字符組與後面的兩個字符:

ATG TGT GTA TAT ATA TAC 
ACA CAT ATA TAT ATA TAT 
ATA TAT ATA TAT 

...再算上團體(如字數):

(ATA,5), (TAT,5), (TAC,1), (ACA,1), (CAT,1), (ATG,1), (TGT,1), (GTA,1) 

的問題是, 「字」跨越多行,正如上面的示例中的TAC一樣。它跨越了換行。我不想只計算每行中的組,但是在整個文件中忽略行結束。

換句話說,我想在整個文件上處理整個序列作爲寬度爲k的滑動窗口,就好像沒有換行符一樣。問題是向前看(或後退)到下一個RDD行,以在到達行尾時完成一個窗口。

兩個想法,我所做的是:

  1. 追加K-1從下一行大字:
ATATATAC 
ACATATAT 
ATATAT 

我試圖與星火SQL鉛()函數,但是當我嘗試執行flatMap時,我得到了WindowSpec的NotSerializableException。有沒有其他方法可以引用下一行?我需要編寫自定義輸入格式嗎?

  • 閱讀整個序列中作爲一個單一的線(或讀取後加入行):
  • ATATATACATATATATAT 
    

    有一種讀取多個線,所以他們可以作爲一個處理?如果是這樣,它是否都需要適應單個機器的內存?

    我意識到這些都可以作爲預處理步驟完成。我想知道最好的方法是在Spark內部完成。一旦我有了這些格式,我就知道如何去做其餘的事情,但我被困在這裏。

    回答

    1

    可以使單個字符串的RDD,而不是加入他們爲一條線,因爲這將會使結果不能被分配的字符串:

    val rdd = sc.textFile("gene.txt") 
    // rdd: org.apache.spark.rdd.RDD[String] = gene.txt MapPartitionsRDD[4] at textFile at <console>:24 
    

    所以簡單地用flatMap分割線成字符的列表:

    rdd.flatMap(_.split("")).collect 
    // res4: Array[String] = Array(A, T, G, T, A, T, A, C, A, T, A, T, A, T, A, T, A, T) 
    

    一個更完整的解決方案,從this answer借:

    val rdd = sc.textFile("gene.txt") 
    
    // create the sliding 3 grams for each partition and record the edges 
    val rdd1 = rdd.flatMap(_.split("")).mapPartitionsWithIndex((i, iter) => { 
        val slideList = iter.toList.sliding(3).toList 
        Iterator((slideList, (slideList.head, slideList.last))) 
    }) 
    
    // collect the edge values, concatenate edges from adjacent partitions and broadcast it 
    val edgeValues = rdd1.values.collect 
    
    val sewedEdges = edgeValues zip edgeValues.tail map { case (x, y) => { 
        (x._2 ++ y._1).drop(1).dropRight(1).sliding(3).toList 
    }} 
    
    val sewedEdgesMap = sc.broadcast(
        (0 until rdd1.partitions.size) zip sewedEdges toMap 
    ) 
    
    // sew the edge values back to the result 
    rdd1.keys.mapPartitionsWithIndex((i, iter) => iter ++ List(sewedEdgesMap.value.getOrElse(i, Nil))). 
        flatMap(_.map(_ mkString "")).collect 
    
    // res54: Array[String] = Array(ATG, TGT, GTA, TAT, ATA, TAC, ACA, CAT, ATA, TAT, ATA, TAT, ATA, TAT, ATA, TAT) 
    
    +0

    我認爲問題依然存在:我如何在當前元素之前訪問元素的兩個位置。所以如果我在第一個元素'A'上,我如何展望未來兩個組合:'ATG'?我知道什麼時候它在一個字符串或數組中,我可以向前看並根據索引進行連接,但RDD行又如何呢? – jcadcell

    +0

    你可以參考[這個答案](http://stackoverflow.com/questions/35154267/how-to-compute-cumulative-sum-using-spark) – Psidom

    +0

    謝謝,這個工程。我必須通過它來了解發生了什麼。 – jcadcell