diff --git a/packer/rpc/muxconn.go b/packer/rpc/muxconn.go index 0e7c08ca0..13e864b5f 100644 --- a/packer/rpc/muxconn.go +++ b/packer/rpc/muxconn.go @@ -175,16 +175,29 @@ func (m *MuxConn) openStream(id uint32) (*Stream, error) { // Create the stream object and channel where data will be sent to dataR, dataW := io.Pipe() + writeCh := make(chan []byte, 10) // Set the data channel so we can write to it. stream := &Stream{ - id: id, - mux: m, - reader: dataR, - writer: dataW, + id: id, + mux: m, + reader: dataR, + writer: dataW, + writeCh: writeCh, } stream.setState(streamStateClosed) + // Start the goroutine that will read from the queue and write + // data out. + go func() { + for { + data := <-writeCh + if _, err := dataW.Write(data); err != nil { + return + } + } + }() + m.streams[id] = stream return m.streams[id], nil } @@ -256,7 +269,11 @@ func (m *MuxConn) loop() { case muxPacketData: stream.mu.Lock() if stream.state == streamStateEstablished { - stream.writer.Write(data) + select { + case stream.writeCh <- data: + default: + log.Printf("[ERR] Failed to write data, buffer full: %d", id) + } } else { log.Printf("[ERR] Data received for stream in state: %d", stream.state) } @@ -293,6 +310,7 @@ type Stream struct { state streamState stateUpdated time.Time mu sync.Mutex + writeCh chan<- []byte } type streamState byte