2016-02-19 64 views
0

早些時候我張貼的一個問題自我加入階。我試圖在Spark中實現它,但無法轉換。這是問題和我的代碼。 輸入數據集...自連接使用Scala API

Proprty_ID, latitude, longitude, Address 123, 33.84, -118.39, null 234, 35.89, -119.48, null 345, 35.34, -119.39, null

輸出數據集

Property_ID1, Property_ID2, distance 123,123,0 123,234,0.1 123,345,0.6 234,234,0 234,123,0.1 234,345,0.7 345,345,0 345,123,0.6 345,234,0.7

星火代碼:

`import math._ 

object Haversine { 
    val R = 6372.8 //radius in km 

    def haversine(lat1:Double, lon1:Double, lat2:Double, lon2:Double)={ 
     val dLat=(lat2 - lat1).toRadians 
     val dLon=(lon2 - lon1).toRadians 

     val a = pow(sin(dLat/2),2) + pow(sin(dLon/2),2) * cos(lat1.toRadians) * cos(lat2.toRadians) 
     val c = 2 * asin(sqrt(a)) 
     R * c 
    } 

    def main(args: Array[String]): Unit = { 
     println(haversine(36.12, -86.67, 33.94, -118.40)) 
    } 
} 

class SimpleCSVHeader(header:Array[String]) extends Serializable { 
    val index = header.zipWithIndex.toMap 
    def apply(array:Array[String], key:String):String = array(index(key)) 
} 


val csv = sc.textFile("geo.csv") // original file 
val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows 
val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line 
val rows = data.filter(line => header(line,"latitude") != "latitude") // filter the header out 

// val users = rows.map(row => header(row,"user") 
// val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt) 

val typed = rows.map{ case Array(id, lat, lon) => (id, lat.toDouble, lon.toDouble)} 

`

在此之後,我需要做的自我鍵入並通過Haversine方法傳遞。 我得到了如下的Scala代碼從社會,我需要將其轉換爲星火代碼RDDS工作。下面的代碼目前正在爲列表工作。

`val combos = for { 
    a <- typed 
    b <- typed 
    } yield (a,b) 

combos.map{ case ((id1, lat1, lon1), (id2, lat2, lon2)) 
    => id1 + "," + id2 + "," + haversine(lat1, lon1, lat2, lon2)} foreach println` 

任何人都可以幫忙嗎?提前致謝。

回答