fix race
This commit is contained in:
parent
781aa2ccae
commit
b84b665ba3
|
@ -4,11 +4,11 @@ import (
|
|||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"unicode"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/hashicorp/packer/common/iochan"
|
||||
"github.com/mitchellh/iochan"
|
||||
)
|
||||
|
||||
// CmdDisconnect is a sentinel value to indicate a RemoteCmd
|
||||
|
@ -115,30 +115,52 @@ func (r *RemoteCmd) RunWithUi(ctx context.Context, c Communicator, ui Ui) error
|
|||
r.Stderr = io.MultiWriter(r.Stderr, stderr_w)
|
||||
}
|
||||
|
||||
// Loop and get all our output until done.
|
||||
printFn := func(in io.Reader, out func(string)) error {
|
||||
for output := range iochan.LineReader(in) {
|
||||
if output != "" {
|
||||
out(output)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
wg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
wg.Go(func() error { return printFn(stdout_r, ui.Message) })
|
||||
wg.Go(func() error { return printFn(stderr_r, ui.Error) })
|
||||
|
||||
// Start the command
|
||||
if err := c.Start(ctx, r); err != nil {
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-r.exitCh:
|
||||
return nil
|
||||
|
||||
// Create the channels we'll use for data
|
||||
stdoutCh := iochan.DelimReader(stdout_r, '\n')
|
||||
stderrCh := iochan.DelimReader(stderr_r, '\n')
|
||||
|
||||
// Start the goroutine to watch for the exit
|
||||
go func() {
|
||||
defer stdout_w.Close()
|
||||
defer stderr_w.Close()
|
||||
r.Wait()
|
||||
}()
|
||||
|
||||
// Loop and get all our output
|
||||
OutputLoop:
|
||||
for {
|
||||
select {
|
||||
case output := <-stderrCh:
|
||||
if output != "" {
|
||||
ui.Error(r.cleanOutputLine(output))
|
||||
}
|
||||
case output := <-stdoutCh:
|
||||
if output != "" {
|
||||
ui.Message(r.cleanOutputLine(output))
|
||||
}
|
||||
case <-r.exitCh:
|
||||
break OutputLoop
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure we finish off stdout/stderr because we may have gotten
|
||||
// a message from the exit channel before finishing these first.
|
||||
for output := range stdoutCh {
|
||||
ui.Message(r.cleanOutputLine(output))
|
||||
}
|
||||
|
||||
for output := range stderrCh {
|
||||
ui.Error(r.cleanOutputLine(output))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetExited is a helper for setting that this process is exited. This
|
||||
|
@ -174,3 +196,19 @@ func (r *RemoteCmd) initchan() {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
// cleanOutputLine cleans up a line so that '\r' don't muck up the
|
||||
// UI output when we're reading from a remote command.
|
||||
func (r *RemoteCmd) cleanOutputLine(line string) string {
|
||||
// Trim surrounding whitespace
|
||||
line = strings.TrimRightFunc(line, unicode.IsSpace)
|
||||
|
||||
// Trim up to the first carriage return, since that text would be
|
||||
// lost anyways.
|
||||
idx := strings.LastIndex(line, "\r")
|
||||
if idx > -1 {
|
||||
line = line[idx+1:]
|
||||
}
|
||||
|
||||
return line
|
||||
}
|
||||
|
|
|
@ -101,10 +101,13 @@ func (c *communicator) Start(ctx context.Context, cmd *packer.RemoteCmd) (err er
|
|||
|
||||
go func() {
|
||||
conn, err := c.mux.Accept(responseStreamId)
|
||||
log.Printf("Megan waiting...")
|
||||
wg.Wait()
|
||||
log.Printf("Megan done waiting...")
|
||||
if err != nil {
|
||||
log.Printf("[ERR] Error accepting response stream %d: %s",
|
||||
responseStreamId, err)
|
||||
log.Printf("Megan SetExited 1...")
|
||||
cmd.SetExited(123)
|
||||
return
|
||||
}
|
||||
|
@ -115,15 +118,19 @@ func (c *communicator) Start(ctx context.Context, cmd *packer.RemoteCmd) (err er
|
|||
if err := decoder.Decode(&finished); err != nil {
|
||||
log.Printf("[ERR] Error decoding response stream %d: %s",
|
||||
responseStreamId, err)
|
||||
log.Printf("Megan SetExited 2...")
|
||||
cmd.SetExited(123)
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("[INFO] RPC client: Communicator ended with: %d", finished.ExitStatus)
|
||||
log.Printf("Megan SetExited 3...")
|
||||
cmd.SetExited(finished.ExitStatus)
|
||||
}()
|
||||
|
||||
log.Printf("Megan, calling communicator using c.client.call")
|
||||
err = c.client.Call("Communicator.Start", &args, new(interface{}))
|
||||
log.Printf("Megan, returned from communicator.Start c.client.call")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -202,6 +209,7 @@ func (c *communicator) Download(path string, w io.Writer) (err error) {
|
|||
}
|
||||
|
||||
func (c *CommunicatorServer) Start(args *CommunicatorStartArgs, reply *interface{}) error {
|
||||
log.Println("Megan inside communicatorServer")
|
||||
ctx := context.TODO()
|
||||
|
||||
// Build the RemoteCmd on this side so that it all pipes over
|
||||
|
|
Loading…
Reference in New Issue