packer/rpc: edge-triggerd state changes for faster dial/accept

This commit is contained in:
Mitchell Hashimoto 2013-12-10 17:01:02 -08:00
parent d9f79b0ecc
commit f79daa0b1b
1 changed files with 44 additions and 26 deletions

View File

@ -84,23 +84,25 @@ func (m *MuxConn) Accept(id uint32) (io.ReadWriteCloser, error) {
if stream.state != streamStateEstablished {
// Go into the listening state
stream.setState(streamStateListen)
// Register a state change listener to wait for changes
stateCh := make(chan streamState, 10)
stream.registerStateListener(stateCh)
defer func() {
stream.mu.Lock()
defer stream.mu.Unlock()
stream.deregisterStateListener(stateCh)
}()
stream.mu.Unlock()
// Wait for the connection to establish
ACCEPT_ESTABLISH_LOOP:
for {
time.Sleep(50 * time.Millisecond)
stream.mu.Lock()
switch stream.state {
state := <-stateCh
switch state {
case streamStateListen:
stream.mu.Unlock()
case streamStateClosed:
// This can happen if it becomes established, some data is sent,
// and it closed all within the time period we wait above.
// This case will be fixed when we have edge-triggered checks.
fallthrough
case streamStateEstablished:
stream.mu.Unlock()
break ACCEPT_ESTABLISH_LOOP
default:
defer stream.mu.Unlock()
@ -137,23 +139,23 @@ func (m *MuxConn) Dial(id uint32) (io.ReadWriteCloser, error) {
return nil, err
}
stream.setState(streamStateSynSent)
// Register a state change listener to wait for changes
stateCh := make(chan streamState, 10)
stream.registerStateListener(stateCh)
defer func() {
stream.mu.Lock()
defer stream.mu.Unlock()
stream.deregisterStateListener(stateCh)
}()
stream.mu.Unlock()
for {
time.Sleep(50 * time.Millisecond)
stream.mu.Lock()
switch stream.state {
state := <-stateCh
switch state {
case streamStateSynSent:
stream.mu.Unlock()
case streamStateClosed:
// This can happen if it becomes established, some data is sent,
// and it closed all within the time period we wait above.
// This case will be fixed when we have edge-triggered checks.
fallthrough
case streamStateCloseWait:
fallthrough
case streamStateEstablished:
stream.mu.Unlock()
return stream, nil
default:
defer stream.mu.Unlock()
@ -203,10 +205,11 @@ func (m *MuxConn) openStream(id uint32) (*Stream, error) {
// Set the data channel so we can write to it.
stream := &Stream{
id: id,
mux: m,
reader: dataR,
writeCh: writeCh,
id: id,
mux: m,
reader: dataR,
writeCh: writeCh,
stateChange: make(map[chan<- streamState]struct{}),
}
stream.setState(streamStateClosed)
@ -364,6 +367,7 @@ type Stream struct {
mux *MuxConn
reader io.Reader
state streamState
stateChange map[chan<- streamState]struct{}
stateUpdated time.Time
mu sync.Mutex
writeCh chan<- []byte
@ -421,7 +425,21 @@ func (s *Stream) remoteClose() {
s.writeCh <- nil
}
func (s *Stream) registerStateListener(ch chan<- streamState) {
s.stateChange[ch] = struct{}{}
}
func (s *Stream) deregisterStateListener(ch chan<- streamState) {
delete(s.stateChange, ch)
}
func (s *Stream) setState(state streamState) {
s.state = state
s.stateUpdated = time.Now().UTC()
for ch, _ := range s.stateChange {
select {
case ch <- state:
default:
}
}
}