From e4e372046b9368bc070926a13e96920318c6fcc5 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Wed, 21 Aug 2013 10:49:57 -0700 Subject: [PATCH] packer/plugin: fix data race reading stdout in Client --- packer/plugin/client.go | 53 ++++++++++++++++++++++++++++++----------- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/packer/plugin/client.go b/packer/plugin/client.go index b8292c40e..e65d152d6 100644 --- a/packer/plugin/client.go +++ b/packer/plugin/client.go @@ -2,7 +2,6 @@ package plugin import ( "bufio" - "bytes" "errors" "fmt" "github.com/mitchellh/packer/packer" @@ -222,7 +221,7 @@ func (c *Client) Start() (address string, err error) { fmt.Sprintf("PACKER_PLUGIN_MAX_PORT=%d", c.config.MaxPort), } - stdout := new(bytes.Buffer) + stdout_r, stdout_w := io.Pipe() stderr_r, stderr_w := io.Pipe() cmd := c.config.Cmd @@ -230,7 +229,7 @@ func (c *Client) Start() (address string, err error) { cmd.Env = append(cmd.Env, env...) cmd.Stdin = os.Stdin cmd.Stderr = stderr_w - cmd.Stdout = stdout + cmd.Stdout = stdout_w log.Printf("Starting plugin: %s %#v", cmd.Path, cmd.Args) err = cmd.Start() @@ -253,9 +252,10 @@ func (c *Client) Start() (address string, err error) { // Start goroutine to wait for process to exit go func() { - // Make sure we close the write end of our stderr listener so - // that the log goroutine ends properly. + // Make sure we close the write end of our stderr/stdout so + // that the readers send EOF properly. defer stderr_w.Close() + defer stdout_w.Close() // Wait for the command to end. cmd.Wait() @@ -271,6 +271,34 @@ func (c *Client) Start() (address string, err error) { // Start goroutine that logs the stderr go c.logStderr(stderr_r) + // Start a goroutine that is going to be reading the lines + // out of stdout + linesCh := make(chan []byte) + go func() { + defer close(linesCh) + + buf := bufio.NewReader(stdout_r) + for { + line, err := buf.ReadBytes('\n') + if line != nil { + linesCh <- line + } + + if err == io.EOF { + return + } + } + }() + + // Make sure after we exit we read the lines from stdout forever + // so they dont' block since it is an io.Pipe + defer func() { + go func() { + for _ = range linesCh { + } + }() + }() + // Some channels for the next step timeout := time.After(c.config.StartTimeout) @@ -281,6 +309,12 @@ func (c *Client) Start() (address string, err error) { case <-timeout: err = errors.New("timeout while waiting for plugin to start") done = true + case line := <-linesCh: + // Trim the address and reset the err since we were able + // to read some sort of address. + c.address = strings.TrimSpace(string(line)) + address = c.address + return default: } @@ -289,15 +323,6 @@ func (c *Client) Start() (address string, err error) { done = true } - if line, lerr := stdout.ReadBytes('\n'); lerr == nil { - // Trim the address and reset the err since we were able - // to read some sort of address. - c.address = strings.TrimSpace(string(line)) - address = c.address - err = nil - break - } - // If error is nil from previously, return now if err != nil { return