packer/rpc: fix crashes with big file uploads [GH-897]

This commit is contained in:
Mitchell Hashimoto 2014-02-21 14:24:22 -08:00
parent c67094fbb6
commit eacac89a24
3 changed files with 64 additions and 7 deletions

View File

@ -14,6 +14,7 @@ IMPROVEMENTS:
BUG FIXES: BUG FIXES:
* core: Fix crash case if blank parameters are given to Packer. [GH-832] * core: Fix crash case if blank parameters are given to Packer. [GH-832]
* core: Fix crash if big file uploads are done. [GH-897]
* builders/docker: user variables work properly. [GH-777] * builders/docker: user variables work properly. [GH-777]
* builder/virtualbox,vmware: iso\_checksum is not required if the * builder/virtualbox,vmware: iso\_checksum is not required if the
checksum type is "none" checksum type is "none"

View File

@ -400,6 +400,8 @@ func (m *MuxConn) loop() {
stream.mu.Unlock() stream.mu.Unlock()
case muxPacketData: case muxPacketData:
unlocked := false
stream.mu.Lock() stream.mu.Lock()
switch stream.state { switch stream.state {
case streamStateFinWait1: case streamStateFinWait1:
@ -408,19 +410,27 @@ func (m *MuxConn) loop() {
fallthrough fallthrough
case streamStateEstablished: case streamStateEstablished:
if len(data) > 0 { if len(data) > 0 {
select { // Get a reference to the write channel while we have
case stream.writeCh <- data: // the lock because otherwise the field might change.
default: // We unlock early here because the write might block
panic(fmt.Sprintf( // for a long time.
"Failed to write data, buffer full for stream %d", id)) writeCh := stream.writeCh
} stream.mu.Unlock()
unlocked = true
// Blocked write, this provides some backpressure on
// the connection if there is a lot of data incoming.
writeCh <- data
} }
default: default:
log.Printf("[ERR] Data received for stream in state: %d", stream.state) log.Printf("[ERR] Data received for stream in state: %d", stream.state)
} }
if !unlocked {
stream.mu.Unlock() stream.mu.Unlock()
} }
} }
}
} }
func (m *MuxConn) write(from muxPacketFrom, id uint32, dataType muxPacketType, p []byte) (int, error) { func (m *MuxConn) write(from muxPacketFrom, id uint32, dataType muxPacketType, p []byte) (int, error) {

View File

@ -114,6 +114,52 @@ func TestMuxConn(t *testing.T) {
<-doneCh <-doneCh
} }
func TestMuxConn_lotsOfData(t *testing.T) {
client, server := testMux(t)
defer client.Close()
defer server.Close()
// When the server is done
doneCh := make(chan struct{})
// The server side
go func() {
defer close(doneCh)
s0, err := server.Accept(0)
if err != nil {
t.Fatalf("err: %s", err)
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
data := readStream(t, s0)
if data != "hello" {
t.Fatalf("bad: %#v", data)
}
}()
wg.Wait()
}()
s0, err := client.Dial(0)
if err != nil {
t.Fatalf("err: %s", err)
}
for i := 0; i < 4096 * 4; i++ {
if _, err := s0.Write([]byte("hello")); err != nil {
t.Fatalf("err: %s", err)
}
}
// Wait for the server to be done
<-doneCh
}
// This tests that even when the client end is closed, data can be // This tests that even when the client end is closed, data can be
// read from the server. // read from the server.
func TestMuxConn_clientCloseRead(t *testing.T) { func TestMuxConn_clientCloseRead(t *testing.T) {