2012-02-13 134 views
7

我試圖完成以下任務:Perl的隊列和線程

  1. 有一個線程,從一個非常大的文件中讀取數據說一下 10GB,並將其推入隊列。 (我不希望爲隊列 變得非常大任)

  2. 雖然buildQueue線程在同一時間將數據推送到隊列中有 約5個工作線程去排隊和處理數據。

我做了一個嘗試,但我的其他線程無法訪問,因爲我的buildQueue線程連續循環。

我的方法可能完全錯誤。感謝您的幫助,非常感謝。

下面的代碼爲buildQueue

sub buildQueue { 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 
    open DICT_FILE, $dict_path or die("Sorry, could not open file!"); 
    while (1) { 
     if (<DICT_FILE>) { 
      if ($queue->pending() < 100) { 
       my $query = <DICT_FILE>; 
       chomp($query); 
       $queue->enqueue($query); 
       my $count = $queue->pending(); 
       print "Queue Size: $count Query: $query\n"; 
      } 
     } 
    } 
} 

正如我所預料,當此線程被什麼別的執行後會被執行,因爲該線程將無法完成。

my $builder = new Thread(&buildQueue); 

由於構建器線程將運行很長時間,所以我永遠不會創建工作線程。

這裏是整個代碼:

#!/usr/bin/perl -w 
use strict; 
use Thread; 
use Thread::Queue; 


my $queue = new Thread::Queue(); 
my @threads; 

sub buildQueue { 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 
    open dict_file, $dict_path or die("Sorry, could not open file!"); 
    while (1) { 
     if (<dict_file>) { 
      if ($queue->pending() < 100) { 
       my $query = <dict_file>; 
       chomp($query); 
       $queue->enqueue($query); 
       my $count = $queue->pending(); 
       print "Queue Size: $count Query: $query\n"; 
      } 
     } 
    } 
} 

sub processor { 
    my $query; 
    while (1) { 
     if ($query = $queue->dequeue) { 
      print "$query\n"; 
     } 
    } 
} 

my $builder = new Thread(&buildQueue); 
push @threads, new Thread(&processor) for 1..5; 
+0

幾個問題:你提到你的隊列,構造器線程將無法完成,但它做了什麼呢?隊列大小是否低於100或高於0?另外,[我不確定你是否正確創建你的線程](http://perldoc.perl.org/perlthrtut.html)。不應該是'我的$ builder = threads-> create(\&buildQueue);'? – 2012-02-13 11:57:11

+0

隊列生成器生成正常,但由於工作線程並沒有達到要創建這樣的隊列是停留在100,而構建的隊列仍然是因爲連續循環的運行不能從隊列中刪除任何東西。 – Sinista 2012-02-13 12:19:14

+0

嗯,我需要看到更多的代碼,以建立背景下,特別是在創建線程。在創建工作線程之前,您不是「連接」或「分離」隊列構建器,對吧? – 2012-02-13 12:20:02

回答

10

你需要標記,當你想你的線程退出(通過任何joinor detach)。事實上,你有沒有last聲明的無限循環突破它們也是一個問題。

編輯:我也忘了一個非常重要的部分! Each worker thread will block, waiting for another item to process off of the queue until they get an undef in the queue。因此,爲什麼我們在隊列構建器完成後爲每個線程特別排隊undef

嘗試:

#!/usr/bin/perl -w 
use strict; 
use threads; 
use Thread::Queue; 


my $queue = new Thread::Queue(); 
our @threads; #Do you really need our instead of my? 

sub buildQueue 
{ 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 

    #Three-argument open, please! 
    open my $dict_file, "<",$dict_path or die("Sorry, could not open file!"); 
    while(my $query=<$dict_file>) 
    { 
     chomp($query); 
     while(1) 
     { #Wait to see if our queue has < 100 items... 
      if ($queue->pending() < 100) 
      { 
       $queue->enqueue($query); 
       print "Queue Size: " . $queue->pending . "\n"; 
       last; #This breaks out of the infinite loop 
      } 
     } 
    } 
    close($dict_file); 
    foreach(1..5) 
    { 
     $queue->enqueue(undef); 
    } 
} 

sub processor 
{ 
    my $query; 
    while ($query = $queue->dequeue) 
    { 
     print "Thread " . threads->tid . " got $query\n"; 
    } 
} 

my $builder=threads->create(\&buildQueue); 
push @threads,threads->create(\&process) for 1..5; 

#Waiting for our threads to finish. 
$builder->join; 
foreach(@threads) 
{ 
    $_->join; 
} 
+1

看來,問題是我棄用線程模塊,而是切換到線程模塊,而我的代碼正如現在應該的那樣工作。謝謝傑克多給我指出了正確的方向。 – Sinista 2012-02-13 13:06:40

1

這聽起來像這種情況可能與Parallel::ForkManager模塊做。

+0

如果可能的話,很樂意看到ForkManager解決方案。 – Sinista 2012-02-14 07:14:56

0

一種不同的方法:您還可以在MCE 1.2+使用user_tasks並創建兩個多工人tasks,用於讀取(因爲它是一個很大的文件,你也可以從並行讀取中獲益,同時保留文件讀取搜索)一個任務和一個處理任務等。

下面的代碼仍然使用Thread::Queue來管理您的緩衝區隊列。

buildQueue sub有你的隊列大小控制,它直接將數據推送到管理進程$ R_QUEUE,因爲我們已經使用了線程,所以它可以訪問父進程的內存空間。如果您想要使用分叉,您仍然可以通過回調函數訪問隊列。但在這裏,我選擇只是推到隊列。

processQueue子只會去排隊無論是在隊列中,直到沒有什麼更多的正在申請中。

每個任務中的task_end子只在每個任務結束時由管理進程運行一次,因此我們用它來向我們的工作進程發出停止信號。

顯然,有一個在你想怎麼塊數據的工人,這樣你就可以在塊的大小,甚至是如何發出聲音在你的數據決定有很大的自由度。

#!/usr/bin/env perl 
use strict; 
use warnings; 
use threads; 
use threads::shared; 
use Thread::Queue; 
use MCE; 

my $R_QUEUE = Thread::Queue->new; 
my $queue_workers = 8; 
my $process_workers = 8; 
my $chunk_size = 1; 

print "Enter a file name: "; 
my $input_file = <STDIN>; 
chomp($input_file); 

sub buildQueue { 
    my ($self, $chunk_ref, $chunk_id) = @_; 
    if ($R_QUEUE->pending() < 100) { 
     $R_QUEUE->enqueue($chunk_ref); 
     $self->sendto('stdout', "Queue Size: " . $R_QUEUE->pending ."\n"); 
    } 
} 

sub processQueue { 
    my $self = shift; 
    my $wid = $self->wid; 
    while (my $buff = $R_QUEUE->dequeue) { 
     $self->sendto('stdout', "Thread " . $wid . " got $$buff"); 
    } 
} 

my $mce = MCE->new(
    input_data => $input_file, # this could be a filepath or a file handle or even a scalar to treat like a file, check the documentation for more details. 
    chunk_size => $chunk_size, 
    use_slurpio => 1, 

    user_tasks => [ 
     { # queueing task 
      max_workers => $queue_workers, 
      user_func => \&buildQueue, 
      use_threads => 1, # we'll use threads to have access to the parent's variables in shared memory. 
      task_end => sub { $R_QUEUE->enqueue((undef) x $process_workers) } # signal stop to our process workers when they hit the end of the queue. Thanks > Jack Maney! 
     }, 
     { # process task 
      max_workers => $process_workers, 
      user_func => \&processQueue, 
      use_threads => 1, # we'll use threads to have access to the parent's variables in shared memory 
      task_end => sub { print "Finished processing!\n"; } 
     } 
    ] 
); 

$mce->run(); 

exit; 
3

的Perl的MCE模塊喜歡大文件。使用MCE,可以同時分割許多行,作爲標量字符串啜一大塊,或一次讀取1行。分割許多行可以減少IPC的開銷。

MCE 1.504現在已經出來了。它提供MCE :: Queue支持包括線程在內的子進程。此外,1.5版本帶有5種型號(MCE ::流量,MCE :: grep的,MCE ::環,MCE ::地圖,以及MCE ::流)的照顧實例化MCE實例以及自動調整max_workers和chunk_size。人們可以重寫這些選項順便說一句。

下面,MCE ::循環用於演示。

use MCE::Loop; 

print "Enter a file name: "; 
my $dict_path = <STDIN>; 
chomp($dict_path); 

mce_loop_f { 
    my ($mce, $chunk_ref, $chunk_id) = @_; 

    foreach my $line (@$chunk_ref) { 
     chomp $line; 
     ## add your code here to process $line 
    } 

} $dict_path; 

如果要指定人員和/或CHUNK_SIZE的數量,然後有2種方式來做到這一點。

use MCE::Loop max_workers => 5, chunk_size => 300000; 

或者......

use MCE::Loop; 

MCE::Loop::init { 
    max_workers => 5, 
    chunk_size => 300000 
}; 

雖然首選大文件分塊,一個可以比較在一個時間分塊一條線的時間。可以省略塊內的第一行(註釋掉)。注意如何不需要內部for循環。 $ chunk_ref仍然是一個包含1行的數組ref。輸入標量$ _包含chunk_size等於1時的行,否則指向$ chunk_ref。

use MCE::Loop; 

MCE::Loop::init { 
    max_workers => 5, 
    chunk_size => 1 
}; 

print "Enter a file name: "; 
my $dict_path = <STDIN>; 
chomp($dict_path); 

mce_loop_f { 
# my ($mce, $chunk_ref, $chunk_id) = @_; 

    my $line = $_; 
    ## add your code here to process $line or $_ 

} $dict_path; 

我希望這個演示是爲希望並行處理文件鄉親的幫助。

:)馬里奧