2016-11-23 76 views
0

我想計算圖中每個節點的平均鄰居度。說,我們有這樣的曲線圖:如何用GraphX計算鄰居的平均度

val users: RDD[(VertexId, String)] = 
     sc.parallelize(Array((3L, "rxin"), 
           (7L, "jgonzal"), 
           (5L, "franklin"), 
           (2L, "istoica"))) 
// Create an RDD for edges 
val relationships: RDD[Edge[Int]] = sc.parallelize(
        Array(Edge(3L, 7L, 12), 
          Edge(5L, 3L, 1), 
          Edge(2L, 5L, 3), 
          Edge(5L, 7L, 5))) 
// Build the initial Graph 
val graph = Graph(users, relationships) 

EDIT 爲了有成果的一個理念,以節點5和它的鄰居:

  • 節點3具有度= 2
  • 具有度數= 2的節點7
  • 具有度數= 1的節點2

該度量的輸出只是節點5的鄰居的平均度數:(2 + 2 + 1)/ 3 = 1.666

理想情況下,您希望在此計算中刪除與節點5的鏈接, 「噸真的重要,我現在......

編輯完

我想申請aggregateMessages,但我不知道如何檢索每個節點的學位,而我到aggregateMessages調用:

val neideg = g.aggregateMessages[(Long, Double)](
    triplet => { 
     val comparedAttrs = compareAttrs(triplet.dstAttr, triplet.srcAttr) // BUT HERE I SHOULD GIVE ALSO THE DEGREE 
     triplet.sendToDst(1L, comparedAttrs) 
     triplet.sendToSrc(1L, comparedAttrs) 
    }, 
    { case ((cnt1, v1), (cnt2, v2)) => (cnt1 + cnt2, v1 + v2) }) 

val aveneideg = neideg.mapValues(kv => kv._2/kv._1.toDouble).toDF("id", "aveneideg") 

然後我有不和的函數:

def compareAttrs(xs: (Int, String), ys: (Int, String)): Double = { 
    xs._1.toDouble + ys._1.toDouble 
} 

如何傳遞到comparedAttrs也學位的價值爲那些節點?

當然更樂意看到這個任務更簡單,更智能的解決方案相比,一個我試圖手藝......

+0

我會爲每個連接的組件做DFS,並跟蹤沿途的鄰居。然後將該數字除以節點的數量。 – Xenwar

+0

感謝您的時間,但這種回覆沒有真正的幫助 – user299791

回答

1

我不清楚,如果這就是你以後,不過這是後話,你可以一起去:

val degrees = graph.degrees 
// now we have a graph where attribute is a degree of a vertex 
val graphWithDegrees = graph.outerJoinVertices(degrees) { (_, _, optDegree) => 
    optDegree.getOrElse(1)  
} 

// now each vertex sends its degree to its neighbours 
// we aggregate them in a set where each vertex gets all values 
// of its neighbours 
val neighboursDegreeAndCount = graphWithDegrees.aggregateMessages[List[Long]](
    sendMsg = triplet => { 
     val srcDegree = triplet.srcAttr 
     val dstDegree = triplet.dstAttr 
     triplet.sendToDst(List(srcDegree)) 
     triplet.sendToSrc(List(dstDegree)) 
    }, 
    mergeMsg = (x, y) => x ++ y 
).mapValues(degrees => degrees.sum/degrees.size.toDouble) 

// now if you want it in the original graph you can do 
// outerJoinVertices again, and now the attr of vertex 
// in the graph is avg of its neighbours 
graph.outerJoinVertices(neighboursDegreeAndCount) { (_, _, optAvgDegree) => 
    optAvgDegree.getOrElse(1) 
} 

因此,對於你的例子輸出爲:Array((5,1.6666666666666667), (2,3.0), (3,2.5), (7,2.5))

+0

我不明白節點5的結果,爲什麼平均度是1.5而不是1.666?謝謝! – user299791

+0

@Ipiepiora我編輯了這個問題,以更好地解釋我想要完成的事情 – user299791

+1

@ user299791對不起,這是我的代碼中的一個愚蠢的錯誤。我在'List'中使用了'Set'。再次檢查代碼。 – lpiepiora