2010-11-27 106 views
14

在線搜索了一些內容,找到了簡單的'tutorials'來使用命名管道。但是,當我做任何背景工作時,我似乎失去了大量的數據。在bash中使用命名管道 - 數據丟失問題

[[編輯:找到一個更簡單的解決方案,請參閱回覆帖子。所以我提出的問題現在是學術 - 如果有人可能需要一個工作服務器]]

使用Ubuntu 10.04與Linux 2.6.32-25-generic#45-Ubuntu SMP Sat Oct 16 19:52:42 UTC 2010 x86_64的GNU/Linux的

GNU的bash,版本4.1.5(1)-release下(x86_64-PC-Linux的GNU)。

我的bash的功能是:

function jqs 
{ 
    pipe=/tmp/__job_control_manager__ 
    trap "rm -f $pipe; exit" EXIT SIGKILL 

    if [[ ! -p "$pipe" ]]; then 
     mkfifo "$pipe" 
    fi 

    while true 
    do 
    if read txt <"$pipe" 
    then 
     echo "$(date +'%Y'): new text is [[$txt]]" 

     if [[ "$txt" == 'quit' ]] 
     then 
    break 
     fi 
    fi 
    done 
} 

我在後臺運行,這樣的:

> jqs& 
[1] 5336 

現在我餵它:

for i in 1 2 3 4 5 6 7 8 
do 
    (echo aaa$i > /tmp/__job_control_manager__ && echo success$i &) 
done 

輸出是不一致的。 我經常沒有得到所有成功的回聲。 我最多可以獲得與成功回聲一樣多的新文本回聲,有時甚至更少。

如果我從「飼料」去除「&」,似乎工作,但我阻止,直到輸出被讀取。因此,我想讓子流程被阻止,但不是主流程。

其目的是寫一個簡單的作業控制腳本,這樣我可以運行在說最並聯10個就業機會,並排隊等待稍後處理的休息,但可靠地知道,他們運行。

全部工作如下經理:

function jq_manage 
{ 
    export __gn__="$1" 

    pipe=/tmp/__job_control_manager_"$__gn__"__ 
    trap "rm -f $pipe" EXIT 
    trap "break"  SIGKILL 

    if [[ ! -p "$pipe" ]]; then 
     mkfifo "$pipe" 
    fi 

    while true 
    do 
    date 
    jobs 
    if (($(jobs | egrep "Running.*echo '%#_Group_#%_$__gn__'" | wc -l) < $__jN__)) 
    then 
     echo "Waiting for new job" 
     if read new_job <"$pipe" 
     then 
    echo "new job is [[$new_job]]" 

    if [[ "$new_job" == 'quit' ]] 
    then 
     break 
    fi 

    echo "In group $__gn__, starting job $new_job" 
    eval "(echo '%#_Group_#%_$__gn__' > /dev/null; $new_job) &" 
     fi 
    else 
     sleep 3 
    fi 
    done 
} 

function jq 
{ 
    # __gn__ = first parameter to this function, the job group name (the pool within which to allocate __jN__ jobs) 
    # __jN__ = second parameter to this function, the maximum of job numbers to run concurrently 

    export __gn__="$1" 
    shift 
    export __jN__="$1" 
    shift 

    export __jq__=$(jobs | egrep "Running.*echo '%#_GroupQueue_#%_$__gn__'" | wc -l) 
    if (($__jq__ '<' 1)) 
    then 
    eval "(echo '%#_GroupQueue_#%_$__gn__' > /dev/null; jq_manage $__gn__) &" 
    fi 

    pipe=/tmp/__job_control_manager_"$__gn__"__ 

    echo [email protected] >$pipe 
} 

調用

jq <name> <max processes> <command> 
jq abc 2 sleep 20 

將啓動一個進程。 這部分工作正常。開始第二個,很好。手工一個接一個似乎很好。 但是,在循環中啓動10似乎會丟失系統,如上面更簡單的示例。

任何提示,以我能做些什麼來解決IPC數據的這種明顯的損失,將不勝感激。

問候, 阿蘭。

+0

查看[second 2018編輯爲*如何設置一個變量來自命令輸出*](https://stackoverflow.com/a/41236640/1765658)或[GitHub.com:Connector-的bash](https://github.com/F-Hauri/Connector-bash)。我*連接* subprocess hoding *工具*到我目前的shell會話。 – 2018-02-12 15:59:24

回答

26

你的問題是下面if聲明:

while true 
do 
    if read txt <"$pipe" 
    .... 
done 

正在發生的事情是,你作業隊列服務器開啓,每次關閉管道周圍循環。這意味着某些客戶在嘗試寫入管道時會出現「斷開管道」的錯誤 - 也就是說,在編寫者打開管道後,管道的讀者會消失。

爲了解決這個問題,改變你在服務器循環打開管道一旦整個循環:

while true 
do 
    if read txt 
    .... 
done < "$pipe" 

使用這種方式時,管道被打開一次,並保持開放。

您將需要小心在循環中運行的內容,因爲循環內的所有處理都將stdin附加到命名管道。你會希望確保你從其他地方重定向循環內部的所有進程的stdin,否則它們可能會從管道中消耗數據。

編輯:現在的問題是,當最後一個客戶端關閉管道時,讀取EOF時,可以使用jilles方法來複制文件描述符,或者您也可以確保您是客戶端,保持管道的開啓的側寫:

while true 
do 
    if read txt 
    .... 
done < "$pipe" 3> "$pipe" 

這將持有的管道寫側FD 3.打開相同的原則同樣適用與此文件描述符:標準輸入。您將需要關閉它,以便任何子進程不會繼承它。這可能比stdin更不重要,但它會更乾淨。

+0

哇,很好的答案。說得通。謝謝。馬上試試。 – asoundmove 2010-11-27 17:16:42

0

一方面這個問題比我想象的要糟糕: 現在我的更復雜的例子(jq_manage)似乎有一個例子,其中相同的數據從管道一遍又一遍地讀取(即使沒有新的數據正在寫入它)。

在另一方面,我發現了一個簡單的解決方案(編輯以下丹尼斯的評論):

function jqn # compute the number of jobs running in that group 
{ 
    __jqty__=$(jobs | egrep "Running.*echo '%#_Group_#%_$__groupn__'" | wc -l) 
} 

function jq 
{ 
    __groupn__="$1"; shift # job group name (the pool within which to allocate $__jmax__ jobs) 
    __jmax__="$1"; shift # maximum of job numbers to run concurrently 

    jqn 
    while (($__jqty__ '>=' $__jmax__)) 
    do 
    sleep 1 
    jqn 
    done 

    eval "(echo '%#_Group_#%_$__groupn__' > /dev/null; [email protected]) &" 
} 

就像一個魅力。 不涉及插座或管道。 簡單。

+1

沒有理由導出`__jqty__`(或任何原始導出)。你爲什麼直接回應`/ dev/null`?爲什麼使用`eval`?爲什麼不只是做`$ @&`?沒有必要引用`> =`。我同意camh的回答。 – 2010-11-27 15:32:24

+0

這一切歸結爲讀取和過濾ps的輸出。回聲/ dev/null,因爲我實際上不需要輸出,我只想在'ps'的輸出中輸入正確的字符串。同eval一樣,否則ps顯示變量名稱,而不是擴展變量,eval執行擴展。我以前從未使用((...)),所以感謝您指出我不需要引號,我只是在某個地方閱讀了一個示例,並且也感謝導出,這是以前的遺留問題更復雜的腳本具有子流程並且需要導出。 – asoundmove 2010-11-27 17:14:13

+0

對不起,我的意思是'工作',而不是'ps' – asoundmove 2010-11-27 17:45:48

1

像camh &丹尼斯威廉姆森說不要打破管道。

現在我有更小的例子,直接在命令行上:

服務器:

(
    for i in {0,1,2,3,4}{0,1,2,3,4,5,6,7,8,9}; 
    do 
    if read s; 
     then echo ">>$i--$s//"; 
    else 
     echo "<<$i"; 
    fi; 
    done < tst-fifo 
)& 

客戶:

(echo "Test-$i" > tst-fifo&); 

(
    for i in {%a,#b}{1,2}{0,1}; 
    do 
    echo "Test-$i" > tst-fifo; 
    done 
)& 

可以用替換鍵行

所有客戶發送到管道的數據會被讀取,但使用客戶端的選項2可能需要在讀取所有數據之前啓動服務器幾次。

雖然讀取等待數據在管道中開始,一旦數據被壓入,它將永遠讀取空字符串。

任何方法來阻止它?

再次感謝您的任何見解。

6

正如其他答案中所說的,您需要始終保持fifo打開狀態以避免丟失數據。

但是,一旦所有編寫者在fifo打開後(所以有一個編寫者)都離開了,讀取立即返回(並且poll()返回POLLHUP)。清除這個狀態的唯一方法是重新開放fifo。

POSIX並沒有提供解決方案,但至少Linux和FreeBSD是這樣做的:如果讀取失敗,再次打開fifo,同時保持原始描述符處於打開狀態。這是有效的,因爲在Linux和FreeBSD中,「掛斷」狀態對於特定的打開文件描述是本地的,而在POSIX中對於fifo是全局的。

這可以在一個shell腳本來完成這樣的:

while :; do 
    exec 3<tmp/testfifo 
    exec 4<&- 
    while read x; do 
     echo "input: $x" 
    done <&3 
    exec 4<&3 
    exec 3<&- 
done 
1

只是對於那些可能會感興趣,[重新編輯]由CAMH和jilles以下意見,這裏有兩個新版本測試服務器腳本。

這兩個版本現在的工作原理完全一樣。

CAMH的版本管道管理:

function jqs # Job queue manager 
{ 
    pipe=/tmp/__job_control_manager__ 
    trap "rm -f $pipe; exit" EXIT TERM 

    if [[ ! -p "$pipe" ]]; then 
     mkfifo "$pipe" 
    fi 

    while true 
    do 
    if read -u 3 txt 
    then 
     echo "$(date +'%Y'): new text is [[$txt]]" 

     if [[ "$txt" == 'quit' ]] 
     then 
    break 
     else 
     sleep 1 
     # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand 
     fi 
    fi 
    done 3< "$pipe" 4> "$pipe" # 4 is just to keep the pipe opened so any real client does not end up causing read to return EOF 
} 

JILLE的版本管道管理:

function jqs # Job queue manager 
{ 
    pipe=/tmp/__job_control_manager__ 
    trap "rm -f $pipe; exit" EXIT TERM 

    if [[ ! -p "$pipe" ]]; then 
     mkfifo "$pipe" 
    fi 

    exec 3< "$pipe" 
    exec 4<&- 

    while true 
    do 
    if read -u 3 txt 
    then 
     echo "$(date +'%Y'): new text is [[$txt]]" 

     if [[ "$txt" == 'quit' ]] 
     then 
    break 
     else 
     sleep 1 
     # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand 
     fi 
    else 
     # Close the pipe and reconnect it so that the next read does not end up returning EOF 
     exec 4<&3 
     exec 3<&- 
     exec 3< "$pipe" 
     exec 4<&- 
    fi 
    done 
} 

感謝所有您的幫助。

0

運行說最多並聯10個就業機會,並排隊等待稍後處理的休息,但可靠地知道,他們跑

你可以用GNU並行做到這一點。你不需要這個腳本。

http://www.gnu.org/software/parallel/man.html#options

您可以設置最大,特效「jobslots的數量。並行運行多達N個就業機會。」有一個選項可以設置要使用的CPU內核數量。您可以將已執行作業的列表保存到日誌文件中,但這是一個測試版功能。