diff --git a/packer/rpc/muxconn.go b/packer/rpc/muxconn.go index d814b242c..93387587e 100644 --- a/packer/rpc/muxconn.go +++ b/packer/rpc/muxconn.go @@ -339,13 +339,18 @@ func (m *MuxConn) loop() { case muxPacketData: stream.mu.Lock() - if stream.state == streamStateEstablished { + switch stream.state { + case streamStateFinWait1: + fallthrough + case streamStateFinWait2: + fallthrough + case streamStateEstablished: select { case stream.writeCh <- data: default: panic(fmt.Sprintf("Failed to write data, buffer full for stream %d", id)) } - } else { + default: log.Printf("[ERR] Data received for stream in state: %d", stream.state) } stream.mu.Unlock() diff --git a/packer/rpc/muxconn_test.go b/packer/rpc/muxconn_test.go index f4ba59db1..446a7ce63 100644 --- a/packer/rpc/muxconn_test.go +++ b/packer/rpc/muxconn_test.go @@ -114,6 +114,51 @@ func TestMuxConn(t *testing.T) { <-doneCh } +// This tests that even when the client end is closed, data can be +// read from the server. +func TestMuxConn_clientCloseRead(t *testing.T) { + client, server := testMux(t) + defer client.Close() + defer server.Close() + + // This channel will be closed when we close + waitCh := make(chan struct{}) + + go func() { + conn, err := server.Accept(0) + if err != nil { + t.Fatalf("err: %s", err) + } + + <-waitCh + + _, err = conn.Write([]byte("foo")) + if err != nil { + t.Fatalf("err: %s", err) + } + + conn.Close() + }() + + s0, err := client.Dial(0) + if err != nil { + t.Fatalf("err: %s", err) + } + + if err := s0.Close(); err != nil { + t.Fatalf("bad: %s", err) + } + + // Close this to continue on on the server-side + close(waitCh) + + var data [1024]byte + n, err := s0.Read(data[:]) + if string(data[:n]) != "foo" { + t.Fatalf("bad: %#v", string(data[:n])) + } +} + func TestMuxConn_socketClose(t *testing.T) { client, server := testMux(t) defer client.Close()