packer-cn/packer/rpc/communicator.go

267 lines
6.0 KiB
Go

package rpc
import (
"encoding/gob"
"errors"
"github.com/mitchellh/packer/packer"
"io"
"log"
"net"
"net/rpc"
"time"
)
// An implementation of packer.Communicator where the communicator is actually
// executed over an RPC connection.
type communicator struct {
client *rpc.Client
}
// CommunicatorServer wraps a packer.Communicator implementation and makes
// it exportable as part of a Golang RPC server.
type CommunicatorServer struct {
c packer.Communicator
}
type CommandFinished struct {
ExitStatus int
}
type CommunicatorStartArgs struct {
Command string
StdinAddress string
StdoutAddress string
StderrAddress string
ResponseAddress string
}
type CommunicatorDownloadArgs struct {
Path string
WriterAddress string
}
type CommunicatorUploadArgs struct {
Path string
ReaderAddress string
}
func Communicator(client *rpc.Client) *communicator {
return &communicator{client}
}
func (c *communicator) Start(cmd *packer.RemoteCmd) (err error) {
var args CommunicatorStartArgs
args.Command = cmd.Command
if cmd.Stdin != nil {
stdinL := netListenerInRange(portRangeMin, portRangeMax)
args.StdinAddress = stdinL.Addr().String()
go serveSingleCopy("stdin", stdinL, nil, cmd.Stdin)
}
if cmd.Stdout != nil {
stdoutL := netListenerInRange(portRangeMin, portRangeMax)
args.StdoutAddress = stdoutL.Addr().String()
go serveSingleCopy("stdout", stdoutL, cmd.Stdout, nil)
}
if cmd.Stderr != nil {
stderrL := netListenerInRange(portRangeMin, portRangeMax)
args.StderrAddress = stderrL.Addr().String()
go serveSingleCopy("stderr", stderrL, cmd.Stderr, nil)
}
responseL := netListenerInRange(portRangeMin, portRangeMax)
args.ResponseAddress = responseL.Addr().String()
go func() {
defer responseL.Close()
conn, err := responseL.Accept()
if err != nil {
log.Panic(err)
}
defer conn.Close()
decoder := gob.NewDecoder(conn)
var finished CommandFinished
if err := decoder.Decode(&finished); err != nil {
log.Panic(err)
}
cmd.ExitStatus = finished.ExitStatus
cmd.Exited = true
}()
err = c.client.Call("Communicator.Start", &args, new(interface{}))
return
}
func (c *communicator) Upload(path string, r io.Reader) (err error) {
// We need to create a server that can proxy the reader data
// over because we can't simply gob encode an io.Reader
readerL := netListenerInRange(portRangeMin, portRangeMax)
if readerL == nil {
err = errors.New("couldn't allocate listener for upload reader")
return
}
// Make sure at the end of this call, we close the listener
defer readerL.Close()
// Pipe the reader through to the connection
go serveSingleCopy("uploadReader", readerL, nil, r)
args := CommunicatorUploadArgs{
path,
readerL.Addr().String(),
}
err = c.client.Call("Communicator.Upload", &args, new(interface{}))
return
}
func (c *communicator) Download(path string, w io.Writer) (err error) {
// We need to create a server that can proxy that data downloaded
// into the writer because we can't gob encode a writer directly.
writerL := netListenerInRange(portRangeMin, portRangeMax)
if writerL == nil {
err = errors.New("couldn't allocate listener for download writer")
return
}
// Make sure we close the listener once we're done because we'll be done
defer writerL.Close()
// Serve a single connection and a single copy
go serveSingleCopy("downloadWriter", writerL, w, nil)
args := CommunicatorDownloadArgs{
path,
writerL.Addr().String(),
}
err = c.client.Call("Communicator.Download", &args, new(interface{}))
return
}
func (c *CommunicatorServer) Start(args *CommunicatorStartArgs, reply *interface{}) (err error) {
// Build the RemoteCmd on this side so that it all pipes over
// to the remote side.
var cmd packer.RemoteCmd
cmd.Command = args.Command
toClose := make([]net.Conn, 0)
if args.StdinAddress != "" {
stdinC, err := net.Dial("tcp", args.StdinAddress)
if err != nil {
return err
}
toClose = append(toClose, stdinC)
cmd.Stdin = stdinC
}
if args.StdoutAddress != "" {
stdoutC, err := net.Dial("tcp", args.StdoutAddress)
if err != nil {
return err
}
toClose = append(toClose, stdoutC)
cmd.Stdout = stdoutC
}
if args.StderrAddress != "" {
stderrC, err := net.Dial("tcp", args.StderrAddress)
if err != nil {
return err
}
toClose = append(toClose, stderrC)
cmd.Stderr = stderrC
}
// Connect to the response address so we can write our result to it
// when ready.
responseC, err := net.Dial("tcp", args.ResponseAddress)
if err != nil {
return err
}
responseWriter := gob.NewEncoder(responseC)
// Start the actual command
err = c.c.Start(&cmd)
// Start a goroutine to spin and wait for the process to actual
// exit. When it does, report it back to caller...
go func() {
defer responseC.Close()
for _, conn := range toClose {
defer conn.Close()
}
for !cmd.Exited {
time.Sleep(50 * time.Millisecond)
}
responseWriter.Encode(&CommandFinished{cmd.ExitStatus})
}()
return
}
func (c *CommunicatorServer) Upload(args *CommunicatorUploadArgs, reply *interface{}) (err error) {
readerC, err := net.Dial("tcp", args.ReaderAddress)
if err != nil {
return
}
defer readerC.Close()
err = c.c.Upload(args.Path, readerC)
return
}
func (c *CommunicatorServer) Download(args *CommunicatorDownloadArgs, reply *interface{}) (err error) {
writerC, err := net.Dial("tcp", args.WriterAddress)
if err != nil {
return
}
defer writerC.Close()
err = c.c.Download(args.Path, writerC)
return
}
func serveSingleCopy(name string, l net.Listener, dst io.Writer, src io.Reader) {
defer l.Close()
conn, err := l.Accept()
if err != nil {
log.Printf("'%s' accept error: %s", name, err)
return
}
// Be sure to close the connection after we're done copying so
// that an EOF will successfully be sent to the remote side
defer conn.Close()
// The connection is the destination/source that is nil
if dst == nil {
dst = conn
} else {
src = conn
}
written, err := io.Copy(dst, src)
log.Printf("%d bytes written for '%s'", written, name)
if err != nil {
log.Printf("'%s' copy error: %s", name, err)
}
}