From 3a4150088839ac06356bbe4e855d94ae42c22580 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Tue, 10 Dec 2013 15:12:16 -0800 Subject: [PATCH] packer/rpc: more robust communicator connection cleanup --- packer/rpc/communicator.go | 39 +++++++++++++++++++++++++++----------- packer/rpc/muxconn.go | 4 ++-- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/packer/rpc/communicator.go b/packer/rpc/communicator.go index de624a7af..3ea728bc1 100644 --- a/packer/rpc/communicator.go +++ b/packer/rpc/communicator.go @@ -95,6 +95,7 @@ func (c *communicator) Start(cmd *packer.RemoteCmd) (err error) { return } + log.Printf("[INFO] RPC client: Communicator ended with: %d", finished.ExitStatus) cmd.SetExited(finished.ExitStatus) }() @@ -146,17 +147,28 @@ func (c *communicator) Download(path string, w io.Writer) (err error) { return } -func (c *CommunicatorServer) Start(args *CommunicatorStartArgs, reply *interface{}) (err error) { +func (c *CommunicatorServer) Start(args *CommunicatorStartArgs, reply *interface{}) (error) { // Build the RemoteCmd on this side so that it all pipes over // to the remote side. var cmd packer.RemoteCmd cmd.Command = args.Command + // Create a channel to signal we're done so that we can close + // our stdin/stdout/stderr streams toClose := make([]io.Closer, 0) + doneCh := make(chan struct{}) + go func() { + <-doneCh + for _, conn := range toClose { + defer conn.Close() + } + }() + if args.StdinStreamId > 0 { conn, err := c.mux.Dial(args.StdinStreamId) if err != nil { - return err + close(doneCh) + return NewBasicError(err) } toClose = append(toClose, conn) @@ -166,7 +178,8 @@ func (c *CommunicatorServer) Start(args *CommunicatorStartArgs, reply *interface if args.StdoutStreamId > 0 { conn, err := c.mux.Dial(args.StdoutStreamId) if err != nil { - return err + close(doneCh) + return NewBasicError(err) } toClose = append(toClose, conn) @@ -176,38 +189,42 @@ func (c *CommunicatorServer) Start(args *CommunicatorStartArgs, reply *interface if args.StderrStreamId > 0 { conn, err := c.mux.Dial(args.StderrStreamId) if err != nil { - return err + close(doneCh) + return NewBasicError(err) } toClose = append(toClose, conn) cmd.Stderr = conn } + // Connect to the response address so we can write our result to it // when ready. responseC, err := c.mux.Dial(args.ResponseStreamId) if err != nil { - return err + close(doneCh) + return NewBasicError(err) } - responseWriter := gob.NewEncoder(responseC) // Start the actual command err = c.c.Start(&cmd) + if err != nil { + close(doneCh) + return NewBasicError(err) + } // Start a goroutine to spin and wait for the process to actual // exit. When it does, report it back to caller... go func() { + defer close(doneCh) defer responseC.Close() - for _, conn := range toClose { - defer conn.Close() - } - cmd.Wait() + log.Printf("[INFO] RPC endpoint: Communicator ended with: %d", cmd.ExitStatus) responseWriter.Encode(&CommandFinished{cmd.ExitStatus}) }() - return + return nil } func (c *CommunicatorServer) Upload(args *CommunicatorUploadArgs, reply *interface{}) (err error) { diff --git a/packer/rpc/muxconn.go b/packer/rpc/muxconn.go index 0663bc0c7..e29d938bb 100644 --- a/packer/rpc/muxconn.go +++ b/packer/rpc/muxconn.go @@ -197,7 +197,7 @@ func (m *MuxConn) openStream(id uint32) (*Stream, error) { // Create the stream object and channel where data will be sent to dataR, dataW := io.Pipe() - writeCh := make(chan []byte, 10) + writeCh := make(chan []byte, 256) // Set the data channel so we can write to it. stream := &Stream{ @@ -315,7 +315,7 @@ func (m *MuxConn) loop() { select { case stream.writeCh <- data: default: - log.Printf("[ERR] Failed to write data, buffer full: %d", id) + panic(fmt.Sprintf("Failed to write data, buffer full for stream %d", id)) } } else { log.Printf("[ERR] Data received for stream in state: %d", stream.state)