From e1785e424ecd8b39677062082b34675b8662a358 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Wed, 8 May 2013 11:14:21 -0700 Subject: [PATCH] packer/plugin: client to encapsulate logic for starting plugins --- packer/plugin/client.go | 121 ++++++++++++++++++++++++++++++++++++++- packer/plugin/command.go | 99 +++----------------------------- 2 files changed, 126 insertions(+), 94 deletions(-) diff --git a/packer/plugin/client.go b/packer/plugin/client.go index d833c0ccf..6f3d93185 100644 --- a/packer/plugin/client.go +++ b/packer/plugin/client.go @@ -1,13 +1,128 @@ package plugin import ( + "bufio" + "bytes" + "errors" + "io" + "log" "os/exec" + "strings" + "time" ) -type Client struct { +type client struct { cmd *exec.Cmd + exited bool } -func NewClient(cmd *exec.Cmd) *Client { - return &Client{cmd} +func NewClient(cmd *exec.Cmd) *client { + return &client{ + cmd, + false, + } +} + +func (c *client) Exited() bool { + return c.exited +} + +func (c *client) Start() (address string, err error) { + env := []string{ + "PACKER_PLUGIN_MIN_PORT=10000", + "PACKER_PLUGIN_MAX_PORT=25000", + } + + stdout := new(bytes.Buffer) + stderr := new(bytes.Buffer) + c.cmd.Env = append(c.cmd.Env, env...) + c.cmd.Stderr = stderr + c.cmd.Stdout = stdout + err = c.cmd.Start() + if err != nil { + return + } + + // Make sure the command is properly cleaned up if there is an error + defer func() { + r := recover() + + if err != nil || r != nil { + c.cmd.Process.Kill() + } + + if r != nil { + panic(r) + } + }() + + // Start goroutine to wait for process to exit + go func() { + c.cmd.Wait() + c.exited = true + }() + + // Start goroutine that logs the stderr + go c.logStderr(stderr) + + // Some channels for the next step + timeout := time.After(1 * time.Minute) + + // Start looking for the address + for done := false; !done; { + select { + case <-timeout: + err = errors.New("timeout while waiting for plugin to start") + done = true + default: + } + + if err == nil && c.Exited() { + err = errors.New("plugin exited before we could connect") + done = true + } + + if line, lerr := stdout.ReadBytes('\n'); lerr == nil { + // Trim the address and reset the err since we were able + // to read some sort of address. + address = strings.TrimSpace(string(line)) + err = nil + break + } + + // If error is nil from previously, return now + if err != nil { + return + } + + // Wait a bit + time.Sleep(10 * time.Millisecond) + } + + return +} + +func (c *client) Kill() { + c.cmd.Process.Kill() +} + +func (c *client) logStderr(r io.Reader) { + buf := bufio.NewReader(r) + + for done := false; !done; { + if c.Exited() { + done = true + } + + var err error + for err == nil { + var line string + line, err = buf.ReadString('\n') + if line != "" { + log.Print(line) + } + } + + time.Sleep(10 * time.Millisecond) + } } diff --git a/packer/plugin/command.go b/packer/plugin/command.go index 6c86321f8..55539f585 100644 --- a/packer/plugin/command.go +++ b/packer/plugin/command.go @@ -1,21 +1,16 @@ package plugin import ( - "bufio" - "bytes" - "errors" "github.com/mitchellh/packer/packer" "log" "net/rpc" "os/exec" packrpc "github.com/mitchellh/packer/packer/rpc" - "strings" - "time" ) type cmdCommand struct { command packer.Command - exited <-chan bool + client *client } func (c *cmdCommand) Run(e packer.Environment, args []string) (exitCode int) { @@ -41,13 +36,10 @@ func (c *cmdCommand) Synopsis() (result string) { } func (c *cmdCommand) checkExit(p interface{}, cb func()) { - select { - case <-c.exited: + if c.client.Exited() { cb() - default: - if p != nil { - log.Panic(p) - } + } else if p != nil { + log.Panic(p) } } @@ -60,17 +52,8 @@ func (c *cmdCommand) checkExit(p interface{}, cb func()) { // // This function guarantees the subprocess will end in a timely manner. func Command(cmd *exec.Cmd) (result packer.Command, err error) { - env := []string{ - "PACKER_PLUGIN_MIN_PORT=10000", - "PACKER_PLUGIN_MAX_PORT=25000", - } - - stdout := new(bytes.Buffer) - stderr := new(bytes.Buffer) - cmd.Env = append(cmd.Env, env...) - cmd.Stderr = stderr - cmd.Stdout = stdout - err = cmd.Start() + cmdClient := NewClient(cmd) + address, err := cmdClient.Start() if err != nil { return } @@ -78,76 +61,10 @@ func Command(cmd *exec.Cmd) (result packer.Command, err error) { defer func() { // Make sure the command is properly killed in the case of an error if err != nil { - cmd.Process.Kill() + cmdClient.Kill() } }() - // Goroutine + channel to signal that the process exited - cmdExited := make(chan bool) - go func() { - cmd.Wait() - cmdExited <- true - }() - - // Goroutine to log out the output from the command - // TODO: All sorts of things wrong with this. First, we're reading from - // a channel that can get consumed elsewhere. Second, the app can end - // without properly flushing all the log data. BLah. - go func() { - buf := bufio.NewReader(stderr) - - for done := false; !done; { - select { - case <-cmdExited: - done = true - default: - } - - var err error - for err == nil { - var line string - line, err = buf.ReadString('\n') - if line != "" { - log.Print(line) - } - } - - time.Sleep(10 * time.Millisecond) - } - }() - - // Timer for a timeout - cmdTimeout := time.After(1 * time.Minute) - - var address string - for done := false; !done; { - select { - case <-cmdExited: - err = errors.New("plugin exited before we could connect") - done = true - case <-cmdTimeout: - err = errors.New("timeout while waiting for plugin to start") - done = true - default: - } - - if line, lerr := stdout.ReadBytes('\n'); lerr == nil { - // Trim the address and reset the err since we were able - // to read some sort of address. - address = strings.TrimSpace(string(line)) - err = nil - break - } - - // If error is nil from previously, return now - if err != nil { - return - } - - // Wait a bit - time.Sleep(10 * time.Millisecond) - } - client, err := rpc.Dial("tcp", address) if err != nil { return @@ -155,7 +72,7 @@ func Command(cmd *exec.Cmd) (result packer.Command, err error) { result = &cmdCommand{ packrpc.Command(client), - cmdExited, + cmdClient, } return