2017-06-03 850 views
1

昨天我問了一個類似的問題 - Matrix Multiplication between two RDD[Array[Double]] in Spark - 但我已經決定轉移到pyspark來做到這一點。我已經在加載和重新格式化數據方面取得了一些進展 - Pyspark map from RDD of strings to RDD of list of doubles - 但是矩陣乘法很困難。讓我先分享我的進度:PySpark中的矩陣乘法A^T * A

matrix1.txt 
1.2 3.4 2.3 
2.3 1.1 1.5 
3.3 1.8 4.5 
5.3 2.2 4.5 
9.3 8.1 0.3 
4.5 4.3 2.1 

很難共享文件,但這是我的matrix1.txt文件的樣子。它是一個空格分隔的文本文件,包含矩陣的值。接下來是代碼:

# do the imports for pyspark and numpy 
from pyspark import SparkConf, SparkContext 
import numpy as np 

# loadmatrix is a helper function used to read matrix1.txt and format 
# from RDD of strings to RDD of list of floats 
def loadmatrix(sc): 
    data = sc.textFile("matrix1.txt").map(lambda line: line.split(' ')).map(lambda line: [float(x) for x in line]) 
    return(data) 

# this is the function I am struggling with, it should take a line of the 
# matrix (formatted as list of floats), compute an outer product with itself 
def AtransposeA(line): 
    # pseudocode for this would be... 
    # outerprod = compute line * line^transpose  
    # return(outerprod) 

# here is the main body of my file  
if __name__ == "__main__": 
    # create the conf, sc objects, then use loadmatrix to read data 
    conf = SparkConf().setAppName('SVD').setMaster('local') 
    sc = SparkContext(conf = conf) 
    mymatrix = loadmatrix(sc) 

    # this is pseudocode for calling AtransposeA 
    ATA = mymatrix.map(lambda line: AtransposeA(line)).reduce(elementwise add all the outerproducts) 

    # the SVD of ATA is computed below 
    U, S, V = np.linalg.svd(ATA) 

    # ... 

我的做法如下 - 做矩陣乘法a^T * A,我創建了計算A的行外所有產品的outerproducts的的的elementwise總和是一個函數我想要的產品。然後我在地圖函數中調用AtransposeA(),這種方式是在矩陣的每一行執行的,最後我使用reduce()來添加結果矩陣。

我正在思考AtransposeA函數應該如何看待。我該如何在pyspark做這樣的外部產品?預先感謝您的幫助!

回答

0

首先,考慮爲什麼要使用Spark。這聽起來像你所有的數據都適合內存,在這種情況下,你可以以非常直接的方式使用numpypandas

如果您的數據不是結構化的,因此行是獨立的,那麼通過將行組發送到不同的節點可能無法將其並行化,這是使用Spark的全部要點。

話雖如此......這裏有一些pyspark(2.1.1)代碼,我認爲你想做什麼。

# read the matrix file 
df = spark.read.csv("matrix1.txt",sep=" ",inferSchema=True) 
df.show() 
+---+---+---+ 
|_c0|_c1|_c2| 
+---+---+---+ 
|1.2|3.4|2.3| 
|2.3|1.1|1.5| 
|3.3|1.8|4.5| 
|5.3|2.2|4.5| 
|9.3|8.1|0.3| 
|4.5|4.3|2.1| 
+---+---+---+ 
# do the sum of the multiplication that we want, and get 
# one data frame for each column 
colDFs = [] 
for c2 in df.columns: 
    colDFs.append(df.select([ F.sum(df[c1]*df[c2]).alias("op_{0}".format(i)) for i,c1 in enumerate(df.columns) ])) 
# now union those separate data frames to build the "matrix" 
mtxDF = reduce(lambda a,b: a.select(a.columns).union(b.select(a.columns)), colDFs) 
mtxDF.show() 
+------------------+------------------+------------------+ 
|    op_0|    op_1|    op_2| 
+------------------+------------------+------------------+ 
|   152.45|118.88999999999999|    57.15| 
|118.88999999999999|104.94999999999999|    38.93| 
|    57.15|    38.93|52.540000000000006| 
+------------------+------------------+------------------+ 

這似乎是從numpy得到的結果。

a = numpy.genfromtxt("matrix1.txt") 
numpy.dot(a.T, a) 
array([[ 152.45, 118.89, 57.15], 
     [ 118.89, 104.95, 38.93], 
     [ 57.15, 38.93, 52.54]])