從該命令的標準輸出接收數據,我有以下程序:正確傳遞數據的標準輸入一個命令,並在golang
package main
import "bytes"
import "io"
import "log"
import "os"
import "os/exec"
import "time"
func main() {
runCatFromStdinWorks(populateStdin("aaa\n"))
runCatFromStdinWorks(populateStdin("bbb\n"))
}
func populateStdin(str string) func(io.WriteCloser) {
return func(stdin io.WriteCloser) {
defer stdin.Close()
io.Copy(stdin, bytes.NewBufferString(str))
}
}
func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) {
cmd := exec.Command("cat")
stdin, err := cmd.StdinPipe()
if err != nil {
log.Panic(err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Panic(err)
}
err = cmd.Start()
if err != nil {
log.Panic(err)
}
go populate_stdin_func(stdin)
go func() {
// Removing the following lines allow some output
// to be fetched from cat's stdout sometimes
time.Sleep(5 * time.Second)
io.Copy(os.Stdout, stdout)
}()
err = cmd.Wait()
if err != nil {
log.Panic(err)
}
}
當在一個循環中運行,我沒有得到任何結果,就像這樣:
$ while true; do go run cat_thingy.go; echo ; done
^C
這個結果來自在虛擬機中安裝golang-go在apt 12.04上的golang-go(go版本go1)。我一直無法在Macbook Air上進行復制安裝(版本go1.0.3)。這似乎是某種競爭條件。事實上,如果我把睡眠(1 * time.Second),我從來沒有看到這個問題在代碼中隨機睡眠的代價。
有什麼我在代碼中做錯了,或者這是一個錯誤?如果它是一個錯誤,它是否被修復?
UPDATE:可能的線索
我發現Command.Wait將關閉管道從即使他們仍然有未讀數據的貓子進程/通信。我不確定如何處理這個問題。我想我可以創建一個通道來通知stdin寫入完成的時間,但是我仍然需要知道cat進程是否已經結束,以確保沒有其他東西會寫入到它的stdout管道。我知道我可以使用cmd.Process.Wait來確定進程何時結束,但是可以安全地調用cmd.Wait嗎?
UPDATE:越來越近
下面是在代碼的新晉級。我相信,只要寫入標準輸入並從標準輸出中讀取,就可以工作。我認爲如果我從stdout處理goroutine中替換io.Copy而沒有流,那麼我可以正確地將數據流化(而不是緩衝所有數據)。
package main
import "bytes"
import "fmt"
import "io"
import "log"
import "os/exec"
import "runtime"
const inputBufferBlockLength = 3*64*(2<<10) // enough to be bigger than 2x the pipe buffer of 64KiB
const numInputBlocks = 6
func main() {
runtime.GOMAXPROCS(5)
runCatFromStdin(populateStdin(numInputBlocks))
}
func populateStdin(numInputBlocks int) func(io.WriteCloser, chan bool) {
return func(stdin io.WriteCloser) {
defer stdin.Close()
repeatedByteBases := []string{"a", "b", "c", "d", "e", "f"}
for i := 0; i < numInputBlocks; i++ {
repeatedBytes := bytes.NewBufferString(repeatedByteBases[i]).Bytes()
fmt.Printf("%s\n", repeatedBytes)
io.Copy(stdin, bytes.NewReader(bytes.Repeat(repeatedBytes, inputBufferBlockLength)))
}
}
}
func runCatFromStdin(populate_stdin_func func(io.WriteCloser)) {
cmd := exec.Command("cat")
stdin, err := cmd.StdinPipe()
if err != nil {
log.Panic(err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Panic(err)
}
err = cmd.Start()
if err != nil {
log.Panic(err)
}
go populate_stdin_func(stdin)
output_done_channel := make(chan bool)
go func() {
out_bytes := new(bytes.Buffer)
io.Copy(out_bytes, stdout)
fmt.Printf("%s\n", out_bytes)
fmt.Println(out_bytes.Len())
fmt.Println(inputBufferBlockLength*numInputBlocks)
output_done_channel <- true
}()
<-output_done_channel
err = cmd.Wait()
if err != nil {
log.Panic(err)
}
}
你的代碼的工作,因爲在我的例子管道緩衝區,永不滿足的只是另一種方式。將goroutine更改爲函數調用通常不起作用。在一般情況下,cat進程用於通信的管道將具有一定大小的緩衝區。例如,stdin管道有一個特定的緩衝區。一旦緩衝區滿了,寫入管道就會被阻塞。在Linux上,我相信緩衝區大小是64KiB。在stdout的cat管道上也會出現類似的問題。在主代碼中執行阻塞I/O意味着阻塞調用將阻止主代碼。 – 2013-02-11 06:52:20