packer-cn/packer/communicator.go

169 lines
4.5 KiB
Go
Raw Normal View History

2013-05-12 17:47:55 -04:00
package packer
import (
"bufio"
"io"
"log"
2013-06-01 20:45:57 -04:00
"sync"
"time"
)
2013-05-12 17:47:55 -04:00
// A Communicator is the interface used to communicate with the machine
// that exists that will eventually be packaged into an image. Communicators
// allow you to execute remote commands, upload files, etc.
//
// Communicators must be safe for concurrency, meaning multiple calls to
// Start or any other method may be called at the same time.
type Communicator interface {
Start(string) (*RemoteCommand, error)
Upload(string, io.Reader) error
Download(string, io.Writer) error
2013-05-12 17:47:55 -04:00
}
// This struct contains some information about the remote command being
// executed and can be used to wait for it to complete.
//
// Stdin, Stdout, Stderr are readers and writers to varios IO streams for
// the remote command.
//
// Exited is false until Wait is called. It can be used to check if Wait
// has already been called.
//
// ExitStatus is the exit code of the remote process. It is only available
// once Wait is called.
2013-05-12 17:47:55 -04:00
type RemoteCommand struct {
2013-05-20 19:50:35 -04:00
Stdin io.Writer
Stdout io.Reader
Stderr io.Reader
Exited bool
2013-05-12 17:47:55 -04:00
ExitStatus int
2013-06-01 20:45:57 -04:00
exitChans []chan<- int
2013-06-01 20:45:57 -04:00
exitChanLock sync.Mutex
2013-06-01 21:49:49 -04:00
errChans []chan<- string
errChanLock sync.Mutex
outChans []chan<- string
outChanLock sync.Mutex
2013-05-12 17:47:55 -04:00
}
2013-06-01 21:49:49 -04:00
// StderrStream returns a channel that will be sent all the
func (r *RemoteCommand) StderrChan() <-chan string {
r.errChanLock.Lock()
defer r.errChanLock.Unlock()
// If no channels have been made, make that slice and start
// the goroutine to read and send to them
if r.errChans == nil {
r.errChans = make([]chan<- string, 0, 5)
go r.channelReader(r.Stderr, &r.errChans, &r.errChanLock)
}
// Create the channel, append it to the channels we care about
errChan := make(chan string, 10)
r.errChans = append(r.errChans, errChan)
return errChan
}
// StdoutStream returns a channel that will be sent all the output
// of stdout as it comes. The output isn't guaranteed to be a full line.
// When the channel is closed, the process is exited.
func (r *RemoteCommand) StdoutChan() <-chan string {
r.outChanLock.Lock()
defer r.outChanLock.Unlock()
// If no output channels have been made yet, then make that slice
// and start the goroutine to read and send to them.
if r.outChans == nil {
r.outChans = make([]chan<- string, 0, 5)
go r.channelReader(r.Stdout, &r.outChans, &r.outChanLock)
}
// Create the channel, append it to the channels we care about
outChan := make(chan string, 10)
r.outChans = append(r.outChans, outChan)
return outChan
}
// ExitChan returns a channel that will be sent the exit status once
// the process exits. This can be used in cases such a select statement
// waiting on the process to end.
func (r *RemoteCommand) ExitChan() <-chan int {
2013-06-01 20:45:57 -04:00
r.exitChanLock.Lock()
defer r.exitChanLock.Unlock()
// If we haven't made any channels yet, make that slice
if r.exitChans == nil {
r.exitChans = make([]chan<- int, 0, 5)
go func() {
// Wait for the command to finish
r.Wait()
// Grab the exit chan lock so we can iterate over it and
// message to each channel.
r.exitChanLock.Lock()
defer r.exitChanLock.Unlock()
for _, ch := range r.exitChans {
// Use a select so the send never blocks
select {
case ch <- r.ExitStatus:
default:
log.Println("remote command exit channel wouldn't blocked. Weird.")
}
close(ch)
}
r.exitChans = nil
}()
}
// Append our new channel onto it and return it
exitChan := make(chan int, 1)
r.exitChans = append(r.exitChans, exitChan)
return exitChan
}
// Wait waits for the command to exit.
func (r *RemoteCommand) Wait() {
// Busy wait on being exited. We put a sleep to be kind to the
// Go scheduler, and because we don't really need smaller granularity.
for !r.Exited {
time.Sleep(10 * time.Millisecond)
}
}
// Takes an io.Reader and then writes its data to a slice of channels.
// The channel slice is expected to be protected by the given mutex, and
// after the io.Reader is over, all channels will be closed and the slice
// set to nil.
func (*RemoteCommand) channelReader(r io.Reader, chans *[]chan<- string, chanLock *sync.Mutex) {
buf := bufio.NewReader(r)
var err error
for err != io.EOF {
var data []byte
data, err = buf.ReadSlice('\n')
if len(data) > 0 {
for _, ch := range *chans {
// Note: this blocks if the channel is full (they
// are buffered by default). What to do?
ch <- string(data)
}
}
}
// Clean up the channels by closing them and setting the
// list to nil.
chanLock.Lock()
defer chanLock.Unlock()
for _, ch := range *chans {
close(ch)
}
*chans = nil
}