2017-01-10 42 views
4

我從gz壓縮的json文件創建edgevertices類型的圖形。帶Spark的Graphx中的最短路徑性能

我已經把文件收存箱文件夾here

加載和映射這些json記錄,以創建由graphx這樣所需的verticesedge類型:

val vertices_raw = sqlContext.read.json("path/vertices.json.gz") 
val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[Long]("index"))) 
val verticesRDD: RDD[(VertexId, Long)] = vertices 
val edges_raw = sqlContext.read.json("path/edges.json.gz") 
val edgesRDD = edges_raw.rdd.map(row=>(Edge(row.getAs[String]("positiveNode").stripPrefix("osgb").toLong, row.getAs[String]("negativeNode").stripPrefix("osgb").toLong, row.getAs[Double]("length")))) 
val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD, edgesRDD).partitionBy(PartitionStrategy.RandomVertexCut) 

然後我用這個dijkstra我發現計算兩個頂點之間的最短路徑:

def dijkstra[VD](g: Graph[VD, Double], origin: VertexId) = { 
      var g2 = g.mapVertices(
     (vid, vd) => (false, if (vid == origin) 0 else Double.MaxValue, List[VertexId]()) 
     ) 
      for (i <- 1L to g.vertices.count - 1) { 
      val currentVertexId: VertexId = g2.vertices.filter(!_._2._1) 
       .fold((0L, (false, Double.MaxValue, List[VertexId]())))(
       (a, b) => if (a._2._2 < b._2._2) a else b) 
       ._1 

      val newDistances: VertexRDD[(Double, List[VertexId])] = 
       g2.aggregateMessages[(Double, List[VertexId])](
      ctx => if (ctx.srcId == currentVertexId) { 
       ctx.sendToDst((ctx.srcAttr._2 + ctx.attr, ctx.srcAttr._3 :+ ctx.srcId)) 
      }, 
      (a, b) => if (a._1 < b._1) a else b 
     ) 
     g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => { 
      val newSumVal = newSum.getOrElse((Double.MaxValue, List[VertexId]())) 
      (
      vd._1 || vid == currentVertexId, 
      math.min(vd._2, newSumVal._1), 
      if (vd._2 < newSumVal._1) vd._3 else newSumVal._2 
      ) 
     }) 
     } 

      g.outerJoinVertices(g2.vertices)((vid, vd, dist) => 
     (vd, dist.getOrElse((false, Double.MaxValue, List[VertexId]())) 
      .productIterator.toList.tail 
     )) 
     } 

我拿兩個隨機頂點的id:

val v1 = 4000000028222916L 
val v2 = 4000000031019012L 

,並計算出它們之間的路徑:

val results = dijkstra(my_graph, v1).vertices.map(_._2).collect 

我無法在本地計算這個在我的筆記本電腦沒有得到一個計算器錯誤。我可以看到它正在使用4個可用內核中的3個。我可以加載這張圖,並使用Python中的igraph庫,每秒計算最短的10條路徑,完全相同。這是計算路徑的低效率手段嗎?在規模上,在多個節點上,路徑將進行計算(無堆棧溢出錯誤),但每個路徑計算仍爲30/40秒。

回答

0

正如你可以在python-igraph github

讀「它的目的是一樣強大(即快),儘可能使大圖的 分析。」

爲了解釋爲什麼它正在4000X更多的時間在Apache的火花比Python的,你可以看看here(深入瞭解與星火PMC成員凱Ousterhout的性能瓶頸。)看到,這可能是由於到了一個瓶頸:

...的想法,網絡和磁盤I/O開始是主要的瓶頸...... 你可能不需要存儲在內存中的數據,因爲作業可能沒有那麼快。這是說,如果你從移動磁盤上的序列化壓縮數據到內存...

,你還可以看到here & here一些信息,但最好的最後一種方法是基準你的代碼就知道瓶頸是