diff --git a/packer/rpc/communicator.go b/packer/rpc/communicator.go index 97d82360f..526f34f2f 100644 --- a/packer/rpc/communicator.go +++ b/packer/rpc/communicator.go @@ -34,6 +34,11 @@ type CommunicatorStartResponse struct { RemoteCommandAddress string } +type CommunicatorUploadArgs struct { + Path string + ReaderAddress string +} + func Communicator(client *rpc.Client) *communicator { return &communicator{client} } @@ -87,8 +92,30 @@ func (c *communicator) Start(cmd string) (rc *packer.RemoteCommand, err error) { return } -func (c *communicator) Upload(string, io.Reader) error { - return nil +func (c *communicator) Upload(path string, r io.Reader) (err error) { + readerL := netListenerInRange(portRangeMin, portRangeMax) + if readerL == nil { + err = errors.New("couldn't allocate listener for upload reader") + return + } + + // Make sure at the end of this call, we close the listener + defer readerL.Close() + + // Pipe the reader through to the connection + go serveSingleCopy("uploadReader", readerL, nil, r) + + args := CommunicatorUploadArgs{ + path, + readerL.Addr().String(), + } + + cerr := c.client.Call("Communicator.Upload", &args, &err) + if cerr != nil { + err = cerr + } + + return } func (c *communicator) Download(string, io.Writer) error { @@ -133,6 +160,18 @@ func (c *CommunicatorServer) Start(cmd *string, reply *CommunicatorStartResponse return } +func (c *CommunicatorServer) Upload(args *CommunicatorUploadArgs, reply *interface{}) (err error) { + readerC, err := net.Dial("tcp", args.ReaderAddress) + if err != nil { + return + } + + defer readerC.Close() + + err = c.c.Upload(args.Path, readerC) + return +} + func (rc *RemoteCommandServer) Wait(args *interface{}, reply *int) error { rc.rc.Wait() *reply = rc.rc.ExitStatus diff --git a/packer/rpc/communicator_test.go b/packer/rpc/communicator_test.go index adcfbf2e6..fb783c803 100644 --- a/packer/rpc/communicator_test.go +++ b/packer/rpc/communicator_test.go @@ -18,6 +18,10 @@ type testCommunicator struct { startErr *io.PipeWriter startExited *bool startExitStatus *int + + uploadCalled bool + uploadPath string + uploadReader io.Reader } func (t *testCommunicator) Start(cmd string) (*packer.RemoteCommand, error) { @@ -45,7 +49,11 @@ func (t *testCommunicator) Start(cmd string) (*packer.RemoteCommand, error) { return rc, nil } -func (t *testCommunicator) Upload(string, io.Reader) error { +func (t *testCommunicator) Upload(path string, reader io.Reader) error { + t.uploadCalled = true + t.uploadPath = path + t.uploadReader = reader + return nil } @@ -99,6 +107,21 @@ func TestCommunicatorRPC(t *testing.T) { *c.startExited = true rc.Wait() assert.Equal(rc.ExitStatus, 42, "should have proper exit status") + + // Test that we can upload things + uploadR, uploadW := io.Pipe() + err = remote.Upload("foo", uploadR) + assert.Nil(err, "should not error") + assert.True(c.uploadCalled, "should be called") + assert.Equal(c.uploadPath, "foo", "should be correct path") + return + + // Test the upload reader + uploadW.Write([]byte("uploadfoo\n")) + bufUpR := bufio.NewReader(c.uploadReader) + data, err = bufUpR.ReadString('\n') + assert.Nil(err, "should not error") + assert.Equal(data, "uploadfoo\n", "should have the proper data") } func TestCommunicator_ImplementsCommunicator(t *testing.T) {