From a8b056e93936827778ebeb91cf765c8624f7fb45 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Tue, 10 Dec 2013 13:18:48 -0800 Subject: [PATCH] packer/rpc: builds --- packer/rpc/build.go | 49 ++++++++++++++++++++-------------------- packer/rpc/build_test.go | 49 ++++++++++------------------------------ packer/rpc/client.go | 7 ++++++ packer/rpc/server.go | 2 +- packer/rpc/server_new.go | 8 +++++++ 5 files changed, 53 insertions(+), 62 deletions(-) diff --git a/packer/rpc/build.go b/packer/rpc/build.go index f2f7d48ab..91988fae1 100644 --- a/packer/rpc/build.go +++ b/packer/rpc/build.go @@ -9,16 +9,14 @@ import ( // over an RPC connection. type build struct { client *rpc.Client + mux *MuxConn } // BuildServer wraps a packer.Build implementation and makes it exportable // as part of a Golang RPC server. type BuildServer struct { build packer.Build -} - -type BuildRunArgs struct { - UiRPCAddress string + mux *MuxConn } type BuildPrepareResponse struct { @@ -27,7 +25,7 @@ type BuildPrepareResponse struct { } func Build(client *rpc.Client) *build { - return &build{client} + return &build{client: client} } func (b *build) Name() (result string) { @@ -45,25 +43,25 @@ func (b *build) Prepare(v map[string]string) ([]string, error) { } func (b *build) Run(ui packer.Ui, cache packer.Cache) ([]packer.Artifact, error) { - // Create and start the server for the UI - server := rpc.NewServer() - RegisterCache(server, cache) - RegisterUi(server, ui) - args := &BuildRunArgs{serveSingleConn(server)} + nextId := b.mux.NextId() + server := NewServerWithMux(b.mux, nextId) + server.RegisterCache(cache) + server.RegisterUi(ui) + go server.Serve() - var result []string - if err := b.client.Call("Build.Run", args, &result); err != nil { + var result []uint32 + if err := b.client.Call("Build.Run", nextId, &result); err != nil { return nil, err } artifacts := make([]packer.Artifact, len(result)) - for i, addr := range result { - client, err := rpcDial(addr) + for i, streamId := range result { + client, err := NewClientWithMux(b.mux, streamId) if err != nil { return nil, err } - artifacts[i] = Artifact(client) + artifacts[i] = client.Artifact() } return artifacts, nil @@ -101,23 +99,26 @@ func (b *BuildServer) Prepare(v map[string]string, resp *BuildPrepareResponse) e return nil } -func (b *BuildServer) Run(args *BuildRunArgs, reply *[]string) error { - client, err := rpcDial(args.UiRPCAddress) +func (b *BuildServer) Run(streamId uint32, reply *[]uint32) error { + client, err := NewClientWithMux(b.mux, streamId) if err != nil { - return err + return NewBasicError(err) } + defer client.Close() - ui := &Ui{client: client} - artifacts, err := b.build.Run(ui, Cache(client)) + artifacts, err := b.build.Run(client.Ui(), client.Cache()) if err != nil { return NewBasicError(err) } - *reply = make([]string, len(artifacts)) + *reply = make([]uint32, len(artifacts)) for i, artifact := range artifacts { - server := rpc.NewServer() - RegisterArtifact(server, artifact) - (*reply)[i] = serveSingleConn(server) + streamId := b.mux.NextId() + server := NewServerWithMux(b.mux, streamId) + server.RegisterArtifact(artifact) + go server.Serve() + + (*reply)[i] = streamId } return nil diff --git a/packer/rpc/build_test.go b/packer/rpc/build_test.go index 60c2c0111..ff79ee49e 100644 --- a/packer/rpc/build_test.go +++ b/packer/rpc/build_test.go @@ -3,7 +3,6 @@ package rpc import ( "errors" "github.com/mitchellh/packer/packer" - "net/rpc" "reflect" "testing" ) @@ -60,25 +59,13 @@ func (b *testBuild) Cancel() { b.cancelCalled = true } -func buildRPCClient(t *testing.T) (*testBuild, packer.Build) { - // Create the interface to test - b := new(testBuild) - - // Start the server - server := rpc.NewServer() - RegisterBuild(server, b) - address := serveSingleConn(server) - - // Create the client over RPC and run some methods to verify it works - client, err := rpc.Dial("tcp", address) - if err != nil { - t.Fatalf("err: %s", err) - } - return b, Build(client) -} - func TestBuild(t *testing.T) { - b, bClient := buildRPCClient(t) + b := new(testBuild) + client, server := testClientServer(t) + defer client.Close() + defer server.Close() + server.RegisterBuild(b) + bClient := client.Build() // Test Name bClient.Name() @@ -120,23 +107,6 @@ func TestBuild(t *testing.T) { t.Fatalf("bad: %#v", artifacts) } - // Test the UI given to run, which should be fully functional - if b.runCalled { - b.runCache.Lock("foo") - if !cache.lockCalled { - t.Fatal("lock shuld be called") - } - - b.runUi.Say("format") - if !ui.sayCalled { - t.Fatal("say should be called") - } - - if ui.sayMessage != "format" { - t.Fatalf("bad: %#v", ui.sayMessage) - } - } - // Test run with an error b.errRunResult = true _, err = bClient.Run(ui, cache) @@ -164,7 +134,12 @@ func TestBuild(t *testing.T) { } func TestBuildPrepare_Warnings(t *testing.T) { - b, bClient := buildRPCClient(t) + b := new(testBuild) + client, server := testClientServer(t) + defer client.Close() + defer server.Close() + server.RegisterBuild(b) + bClient := client.Build() expected := []string{"foo"} b.prepareWarnings = expected diff --git a/packer/rpc/client.go b/packer/rpc/client.go index 237cb0c88..c9ddc5f57 100644 --- a/packer/rpc/client.go +++ b/packer/rpc/client.go @@ -45,6 +45,13 @@ func (c *Client) Artifact() packer.Artifact { } } +func (c *Client) Build() packer.Build { + return &build{ + client: c.client, + mux: c.mux, + } +} + func (c *Client) Builder() packer.Builder { return &builder{ client: c.client, diff --git a/packer/rpc/server.go b/packer/rpc/server.go index 838405dd2..5a08f6a73 100644 --- a/packer/rpc/server.go +++ b/packer/rpc/server.go @@ -14,7 +14,7 @@ func RegisterArtifact(s *rpc.Server, a packer.Artifact) { // Registers the appropriate endpoint on an RPC server to serve a // Packer Build. func RegisterBuild(s *rpc.Server, b packer.Build) { - registerComponent(s, "Build", &BuildServer{b}, false) + registerComponent(s, "Build", &BuildServer{build: b}, false) } // Registers the appropriate endpoint on an RPC server to serve a diff --git a/packer/rpc/server_new.go b/packer/rpc/server_new.go index ac4c6b0d1..bf194bd6c 100644 --- a/packer/rpc/server_new.go +++ b/packer/rpc/server_new.go @@ -13,6 +13,7 @@ var endpointId uint64 const ( DefaultArtifactEndpoint string = "Artifact" + DefaultBuildEndpoint = "Build" DefaultBuilderEndpoint = "Builder" DefaultCacheEndpoint = "Cache" DefaultCommandEndpoint = "Command" @@ -55,6 +56,13 @@ func (s *Server) RegisterArtifact(a packer.Artifact) { }) } +func (s *Server) RegisterBuild(b packer.Build) { + s.server.RegisterName(DefaultBuildEndpoint, &BuildServer{ + build: b, + mux: s.mux, + }) +} + func (s *Server) RegisterBuilder(b packer.Builder) { s.server.RegisterName(DefaultBuilderEndpoint, &BuilderServer{ builder: b,