2017-08-30 108 views
1

我有一個使用PHP 7.0.22的pthreads v3.1.6的代碼。我遇到的問題是線程不返回數組值。我的代碼如下pthreads破壞數組結果

$threadCount = 1; 

$url_idList = range(1,2000); 
$url_idListChunked = array_chunk($url_idList, $threadCount); 

class WorkerThreads extends Thread { 
    private $threadName, $url_id; 

    public function __construct($threadName,$url_id) { 
     $this->threadName = $threadName; 
     $this->url_id = $url_id; 
     $this->result = []; 
    } 

    public function run() { 
     if ($this->threadName && $this->url_id) { 
      printf('%sLoading URL #: %s' . "\n", $this->threadName, $this->url_id); 
      $this->result = send_request('GET',$this->url_id,NULL,$this->threadName); 
     } 
    } 
} 

while(count($url_idListChunked)){ 
    $url_idListChunk = array_shift($url_idListChunked); 
    $workers = []; 
    foreach (range(0,count($url_idListChunk)-1) as $i) { 
     $threadName = "Thread #".$i.": "; 
     $workers[$i] = new WorkerThreads($threadName,$url_idListChunk[$i]); 
     $workers[$i]->start(); 
    } 

    foreach (range(0,count($url_idListChunk)-1) as $i) { 
     $workers[$i]->join(); 
     print_r($workers[$i]); 
     exit(); 
     echo $workers[$i]['threadName']."Result for URL #: ".$workers[$i]['url_id']."\n"; 
    } 

} 

function send_request($method,$url_id,$data,$threadName=NULL){ 

    $url = 'https://www.example.tld/?id='.$url_id; 

    $ch = curl_init(); 
    curl_setopt($ch, CURLOPT_URL, $url); 
    curl_setopt($ch, CURLOPT_HEADER, TRUE); 
    curl_setopt($ch, CURLOPT_RETURNTRANSFER, TRUE); 
    curl_setopt($ch, CURLOPT_FOLLOWLOCATION, TRUE); 
    curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, FALSE); 
    curl_setopt($ch, CURLOPT_TIMEOUT, 3); 
    if(!$data && $method=='POST'){ 
     $data = generate_post_data(); 
     curl_setopt($ch, CURLOPT_POST, 1); 
     curl_setopt($ch, CURLOPT_POSTFIELDS, $data); 
    } 
    $response = curl_exec($ch); 
    while((curl_errno($ch) == 6 OR curl_errno($ch) == 28)){ 
     $response = curl_exec($ch); 
     echo $threadName.'Curl error #'.curl_errno($ch).' - ' . curl_error($ch)." Retrying.\n"; 
     sleep(2);  
    } 

    curl_close($ch); 

    $result['data'] = $response; 

    return $result; 

} 

當我嘗試print_t($workers)我收到以下錯誤消息Uncaught RuntimeException: pthreads detected an attempt to connect to an object which has already been destroyed。爲什麼我會失去數組結果?看起來線程沒有問題將字符串傳回。

回答

0

你好,我不完全確定使用start-> join方法是你想要實現的首選方法嗎?我認爲你需要使用pthreads中的pool collectable方法。

這是一個例子,可能會激發你的工作,如果你想從分塊批次收集結果。我親自使用它,這是推動pthreads進入極限的最快方法。小心不要通過CPU線程推送池號(在這個例子中它是10個核心)。

如果我可以說關於你的代碼,不要試圖從pthreads worker輸出屏幕上的東西,它肯定會亂七八糟。返回對象並在集合上回顯它。確保你的結果在你的課堂上已經公開允許返回的對象。

更不用說多捲曲,它可能是最快和最恰當的方式。

/* pthreads batches */ 
$batches = array(); 

$nbpool = 20; // cpu 10 cores 

/* job 1 */ 
$list = [/* data1 */]; 
$url_idList[] = array_chunk($list, 5000); 

/* job 2 */ 
$list2 = [/* data2 */]; 
$url_idList[] = array_chunk($list, 10000); 

/* final collected results */ 
$resultFinal = []; 

/* loop across batches */ 
foreach ($url_idList as $key => $url_idListChunked) { 

    $url_idListChunk = array_shift($url_idListChunked); 

    /* for intermediate collection */ 
    $data[$key] = []; 

    /* how many workers */ 
    $workCount = count($url_idListChunk); 

    /* set pool job up to max cpu capabilities */ 
    $pool = new Pool($nbpool, Worker::class); 

    /* pool cycling submit */ 
    foreach (range(1, $workCount) as $i) { 
     $chunck = $url_idListChunk[$i - 1]; 
     $pool->submit(new WorkerThreads(($i - 1), $chunck)); 
    } 

    /* on collection cycling */ 
    $collector = function (\Collectable $work) use (&$data) { 

     /* is worker complete ? */ 
     $isGarbage = $work->isGarbage(); 

     /* worker complete */ 
     if ($isGarbage) { 
      $result = $work->result; 
      $info = $work->info; 
      $data[$key] = $result; 

      /* echo result info outside worker */ 
      echo($info); 
     } 
     return $isGarbage; 
    }; 
    do { 
     /* collection on pool stack */ 
     $count = $pool->collect($collector); 
     $isComplete = count($data) === $workCount; 
    } while (!$isComplete); 

    /* push stack results */ 
    array_push($resultFinal, $data); 

    /* close pool */ 
    $pool->shutdown(); 
} 

class WorkerThreads extends \Threaded implements \Collectable { 

    private $url_id; 
    private $isGarbage; 
    private $threadName; 
    public $result; 
    public $info; 

    public function __construct($i, $url_id) { 
     $this->threadName = "Thread #" . $i . ": "; 
     $this->url_id = $url_id; 
    } 

    public function run() { 
     if ($this->threadName && $this->url_id) { 
      $this->result = send_request('GET', $this->url_id, NULL, $this->threadName); 
     } 
     $this->info = $this->threadName . " Result for URL #: " . $this->url_id; 
     $this->isGarbage = true; // yeah, it s done 
    } 

    public function isGarbage(): bool { 
     return $this->isGarbage; 
    } 

} 
+0

首先,感謝您的回覆。即使您提供了評論,我仍然無法理解代碼。我嘗試運行它,並行'$池 - >提交(新的WorkerThreads(($ i - 1),$ chunck));'是拋出和錯誤,說'Class'WorkerThreads'找不到'。另外,我還沒有達到在$ list數組中有兩個作業的目的。 –

+0

您首先需要將pthreads dll添加到php.ini以使pthreads正常工作。它只能在cli模式下工作,因爲你可能知道,所以你需要禁用DLL的xampp工作。 Vis等等......你可以添加任意數量的$ list_array,甚至可以爲每個批次添加池的數量。這是一個簡單的方法來跨批次同步wotk,並避免使用棘手的pthread syncro方法。 – tryHarder

+0

pthreads.dll安裝並正常工作。它找不到WorkerThreads類,而不是實際的Threaded類。 –