2014-09-25 45 views
0

我正在編寫一個使用MPI將未排序數組發送給工作人員的Python腳本,該工作人員將對所述數組進行排序並將其歸類爲主數據。當使用超過5名工人時,MPI顯然死鎖

運行它與mpirun -n 2 python mpi_sort.py高達mpirun -n 5 python mpi_sort.py按預期工作,但DIE消息似乎會丟失時,陣列數量太大,工作人員永遠不會停止。

運行超過5名工人時,腳本在執行過程中很早停止。通常情況下,工作人員將獲得第一批數組,返回主人,並且永遠不會再有任何工作。我很難理解爲什麼會發生這種情況。

更糟糕的是,如果我減少陣列的大小或數量,更多的工作人員似乎能夠很好地完成這項工作。

的代碼如下:

#!/usr/bin/ENV python 
import numpy 
from mpi4py import MPI 

NUMARRAYS = 1000 
ARRAYSIZE = 10000 

ASK_FOR_WORK_TAG = 1 
WORK_TAG = 2 
DIE_TAG = 3 

comm = MPI.COMM_WORLD 
rank = comm.Get_rank() 
size = comm.Get_size() 
status = MPI.Status() 

# Master 
if rank == 0: 
    data = numpy.empty(ARRAYSIZE, dtype=numpy.int32) 
    sorted_data = numpy.empty([NUMARRAYS, ARRAYSIZE], dtype=numpy.int32) 
    sorted_arrays = 0 

    while sorted_arrays < NUMARRAYS: 
     print "[Master] Probing" 
     comm.Recv(data, source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status) 
     print "[Master] Probed" 

     dest = status.Get_source() 
     print "[Master] got request for work from worker %d" % dest 

     data = numpy.random.random_integers(0, ARRAYSIZE, ARRAYSIZE).astype(numpy.int32) 
     print "[Master] sending work to Worker %d" % dest 
     comm.Send([data, ARRAYSIZE, MPI.INT], dest=dest, tag=WORK_TAG) 
     print "[Master] sent work to Worker %d" % dest 

     print "[Master] waiting for complete work from someone" 
     comm.Recv([data, ARRAYSIZE, MPI.INT], source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status) 
     print "[Master] got results from Worker %d. Storing in line %d" % (status.Get_source(), sorted_arrays) 
     sorted_data[sorted_arrays] = numpy.copy(data) 
     numpy.savetxt("sample", data, newline=" ", fmt="%d") 
     sorted_arrays += 1 

    for dest in range(1, size): 
     print "[Master] Telling Worker %d to DIE DIE DIE" % dest 
     comm.Send(data, dest=dest, tag=DIE_TAG) 

# Slave 
else: 
    # Ask for work 
    data = numpy.empty(ARRAYSIZE, dtype=numpy.int32) 
    while True: 
     print "[Worker %d] asking for work" % rank 
     comm.Send(data, dest=0, tag=ASK_FOR_WORK_TAG) 
     print "[Worker %d] sent request for work" % rank 

     comm.Recv(data, source=0, tag=MPI.ANY_TAG, status=status) 

     if status.Get_tag() == WORK_TAG: 
      print "[Worker %d] got work" % rank 

      print "[Worker %d] is sorting the array" % rank 
      data.sort() 
      print "[Worker %d] finished work. Sending it back" % rank 
      comm.Send([data, ARRAYSIZE, MPI.INT], dest=0, tag=ASK_FOR_WORK_TAG) 
     else: 
      print "[Worker %d] DIE DIE DIE" % rank 
      break 
+1

我不是超級熟悉mpi4py,但是這看起來像一個典型的MPI僵局 - 您發佈臨危發送消息之前,所以MPI等待收到,但沒有收到任何東西,因爲你還沒有發送任何東西。我認爲mpi4py也提供了MPI_Irecv的包裝器呢?如果是這樣,你可能要考慮使用它,而不是... – mgilson 2014-09-25 17:59:31

+0

它看起來對我來說,但爲什麼它與6名工人死鎖,但不與5? – 2014-09-25 18:15:29

+0

爲了提高效率,一些MPI實現以有趣的方式緩存操作。也許緩存取決於工作者的數量(儘管通常我會期望隱式地緩存「發送」,而不是「recv」...) – mgilson 2014-09-25 18:17:59

回答

0

我發現這個問題。

有一些死鎖,像@ mgilson建議的。

首先,工人會送回工作,但主人會將其解釋爲工人的請求,而工人並不期待這種請求。

然後,有消息的問題,這是類似的。 DIE消息將被髮送給不期待他們的工人。

最終的解決方案是這樣的:

#!/usr/bin/ENV python 
import numpy 
from mpi4py import MPI 

NUMARRAYS = 100 
ARRAYSIZE = 10000 

ASK_FOR_WORK_TAG = 1 
WORK_TAG = 2 
WORK_DONE_TAG = 3 
DIE_TAG = 4 

comm = MPI.COMM_WORLD 
rank = comm.Get_rank() 
size = comm.Get_size() 
status = MPI.Status() 

# Master 
if rank == 0: 
    data = numpy.empty(ARRAYSIZE, dtype=numpy.int32) 
    sorted_data = numpy.empty([NUMARRAYS, ARRAYSIZE], dtype=numpy.int32) 
    sorted_arrays = 0 
    dead_workers = 0 

    while dead_workers < size - 1: 
     print "[Master] Probing" 
     comm.Recv([data, ARRAYSIZE, MPI.INT], source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status) 
     print "[Master] Probed" 

     dest = status.Get_source() 
     if status.Get_tag() == ASK_FOR_WORK_TAG: 
      if sorted_arrays <= NUMARRAYS - 1: 
       print "[Master] got request for work from worker %d" % dest 

       data = numpy.random.random_integers(0, ARRAYSIZE, ARRAYSIZE).astype(numpy.int32) 
       print "[Master] sending work to Worker %d" % dest 
       comm.Send([data, ARRAYSIZE, MPI.INT], dest=dest, tag=WORK_TAG) 
       print "[Master] sent work to Worker %d" % dest 
      else: 
       # Someone did more work than they should have 
       print "[Master] Telling worker %d to DIE DIE DIE" % dest 
       comm.Send([data, ARRAYSIZE, MPI.INT], dest=dest, tag=DIE_TAG) 
       dead_workers += 1 
       print "[Master] Already killed %d workers" % dead_workers 

     elif status.Get_tag() == WORK_DONE_TAG: 
      if sorted_arrays <= NUMARRAYS - 1: 
       print "[Master] got results from Worker %d. Storing in line %d" % (status.Get_source(), sorted_arrays) 
       sorted_data[sorted_arrays] = numpy.copy(data) 
       numpy.savetxt("sample", data, newline=" ", fmt="%d") 
       sorted_arrays += 1 

# Slave 
else: 
    # Ask for work 
    data = numpy.empty(ARRAYSIZE, dtype=numpy.int32) 
    while True: 
     print "[Worker %d] asking for work" % rank 
     comm.Send([data, ARRAYSIZE, MPI.INT], dest=0, tag=ASK_FOR_WORK_TAG) 
     print "[Worker %d] sent request for work" % rank 

     comm.Recv([data, ARRAYSIZE, MPI.INT], source=0, tag=MPI.ANY_TAG, status=status) 

     if status.Get_tag() == WORK_TAG: 
      print "[Worker %d] got work" % rank 

      print "[Worker %d] is sorting the array" % rank 
      data.sort() 
      print "[Worker %d] finished work. Sending it back" % rank 
      comm.Send([data, ARRAYSIZE, MPI.INT], dest=0, tag=WORK_DONE_TAG) 
     elif status.Get_tag() == DIE_TAG: 
      print "[Worker %d] DIE DIE DIE" % rank 
      break 
     else: 
      print "[Worker %d] Doesn't know what to to with tag %d right now" % (rank, status.Get_tag())