0

我跟着這篇文章並行運行KMeans。我在EMR上使用了Python 2.7和Spark 2.0.2。從不同的工藝提交Pyspark平行ml.KMeans覆蓋對方的K

How to run multiple jobs in one Sparkcontext from separate threads in PySpark?

正如郵報,工作不應該相互影響。

在給定的Spark應用程序(SparkContext實例)中,如果多個並行作業是從單獨的線程提交的,它們可以同時運行。通過「作業」,在本節中,我們的意思是Spark操作(例如保存,收集)以及需要運行以評估該操作的任何任務。斯巴克的調度是線程安全的,並支持這種使用情況,使服務於多個請求(例如,對於多個用戶查詢)的應用程序。」 http://spark.apache.org/docs/latest/job-scheduling.html

然而,得到的模型的簇數K是從什麼是在通過不同。

代碼:

from pyspark.ml.clustering import KMeans 
from sklearn.datasets.samples_generator import make_blobs 
from pyspark.ml.linalg import Vectors 
import random 
random.seed(1) 

group_size = 30 
n_groups = 20 

n_samples= n_groups * group_size 
n_features=2 
n_centers=4 

xs, ys = make_blobs(n_samples=n_samples, n_features=n_features, centers=n_centers, cluster_std=1.0, center_box=(-10.0, 10.0), shuffle=True, random_state=None) 
x_groups = [] 
for i in range(n_groups): 
    x_groups.append(xs[i*group_size: (i+1)*group_size]) 


def do_kmean(xs): 
    data = [] 
    for x in xs: 
     data.append((Vectors.dense(x.tolist()),)) 
    df = spark.createDataFrame(data, ["features"]) 

    num_clusters = random.randint(5,10) 
    kmeans = KMeans(k=num_clusters, maxIter=1, seed=1, featuresCol="features", predictionCol="prediction") 
    model = kmeans.fit(df) 
    return [num_clusters, kmeans.getK()] 

from multiprocessing.pool import ThreadPool 
tpool = ThreadPool(processes=8) 

result = tpool.map(do_kmean, x_groups) 

結果:(將輸入的K VS KMEANS什麼實際使用的)

[[5, 9], 
[8, 9], 
[6, 8], 
[10, 9], 
[7, 9], 
[9, 9], 
[7, 9], 
[9, 9], 
[5, 5], 
[5, 9], 
[9, 7], 
[9, 9], 
[5, 7], 
[10, 5], 
[7, 7], 
[7, 7], 
[6, 6], 
[10, 10], 
[10, 10], 
[5, 5]] 

看來Spark不是線程/進程安全的,並且正在訪問K的其他進程的副本。是否有任何Spark配置導致此問題,或者這是Spark錯誤?

回答