diff --git a/packer/rpc/client.go b/packer/rpc/client.go index 99f0b296d..4dd9deb47 100644 --- a/packer/rpc/client.go +++ b/packer/rpc/client.go @@ -15,14 +15,12 @@ type Client struct { } func NewClient(rwc io.ReadWriteCloser) (*Client, error) { - // Create the MuxConn around the RWC and get the client to server stream. - // This is the primary stream that we use to communicate with the - // remote RPC server. On the remote side Server.ServeConn also listens - // on this stream ID. - mux := NewMuxConn(rwc) - clientConn, err := mux.Dial(0) + return NewClientWithMux(NewMuxConn(rwc), 0) +} + +func NewClientWithMux(mux *MuxConn, streamId uint32) (*Client, error) { + clientConn, err := mux.Dial(streamId) if err != nil { - mux.Close() return nil, err } @@ -37,7 +35,7 @@ func (c *Client) Close() error { return err } - return c.mux.Close() + return nil } func (c *Client) Artifact() packer.Artifact { @@ -56,12 +54,13 @@ func (c *Client) Cache() packer.Cache { func (c *Client) PostProcessor() packer.PostProcessor { return &postProcessor{ client: c.client, + mux: c.mux, } } func (c *Client) Ui() packer.Ui { return &Ui{ - client: c.client, + client: c.client, endpoint: DefaultUiEndpoint, } } diff --git a/packer/rpc/muxconn.go b/packer/rpc/muxconn.go index 7247d4ed1..24e57fd63 100644 --- a/packer/rpc/muxconn.go +++ b/packer/rpc/muxconn.go @@ -21,7 +21,7 @@ type MuxConn struct { curId uint32 rwc io.ReadWriteCloser streams map[uint32]*Stream - mu sync.RWMutex + mu sync.Mutex wlock sync.Mutex } diff --git a/packer/rpc/post_processor.go b/packer/rpc/post_processor.go index 20e4f9b70..2a0dfe62a 100644 --- a/packer/rpc/post_processor.go +++ b/packer/rpc/post_processor.go @@ -9,14 +9,14 @@ import ( // executed over an RPC connection. type postProcessor struct { client *rpc.Client - server *rpc.Server + mux *MuxConn } // PostProcessorServer wraps a packer.PostProcessor implementation and makes it // exportable as part of a Golang RPC server. type PostProcessorServer struct { client *rpc.Client - server *rpc.Server + mux *MuxConn p packer.PostProcessor } @@ -24,15 +24,10 @@ type PostProcessorConfigureArgs struct { Configs []interface{} } -type PostProcessorPostProcessArgs struct { - ArtifactEndpoint string - UiEndpoint string -} - type PostProcessorProcessResponse struct { - Err error - Keep bool - ArtifactEndpoint string + Err error + Keep bool + StreamId uint32 } func PostProcessor(client *rpc.Client) *postProcessor { @@ -49,21 +44,14 @@ func (p *postProcessor) Configure(raw ...interface{}) (err error) { } func (p *postProcessor) PostProcess(ui packer.Ui, a packer.Artifact) (packer.Artifact, bool, error) { - artifactEndpoint := registerComponent(p.server, "Artifact", &ArtifactServer{ - artifact: a, - }, true) - - uiEndpoint := registerComponent(p.server, "Ui", &UiServer{ - ui: ui, - }, true) - - args := PostProcessorPostProcessArgs{ - ArtifactEndpoint: artifactEndpoint, - UiEndpoint: uiEndpoint, - } + nextId := p.mux.NextId() + server := NewServerWithMux(p.mux, nextId) + server.RegisterArtifact(a) + server.RegisterUi(ui) + go server.Serve() var response PostProcessorProcessResponse - if err := p.client.Call("PostProcessor.PostProcess", &args, &response); err != nil { + if err := p.client.Call("PostProcessor.PostProcess", nextId, &response); err != nil { return nil, false, err } @@ -71,14 +59,16 @@ func (p *postProcessor) PostProcess(ui packer.Ui, a packer.Artifact) (packer.Art return nil, false, response.Err } - if response.ArtifactEndpoint == "" { + if response.StreamId == 0 { return nil, false, nil } - return &artifact{ - client: p.client, - endpoint: response.ArtifactEndpoint, - }, response.Keep, nil + client, err := NewClientWithMux(p.mux, response.StreamId) + if err != nil { + return nil, false, err + } + + return client.Artifact(), response.Keep, nil } func (p *PostProcessorServer) Configure(args *PostProcessorConfigureArgs, reply *error) error { @@ -90,23 +80,20 @@ func (p *PostProcessorServer) Configure(args *PostProcessorConfigureArgs, reply return nil } -func (p *PostProcessorServer) PostProcess(args *PostProcessorPostProcessArgs, reply *PostProcessorProcessResponse) error { - artifact := &artifact{ - client: p.client, - endpoint: args.ArtifactEndpoint, +func (p *PostProcessorServer) PostProcess(streamId uint32, reply *PostProcessorProcessResponse) error { + client, err := NewClientWithMux(p.mux, streamId) + if err != nil { + return NewBasicError(err) } + defer client.Close() - ui := &Ui{ - client: p.client, - endpoint: args.UiEndpoint, - } - - var artifactEndpoint string - artifactResult, keep, err := p.p.PostProcess(ui, artifact) + streamId = 0 + artifactResult, keep, err := p.p.PostProcess(client.Ui(), client.Artifact()) if err == nil && artifactResult != nil { - artifactEndpoint = registerComponent(p.server, "Artifact", &ArtifactServer{ - artifact: artifactResult, - }, true) + streamId = p.mux.NextId() + server := NewServerWithMux(p.mux, streamId) + server.RegisterArtifact(artifactResult) + go server.Serve() } if err != nil { @@ -114,9 +101,9 @@ func (p *PostProcessorServer) PostProcess(args *PostProcessorPostProcessArgs, re } *reply = PostProcessorProcessResponse{ - Err: err, - Keep: keep, - ArtifactEndpoint: artifactEndpoint, + Err: err, + Keep: keep, + StreamId: streamId, } return nil diff --git a/packer/rpc/post_processor_test.go b/packer/rpc/post_processor_test.go index 418889ac5..d469ee4e0 100644 --- a/packer/rpc/post_processor_test.go +++ b/packer/rpc/post_processor_test.go @@ -13,6 +13,7 @@ type TestPostProcessor struct { configVal []interface{} ppCalled bool ppArtifact packer.Artifact + ppArtifactId string ppUi packer.Ui } @@ -25,6 +26,7 @@ func (pp *TestPostProcessor) Configure(v ...interface{}) error { func (pp *TestPostProcessor) PostProcess(ui packer.Ui, a packer.Artifact) (packer.Artifact, bool, error) { pp.ppCalled = true pp.ppArtifact = a + pp.ppArtifactId = a.Id() pp.ppUi = ui return testPostProcessorArtifact, false, nil } @@ -70,8 +72,8 @@ func TestPostProcessorRPC(t *testing.T) { t.Fatal("postprocess should be called") } - if p.ppArtifact.Id() != "ppTestId" { - t.Fatal("unknown artifact") + if p.ppArtifactId != "ppTestId" { + t.Fatalf("unknown artifact: %s", p.ppArtifact.Id()) } if artifact.Id() != "id" { diff --git a/packer/rpc/server_new.go b/packer/rpc/server_new.go index afed8fe58..4f531f187 100644 --- a/packer/rpc/server_new.go +++ b/packer/rpc/server_new.go @@ -21,15 +21,21 @@ const ( // Server represents an RPC server for Packer. This must be paired on // the other side with a Client. type Server struct { - mux *MuxConn - server *rpc.Server + mux *MuxConn + streamId uint32 + server *rpc.Server } // NewServer returns a new Packer RPC server. func NewServer(conn io.ReadWriteCloser) *Server { + return NewServerWithMux(NewMuxConn(conn), 0) +} + +func NewServerWithMux(mux *MuxConn, streamId uint32) *Server { return &Server{ - mux: NewMuxConn(conn), - server: rpc.NewServer(), + mux: mux, + streamId: streamId, + server: rpc.NewServer(), } } @@ -51,7 +57,8 @@ func (s *Server) RegisterCache(c packer.Cache) { func (s *Server) RegisterPostProcessor(p packer.PostProcessor) { s.server.RegisterName(DefaultPostProcessorEndpoint, &PostProcessorServer{ - p: p, + mux: s.mux, + p: p, }) } @@ -66,7 +73,7 @@ func (s *Server) RegisterUi(ui packer.Ui) { func (s *Server) Serve() { // Accept a connection on stream ID 0, which is always used for // normal client to server connections. - stream, err := s.mux.Accept(0) + stream, err := s.mux.Accept(s.streamId) defer stream.Close() if err != nil { log.Printf("[ERR] Error retrieving stream for serving: %s", err)