From 88a018bf5d8012221cb5140bd7ab4db25feb8794 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Sun, 12 May 2013 14:47:55 -0700 Subject: [PATCH] packer: Work on communicators... WIP --- builder/amazonebs/builder.go | 3 - packer/communicator.go | 24 ++++++ packer/rpc/communicator.go | 134 ++++++++++++++++++++++++++++++++ packer/rpc/communicator_test.go | 95 ++++++++++++++++++++++ packer/rpc/server.go | 6 ++ 5 files changed, 259 insertions(+), 3 deletions(-) create mode 100644 packer/communicator.go create mode 100644 packer/rpc/communicator.go create mode 100644 packer/rpc/communicator_test.go diff --git a/builder/amazonebs/builder.go b/builder/amazonebs/builder.go index ea62703e6..7305dfe84 100644 --- a/builder/amazonebs/builder.go +++ b/builder/amazonebs/builder.go @@ -60,9 +60,6 @@ func (b *Builder) Run(ui packer.Ui, hook packer.Hook) { MaxCount: 0, } - hook.Run(HookPreLaunch, nil, ui) - return - ui.Say("Launching a source AWS instance...\n") runResp, err := ec2conn.RunInstances(runOpts) if err != nil { diff --git a/packer/communicator.go b/packer/communicator.go new file mode 100644 index 000000000..97cf3d768 --- /dev/null +++ b/packer/communicator.go @@ -0,0 +1,24 @@ +package packer + +import "io" + +// A Communicator is the interface used to communicate with the machine +// that exists that will eventually be packaged into an image. Communicators +// allow you to execute remote commands, upload files, etc. +// +// Communicators must be safe for concurrency, meaning multiple calls to +// Start or any other method may be called at the same time. +type Communicator interface { + Start(string) (*RemoteCommand, error) + Upload(string, io.Reader) + Download(string, io.Writer) +} + +// This struct contains some information about the remote command being +// executed and can be used to wait for it to complete. +type RemoteCommand struct { + Stdin io.Writer + Stdout io.Reader + Stderr io.Reader + ExitStatus int +} diff --git a/packer/rpc/communicator.go b/packer/rpc/communicator.go new file mode 100644 index 000000000..202c19512 --- /dev/null +++ b/packer/rpc/communicator.go @@ -0,0 +1,134 @@ +package rpc + +import ( + "errors" + "github.com/mitchellh/packer/packer" + "io" + "log" + "net" + "net/rpc" +) + +// An implementation of packer.Communicator where the communicator is actually +// executed over an RPC connection. +type communicator struct { + client *rpc.Client +} + +// CommunicatorServer wraps a packer.Communicator implementation and makes +// it exportable as part of a Golang RPC server. +type CommunicatorServer struct { + c packer.Communicator +} + +// RemoteCommandServer wraps a packer.RemoteCommand struct and makes it +// exportable as part of a Golang RPC server. +type RemoteCommandServer struct { + rc *packer.RemoteCommand +} + +type CommunicatorStartResponse struct { + StdinAddress string + StdoutAddress string + StderrAddress string + ExitStatusAddress string +} + +func Communicator(client *rpc.Client) *communicator { + return &communicator{client} +} + +func (c *communicator) Start(cmd string) (rc *packer.RemoteCommand, err error) { + var response CommunicatorStartResponse + err = c.client.Call("Communicator.Start", &cmd, &response) + if err != nil { + return + } + + // Connect to the three streams that will handle stdin, stdout, + // and stderr and get net.Conns for them. + stdinC, err := net.Dial("tcp", response.StdinAddress) + if err != nil { + return + } + + stdoutC, err := net.Dial("tcp", response.StdoutAddress) + if err != nil { + return + } + + stderrC, err := net.Dial("tcp", response.StderrAddress) + if err != nil { + return + } + + // Build the response object using the streams we created + rc = &packer.RemoteCommand{ + stdinC, + stdoutC, + stderrC, + 0, + } + + return +} + +func (c *CommunicatorServer) Start(cmd *string, reply *CommunicatorStartResponse) (err error) { + // Start executing the command. + command, err := c.c.Start(*cmd) + if err != nil { + return + } + + // If we didn't get a proper command... that's not right. + if command == nil { + return errors.New("communicator returned nil remote command") + } + + // Next, we need to take the stdin/stdout and start a listener + // for each because the client will connect to us via TCP and use + // that connection as the io.Reader or io.Writer. These exist for + // only a single connection that is persistent. + stdinL := netListenerInRange(portRangeMin, portRangeMax) + stdoutL := netListenerInRange(portRangeMin, portRangeMax) + stderrL := netListenerInRange(portRangeMin, portRangeMax) + go serveSingleCopy("stdin", stdinL, command.Stdin, nil) + go serveSingleCopy("stdout", stdoutL, nil, command.Stdout) + go serveSingleCopy("stderr", stderrL, nil, command.Stderr) + + // For the exit status, we use a simple RPC Server that serves + // some of the RemoteComand methods. + server := rpc.NewServer() + //server.RegisterName("RemoteCommand", &RemoteCommandServer{command}) + + *reply = CommunicatorStartResponse{ + stdinL.Addr().String(), + stdoutL.Addr().String(), + stderrL.Addr().String(), + serveSingleConn(server), + } + + return +} + +func serveSingleCopy(name string, l net.Listener, dst io.Writer, src io.Reader) { + defer l.Close() + + conn, err := l.Accept() + if err != nil { + return + } + + // The connection is the destination/source that is nil + if dst == nil { + dst = conn + } else { + src = conn + } + + written, err := io.Copy(dst, src) + log.Printf("%d bytes written for '%s'", written, name) + if err != nil { + log.Printf("'%s' copy error: %s", name, err.Error()) + } +} diff --git a/packer/rpc/communicator_test.go b/packer/rpc/communicator_test.go new file mode 100644 index 000000000..58c52be65 --- /dev/null +++ b/packer/rpc/communicator_test.go @@ -0,0 +1,95 @@ +package rpc + +import ( + "bufio" + "cgl.tideland.biz/asserts" + "github.com/mitchellh/packer/packer" + "io" + "net/rpc" + "testing" +) + +type testCommunicator struct { + startCalled bool + startCmd string + + startIn *io.PipeReader + startOut *io.PipeWriter + startErr *io.PipeWriter +} + +func (t *testCommunicator) Start(cmd string) (*packer.RemoteCommand, error) { + t.startCalled = true + t.startCmd = cmd + + var stdin *io.PipeWriter + var stdout, stderr *io.PipeReader + + t.startIn, stdin = io.Pipe() + stdout, t.startOut = io.Pipe() + stderr, t.startErr = io.Pipe() + + rc := &packer.RemoteCommand{ + stdin, + stdout, + stderr, + 0, + } + + return rc, nil +} + +func (t *testCommunicator) Upload(string, io.Reader) {} + +func (t *testCommunicator) Download(string, io.Writer) {} + +func TestCommunicatorRPC(t *testing.T) { + assert := asserts.NewTestingAsserts(t, true) + + // Create the interface to test + c := new(testCommunicator) + + // Start the server + server := rpc.NewServer() + RegisterCommunicator(server, c) + address := serveSingleConn(server) + + // Create the client over RPC and run some methods to verify it works + client, err := rpc.Dial("tcp", address) + assert.Nil(err, "should be able to connect") + + // Test Start + remote := Communicator(client) + rc, err := remote.Start("foo") + assert.Nil(err, "should not have an error") + + // Test that we can read from stdout + bufOut := bufio.NewReader(rc.Stdout) + c.startOut.Write([]byte("outfoo\n")) + data, err := bufOut.ReadString('\n') + assert.Nil(err, "should have no problem reading stdout") + assert.Equal(data, "outfoo\n", "should be correct stdout") + + // Test that we can read from stderr + bufErr := bufio.NewReader(rc.Stderr) + c.startErr.Write([]byte("errfoo\n")) + data, err = bufErr.ReadString('\n') + assert.Nil(err, "should have no problem reading stdout") + assert.Equal(data, "errfoo\n", "should be correct stdout") + + // Test that we can write to stdin + bufIn := bufio.NewReader(c.startIn) + rc.Stdin.Write([]byte("infoo\n")) + data, err = bufIn.ReadString('\n') + assert.Nil(err, "should have no problem reading stdin") + assert.Equal(data, "infoo\n", "should be correct stdin") +} + +func TestCommunicator_ImplementsCommunicator(t *testing.T) { + //assert := asserts.NewTestingAsserts(t, true) + + //var r packer.Communicator + //c := Communicator(nil) + + //assert.Implementor(c, &r, "should be a Communicator") +} diff --git a/packer/rpc/server.go b/packer/rpc/server.go index e9059bef8..c050ccb9e 100644 --- a/packer/rpc/server.go +++ b/packer/rpc/server.go @@ -23,6 +23,12 @@ func RegisterCommand(s *rpc.Server, c packer.Command) { s.RegisterName("Command", &CommandServer{c}) } +// Registers the appropriate endpoint on an RPC server to serve a +// Packer Communicator. +func RegisterCommunicator(s *rpc.Server, c packer.Communicator) { + s.RegisterName("Communicator", &CommunicatorServer{c}) +} + // Registers the appropriate endpoint on an RPC server to serve a // Packer Environment func RegisterEnvironment(s *rpc.Server, e packer.Environment) {