2017-09-06 377 views
0

我正在嘗試將耗時的任務提取到單獨的進程。不幸的是,多線程似乎並不是PHP的選項,但您可以使用popen創建新的php進程。PHP popen進程限制?

用例如下:每分鐘運行一次cronjob,檢查是否有任何需要發送的電子郵件活動。可能需要同時發送多個廣告系列,但截至目前,它只是每分鐘收到一個廣告系列。我想提取廣告系列的發送到單獨的流程,以便我可以同時發送多個廣告系列。

的代碼看起來是這樣的(注意,這只是一個概念驗證):

的crontab

* * * * * root /usr/local/bin/php /var/www/maintask.php 2>&1 

maintask.php

for ($i = 0; $i < 4; $i++) { 
    $processName = "Process_{$i}"; 
    echo "Spawn process {$processName}" . PHP_EOL; 

    $process = popen("php subtask.php?process_name={$processName} 2>&1", "r"); 
    stream_set_blocking($process, false); 
} 

子任務.php

$process = $_GET['process_name']; 

echo "Started sleeping process {$process}" . PHP_EOL; 
sleep(rand(10, 40)); 
echo "Stopped sleeping process {$process}" . PHP_EOL; 

現在,我遇到的問題是,popen只會在任何時候產生2個進程,而我試圖產卵4.我找不出原因。似乎沒有任何限制記錄。也許這受到我可用內核數量的限制?

+0

我已經用[proc_open()](http://php.net/manual/en/function.proc-open.php)成功地在過去創建多個子進程。我會看看我是否可以挖掘出代碼段 –

+0

你解決了你的問題嗎?你怎麼知道你的代碼只能同時運行2個進程?你的文章中的代碼不足以測試這個。 –

+0

@WeeZel不幸的是,我還沒有能夠解決這個問題。我通過監視活動進程檢查了僅有2個正在運行的進程,使用「ps -aux」。我可以看到,只要有一個subtask.php進程完成,一個新進程就會啓動。 我想你是對的提供的代碼。這是一個試圖簡單地展示我想要實現的內容,而不會公佈所有會使事情過於複雜的實際代碼。我會看看我是否可以對我的問題進行演示並更新原始帖子。 謝謝! –

回答

1

我修改了subtask.php,以便您可以看到每個任務的開始時間,結束時間和打算等待的時間。現在你可以看到一個進程啓動時/停止可以減少睡眠時間 - 無需使用ps -aux顯示當進程正在運行

subtask.php

<?php 
$process = $argv[1]; 

$sleepTime = rand(1, 10); 
echo date('Y-m-d H:i:s') . " - Started sleeping process {$process} ({$sleepTime})" . PHP_EOL; 
sleep($sleepTime); 
echo date('Y-m-d H:i:s') . " - Stopped sleeping process {$process}" . PHP_EOL; 

我添加類到maintask.php代碼,以便您可以測試它...開始從中獲得樂趣,當你queue()更多的條目超過您設置maxProcesses(試行32)
注:結果會回來的順序他們完成

maintask.php

<?php 
class ParallelProcess 
{ 
    private $maxProcesses = 16; // maximum processes 
    private $arrProcessQueue = []; 
    private $arrCommandQueue = []; 

    private function __construct() 
    { 
    } 

    private function __clone() 
    { 
    } 

    /** 
    * 
    * @return \static 
    */ 
    public static function create() 
    { 
     $result = new static(); 
     return $result; 
    } 

    /** 
    * 
    * @param int $maxProcesses 
    * @return \static 
    */ 
    public static function load($maxProcesses = 16) 
    { 
     $result = self::create(); 
     $result->setMaxProcesses($maxProcesses); 
     return $result; 
    } 

    /** 
    * get maximum processes 
    * 
    * @return int 
    */ 
    public function getMaxProcesses() 
    { 
     return $this->maxProcesses; 
    } 

    /** 
    * set maximum processes 
    * 
    * @param int $maxProcesses 
    * @return $this 
    */ 
    public function setMaxProcesses($maxProcesses) 
    { 
     $this->maxProcesses = $maxProcesses; 
     return $this; 
    } 

    /** 
    * number of entries in the process queue 
    * 
    * @return int 
    */ 
    public function processQueueLength() 
    { 
     $result = count($this->arrProcessQueue); 
     return $result; 
    } 

    /** 
    * number of entries in the command queue 
    * 
    * @return int 
    */ 
    public function commandQueueLength() 
    { 
     $result = count($this->arrCommandQueue); 
     return $result; 
    } 


    /** 
    * process open 
    * 
    * @staticvar array $arrDescriptorspec 
    * @param string $strCommand 
    * @return $this 
    * @throws \Exception 
    */ 
    private function p_open($strCommand) 
    { 
     static $arrDescriptorSpec = array(
      0 => array('file', '/dev/null', 'r'), // stdin is a file that the child will reda from 
      1 => array('pipe', 'w'), // stdout is a pipe that the child will write to 
      2 => array('file', '/dev/null', 'w') // stderr is a pipe that the child will write to 
     ); 

     $arrPipes = array(); 
     if (($resProcess = proc_open($strCommand, $arrDescriptorSpec, $arrPipes)) === false) { 
      throw new \Exception("error: proc_open() failed!"); 
     } 

     $resStream = &$arrPipes[1]; 

     if (($blnSetBlockingResult = stream_set_blocking($resStream, true)) === false) { 
      throw new \Exception("error: stream_set_blocking() failed!"); 
     } 

     $this->arrProcessQueue[] = array(&$strCommand, &$resProcess, &$resStream); 
     return $this; 
    } 

    /** 
    * execute any queued commands 
    * 
    * @return $this 
    */ 
    private function executeCommand() 
    { 
     while ($this->processQueueLength() < $this->maxProcesses and $this->commandQueueLength() > 0) { 
      $strCommand = array_shift($this->arrCommandQueue); 
      $this->p_open($strCommand); 
     } 
     return $this; 
    } 

    /** 
    * process close 
    * 
    * @param array $arrQueueEntry 
    * @return $this 
    */ 
    private function p_close(array $arrQueueEntry) 
    { 
     $resProcess = $arrQueueEntry[1]; 
     $resStream = $arrQueueEntry[2]; 

     fclose($resStream); 

     $this->returnValue = proc_close($resProcess); 

     $this->executeCommand(); 
     return $this; 
    } 

    /** 
    * queue command 
    * 
    * @param string $strCommand 
    * @return $this 
    */ 
    public function queue($strCommand) { 
     // put the command on the $arrCommandQueue 
     $this->arrCommandQueue[] = $strCommand; 
     $this->executeCommand(); 
     return $this; 
    } 

    /** 
    * read from stream 
    * 
    * @param resource $resStream 
    * @return string 
    */ 
    private static function readStream($resStream) 
    { 
     $result = ''; 
     while (($line = fgets($resStream)) !== false) { 
      $result .= $line; 
     } 
     return $result; 
    } 

    /** 
    * read a result from the process queue 
    * 
    * @return string|false 
    */ 
    private function readProcessQueue() 
    { 
     $result = false; 
     reset($this->arrProcessQueue); 
     while ($result === false && list($key, $arrQueueEntry) = each($this->arrProcessQueue)) { 
      $arrStatus = proc_get_status($arrQueueEntry[1]); 
      if ($arrStatus['running'] === false) { 
       array_splice($this->arrProcessQueue, $key, 1); 
       $resStream = $arrQueueEntry[2]; 
       $result = self::readStream($resStream); 
       $this->p_close($arrQueueEntry); 
      } 
     } 
     return $result; 
    } 

    /** 
    * get result from process queue 
    * 
    * @return string|false 
    */ 
    public function readNext() 
    { 
     $result = false; 
     if ($this->processQueueLength() === 0) { 
     } else { 
      while ($result === false and $this->processQueueLength() > 0) { 
       $result = $this->readProcessQueue(); 
      } 
     } 
     return $result; 
    } 
} 

set_time_limit(0); // don't timeout 

$objParallelProcess = ParallelProcess::load(8); // allow up to 8 parallel processes 

for ($i = 0; $i < 4; $i++) { 
    $processName = "Process_{$i}"; 
    echo date('Y-m-d H:i:s') . " - Queue process {$processName}" . PHP_EOL; 
    $objParallelProcess->queue("php subtask.php {$processName}"); // queue process 
} 

// loop through process queue 
while (($strResponse = $objParallelProcess->readNext()) !== false) { // read next result and run next command if one is queued 
    // process response 
    echo $strResponse; 
} 
+0

這是出色的工作!謝謝一堆! –