From d8637751a346e613d238daa33cf0c4242a1c54ad Mon Sep 17 00:00:00 2001 From: Megan Marsh Date: Mon, 3 Jul 2017 12:18:10 -0700 Subject: [PATCH 1/3] rpc/communicator fix race condition that causes stdout from ssh provisioner to be truncated --- packer/rpc/communicator.go | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/packer/rpc/communicator.go b/packer/rpc/communicator.go index e1aab41e0..c47036399 100644 --- a/packer/rpc/communicator.go +++ b/packer/rpc/communicator.go @@ -6,6 +6,7 @@ import ( "log" "net/rpc" "os" + "sync" "github.com/hashicorp/packer/packer" ) @@ -67,19 +68,33 @@ func (c *communicator) Start(cmd *packer.RemoteCmd) (err error) { var args CommunicatorStartArgs args.Command = cmd.Command + var wg sync.WaitGroup + if cmd.Stdin != nil { + wg.Add(1) args.StdinStreamId = c.mux.NextId() - go serveSingleCopy("stdin", c.mux, args.StdinStreamId, nil, cmd.Stdin) + go func() { + defer wg.Done() + serveSingleCopy("stdin", c.mux, args.StdinStreamId, nil, cmd.Stdin) + }() } if cmd.Stdout != nil { + wg.Add(1) args.StdoutStreamId = c.mux.NextId() - go serveSingleCopy("stdout", c.mux, args.StdoutStreamId, cmd.Stdout, nil) + go func() { + defer wg.Done() + serveSingleCopy("stdout", c.mux, args.StdoutStreamId, cmd.Stdout, nil) + }() } if cmd.Stderr != nil { + wg.Add(1) args.StderrStreamId = c.mux.NextId() - go serveSingleCopy("stderr", c.mux, args.StderrStreamId, cmd.Stderr, nil) + go func() { + defer wg.Done() + serveSingleCopy("stderr", c.mux, args.StderrStreamId, cmd.Stderr, nil) + }() } responseStreamId := c.mux.NextId() @@ -97,7 +112,9 @@ func (c *communicator) Start(cmd *packer.RemoteCmd) (err error) { var finished CommandFinished decoder := gob.NewDecoder(conn) - if err := decoder.Decode(&finished); err != nil { + err = decoder.Decode(&finished) + wg.Wait() + if err != nil { log.Printf("[ERR] Error decoding response stream %d: %s", responseStreamId, err) cmd.SetExited(123) From 9ee97aaa2a26e1401a59829b5104012aeac65bbe Mon Sep 17 00:00:00 2001 From: Megan Marsh Date: Mon, 3 Jul 2017 13:30:11 -0700 Subject: [PATCH 2/3] while I'm at it, kill this race condition in uploads, too --- packer/rpc/communicator.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/packer/rpc/communicator.go b/packer/rpc/communicator.go index c47036399..d4823a49a 100644 --- a/packer/rpc/communicator.go +++ b/packer/rpc/communicator.go @@ -132,7 +132,12 @@ func (c *communicator) Start(cmd *packer.RemoteCmd) (err error) { func (c *communicator) Upload(path string, r io.Reader, fi *os.FileInfo) (err error) { // Pipe the reader through to the connection streamId := c.mux.NextId() - go serveSingleCopy("uploadData", c.mux, streamId, nil, r) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + serveSingleCopy("uploadData", c.mux, streamId, nil, r) + }() args := CommunicatorUploadArgs{ Path: path, @@ -144,6 +149,7 @@ func (c *communicator) Upload(path string, r io.Reader, fi *os.FileInfo) (err er } err = c.client.Call("Communicator.Upload", &args, new(interface{})) + wg.Wait() return } From b2d5fcd48aa03ca0f11ac9cab09a9d2c3cea3b59 Mon Sep 17 00:00:00 2001 From: Megan Marsh Date: Wed, 5 Jul 2017 12:27:59 -0700 Subject: [PATCH 3/3] move wait earlier --- packer/rpc/communicator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packer/rpc/communicator.go b/packer/rpc/communicator.go index d4823a49a..816e2c7d8 100644 --- a/packer/rpc/communicator.go +++ b/packer/rpc/communicator.go @@ -102,6 +102,7 @@ func (c *communicator) Start(cmd *packer.RemoteCmd) (err error) { go func() { conn, err := c.mux.Accept(responseStreamId) + wg.Wait() if err != nil { log.Printf("[ERR] Error accepting response stream %d: %s", responseStreamId, err) @@ -113,7 +114,6 @@ func (c *communicator) Start(cmd *packer.RemoteCmd) (err error) { var finished CommandFinished decoder := gob.NewDecoder(conn) err = decoder.Decode(&finished) - wg.Wait() if err != nil { log.Printf("[ERR] Error decoding response stream %d: %s", responseStreamId, err)