packer-cn/packer/plugin/client.go

365 lines
8.9 KiB
Go
Raw Normal View History

2013-05-05 00:26:30 -04:00
package plugin
import (
"bufio"
"errors"
"fmt"
"github.com/mitchellh/packer/packer"
packrpc "github.com/mitchellh/packer/packer/rpc"
"io"
2013-07-30 13:41:02 -04:00
"io/ioutil"
"log"
"net"
"net/rpc"
"os"
2013-05-05 00:26:30 -04:00
"os/exec"
"strings"
"sync"
"time"
"unicode"
2013-05-05 00:26:30 -04:00
)
// If this is true, then the "unexpected EOF" panic will not be
// raised throughout the clients.
var Killed = false
// This is a slice of the "managed" clients which are cleaned up when
// calling Cleanup
var managedClients = make([]*Client, 0, 5)
// Client handles the lifecycle of a plugin application, determining its
// RPC address, and returning various types of packer interface implementations
// across the multi-process communication layer.
type Client struct {
2013-06-11 17:09:31 -04:00
config *ClientConfig
2013-05-10 20:01:24 -04:00
exited bool
doneLogging chan struct{}
l sync.Mutex
address string
2013-05-05 00:26:30 -04:00
}
// ClientConfig is the configuration used to initialize a new
// plugin client. After being used to initialize a plugin client,
// that configuration must not be modified again.
type ClientConfig struct {
// The unstarted subprocess for starting the plugin.
Cmd *exec.Cmd
// Managed represents if the client should be managed by the
// plugin package or not. If true, then by calling CleanupClients,
// it will automatically be cleaned up. Otherwise, the client
// user is fully responsible for making sure to Kill all plugin
2013-06-11 14:06:23 -04:00
// clients. By default the client is _not_ managed.
Managed bool
// The minimum and maximum port to use for communicating with
// the subprocess. If not set, this defaults to 10,000 and 25,000
// respectively.
MinPort, MaxPort uint
// StartTimeout is the timeout to wait for the plugin to say it
// has started successfully.
StartTimeout time.Duration
// If non-nil, then the stderr of the client will be written to here
// (as well as the log).
Stderr io.Writer
}
// This makes sure all the managed subprocesses are killed and properly
// logged. This should be called before the parent process running the
// plugins exits.
2013-05-09 17:27:20 -04:00
//
// This must only be called _once_.
func CleanupClients() {
// Set the killed to true so that we don't get unexpected panics
Killed = true
// Kill all the managed clients in parallel and use a WaitGroup
// to wait for them all to finish up.
var wg sync.WaitGroup
for _, client := range managedClients {
wg.Add(1)
go func(client *Client) {
client.Kill()
wg.Done()
}(client)
}
log.Println("waiting for all plugin processes to complete...")
wg.Wait()
}
2013-05-09 17:27:20 -04:00
// Creates a new plugin client which manages the lifecycle of an external
// plugin and gets the address for the RPC connection.
//
// The client must be cleaned up at some point by calling Kill(). If
// the client is a managed client (created with NewManagedClient) you
// can just call CleanupClients at the end of your program and they will
// be properly cleaned.
func NewClient(config *ClientConfig) (c *Client) {
if config.MinPort == 0 && config.MaxPort == 0 {
config.MinPort = 10000
config.MaxPort = 25000
}
if config.StartTimeout == 0 {
config.StartTimeout = 1 * time.Minute
}
2013-07-30 13:41:02 -04:00
if config.Stderr == nil {
config.Stderr = ioutil.Discard
}
c = &Client{config: config}
if config.Managed {
managedClients = append(managedClients, c)
}
return
}
2013-05-09 17:27:20 -04:00
// Tells whether or not the underlying process has exited.
func (c *Client) Exited() bool {
return c.exited
}
// Returns a builder implementation that is communicating over this
// client. If the client hasn't been started, this will start it.
func (c *Client) Builder() (packer.Builder, error) {
client, err := c.rpcClient()
if err != nil {
return nil, err
}
return &cmdBuilder{packrpc.Builder(client), c}, nil
}
// Returns a command implementation that is communicating over this
// client. If the client hasn't been started, this will start it.
func (c *Client) Command() (packer.Command, error) {
client, err := c.rpcClient()
if err != nil {
return nil, err
}
return &cmdCommand{packrpc.Command(client), c}, nil
}
// Returns a hook implementation that is communicating over this
// client. If the client hasn't been started, this will start it.
func (c *Client) Hook() (packer.Hook, error) {
client, err := c.rpcClient()
if err != nil {
return nil, err
}
return &cmdHook{packrpc.Hook(client), c}, nil
}
// Returns a post-processor implementation that is communicating over
// this client. If the client hasn't been started, this will start it.
func (c *Client) PostProcessor() (packer.PostProcessor, error) {
2013-06-18 16:49:07 -04:00
client, err := c.rpcClient()
if err != nil {
return nil, err
}
2013-06-18 16:49:07 -04:00
return &cmdPostProcessor{packrpc.PostProcessor(client), c}, nil
}
// Returns a provisioner implementation that is communicating over this
// client. If the client hasn't been started, this will start it.
func (c *Client) Provisioner() (packer.Provisioner, error) {
client, err := c.rpcClient()
if err != nil {
return nil, err
}
return &cmdProvisioner{packrpc.Provisioner(client), c}, nil
}
// End the executing subprocess (if it is running) and perform any cleanup
// tasks necessary such as capturing any remaining logs and so on.
//
// This method blocks until the process successfully exits.
//
// This method can safely be called multiple times.
func (c *Client) Kill() {
cmd := c.config.Cmd
if cmd.Process == nil {
return
}
cmd.Process.Kill()
// Wait for the client to finish logging so we have a complete log
<-c.doneLogging
}
2013-05-09 17:27:20 -04:00
// Starts the underlying subprocess, communicating with it to negotiate
// a port for RPC connections, and returning the address to connect via RPC.
//
// This method is safe to call multiple times. Subsequent calls have no effect.
// Once a client has been started once, it cannot be started again, even if
// it was killed.
func (c *Client) Start() (address string, err error) {
c.l.Lock()
defer c.l.Unlock()
if c.address != "" {
return c.address, nil
}
2013-05-09 17:27:20 -04:00
c.doneLogging = make(chan struct{})
env := []string{
fmt.Sprintf("%s=%s", MagicCookieKey, MagicCookieValue),
fmt.Sprintf("PACKER_PLUGIN_MIN_PORT=%d", c.config.MinPort),
fmt.Sprintf("PACKER_PLUGIN_MAX_PORT=%d", c.config.MaxPort),
}
stdout_r, stdout_w := io.Pipe()
stderr_r, stderr_w := io.Pipe()
cmd := c.config.Cmd
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Env = append(cmd.Env, env...)
cmd.Stdin = os.Stdin
cmd.Stderr = stderr_w
cmd.Stdout = stdout_w
log.Printf("Starting plugin: %s %#v", cmd.Path, cmd.Args)
err = cmd.Start()
if err != nil {
return
}
// Make sure the command is properly cleaned up if there is an error
defer func() {
r := recover()
if err != nil || r != nil {
cmd.Process.Kill()
}
if r != nil {
panic(r)
}
}()
// Start goroutine to wait for process to exit
exitCh := make(chan struct{})
go func() {
// Make sure we close the write end of our stderr/stdout so
// that the readers send EOF properly.
defer stderr_w.Close()
defer stdout_w.Close()
// Wait for the command to end.
cmd.Wait()
// Log and make sure to flush the logs write away
log.Printf("%s: plugin process exited\n", cmd.Path)
os.Stderr.Sync()
// Mark that we exited
close(exitCh)
// Set that we exited, which takes a lock
c.l.Lock()
defer c.l.Unlock()
c.exited = true
}()
// Start goroutine that logs the stderr
go c.logStderr(stderr_r)
// Start a goroutine that is going to be reading the lines
// out of stdout
linesCh := make(chan []byte)
go func() {
defer close(linesCh)
buf := bufio.NewReader(stdout_r)
for {
line, err := buf.ReadBytes('\n')
if line != nil {
linesCh <- line
}
if err == io.EOF {
return
}
}
}()
// Make sure after we exit we read the lines from stdout forever
// so they dont' block since it is an io.Pipe
defer func() {
go func() {
for _ = range linesCh {
}
}()
}()
// Some channels for the next step
timeout := time.After(c.config.StartTimeout)
// Start looking for the address
log.Printf("Waiting for RPC address for: %s", cmd.Path)
select {
case <-timeout:
err = errors.New("timeout while waiting for plugin to start")
case <-exitCh:
err = errors.New("plugin exited before we could connect")
case line := <-linesCh:
// Trim the address and reset the err since we were able
// to read some sort of address.
c.address = strings.TrimSpace(string(line))
address = c.address
}
return
}
func (c *Client) logStderr(r io.Reader) {
bufR := bufio.NewReader(r)
for {
line, err := bufR.ReadString('\n')
if line != "" {
2013-07-30 13:41:02 -04:00
c.config.Stderr.Write([]byte(line))
line = strings.TrimRightFunc(line, unicode.IsSpace)
log.Printf("%s: %s", c.config.Cmd.Path, line)
}
if err == io.EOF {
break
}
}
// Flag that we've completed logging for others
close(c.doneLogging)
2013-05-05 00:26:30 -04:00
}
func (c *Client) rpcClient() (*rpc.Client, error) {
address, err := c.Start()
if err != nil {
return nil, err
}
conn, err := net.Dial("tcp", address)
if err != nil {
return nil, err
}
// Make sure to set keep alive so that the connection doesn't die
tcpConn := conn.(*net.TCPConn)
tcpConn.SetKeepAlive(true)
return rpc.NewClient(tcpConn), nil
}