diff --git a/packer/rpc/muxconn.go b/packer/rpc/muxconn.go index 34b5988c3..8ff8b4aec 100644 --- a/packer/rpc/muxconn.go +++ b/packer/rpc/muxconn.go @@ -84,23 +84,25 @@ func (m *MuxConn) Accept(id uint32) (io.ReadWriteCloser, error) { if stream.state != streamStateEstablished { // Go into the listening state stream.setState(streamStateListen) + + // Register a state change listener to wait for changes + stateCh := make(chan streamState, 10) + stream.registerStateListener(stateCh) + defer func() { + stream.mu.Lock() + defer stream.mu.Unlock() + stream.deregisterStateListener(stateCh) + }() + stream.mu.Unlock() // Wait for the connection to establish ACCEPT_ESTABLISH_LOOP: for { - time.Sleep(50 * time.Millisecond) - stream.mu.Lock() - switch stream.state { + state := <-stateCh + switch state { case streamStateListen: - stream.mu.Unlock() - case streamStateClosed: - // This can happen if it becomes established, some data is sent, - // and it closed all within the time period we wait above. - // This case will be fixed when we have edge-triggered checks. - fallthrough case streamStateEstablished: - stream.mu.Unlock() break ACCEPT_ESTABLISH_LOOP default: defer stream.mu.Unlock() @@ -137,23 +139,23 @@ func (m *MuxConn) Dial(id uint32) (io.ReadWriteCloser, error) { return nil, err } stream.setState(streamStateSynSent) + + // Register a state change listener to wait for changes + stateCh := make(chan streamState, 10) + stream.registerStateListener(stateCh) + defer func() { + stream.mu.Lock() + defer stream.mu.Unlock() + stream.deregisterStateListener(stateCh) + }() + stream.mu.Unlock() for { - time.Sleep(50 * time.Millisecond) - stream.mu.Lock() - switch stream.state { + state := <-stateCh + switch state { case streamStateSynSent: - stream.mu.Unlock() - case streamStateClosed: - // This can happen if it becomes established, some data is sent, - // and it closed all within the time period we wait above. - // This case will be fixed when we have edge-triggered checks. - fallthrough - case streamStateCloseWait: - fallthrough case streamStateEstablished: - stream.mu.Unlock() return stream, nil default: defer stream.mu.Unlock() @@ -203,10 +205,11 @@ func (m *MuxConn) openStream(id uint32) (*Stream, error) { // Set the data channel so we can write to it. stream := &Stream{ - id: id, - mux: m, - reader: dataR, - writeCh: writeCh, + id: id, + mux: m, + reader: dataR, + writeCh: writeCh, + stateChange: make(map[chan<- streamState]struct{}), } stream.setState(streamStateClosed) @@ -364,6 +367,7 @@ type Stream struct { mux *MuxConn reader io.Reader state streamState + stateChange map[chan<- streamState]struct{} stateUpdated time.Time mu sync.Mutex writeCh chan<- []byte @@ -421,7 +425,21 @@ func (s *Stream) remoteClose() { s.writeCh <- nil } +func (s *Stream) registerStateListener(ch chan<- streamState) { + s.stateChange[ch] = struct{}{} +} + +func (s *Stream) deregisterStateListener(ch chan<- streamState) { + delete(s.stateChange, ch) +} + func (s *Stream) setState(state streamState) { s.state = state s.stateUpdated = time.Now().UTC() + for ch, _ := range s.stateChange { + select { + case ch <- state: + default: + } + } }