2013-02-10 47 views
11

從該命令的標準輸出接收數據,我有以下程序:正確傳遞數據的標準輸入一個命令,並在golang

package main 

import "bytes" 
import "io" 
import "log" 
import "os" 
import "os/exec" 
import "time" 

func main() { 
    runCatFromStdinWorks(populateStdin("aaa\n")) 
    runCatFromStdinWorks(populateStdin("bbb\n")) 
} 

func populateStdin(str string) func(io.WriteCloser) { 
    return func(stdin io.WriteCloser) { 
     defer stdin.Close() 
     io.Copy(stdin, bytes.NewBufferString(str)) 
    } 
} 

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) { 
    cmd := exec.Command("cat") 
    stdin, err := cmd.StdinPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    stdout, err := cmd.StdoutPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    err = cmd.Start() 
    if err != nil { 
     log.Panic(err) 
    } 
    go populate_stdin_func(stdin) 
    go func() { 
      // Removing the following lines allow some output 
      // to be fetched from cat's stdout sometimes 
      time.Sleep(5 * time.Second) 
      io.Copy(os.Stdout, stdout) 
    }() 
    err = cmd.Wait() 
    if err != nil { 
     log.Panic(err) 
    } 
} 

當在一個循環中運行,我沒有得到任何結果,就像這樣:

$ while true; do go run cat_thingy.go; echo ; done 



^C 

這個結果來自在虛擬機中安裝golang-go在apt 12.04上的golang-go(go版本go1)。我一直無法在Macbook Air上進行復制安裝(版本go1.0.3)。這似乎是某種競爭條件。事實上,如果我把睡眠(1 * time.Second),我從來沒有看到這個問題在代碼中隨機睡眠的代價。

有什麼我在代碼中做錯了,或者這是一個錯誤?如果它是一個錯誤,它是否被修復?

UPDATE:可能的線索

我發現Command.Wait將關閉管道從即使他們仍然有未讀數據的貓子進程/通信。我不確定如何處理這個問題。我想我可以創建一個通道來通知stdin寫入完成的時間,但是我仍然需要知道cat進程是否已經結束,以確保沒有其他東西會寫入到它的stdout管道。我知道我可以使用cmd.Process.Wait來確定進程何時結束,但是可以安全地調用cmd.Wait嗎?

UPDATE:越來越近

下面是在代碼的新晉級。我相信,只要寫入標準輸入並從標準輸出中讀取,就可以工作。我認爲如果我從stdout處理goroutine中替換io.Copy而沒有流,那麼我可以正確地將數據流化(而不是緩衝所有數據)。

package main 

import "bytes" 
import "fmt" 
import "io" 
import "log" 
import "os/exec" 
import "runtime" 

const inputBufferBlockLength = 3*64*(2<<10) // enough to be bigger than 2x the pipe buffer of 64KiB 
const numInputBlocks = 6 

func main() { 
    runtime.GOMAXPROCS(5) 
    runCatFromStdin(populateStdin(numInputBlocks)) 
} 

func populateStdin(numInputBlocks int) func(io.WriteCloser, chan bool) { 
    return func(stdin io.WriteCloser) { 
     defer stdin.Close() 
     repeatedByteBases := []string{"a", "b", "c", "d", "e", "f"} 
     for i := 0; i < numInputBlocks; i++ { 
      repeatedBytes := bytes.NewBufferString(repeatedByteBases[i]).Bytes() 
      fmt.Printf("%s\n", repeatedBytes) 
      io.Copy(stdin, bytes.NewReader(bytes.Repeat(repeatedBytes, inputBufferBlockLength))) 
     } 
    } 
} 

func runCatFromStdin(populate_stdin_func func(io.WriteCloser)) { 
    cmd := exec.Command("cat") 
    stdin, err := cmd.StdinPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    stdout, err := cmd.StdoutPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    err = cmd.Start() 
    if err != nil { 
     log.Panic(err) 
    } 
    go populate_stdin_func(stdin) 
    output_done_channel := make(chan bool) 
    go func() { 
     out_bytes := new(bytes.Buffer) 
     io.Copy(out_bytes, stdout) 
     fmt.Printf("%s\n", out_bytes) 
     fmt.Println(out_bytes.Len()) 
     fmt.Println(inputBufferBlockLength*numInputBlocks) 
     output_done_channel <- true 
    }() 
    <-output_done_channel 
    err = cmd.Wait() 
    if err != nil { 
     log.Panic(err) 
    } 
} 

回答

0

Go statements

A「走出去」的語句開頭的函數或方法調用作爲 控制,或者夠程的獨立併發線程的執行過程中, 相同的地址空間內。

GoStmt =「go」表達式。

表達式必須是一個調用。函數值和參數在調用goroutine中照常進行評估,但與常規調用 不同,程序執行不會等待被調用的函數完成 。相反,函數在 新的goroutine中開始獨立執行。當函數終止時,它的goroutine也會終止。如果該函數有任何返回值,則在函數完成時將被丟棄 。

將函數調用轉換爲免費goroutines。

package main 

import (
    "bytes" 
    "io" 
    "log" 
    "os" 
    "os/exec" 
) 

func main() { 
    runCatFromStdinWorks(populateStdin("aaa\n")) 
    runCatFromStdinWorks(populateStdin("bbb\n")) 
} 

func populateStdin(str string) func(io.WriteCloser) { 
    return func(stdin io.WriteCloser) { 
     defer stdin.Close() 
     io.Copy(stdin, bytes.NewBufferString(str)) 
    } 
} 

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) { 
    cmd := exec.Command("cat") 
    stdin, err := cmd.StdinPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    stdout, err := cmd.StdoutPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    err = cmd.Start() 
    if err != nil { 
     log.Panic(err) 
    } 
    populate_stdin_func(stdin) 
    io.Copy(os.Stdout, stdout) 
    err = cmd.Wait() 
    if err != nil { 
     log.Panic(err) 
    } 
} 
+2

你的代碼的工作,因爲在我的例子管道緩衝區,永不滿足的只是另一種方式。將goroutine更改爲函數調用通常不起作用。在一般情況下,cat進程用於通信的管道將具有一定大小的緩衝區。例如,stdin管道有一個特定的緩衝區。一旦緩衝區滿了,寫入管道就會被阻塞。在Linux上,我相信緩衝區大小是64KiB。在stdout的cat管道上也會出現類似的問題。在主代碼中執行阻塞I/O意味着阻塞調用將阻止主代碼。 – 2013-02-11 06:52:20

4

這是您的第一個代碼的工作版本。注意添加sync.WaitGroup以確保在關閉命令之前完成發送和接收去程序。

package main 

import (
    "bytes" 
    "io" 
    "log" 
    "os" 
    "os/exec" 
    "sync" 
    "time" 
) 

func main() { 
    runCatFromStdinWorks(populateStdin("aaa\n")) 
    runCatFromStdinWorks(populateStdin("bbb\n")) 
} 

func populateStdin(str string) func(io.WriteCloser) { 
    return func(stdin io.WriteCloser) { 
     defer stdin.Close() 
     io.Copy(stdin, bytes.NewBufferString(str)) 
    } 
} 

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) { 
    cmd := exec.Command("cat") 
    stdin, err := cmd.StdinPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    stdout, err := cmd.StdoutPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    err = cmd.Start() 
    if err != nil { 
     log.Panic(err) 
    } 
    var wg sync.WaitGroup 
    wg.Add(2) 
    go func() { 
     defer wg.Done() 
     populate_stdin_func(stdin) 
    }() 
    go func() { 
     defer wg.Done() 
     time.Sleep(5 * time.Second) 
     io.Copy(os.Stdout, stdout) 
    }() 
    wg.Wait() 
    err = cmd.Wait() 
    if err != nil { 
     log.Panic(err) 
    } 
} 

(這是說什麼@peterSO說,雖然;-)

+0

這不僅僅是說@佩特爾所說的另一種說法。它實際上正確地處理管道緩衝區,因爲cat的輸入是在輸出的一個單獨的goroutine中處理的。我也認爲WaitGroups比我用來做同步的頻道好一點。我仍然覺得管道作爲cmd.Wait()的副作用而關閉是相當困惑的。它真的讓人困惑,因爲直到過程結束後纔會發生。 – 2013-02-13 18:51:14

相關問題