From bd6fbc05ebd7d60f94f855589a8d52c78d95a58c Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Tue, 10 Dec 2013 12:23:42 -0800 Subject: [PATCH] packer/rpc: environment --- packer/rpc/client.go | 7 ++ packer/rpc/command.go | 2 +- packer/rpc/environment.go | 150 ++++++++++++++++----------------- packer/rpc/environment_test.go | 16 ++-- packer/rpc/server.go | 2 +- packer/rpc/server_new.go | 8 ++ 6 files changed, 95 insertions(+), 90 deletions(-) diff --git a/packer/rpc/client.go b/packer/rpc/client.go index d82a20c7a..237cb0c88 100644 --- a/packer/rpc/client.go +++ b/packer/rpc/client.go @@ -72,6 +72,13 @@ func (c *Client) Communicator() packer.Communicator { } } +func (c *Client) Environment() packer.Environment { + return &Environment{ + client: c.client, + mux: c.mux, + } +} + func (c *Client) Hook() packer.Hook { return &hook{ client: c.client, diff --git a/packer/rpc/command.go b/packer/rpc/command.go index 4d45a2ec1..945b8d58b 100644 --- a/packer/rpc/command.go +++ b/packer/rpc/command.go @@ -73,7 +73,7 @@ func (c *CommandServer) Run(args *CommandRunArgs, reply *int) error { return err } - env := &Environment{client} + env := &Environment{client: client} *reply = c.command.Run(env, args.Args) return nil diff --git a/packer/rpc/environment.go b/packer/rpc/environment.go index aaf0db412..1aa3b9abb 100644 --- a/packer/rpc/environment.go +++ b/packer/rpc/environment.go @@ -2,6 +2,7 @@ package rpc import ( "github.com/mitchellh/packer/packer" + "log" "net/rpc" ) @@ -9,12 +10,14 @@ import ( // where the actual environment is executed over an RPC connection. type Environment struct { client *rpc.Client + mux *MuxConn } // A EnvironmentServer wraps a packer.Environment and makes it exportable // as part of a Golang RPC server. type EnvironmentServer struct { env packer.Environment + mux *MuxConn } type EnvironmentCliArgs struct { @@ -22,33 +25,32 @@ type EnvironmentCliArgs struct { } func (e *Environment) Builder(name string) (b packer.Builder, err error) { - var reply string - err = e.client.Call("Environment.Builder", name, &reply) + var streamId uint32 + err = e.client.Call("Environment.Builder", name, &streamId) if err != nil { return } - client, err := rpcDial(reply) + client, err := NewClientWithMux(e.mux, streamId) if err != nil { - return + return nil, err } - - b = Builder(client) + b = client.Builder() return } func (e *Environment) Cache() packer.Cache { - var reply string - if err := e.client.Call("Environment.Cache", new(interface{}), &reply); err != nil { + var streamId uint32 + if err := e.client.Call("Environment.Cache", new(interface{}), &streamId); err != nil { panic(err) } - client, err := rpcDial(reply) + client, err := NewClientWithMux(e.mux, streamId) if err != nil { - panic(err) + log.Printf("[ERR] Error getting cache client: %s", err) + return nil } - - return Cache(client) + return client.Cache() } func (e *Environment) Cli(args []string) (result int, err error) { @@ -58,85 +60,81 @@ func (e *Environment) Cli(args []string) (result int, err error) { } func (e *Environment) Hook(name string) (h packer.Hook, err error) { - var reply string - err = e.client.Call("Environment.Hook", name, &reply) + var streamId uint32 + err = e.client.Call("Environment.Hook", name, &streamId) if err != nil { return } - client, err := rpcDial(reply) + client, err := NewClientWithMux(e.mux, streamId) if err != nil { - return + return nil, err } - - h = Hook(client) - return + return client.Hook(), nil } func (e *Environment) PostProcessor(name string) (p packer.PostProcessor, err error) { - var reply string - err = e.client.Call("Environment.PostProcessor", name, &reply) + var streamId uint32 + err = e.client.Call("Environment.PostProcessor", name, &streamId) if err != nil { return } - client, err := rpcDial(reply) + client, err := NewClientWithMux(e.mux, streamId) if err != nil { - return + return nil, err } - - p = PostProcessor(client) + p = client.PostProcessor() return } func (e *Environment) Provisioner(name string) (p packer.Provisioner, err error) { - var reply string - err = e.client.Call("Environment.Provisioner", name, &reply) + var streamId uint32 + err = e.client.Call("Environment.Provisioner", name, &streamId) if err != nil { return } - client, err := rpcDial(reply) + client, err := NewClientWithMux(e.mux, streamId) if err != nil { - return + return nil, err } - - p = Provisioner(client) + p = client.Provisioner() return } func (e *Environment) Ui() packer.Ui { - var reply string - e.client.Call("Environment.Ui", new(interface{}), &reply) + var streamId uint32 + e.client.Call("Environment.Ui", new(interface{}), &streamId) - client, err := rpcDial(reply) + client, err := NewClientWithMux(e.mux, streamId) if err != nil { - panic(err) + log.Printf("[ERR] Error connecting to Ui: %s", err) + return nil } - - return &Ui{client: client} + return client.Ui() } -func (e *EnvironmentServer) Builder(name *string, reply *string) error { - builder, err := e.env.Builder(*name) +func (e *EnvironmentServer) Builder(name string, reply *uint32) error { + builder, err := e.env.Builder(name) if err != nil { - return err + return NewBasicError(err) } - // Wrap it - server := rpc.NewServer() - RegisterBuilder(server, builder) - - *reply = serveSingleConn(server) + *reply = e.mux.NextId() + server := NewServerWithMux(e.mux, *reply) + server.RegisterBuilder(builder) + go server.Serve() return nil } -func (e *EnvironmentServer) Cache(args *interface{}, reply *string) error { +func (e *EnvironmentServer) Cache(args *interface{}, reply *uint32) error { cache := e.env.Cache() - server := rpc.NewServer() - RegisterCache(server, cache) - *reply = serveSingleConn(server) + *reply = e.mux.NextId() + server := NewServerWithMux(e.mux, *reply) + server.RegisterCache(cache) + go server.Serve() return nil } @@ -145,53 +143,51 @@ func (e *EnvironmentServer) Cli(args *EnvironmentCliArgs, reply *int) (err error return } -func (e *EnvironmentServer) Hook(name *string, reply *string) error { - hook, err := e.env.Hook(*name) +func (e *EnvironmentServer) Hook(name string, reply *uint32) error { + hook, err := e.env.Hook(name) if err != nil { - return err + return NewBasicError(err) } - // Wrap it - server := rpc.NewServer() - RegisterHook(server, hook) - - *reply = serveSingleConn(server) + *reply = e.mux.NextId() + server := NewServerWithMux(e.mux, *reply) + server.RegisterHook(hook) + go server.Serve() return nil } -func (e *EnvironmentServer) PostProcessor(name *string, reply *string) error { - pp, err := e.env.PostProcessor(*name) +func (e *EnvironmentServer) PostProcessor(name string, reply *uint32) error { + pp, err := e.env.PostProcessor(name) if err != nil { - return err + return NewBasicError(err) } - server := rpc.NewServer() - RegisterPostProcessor(server, pp) - - *reply = serveSingleConn(server) + *reply = e.mux.NextId() + server := NewServerWithMux(e.mux, *reply) + server.RegisterPostProcessor(pp) + go server.Serve() return nil } -func (e *EnvironmentServer) Provisioner(name *string, reply *string) error { - prov, err := e.env.Provisioner(*name) +func (e *EnvironmentServer) Provisioner(name string, reply *uint32) error { + prov, err := e.env.Provisioner(name) if err != nil { - return err + return NewBasicError(err) } - server := rpc.NewServer() - RegisterProvisioner(server, prov) - - *reply = serveSingleConn(server) + *reply = e.mux.NextId() + server := NewServerWithMux(e.mux, *reply) + server.RegisterProvisioner(prov) + go server.Serve() return nil } -func (e *EnvironmentServer) Ui(args *interface{}, reply *string) error { +func (e *EnvironmentServer) Ui(args *interface{}, reply *uint32) error { ui := e.env.Ui() - // Wrap it - server := rpc.NewServer() - RegisterUi(server, ui) - - *reply = serveSingleConn(server) + *reply = e.mux.NextId() + server := NewServerWithMux(e.mux, *reply) + server.RegisterUi(ui) + go server.Serve() return nil } diff --git a/packer/rpc/environment_test.go b/packer/rpc/environment_test.go index cb80929ec..bd0a6784a 100644 --- a/packer/rpc/environment_test.go +++ b/packer/rpc/environment_test.go @@ -2,7 +2,6 @@ package rpc import ( "github.com/mitchellh/packer/packer" - "net/rpc" "reflect" "testing" ) @@ -69,16 +68,11 @@ func TestEnvironmentRPC(t *testing.T) { e := &testEnvironment{} // Start the server - server := rpc.NewServer() - RegisterEnvironment(server, e) - address := serveSingleConn(server) - - // Create the client over RPC and run some methods to verify it works - client, err := rpc.Dial("tcp", address) - if err != nil { - t.Fatalf("err: %s", err) - } - eClient := &Environment{client} + client, server := testClientServer(t) + defer client.Close() + defer server.Close() + server.RegisterEnvironment(e) + eClient := client.Environment() // Test Builder builder, _ := eClient.Builder("foo") diff --git a/packer/rpc/server.go b/packer/rpc/server.go index 32190a2fe..838405dd2 100644 --- a/packer/rpc/server.go +++ b/packer/rpc/server.go @@ -44,7 +44,7 @@ func RegisterCommunicator(s *rpc.Server, c packer.Communicator) { // Registers the appropriate endpoint on an RPC server to serve a // Packer Environment func RegisterEnvironment(s *rpc.Server, e packer.Environment) { - registerComponent(s, "Environment", &EnvironmentServer{e}, false) + registerComponent(s, "Environment", &EnvironmentServer{env: e}, false) } // Registers the appropriate endpoint on an RPC server to serve a diff --git a/packer/rpc/server_new.go b/packer/rpc/server_new.go index 53e4d0ddd..ac4c6b0d1 100644 --- a/packer/rpc/server_new.go +++ b/packer/rpc/server_new.go @@ -17,6 +17,7 @@ const ( DefaultCacheEndpoint = "Cache" DefaultCommandEndpoint = "Command" DefaultCommunicatorEndpoint = "Communicator" + DefaultEnvironmentEndpoint = "Environment" DefaultHookEndpoint = "Hook" DefaultPostProcessorEndpoint = "PostProcessor" DefaultProvisionerEndpoint = "Provisioner" @@ -81,6 +82,13 @@ func (s *Server) RegisterCommunicator(c packer.Communicator) { }) } +func (s *Server) RegisterEnvironment(b packer.Environment) { + s.server.RegisterName(DefaultEnvironmentEndpoint, &EnvironmentServer{ + env: b, + mux: s.mux, + }) +} + func (s *Server) RegisterHook(h packer.Hook) { s.server.RegisterName(DefaultHookEndpoint, &HookServer{ hook: h,