packer-cn/packer/rpc/communicator.go

344 lines
7.6 KiB
Go
Raw Normal View History

2013-05-12 17:47:55 -04:00
package rpc
import (
"context"
"encoding/gob"
2013-05-12 17:47:55 -04:00
"io"
"log"
"net/rpc"
"os"
"sync"
2017-04-04 16:39:01 -04:00
"github.com/hashicorp/packer/packer"
2013-05-12 17:47:55 -04:00
)
// An implementation of packer.Communicator where the communicator is actually
// executed over an RPC connection.
type communicator struct {
client *rpc.Client
2014-09-02 17:23:06 -04:00
mux *muxBroker
2013-05-12 17:47:55 -04:00
}
// CommunicatorServer wraps a packer.Communicator implementation and makes
// it exportable as part of a Golang RPC server.
type CommunicatorServer struct {
2013-12-10 14:43:02 -05:00
c packer.Communicator
2014-09-02 17:23:06 -04:00
mux *muxBroker
2013-05-12 17:47:55 -04:00
}
type CommandFinished struct {
ExitStatus int
}
type CommunicatorStartArgs struct {
2013-12-10 14:43:02 -05:00
Command string
StdinStreamId uint32
StdoutStreamId uint32
StderrStreamId uint32
ResponseStreamId uint32
2013-05-12 17:47:55 -04:00
}
type CommunicatorDownloadArgs struct {
2013-12-10 14:43:02 -05:00
Path string
WriterStreamId uint32
}
2013-05-12 19:48:46 -04:00
type CommunicatorUploadArgs struct {
2013-12-10 14:43:02 -05:00
Path string
ReaderStreamId uint32
FileInfo *fileInfo
2013-05-12 19:48:46 -04:00
}
2013-08-23 22:18:15 -04:00
type CommunicatorUploadDirArgs struct {
Dst string
Src string
Exclude []string
}
type CommunicatorDownloadDirArgs struct {
Dst string
Src string
Exclude []string
}
2013-05-12 17:47:55 -04:00
func Communicator(client *rpc.Client) *communicator {
2013-12-10 14:43:02 -05:00
return &communicator{client: client}
2013-05-12 17:47:55 -04:00
}
func (c *communicator) Start(ctx context.Context, cmd *packer.RemoteCmd) (err error) {
var args CommunicatorStartArgs
args.Command = cmd.Command
2013-05-12 17:47:55 -04:00
var wg sync.WaitGroup
if cmd.Stdin != nil {
2013-12-10 14:43:02 -05:00
args.StdinStreamId = c.mux.NextId()
go func() {
serveSingleCopy("stdin", c.mux, args.StdinStreamId, nil, cmd.Stdin)
}()
2013-05-12 17:47:55 -04:00
}
if cmd.Stdout != nil {
wg.Add(1)
2013-12-10 14:43:02 -05:00
args.StdoutStreamId = c.mux.NextId()
go func() {
defer wg.Done()
serveSingleCopy("stdout", c.mux, args.StdoutStreamId, cmd.Stdout, nil)
}()
}
if cmd.Stderr != nil {
wg.Add(1)
2013-12-10 14:43:02 -05:00
args.StderrStreamId = c.mux.NextId()
go func() {
defer wg.Done()
serveSingleCopy("stderr", c.mux, args.StderrStreamId, cmd.Stderr, nil)
}()
2013-05-12 17:47:55 -04:00
}
2013-12-10 14:43:02 -05:00
responseStreamId := c.mux.NextId()
args.ResponseStreamId = responseStreamId
go func() {
2013-12-10 14:43:02 -05:00
conn, err := c.mux.Accept(responseStreamId)
2017-07-05 15:27:59 -04:00
wg.Wait()
if err != nil {
2013-12-10 14:43:02 -05:00
log.Printf("[ERR] Error accepting response stream %d: %s",
responseStreamId, err)
cmd.SetExited(123)
return
}
defer conn.Close()
var finished CommandFinished
2013-12-10 14:43:02 -05:00
decoder := gob.NewDecoder(conn)
if err := decoder.Decode(&finished); err != nil {
2013-12-10 14:43:02 -05:00
log.Printf("[ERR] Error decoding response stream %d: %s",
responseStreamId, err)
cmd.SetExited(123)
return
}
log.Printf("[INFO] RPC client: Communicator ended with: %d", finished.ExitStatus)
cmd.SetExited(finished.ExitStatus)
}()
err = c.client.Call("Communicator.Start", &args, new(interface{}))
2013-05-12 17:47:55 -04:00
return
}
func (c *communicator) Upload(path string, r io.Reader, fi *os.FileInfo) (err error) {
2013-05-12 19:48:46 -04:00
// Pipe the reader through to the connection
2013-12-10 14:43:02 -05:00
streamId := c.mux.NextId()
go serveSingleCopy("uploadData", c.mux, streamId, nil, r)
2013-05-12 19:48:46 -04:00
args := CommunicatorUploadArgs{
2013-12-10 14:43:02 -05:00
Path: path,
ReaderStreamId: streamId,
2013-05-12 19:48:46 -04:00
}
if fi != nil {
args.FileInfo = NewFileInfo(*fi)
}
err = c.client.Call("Communicator.Upload", &args, new(interface{}))
return
}
func (c *communicator) UploadDir(dst string, src string, exclude []string) error {
2013-08-23 22:18:15 -04:00
args := &CommunicatorUploadDirArgs{
Dst: dst,
Src: src,
Exclude: exclude,
}
var reply error
err := c.client.Call("Communicator.UploadDir", args, &reply)
2013-08-23 22:18:15 -04:00
if err == nil {
err = reply
}
return err
}
func (c *communicator) DownloadDir(src string, dst string, exclude []string) error {
args := &CommunicatorDownloadDirArgs{
Dst: dst,
Src: src,
Exclude: exclude,
}
var reply error
err := c.client.Call("Communicator.DownloadDir", args, &reply)
if err == nil {
err = reply
}
return err
}
func (c *communicator) Download(path string, w io.Writer) (err error) {
// Serve a single connection and a single copy
2013-12-10 14:43:02 -05:00
streamId := c.mux.NextId()
waitServer := make(chan struct{})
go func() {
serveSingleCopy("downloadWriter", c.mux, streamId, w, nil)
close(waitServer)
}()
args := CommunicatorDownloadArgs{
2013-12-10 14:43:02 -05:00
Path: path,
WriterStreamId: streamId,
}
// Start sending data to the RPC server
err = c.client.Call("Communicator.Download", &args, new(interface{}))
// Wait for the RPC server to finish receiving the data before we return
<-waitServer
return
}
2013-12-11 14:19:36 -05:00
func (c *CommunicatorServer) Start(args *CommunicatorStartArgs, reply *interface{}) error {
ctx := context.TODO()
// Build the RemoteCmd on this side so that it all pipes over
// to the remote side.
var cmd packer.RemoteCmd
cmd.Command = args.Command
// Create a channel to signal we're done so that we can close
// our stdin/stdout/stderr streams
2013-12-10 14:43:02 -05:00
toClose := make([]io.Closer, 0)
doneCh := make(chan struct{})
go func() {
<-doneCh
for _, conn := range toClose {
defer conn.Close()
}
}()
if args.StdinStreamId > 0 {
2013-12-10 14:43:02 -05:00
conn, err := c.mux.Dial(args.StdinStreamId)
if err != nil {
close(doneCh)
return NewBasicError(err)
}
2013-12-10 14:43:02 -05:00
toClose = append(toClose, conn)
cmd.Stdin = conn
2013-05-12 17:47:55 -04:00
}
if args.StdoutStreamId > 0 {
2013-12-10 14:43:02 -05:00
conn, err := c.mux.Dial(args.StdoutStreamId)
if err != nil {
close(doneCh)
return NewBasicError(err)
}
2013-12-10 14:43:02 -05:00
toClose = append(toClose, conn)
cmd.Stdout = conn
2013-05-12 17:47:55 -04:00
}
if args.StderrStreamId > 0 {
2013-12-10 14:43:02 -05:00
conn, err := c.mux.Dial(args.StderrStreamId)
if err != nil {
close(doneCh)
return NewBasicError(err)
}
2013-12-10 14:43:02 -05:00
toClose = append(toClose, conn)
cmd.Stderr = conn
2013-05-12 17:47:55 -04:00
}
// Connect to the response address so we can write our result to it
// when ready.
2013-12-10 14:43:02 -05:00
responseC, err := c.mux.Dial(args.ResponseStreamId)
if err != nil {
close(doneCh)
return NewBasicError(err)
}
responseWriter := gob.NewEncoder(responseC)
// Start the actual command
err = c.c.Start(ctx, &cmd)
if err != nil {
close(doneCh)
return NewBasicError(err)
}
// Start a goroutine to spin and wait for the process to actual
// exit. When it does, report it back to caller...
go func() {
defer close(doneCh)
defer responseC.Close()
2013-07-29 15:12:42 -04:00
cmd.Wait()
log.Printf("[INFO] RPC endpoint: Communicator ended with: %d", cmd.ExitStatus())
responseWriter.Encode(&CommandFinished{cmd.ExitStatus()})
}()
return nil
2013-05-12 17:47:55 -04:00
}
2013-05-12 19:48:46 -04:00
func (c *CommunicatorServer) Upload(args *CommunicatorUploadArgs, reply *interface{}) (err error) {
2013-12-10 14:43:02 -05:00
readerC, err := c.mux.Dial(args.ReaderStreamId)
2013-05-12 19:48:46 -04:00
if err != nil {
return
}
defer readerC.Close()
var fi *os.FileInfo
if args.FileInfo != nil {
fi = new(os.FileInfo)
*fi = *args.FileInfo
}
err = c.c.Upload(args.Path, readerC, fi)
2013-05-12 19:48:46 -04:00
return
}
2013-08-23 22:18:15 -04:00
func (c *CommunicatorServer) UploadDir(args *CommunicatorUploadDirArgs, reply *error) error {
return c.c.UploadDir(args.Dst, args.Src, args.Exclude)
}
func (c *CommunicatorServer) DownloadDir(args *CommunicatorUploadDirArgs, reply *error) error {
return c.c.DownloadDir(args.Src, args.Dst, args.Exclude)
}
func (c *CommunicatorServer) Download(args *CommunicatorDownloadArgs, reply *interface{}) (err error) {
2013-12-10 14:43:02 -05:00
writerC, err := c.mux.Dial(args.WriterStreamId)
if err != nil {
return
}
defer writerC.Close()
err = c.c.Download(args.Path, writerC)
return
}
2014-09-02 17:23:06 -04:00
func serveSingleCopy(name string, mux *muxBroker, id uint32, dst io.Writer, src io.Reader) {
2013-12-10 14:43:02 -05:00
conn, err := mux.Accept(id)
2013-05-12 17:47:55 -04:00
if err != nil {
log.Printf("[ERR] '%s' accept error: %s", name, err)
2013-05-12 17:47:55 -04:00
return
}
// Be sure to close the connection after we're done copying so
// that an EOF will successfully be sent to the remote side
defer conn.Close()
2013-05-12 17:47:55 -04:00
// The connection is the destination/source that is nil
if dst == nil {
dst = conn
} else {
src = conn
}
written, err := io.Copy(dst, src)
log.Printf("[INFO] %d bytes written for '%s'", written, name)
2013-05-12 17:47:55 -04:00
if err != nil {
log.Printf("[ERR] '%s' copy error: %s", name, err)
2013-05-12 17:47:55 -04:00
}
}