From a036bec96e69f3ebe21f13251a7b57453f006161 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Tue, 10 Dec 2013 11:50:30 -0800 Subject: [PATCH] packer/rpc: Hook --- packer/rpc/client.go | 7 +++++++ packer/rpc/hook.go | 35 ++++++++++++++++++++++------------- packer/rpc/hook_test.go | 33 ++++++++++----------------------- packer/rpc/server.go | 2 +- packer/rpc/server_new.go | 8 ++++++++ 5 files changed, 48 insertions(+), 37 deletions(-) diff --git a/packer/rpc/client.go b/packer/rpc/client.go index cc657741e..c201ebe19 100644 --- a/packer/rpc/client.go +++ b/packer/rpc/client.go @@ -58,6 +58,13 @@ func (c *Client) Communicator() packer.Communicator { } } +func (c *Client) Hook() packer.Hook { + return &hook{ + client: c.client, + mux: c.mux, + } +} + func (c *Client) PostProcessor() packer.PostProcessor { return &postProcessor{ client: c.client, diff --git a/packer/rpc/hook.go b/packer/rpc/hook.go index cede08302..8162dd4f2 100644 --- a/packer/rpc/hook.go +++ b/packer/rpc/hook.go @@ -10,32 +10,40 @@ import ( // over an RPC connection. type hook struct { client *rpc.Client + mux *MuxConn } // HookServer wraps a packer.Hook implementation and makes it exportable // as part of a Golang RPC server. type HookServer struct { hook packer.Hook + mux *MuxConn } type HookRunArgs struct { - Name string - Data interface{} - RPCAddress string + Name string + Data interface{} + StreamId uint32 } func Hook(client *rpc.Client) *hook { - return &hook{client} + return &hook{client: client} } func (h *hook) Run(name string, ui packer.Ui, comm packer.Communicator, data interface{}) error { - server := rpc.NewServer() - RegisterCommunicator(server, comm) - RegisterUi(server, ui) - address := serveSingleConn(server) + nextId := h.mux.NextId() + server := NewServerWithMux(h.mux, nextId) + server.RegisterCommunicator(comm) + server.RegisterUi(ui) + go server.Serve() - args := &HookRunArgs{name, data, address} - return h.client.Call("Hook.Run", args, new(interface{})) + args := HookRunArgs{ + Name: name, + Data: data, + StreamId: nextId, + } + + return h.client.Call("Hook.Run", &args, new(interface{})) } func (h *hook) Cancel() { @@ -46,12 +54,13 @@ func (h *hook) Cancel() { } func (h *HookServer) Run(args *HookRunArgs, reply *interface{}) error { - client, err := rpcDial(args.RPCAddress) + client, err := NewClientWithMux(h.mux, args.StreamId) if err != nil { - return err + return NewBasicError(err) } + defer client.Close() - if err := h.hook.Run(args.Name, &Ui{client: client}, Communicator(client), args.Data); err != nil { + if err := h.hook.Run(args.Name, client.Ui(), client.Communicator(), args.Data); err != nil { return NewBasicError(err) } diff --git a/packer/rpc/hook_test.go b/packer/rpc/hook_test.go index c7ffc7258..b3f4a420c 100644 --- a/packer/rpc/hook_test.go +++ b/packer/rpc/hook_test.go @@ -2,7 +2,6 @@ package rpc import ( "github.com/mitchellh/packer/packer" - "net/rpc" "reflect" "sync" "testing" @@ -14,17 +13,11 @@ func TestHookRPC(t *testing.T) { h := new(packer.MockHook) // Serve - server := rpc.NewServer() - RegisterHook(server, h) - address := serveSingleConn(server) - - // Create the client over RPC and run some methods to verify it works - client, err := rpc.Dial("tcp", address) - if err != nil { - t.Fatalf("err: %s", err) - } - - hClient := Hook(client) + client, server := testClientServer(t) + defer client.Close() + defer server.Close() + server.RegisterHook(h) + hClient := client.Hook() // Test Run ui := &testUi{} @@ -60,17 +53,11 @@ func TestHook_cancelWhileRun(t *testing.T) { } // Serve - server := rpc.NewServer() - RegisterHook(server, h) - address := serveSingleConn(server) - - // Create the client over RPC and run some methods to verify it works - client, err := rpc.Dial("tcp", address) - if err != nil { - t.Fatalf("err: %s", err) - } - - hClient := Hook(client) + client, server := testClientServer(t) + defer client.Close() + defer server.Close() + server.RegisterHook(h) + hClient := client.Hook() // Start the run finished := make(chan struct{}) diff --git a/packer/rpc/server.go b/packer/rpc/server.go index 0d6cacf62..fc29c7e91 100644 --- a/packer/rpc/server.go +++ b/packer/rpc/server.go @@ -50,7 +50,7 @@ func RegisterEnvironment(s *rpc.Server, e packer.Environment) { // Registers the appropriate endpoint on an RPC server to serve a // Hook. func RegisterHook(s *rpc.Server, h packer.Hook) { - registerComponent(s, "Hook", &HookServer{h}, false) + registerComponent(s, "Hook", &HookServer{hook: h}, false) } // Registers the appropriate endpoing on an RPC server to serve a diff --git a/packer/rpc/server_new.go b/packer/rpc/server_new.go index 8a1d0a128..b8cb9fffa 100644 --- a/packer/rpc/server_new.go +++ b/packer/rpc/server_new.go @@ -15,6 +15,7 @@ const ( DefaultArtifactEndpoint string = "Artifact" DefaultCacheEndpoint = "Cache" DefaultCommunicatorEndpoint = "Communicator" + DefaultHookEndpoint = "Hook" DefaultPostProcessorEndpoint = "PostProcessor" DefaultUiEndpoint = "Ui" ) @@ -63,6 +64,13 @@ func (s *Server) RegisterCommunicator(c packer.Communicator) { }) } +func (s *Server) RegisterHook(h packer.Hook) { + s.server.RegisterName(DefaultHookEndpoint, &HookServer{ + hook: h, + mux: s.mux, + }) +} + func (s *Server) RegisterPostProcessor(p packer.PostProcessor) { s.server.RegisterName(DefaultPostProcessorEndpoint, &PostProcessorServer{ mux: s.mux,