packer/plugin: client to encapsulate logic for starting plugins

This commit is contained in:
Mitchell Hashimoto 2013-05-08 11:14:21 -07:00
parent 4d9b5fa86d
commit e1785e424e
2 changed files with 126 additions and 94 deletions

View File

@ -1,13 +1,128 @@
package plugin package plugin
import ( import (
"bufio"
"bytes"
"errors"
"io"
"log"
"os/exec" "os/exec"
"strings"
"time"
) )
type Client struct { type client struct {
cmd *exec.Cmd cmd *exec.Cmd
exited bool
} }
func NewClient(cmd *exec.Cmd) *Client { func NewClient(cmd *exec.Cmd) *client {
return &Client{cmd} return &client{
cmd,
false,
}
}
func (c *client) Exited() bool {
return c.exited
}
func (c *client) Start() (address string, err error) {
env := []string{
"PACKER_PLUGIN_MIN_PORT=10000",
"PACKER_PLUGIN_MAX_PORT=25000",
}
stdout := new(bytes.Buffer)
stderr := new(bytes.Buffer)
c.cmd.Env = append(c.cmd.Env, env...)
c.cmd.Stderr = stderr
c.cmd.Stdout = stdout
err = c.cmd.Start()
if err != nil {
return
}
// Make sure the command is properly cleaned up if there is an error
defer func() {
r := recover()
if err != nil || r != nil {
c.cmd.Process.Kill()
}
if r != nil {
panic(r)
}
}()
// Start goroutine to wait for process to exit
go func() {
c.cmd.Wait()
c.exited = true
}()
// Start goroutine that logs the stderr
go c.logStderr(stderr)
// Some channels for the next step
timeout := time.After(1 * time.Minute)
// Start looking for the address
for done := false; !done; {
select {
case <-timeout:
err = errors.New("timeout while waiting for plugin to start")
done = true
default:
}
if err == nil && c.Exited() {
err = errors.New("plugin exited before we could connect")
done = true
}
if line, lerr := stdout.ReadBytes('\n'); lerr == nil {
// Trim the address and reset the err since we were able
// to read some sort of address.
address = strings.TrimSpace(string(line))
err = nil
break
}
// If error is nil from previously, return now
if err != nil {
return
}
// Wait a bit
time.Sleep(10 * time.Millisecond)
}
return
}
func (c *client) Kill() {
c.cmd.Process.Kill()
}
func (c *client) logStderr(r io.Reader) {
buf := bufio.NewReader(r)
for done := false; !done; {
if c.Exited() {
done = true
}
var err error
for err == nil {
var line string
line, err = buf.ReadString('\n')
if line != "" {
log.Print(line)
}
}
time.Sleep(10 * time.Millisecond)
}
} }

View File

@ -1,21 +1,16 @@
package plugin package plugin
import ( import (
"bufio"
"bytes"
"errors"
"github.com/mitchellh/packer/packer" "github.com/mitchellh/packer/packer"
"log" "log"
"net/rpc" "net/rpc"
"os/exec" "os/exec"
packrpc "github.com/mitchellh/packer/packer/rpc" packrpc "github.com/mitchellh/packer/packer/rpc"
"strings"
"time"
) )
type cmdCommand struct { type cmdCommand struct {
command packer.Command command packer.Command
exited <-chan bool client *client
} }
func (c *cmdCommand) Run(e packer.Environment, args []string) (exitCode int) { func (c *cmdCommand) Run(e packer.Environment, args []string) (exitCode int) {
@ -41,14 +36,11 @@ func (c *cmdCommand) Synopsis() (result string) {
} }
func (c *cmdCommand) checkExit(p interface{}, cb func()) { func (c *cmdCommand) checkExit(p interface{}, cb func()) {
select { if c.client.Exited() {
case <-c.exited:
cb() cb()
default: } else if p != nil {
if p != nil {
log.Panic(p) log.Panic(p)
} }
}
} }
// Returns a valid packer.Command where the command is executed via RPC // Returns a valid packer.Command where the command is executed via RPC
@ -60,17 +52,8 @@ func (c *cmdCommand) checkExit(p interface{}, cb func()) {
// //
// This function guarantees the subprocess will end in a timely manner. // This function guarantees the subprocess will end in a timely manner.
func Command(cmd *exec.Cmd) (result packer.Command, err error) { func Command(cmd *exec.Cmd) (result packer.Command, err error) {
env := []string{ cmdClient := NewClient(cmd)
"PACKER_PLUGIN_MIN_PORT=10000", address, err := cmdClient.Start()
"PACKER_PLUGIN_MAX_PORT=25000",
}
stdout := new(bytes.Buffer)
stderr := new(bytes.Buffer)
cmd.Env = append(cmd.Env, env...)
cmd.Stderr = stderr
cmd.Stdout = stdout
err = cmd.Start()
if err != nil { if err != nil {
return return
} }
@ -78,76 +61,10 @@ func Command(cmd *exec.Cmd) (result packer.Command, err error) {
defer func() { defer func() {
// Make sure the command is properly killed in the case of an error // Make sure the command is properly killed in the case of an error
if err != nil { if err != nil {
cmd.Process.Kill() cmdClient.Kill()
} }
}() }()
// Goroutine + channel to signal that the process exited
cmdExited := make(chan bool)
go func() {
cmd.Wait()
cmdExited <- true
}()
// Goroutine to log out the output from the command
// TODO: All sorts of things wrong with this. First, we're reading from
// a channel that can get consumed elsewhere. Second, the app can end
// without properly flushing all the log data. BLah.
go func() {
buf := bufio.NewReader(stderr)
for done := false; !done; {
select {
case <-cmdExited:
done = true
default:
}
var err error
for err == nil {
var line string
line, err = buf.ReadString('\n')
if line != "" {
log.Print(line)
}
}
time.Sleep(10 * time.Millisecond)
}
}()
// Timer for a timeout
cmdTimeout := time.After(1 * time.Minute)
var address string
for done := false; !done; {
select {
case <-cmdExited:
err = errors.New("plugin exited before we could connect")
done = true
case <-cmdTimeout:
err = errors.New("timeout while waiting for plugin to start")
done = true
default:
}
if line, lerr := stdout.ReadBytes('\n'); lerr == nil {
// Trim the address and reset the err since we were able
// to read some sort of address.
address = strings.TrimSpace(string(line))
err = nil
break
}
// If error is nil from previously, return now
if err != nil {
return
}
// Wait a bit
time.Sleep(10 * time.Millisecond)
}
client, err := rpc.Dial("tcp", address) client, err := rpc.Dial("tcp", address)
if err != nil { if err != nil {
return return
@ -155,7 +72,7 @@ func Command(cmd *exec.Cmd) (result packer.Command, err error) {
result = &cmdCommand{ result = &cmdCommand{
packrpc.Command(client), packrpc.Command(client),
cmdExited, cmdClient,
} }
return return