1

問一個要求熊貓的this question的變化,我有類似的情況,除了我正在與spark-shellpyspark工作。如何用另一個DataFrame中的匹配ID替換單詞(在一個DataFrame中)?

我有一個包含域的列表中的數據幀(頂點):

index   domain 
0   airbnb.com 
1   facebook.com 
2    st.org 
3    index.co 
4  crunchbase.com 
5    avc.com 
6  techcrunch.com 
7   google.com 

我有一個包含這些結構域之間(邊緣)連接另一個數據幀:

  source_domain destination_domain 
       airbnb.com   google.com 
      facebook.com   google.com 
        st.org   facebook.com 
        st.org   airbnb.com 
        st.org  crunchbase.com 
       index.co  techcrunch.com 
      crunchbase.com  techcrunch.com 
      crunchbase.com   airbnb.com 
       avc.com  techcrunch.com 
      techcrunch.com    st.org 
      techcrunch.com   google.com 
      techcrunch.com   facebook.com 

哪能用邊的數據框中的相應索引替換邊數據框中的每個單元格?因此,在邊緣數據幀中的第一行最終可能會看起來像:

###### Before: ##################### 
      facebook.com google.com 
###### After: ##################### 
      1   7 

數據幀是要增長到至少幾百千兆字節。

我該如何着手在Spark中做這件事?

回答

2

TL; DR保存數據集爲CSV文件,vertices.csvedges.csv,分別readjoin

// load the datasets 
val vertices = spark.read.option("header", true).csv("vertices.csv") 
val edges = spark.read.option("header", true).csv("edges.csv") 

// indexify the source_domain 
val sources = edges. 
    join(vertices). 
    where(edges("source_domain") === vertices("domain")). 
    withColumnRenamed("index", "source_index") 

// indexify the destination_domain 
val destinations = edges. 
    join(vertices). 
    where(edges("destination_domain") === vertices("domain")). 
    withColumnRenamed("index", "destination_index") 

val result = sources. 
    join(destinations, Seq("source_domain", "destination_domain")). 
    select("source_index", "destination_index") 
scala> result.show 
+------------+-----------------+ 
|source_index|destination_index| 
+------------+-----------------+ 
|   0|    7| 
|   1|    7| 
|   2|    1| 
|   2|    0| 
|   2|    4| 
|   3|    6| 
|   4|    6| 
|   4|    0| 
|   5|    6| 
|   6|    2| 
|   6|    7| 
|   6|    1| 
+------------+-----------------+ 
相關問題