From eacac89a246f045689171dbb2d228d677db7fac1 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Fri, 21 Feb 2014 14:24:22 -0800 Subject: [PATCH] packer/rpc: fix crashes with big file uploads [GH-897] --- CHANGELOG.md | 1 + packer/rpc/muxconn.go | 24 ++++++++++++++------ packer/rpc/muxconn_test.go | 46 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 935b002ef..1e5eae8f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ IMPROVEMENTS: BUG FIXES: * core: Fix crash case if blank parameters are given to Packer. [GH-832] +* core: Fix crash if big file uploads are done. [GH-897] * builders/docker: user variables work properly. [GH-777] * builder/virtualbox,vmware: iso\_checksum is not required if the checksum type is "none" diff --git a/packer/rpc/muxconn.go b/packer/rpc/muxconn.go index 17312f367..f449024b1 100644 --- a/packer/rpc/muxconn.go +++ b/packer/rpc/muxconn.go @@ -400,6 +400,8 @@ func (m *MuxConn) loop() { stream.mu.Unlock() case muxPacketData: + unlocked := false + stream.mu.Lock() switch stream.state { case streamStateFinWait1: @@ -408,17 +410,25 @@ func (m *MuxConn) loop() { fallthrough case streamStateEstablished: if len(data) > 0 { - select { - case stream.writeCh <- data: - default: - panic(fmt.Sprintf( - "Failed to write data, buffer full for stream %d", id)) - } + // Get a reference to the write channel while we have + // the lock because otherwise the field might change. + // We unlock early here because the write might block + // for a long time. + writeCh := stream.writeCh + stream.mu.Unlock() + unlocked = true + + // Blocked write, this provides some backpressure on + // the connection if there is a lot of data incoming. + writeCh <- data } default: log.Printf("[ERR] Data received for stream in state: %d", stream.state) } - stream.mu.Unlock() + + if !unlocked { + stream.mu.Unlock() + } } } } diff --git a/packer/rpc/muxconn_test.go b/packer/rpc/muxconn_test.go index 9e67454a4..c49786178 100644 --- a/packer/rpc/muxconn_test.go +++ b/packer/rpc/muxconn_test.go @@ -114,6 +114,52 @@ func TestMuxConn(t *testing.T) { <-doneCh } +func TestMuxConn_lotsOfData(t *testing.T) { + client, server := testMux(t) + defer client.Close() + defer server.Close() + + // When the server is done + doneCh := make(chan struct{}) + + // The server side + go func() { + defer close(doneCh) + + s0, err := server.Accept(0) + if err != nil { + t.Fatalf("err: %s", err) + } + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + data := readStream(t, s0) + if data != "hello" { + t.Fatalf("bad: %#v", data) + } + }() + + wg.Wait() + }() + + s0, err := client.Dial(0) + if err != nil { + t.Fatalf("err: %s", err) + } + + for i := 0; i < 4096 * 4; i++ { + if _, err := s0.Write([]byte("hello")); err != nil { + t.Fatalf("err: %s", err) + } + } + + // Wait for the server to be done + <-doneCh +} + // This tests that even when the client end is closed, data can be // read from the server. func TestMuxConn_clientCloseRead(t *testing.T) {