2016-11-09 46 views

回答

1

你可以用一點SQL來實現這一點。

讓我們假設你有如下圖:

import org.apache.spark.graphx 
import org.apache.spark.rdd.RDD 

// Create an RDD for the vertices 
val v: RDD[(VertexId, (String))] = 
    sc.parallelize(Array((1L, ("car1")), (2L, ("car2")), 
         (3L, ("car3")), (4L, ("person1")),(5L, ("person2")))) 
// Create an RDD for edges 
val e: RDD[Edge[Int]] = 
    sc.parallelize(Array(Edge(4L, 1L,1), Edge(4L, 2L, 1), 
         Edge(5L, 1L,1))) 


val graph = Graph(v,e) 

現在邊和頂點提取到Dataframes:

val vDf = graph.vertices.toDF("vId","vName") 
val eDf =graph.edges.toDF("person","car","attr") 

數據轉換爲所需的輸出

eDf.drop("attr").join(vDf,'person === 'vId).drop("vId","person").withColumnRenamed("vName","person") 
.join(vDf,'car === 'vId).drop("car","vId") 
.groupBy("person") 
.agg(collect_set('vName)).toDF("person","car") 
.show() 


+-------+------------+ 
| person|   car| 
+-------+------------+ 
|person2|  [car1]| 
|person1|[car2, car1]| 
+-------+------------+ 
相關問題