2016-02-12 48 views
2

我想從所有處理器中將numpy數組內容收集到一個。如果所有數組的大小相同,則可以使用。但是我沒有看到爲proc相關大小的數組執行相同任務的自然方式。請考慮下面的代碼:發送/接收奇數大小的numpy數組

from mpi4py import MPI 
import numpy 

comm = MPI.COMM_WORLD 
rank = comm.rank 
size = comm.size 

if rank >= size/2: 
    nb_elts = 5 
else: 
    nb_elts = 2 

# create data 
lst = [] 
for i in xrange(nb_elts): 
    lst.append(rank*3+i) 
array_lst = numpy.array(lst, dtype=int) 

# communicate array 
result = [] 
if rank == 0: 
    result = array_lst 
    for p in xrange(1, size): 
     received = numpy.empty(nb_elts, dtype=numpy.int) 
     comm.Recv(received, p, tag=13) 
     result = numpy.concatenate([result, received]) 
else: 
    comm.Send(array_lst, 0, tag=13) 

我的問題是在「收到」的分配。我怎麼知道什麼是分配的大小?我必須首先發送/接收每個數組的大小嗎?

基於下面的建議,我會

data_array = numpy.ones(rank + 3, dtype=int) 
data_array *= rank + 5 
print '[{}] data: {} ({})'.format(rank, data_array, type(data_array)) 

# make all processors aware of data array sizes 
all_sizes = {rank: data_array.size} 
gathered_all_sizes = comm_py.allgather(all_sizes) 
for d in gathered_all_sizes: 
    all_sizes.update(d) 

# prepare Gatherv as described by @francis 
nbsum = 0 
sendcounts = [] 
displacements = [] 
for p in xrange(size): 
    n = all_sizes[p] 
    displacements.append(nbsum) 
    sendcounts.append(n) 
    nbsum += n 

if rank==0: 
    result = numpy.empty(nbsum, dtype=numpy.int) 
else: 
    result = None 

comm_py.Gatherv(data_array,[result, tuple(sendcounts), tuple(displacements), MPI.INT64_T], root=0) 

print '[{}] gathered data: {}'.format(rank, result) 

回答

2

走在你粘貼代碼,二者Send()Recv()發送nb_elts元素。問題是,nb_elts不是每個過程一樣...因此,收到不匹配已發送元素的數量和計劃項目的數量抱怨:

mpi4py.MPI.Exception:MPI_ERR_TRUNCATE :消息被截斷

爲防止出現這種情況,根進程必須計算其他進程發送的項目數。因此,在循環for p in xrange(1, size)中,必須根據p而不是rank來計算nb_elts

以下基於您的代碼已得到更正。我想補充一點,執行這種採集操作的自然方式是使用Gatherv()。例如,參見http://materials.jeremybejarano.com/MPIwithPython/collectiveCom.html和。我添加了相應的示例代碼。唯一難點是numpy.int是64位長。因此,Gatherv()使用MPI類型MPI_DOUBLE

from mpi4py import MPI 
import numpy 

comm = MPI.COMM_WORLD 
rank = comm.rank 
size = comm.size 

if rank >= size/2: 
    nb_elts = 5 
else: 
    nb_elts = 2 

# create data 
lst = [] 
for i in xrange(nb_elts): 
    lst.append(rank*3+i) 
array_lst = numpy.array(lst, dtype=int) 

# communicate array 
result = [] 
if rank == 0: 
    result = array_lst 
    for p in xrange(1, size): 

     if p >= size/2: 
      nb_elts = 5 
     else: 
      nb_elts = 2 

     received = numpy.empty(nb_elts, dtype=numpy.int) 
     comm.Recv(received, p, tag=13) 
     result = numpy.concatenate([result, received]) 
else: 
    comm.Send(array_lst, 0, tag=13) 

if rank==0: 
    print "Send Recv, result= "+str(result) 

#How to use Gatherv: 
nbsum=0 
sendcounts=[] 
displacements=[] 

for p in xrange(0,size): 
    displacements.append(nbsum) 
    if p >= size/2: 
      nbsum+= 5 
      sendcounts.append(5) 
    else: 
      nbsum+= 2 
      sendcounts.append(2) 

if rank==0: 
    print "nbsum "+str(nbsum) 
    print "sendcounts "+str(tuple(sendcounts)) 
    print "displacements "+str(tuple(displacements)) 
print "rank "+str(rank)+" array_lst "+str(array_lst) 
print "numpy.int "+str(numpy.dtype(numpy.int))+" "+str(numpy.dtype(numpy.int).itemsize)+" "+str(numpy.dtype(numpy.int).name) 

if rank==0: 
    result2=numpy.empty(nbsum, dtype=numpy.int) 
else: 
    result2=None 

comm.Gatherv(array_lst,[result2,tuple(sendcounts),tuple(displacements),MPI.DOUBLE],root=0) 

if rank==0: 
    print "Gatherv, result2= "+str(result2) 
+0

謝謝弗朗西斯。我不能直接使用你的代碼,因爲在我的情況下,每個處理器只知道它自己的數組大小。我已經添加了一個更新的代碼構建您的Gatherv建議對我的問題。 –

+0

不客氣!請注意,'recvcounts','displs'和'recvtype'這兩個參數只在根目錄中有意義。請參閱https://www.open-mpi.org/doc/v1.8/man3/MPI_Gatherv.3.php。因此,很可能發送的項目數量的gather()'是足夠的,'all_gather()'是過量的。嘗試將一個空列表'[]'傳遞給'gatherv()'中的其他進程:它似乎工作。 – francis