2012-03-08 66 views
3

我遇到的問題是,在PHP應用程序中,Gearman作業有時會傳遞給多個工作人員。我可以減少代碼以將其重現爲一個文件。現在我不確定這是Gearman中的錯誤還是pecl庫中的錯誤或者我的代碼中的錯誤。Gearman作業被傳遞給多個工作人員(PHP)

下面是重現該錯誤代碼:

#!/usr/bin/php 
<?php 

// Try 'standard', 'exception' or 'exception-sleep'. 
$sWorker = 'exception'; 



// Detect run mode "client" or "worker". 
if (!isset($argv[1])) 
    $sMode = 'client'; 
else 
    $sMode = 'worker-' . $sWorker; 

$sLogFilePath = __DIR__ . '/log.txt'; 

switch ($sMode) { 

    case 'client': 

     // Remove all queued test jobs and quit if there are test workers running. 
     prepare(); 

     // Init the greaman client. 
     $Client= new GearmanClient; 
     $Client->addServer(); 

     // Empty the log file. 
     file_put_contents($sLogFilePath, ''); 

     // Start some worker processes. 
     $aPids = array();  
     for ($i = 0; $i < 100; $i++) 
      $aPids[] = exec('php ' . __FILE__ . ' worker > /dev/null 2>&1 & echo $!'); 

     // Start some jobs. Also try doHigh(), doBackground() and 
     // doBackgroundHigh(); 
     for ($i = 0; $i < 50; $i++) 
      $Client->doNormal('test', $i); 

     // Wait a second (when running jobs in background). 
     // sleep(1); 

     // Prepare the log file entries. 
     $aJobs = array(); 
     $aLines = file($sLogFilePath); 
     foreach ($aLines as $sLine) { 
      list($sTime, $sPid, $sHandle, $sWorkload) = $aAttributes = explode("\t", $sLine); 
      $sWorkload = trim($sWorkload); 
      if (!isset($aJobs[$sWorkload])) 
       $aJobs[$sWorkload] = array(); 
      $aJobs[$sWorkload][] = $aAttributes; 
     } 

     // Remove all jobs that have been passed to only one worker as expected. 
     foreach ($aJobs as $sWorkload => $aJob) { 
      if (count($aJob) === 1) 
       unset($aJobs[$sWorkload]); 
     } 

     echo "\n\n"; 

     if (empty($aJobs)) 
      echo "No job has been passed to more than one worker."; 
     else { 
      echo "Those jobs has been passed more than one times to a worker:\n"; 
      foreach ($aJobs as $sWorload => $aJob) { 

       echo "\nJob #" . $sWorload . ":\n"; 
       foreach ($aJob as $aAttributes) 
        echo " $aAttributes[2] (Worker PID: $aAttributes[1])\n"; 
      } 
     } 

     echo "\n\n"; 

     // Kill all started workers. 
     foreach ($aPids as $sPid) 
      exec('kill ' . $sPid . ' > /dev/null 2>&1'); 

    break; 

    case 'worker-standard': 
     $Worker = new GearmanWorker; 
     $Worker->addServer(); 
     $Worker->addFunction('test', 'logJob'); 
        $bShutdown = false; 
     while ($Worker->work()) 
      if ($bShutdown) 
       continue; 
     break; 

    case 'worker-exception': 
     try { 
      $Worker = new GearmanWorker; 
      $Worker->addServer(); 
      $Worker->addFunction('test', 'logJob'); 
      $bShutdown = false; 
      while ($Worker->work()) 
       if ($bShutdown) 
        throw new \Exception; 

     } catch (\Exception $E) { 
     } 
    break; 

    case 'worker-exception-sleep': 
     try { 
      $Worker = new GearmanWorker; 
      $Worker->addServer(); 
      $Worker->addFunction('test', 'logJob'); 
      $bShutdown = false; 
      while ($Worker->work()) 
      { 
       if ($bShutdown) { 
        sleep(1); 
        throw new \Exception; 
       } 
      } 
     } catch (\Exception $E) { 
     } 
    break; 
} 

function logJob(\GearmanJob $Job) 
{ 
    global $bShutdown, $sLogFilePath; 
    $sLine = microtime(true) . "\t" . getmypid() . "\t" . $Job->handle() . "\t" . $Job->workload() . "\n"; 
    file_put_contents($sLogFilePath, $sLine, FILE_APPEND); 
    $bShutdown = true; 
} 


function prepare() 
{ 
    $rGearman = fsockopen('127.0.0.1', '4730', $iErrno, $sErrstr, 3); 
    $aBuffer = array(); 
    fputs ($rGearman, 'STATUS' . PHP_EOL); 
    stream_set_timeout($rGearman, 1); 
    while (!feof($rGearman)) 
     if ('.' . PHP_EOL !== $sLine = fgets($rGearman, 128)) 
      $aBuffer[] = $sLine; 
     else 
      break; 
    fclose($rGearman); 

    $bJobsInQueue = false; 
    $bWorkersRunning = false; 
    foreach ($aBuffer as $sFunctionLine) { 
     list($sFunctionName, $iQueuedJobs, $iRunningJobs, $iWorkers) = explode("\t", $sFunctionLine); 
     if ('test' === $sFunctionName) { 
      if (0 != $iQueuedJobs) 
       $bJobsInQueue = true; 
      if (0 != $iWorkers) 
       $bWorkersRunning = true;; 
     } 
    } 

    // Exit if there are workers running. 
    if ($bWorkersRunning) 
     die("There are some Gearman workers running that have registered a 'test' function. Please stop these workers and run again.\n\n"); 

    // If there are test jobs in the queue start a worker that eat up the jobs. 
    if ($bJobsInQueue) { 
     $sPid = exec('gearman -n -w -f test > /dev/null 2>&1 & echo $!'); 
     sleep(1); 
     exec ("kill $sPid > /dev/null 2>&1"); 
     // Repeat this method to make sure all jobs are removed. 
     prepare(); 
    } 
} 

當你運行它應該輸出"No job has been passed to more than one worker."在命令行上此代碼,但insted的它送花兒給人輸出已經通過一些任務的列表不止一名工人。如果您設置了$sWorker = 'standard';'exception-sleep',則不會出現該錯誤。

如果你可以運行代碼並告訴我你是否能夠重現錯誤代碼中的錯誤,它將會幫助我很多。

+0

當我運行你的代碼時,我得到了預期的輸出。你正在使用哪個Gearman版本? – Aurimas 2012-03-28 21:07:05

+1

謝謝你測試它。我已經在0.28和0.27上測試了它,在Linux上使用PECL lib版本1.0.1和1.0.2。你使用的是什麼版本和操作系統? – stofl 2012-04-11 10:34:35

+1

我在0.29上運行,但我只是再試一次,也能夠重現錯誤(它真的只發生在第一次測試時,我至少沒有得到3-4次錯誤)。 – Aurimas 2012-04-18 08:13:43

回答

3

與Gearman 0.24,PECL lib 1.0.2完全相同的問題。每次都能夠用腳本重現錯誤。

舊版本的Gearman(0.14我認爲)用於正常工作。

升級Gearman爲0.33修復了這個問題。