0
我正在編寫一個使用MPI將未排序數組發送給工作人員的Python腳本,該工作人員將對所述數組進行排序並將其歸類爲主數據。當使用超過5名工人時,MPI顯然死鎖
運行它與mpirun -n 2 python mpi_sort.py
高達mpirun -n 5 python mpi_sort.py
按預期工作,但DIE消息似乎會丟失時,陣列數量太大,工作人員永遠不會停止。
運行超過5名工人時,腳本在執行過程中很早停止。通常情況下,工作人員將獲得第一批數組,返回主人,並且永遠不會再有任何工作。我很難理解爲什麼會發生這種情況。
更糟糕的是,如果我減少陣列的大小或數量,更多的工作人員似乎能夠很好地完成這項工作。
的代碼如下:
#!/usr/bin/ENV python
import numpy
from mpi4py import MPI
NUMARRAYS = 1000
ARRAYSIZE = 10000
ASK_FOR_WORK_TAG = 1
WORK_TAG = 2
DIE_TAG = 3
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
status = MPI.Status()
# Master
if rank == 0:
data = numpy.empty(ARRAYSIZE, dtype=numpy.int32)
sorted_data = numpy.empty([NUMARRAYS, ARRAYSIZE], dtype=numpy.int32)
sorted_arrays = 0
while sorted_arrays < NUMARRAYS:
print "[Master] Probing"
comm.Recv(data, source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
print "[Master] Probed"
dest = status.Get_source()
print "[Master] got request for work from worker %d" % dest
data = numpy.random.random_integers(0, ARRAYSIZE, ARRAYSIZE).astype(numpy.int32)
print "[Master] sending work to Worker %d" % dest
comm.Send([data, ARRAYSIZE, MPI.INT], dest=dest, tag=WORK_TAG)
print "[Master] sent work to Worker %d" % dest
print "[Master] waiting for complete work from someone"
comm.Recv([data, ARRAYSIZE, MPI.INT], source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
print "[Master] got results from Worker %d. Storing in line %d" % (status.Get_source(), sorted_arrays)
sorted_data[sorted_arrays] = numpy.copy(data)
numpy.savetxt("sample", data, newline=" ", fmt="%d")
sorted_arrays += 1
for dest in range(1, size):
print "[Master] Telling Worker %d to DIE DIE DIE" % dest
comm.Send(data, dest=dest, tag=DIE_TAG)
# Slave
else:
# Ask for work
data = numpy.empty(ARRAYSIZE, dtype=numpy.int32)
while True:
print "[Worker %d] asking for work" % rank
comm.Send(data, dest=0, tag=ASK_FOR_WORK_TAG)
print "[Worker %d] sent request for work" % rank
comm.Recv(data, source=0, tag=MPI.ANY_TAG, status=status)
if status.Get_tag() == WORK_TAG:
print "[Worker %d] got work" % rank
print "[Worker %d] is sorting the array" % rank
data.sort()
print "[Worker %d] finished work. Sending it back" % rank
comm.Send([data, ARRAYSIZE, MPI.INT], dest=0, tag=ASK_FOR_WORK_TAG)
else:
print "[Worker %d] DIE DIE DIE" % rank
break
我不是超級熟悉mpi4py,但是這看起來像一個典型的MPI僵局 - 您發佈臨危發送消息之前,所以MPI等待收到,但沒有收到任何東西,因爲你還沒有發送任何東西。我認爲mpi4py也提供了MPI_Irecv的包裝器呢?如果是這樣,你可能要考慮使用它,而不是... – mgilson 2014-09-25 17:59:31
它看起來對我來說,但爲什麼它與6名工人死鎖,但不與5? – 2014-09-25 18:15:29
爲了提高效率,一些MPI實現以有趣的方式緩存操作。也許緩存取決於工作者的數量(儘管通常我會期望隱式地緩存「發送」,而不是「recv」...) – mgilson 2014-09-25 18:17:59