diff --git a/packer/rpc/artifact.go b/packer/rpc/artifact.go index 8ca3ef869..c6e0a208d 100644 --- a/packer/rpc/artifact.go +++ b/packer/rpc/artifact.go @@ -18,10 +18,6 @@ type ArtifactServer struct { artifact packer.Artifact } -func Artifact(client *rpc.Client) *artifact { - return &artifact{client: client} -} - func (a *artifact) BuilderId() (result string) { a.client.Call(a.endpoint+".BuilderId", new(interface{}), &result) return diff --git a/packer/rpc/artifact_test.go b/packer/rpc/artifact_test.go index d5405ab6e..37f4cef9f 100644 --- a/packer/rpc/artifact_test.go +++ b/packer/rpc/artifact_test.go @@ -37,5 +37,5 @@ func TestArtifactRPC(t *testing.T) { } func TestArtifact_Implements(t *testing.T) { - var _ packer.Artifact = Artifact(nil) + var _ packer.Artifact = new(artifact) } diff --git a/packer/rpc/build.go b/packer/rpc/build.go index 91988fae1..f63f35f76 100644 --- a/packer/rpc/build.go +++ b/packer/rpc/build.go @@ -24,10 +24,6 @@ type BuildPrepareResponse struct { Error error } -func Build(client *rpc.Client) *build { - return &build{client: client} -} - func (b *build) Name() (result string) { b.client.Call("Build.Name", new(interface{}), &result) return diff --git a/packer/rpc/build_test.go b/packer/rpc/build_test.go index ff79ee49e..6c18ad384 100644 --- a/packer/rpc/build_test.go +++ b/packer/rpc/build_test.go @@ -154,5 +154,5 @@ func TestBuildPrepare_Warnings(t *testing.T) { } func TestBuild_ImplementsBuild(t *testing.T) { - var _ packer.Build = Build(nil) + var _ packer.Build = new(build) } diff --git a/packer/rpc/builder.go b/packer/rpc/builder.go index 20c166a58..7721414e2 100644 --- a/packer/rpc/builder.go +++ b/packer/rpc/builder.go @@ -29,10 +29,6 @@ type BuilderPrepareResponse struct { Error error } -func Builder(client *rpc.Client) *builder { - return &builder{client: client} -} - func (b *builder) Prepare(config ...interface{}) ([]string, error) { var resp BuilderPrepareResponse cerr := b.client.Call("Builder.Prepare", &BuilderPrepareArgs{config}, &resp) diff --git a/packer/rpc/builder_test.go b/packer/rpc/builder_test.go index 22bbeea06..37e8041bb 100644 --- a/packer/rpc/builder_test.go +++ b/packer/rpc/builder_test.go @@ -141,5 +141,5 @@ func TestBuilderCancel(t *testing.T) { } func TestBuilder_ImplementsBuilder(t *testing.T) { - var _ packer.Builder = Builder(nil) + var _ packer.Builder = new(builder) } diff --git a/packer/rpc/cache.go b/packer/rpc/cache.go index 73b154cb6..184286411 100644 --- a/packer/rpc/cache.go +++ b/packer/rpc/cache.go @@ -17,10 +17,6 @@ type CacheServer struct { cache packer.Cache } -func Cache(client *rpc.Client) *cache { - return &cache{client} -} - type CacheRLockResponse struct { Path string Exists bool diff --git a/packer/rpc/cache_test.go b/packer/rpc/cache_test.go index a9a1f110f..702ca98e4 100644 --- a/packer/rpc/cache_test.go +++ b/packer/rpc/cache_test.go @@ -39,11 +39,7 @@ func (t *testCache) RUnlock(key string) { } func TestCache_Implements(t *testing.T) { - var raw interface{} - raw = Cache(nil) - if _, ok := raw.(packer.Cache); !ok { - t.Fatal("Cache must be a cache.") - } + var _ packer.Cache = new(cache) } func TestCacheRPC(t *testing.T) { diff --git a/packer/rpc/command.go b/packer/rpc/command.go index 6d70fdace..e0243e9bb 100644 --- a/packer/rpc/command.go +++ b/packer/rpc/command.go @@ -26,10 +26,6 @@ type CommandRunArgs struct { type CommandSynopsisArgs byte -func Command(client *rpc.Client) *command { - return &command{client: client} -} - func (c *command) Help() (result string) { err := c.client.Call("Command.Help", new(interface{}), &result) if err != nil { diff --git a/packer/rpc/command_test.go b/packer/rpc/command_test.go index a6c3c9592..7ba433956 100644 --- a/packer/rpc/command_test.go +++ b/packer/rpc/command_test.go @@ -67,5 +67,5 @@ func TestRPCCommand(t *testing.T) { } func TestCommand_Implements(t *testing.T) { - var _ packer.Command = Command(nil) + var _ packer.Command = new(command) } diff --git a/packer/rpc/hook.go b/packer/rpc/hook.go index 8162dd4f2..18682fac6 100644 --- a/packer/rpc/hook.go +++ b/packer/rpc/hook.go @@ -26,10 +26,6 @@ type HookRunArgs struct { StreamId uint32 } -func Hook(client *rpc.Client) *hook { - return &hook{client: client} -} - func (h *hook) Run(name string, ui packer.Ui, comm packer.Communicator, data interface{}) error { nextId := h.mux.NextId() server := NewServerWithMux(h.mux, nextId) diff --git a/packer/rpc/provisioner.go b/packer/rpc/provisioner.go index eafd65756..7ddb32ec3 100644 --- a/packer/rpc/provisioner.go +++ b/packer/rpc/provisioner.go @@ -24,9 +24,6 @@ type ProvisionerPrepareArgs struct { Configs []interface{} } -func Provisioner(client *rpc.Client) *provisioner { - return &provisioner{client: client} -} func (p *provisioner) Prepare(configs ...interface{}) (err error) { args := &ProvisionerPrepareArgs{configs} if cerr := p.client.Call("Provisioner.Prepare", args, &err); cerr != nil { diff --git a/packer/rpc/provisioner_test.go b/packer/rpc/provisioner_test.go index 5d1604b74..a17e0694c 100644 --- a/packer/rpc/provisioner_test.go +++ b/packer/rpc/provisioner_test.go @@ -43,5 +43,5 @@ func TestProvisionerRPC(t *testing.T) { } func TestProvisioner_Implements(t *testing.T) { - var _ packer.Provisioner = Provisioner(nil) + var _ packer.Provisioner = new(provisioner) } diff --git a/packer/rpc/server.go b/packer/rpc/server.go index 5a08f6a73..bf194bd6c 100644 --- a/packer/rpc/server.go +++ b/packer/rpc/server.go @@ -1,88 +1,155 @@ package rpc import ( + "fmt" "github.com/mitchellh/packer/packer" + "io" + "log" "net/rpc" + "sync/atomic" ) -// Registers the appropriate endpoint on an RPC server to serve an -// Artifact. -func RegisterArtifact(s *rpc.Server, a packer.Artifact) { - registerComponent(s, "Artifact", &ArtifactServer{a}, false) +var endpointId uint64 + +const ( + DefaultArtifactEndpoint string = "Artifact" + DefaultBuildEndpoint = "Build" + DefaultBuilderEndpoint = "Builder" + DefaultCacheEndpoint = "Cache" + DefaultCommandEndpoint = "Command" + DefaultCommunicatorEndpoint = "Communicator" + DefaultEnvironmentEndpoint = "Environment" + DefaultHookEndpoint = "Hook" + DefaultPostProcessorEndpoint = "PostProcessor" + DefaultProvisionerEndpoint = "Provisioner" + DefaultUiEndpoint = "Ui" +) + +// Server represents an RPC server for Packer. This must be paired on +// the other side with a Client. +type Server struct { + mux *MuxConn + streamId uint32 + server *rpc.Server } -// Registers the appropriate endpoint on an RPC server to serve a -// Packer Build. -func RegisterBuild(s *rpc.Server, b packer.Build) { - registerComponent(s, "Build", &BuildServer{build: b}, false) +// NewServer returns a new Packer RPC server. +func NewServer(conn io.ReadWriteCloser) *Server { + return NewServerWithMux(NewMuxConn(conn), 0) } -// Registers the appropriate endpoint on an RPC server to serve a -// Packer Builder. -func RegisterBuilder(s *rpc.Server, b packer.Builder) { - registerComponent(s, "Builder", &BuilderServer{builder: b}, false) +func NewServerWithMux(mux *MuxConn, streamId uint32) *Server { + return &Server{ + mux: mux, + streamId: streamId, + server: rpc.NewServer(), + } } -// Registers the appropriate endpoint on an RPC server to serve a -// Packer Cache. -func RegisterCache(s *rpc.Server, c packer.Cache) { - registerComponent(s, "Cache", &CacheServer{c}, false) +func (s *Server) Close() error { + return s.mux.Close() } -// Registers the appropriate endpoint on an RPC server to serve a -// Packer Command. -func RegisterCommand(s *rpc.Server, c packer.Command) { - registerComponent(s, "Command", &CommandServer{command: c}, false) +func (s *Server) RegisterArtifact(a packer.Artifact) { + s.server.RegisterName(DefaultArtifactEndpoint, &ArtifactServer{ + artifact: a, + }) } -// Registers the appropriate endpoint on an RPC server to serve a -// Packer Communicator. -func RegisterCommunicator(s *rpc.Server, c packer.Communicator) { - registerComponent(s, "Communicator", &CommunicatorServer{c: c}, false) +func (s *Server) RegisterBuild(b packer.Build) { + s.server.RegisterName(DefaultBuildEndpoint, &BuildServer{ + build: b, + mux: s.mux, + }) } -// Registers the appropriate endpoint on an RPC server to serve a -// Packer Environment -func RegisterEnvironment(s *rpc.Server, e packer.Environment) { - registerComponent(s, "Environment", &EnvironmentServer{env: e}, false) +func (s *Server) RegisterBuilder(b packer.Builder) { + s.server.RegisterName(DefaultBuilderEndpoint, &BuilderServer{ + builder: b, + mux: s.mux, + }) } -// Registers the appropriate endpoint on an RPC server to serve a -// Hook. -func RegisterHook(s *rpc.Server, h packer.Hook) { - registerComponent(s, "Hook", &HookServer{hook: h}, false) +func (s *Server) RegisterCache(c packer.Cache) { + s.server.RegisterName(DefaultCacheEndpoint, &CacheServer{ + cache: c, + }) } -// Registers the appropriate endpoing on an RPC server to serve a -// PostProcessor. -func RegisterPostProcessor(s *rpc.Server, p packer.PostProcessor) { - registerComponent(s, "PostProcessor", &PostProcessorServer{p: p}, false) +func (s *Server) RegisterCommand(c packer.Command) { + s.server.RegisterName(DefaultCommandEndpoint, &CommandServer{ + command: c, + mux: s.mux, + }) } -// Registers the appropriate endpoint on an RPC server to serve a packer.Provisioner -func RegisterProvisioner(s *rpc.Server, p packer.Provisioner) { - registerComponent(s, "Provisioner", &ProvisionerServer{p: p}, false) +func (s *Server) RegisterCommunicator(c packer.Communicator) { + s.server.RegisterName(DefaultCommunicatorEndpoint, &CommunicatorServer{ + c: c, + mux: s.mux, + }) } -// Registers the appropriate endpoint on an RPC server to serve a -// Packer UI -func RegisterUi(s *rpc.Server, ui packer.Ui) { - registerComponent(s, "Ui", &UiServer{ui}, false) +func (s *Server) RegisterEnvironment(b packer.Environment) { + s.server.RegisterName(DefaultEnvironmentEndpoint, &EnvironmentServer{ + env: b, + mux: s.mux, + }) } -func serveSingleConn(s *rpc.Server) string { - l := netListenerInRange(portRangeMin, portRangeMax) - - // Accept a single connection in a goroutine and then exit - go func() { - defer l.Close() - conn, err := l.Accept() - if err != nil { - panic(err) - } - - s.ServeConn(conn) - }() - - return l.Addr().String() +func (s *Server) RegisterHook(h packer.Hook) { + s.server.RegisterName(DefaultHookEndpoint, &HookServer{ + hook: h, + mux: s.mux, + }) +} + +func (s *Server) RegisterPostProcessor(p packer.PostProcessor) { + s.server.RegisterName(DefaultPostProcessorEndpoint, &PostProcessorServer{ + mux: s.mux, + p: p, + }) +} + +func (s *Server) RegisterProvisioner(p packer.Provisioner) { + s.server.RegisterName(DefaultProvisionerEndpoint, &ProvisionerServer{ + mux: s.mux, + p: p, + }) +} + +func (s *Server) RegisterUi(ui packer.Ui) { + s.server.RegisterName(DefaultUiEndpoint, &UiServer{ + ui: ui, + }) +} + +// ServeConn serves a single connection over the RPC server. It is up +// to the caller to obtain a proper io.ReadWriteCloser. +func (s *Server) Serve() { + // Accept a connection on stream ID 0, which is always used for + // normal client to server connections. + stream, err := s.mux.Accept(s.streamId) + defer stream.Close() + if err != nil { + log.Printf("[ERR] Error retrieving stream for serving: %s", err) + return + } + + s.server.ServeConn(stream) +} + +// registerComponent registers a single Packer RPC component onto +// the RPC server. If id is true, then a unique ID number will be appended +// onto the end of the endpoint. +// +// The endpoint name is returned. +func registerComponent(server *rpc.Server, name string, rcvr interface{}, id bool) string { + endpoint := name + if id { + fmt.Sprintf("%s.%d", endpoint, atomic.AddUint64(&endpointId, 1)) + } + + server.RegisterName(endpoint, rcvr) + return endpoint } diff --git a/packer/rpc/server_new.go b/packer/rpc/server_new.go deleted file mode 100644 index bf194bd6c..000000000 --- a/packer/rpc/server_new.go +++ /dev/null @@ -1,155 +0,0 @@ -package rpc - -import ( - "fmt" - "github.com/mitchellh/packer/packer" - "io" - "log" - "net/rpc" - "sync/atomic" -) - -var endpointId uint64 - -const ( - DefaultArtifactEndpoint string = "Artifact" - DefaultBuildEndpoint = "Build" - DefaultBuilderEndpoint = "Builder" - DefaultCacheEndpoint = "Cache" - DefaultCommandEndpoint = "Command" - DefaultCommunicatorEndpoint = "Communicator" - DefaultEnvironmentEndpoint = "Environment" - DefaultHookEndpoint = "Hook" - DefaultPostProcessorEndpoint = "PostProcessor" - DefaultProvisionerEndpoint = "Provisioner" - DefaultUiEndpoint = "Ui" -) - -// Server represents an RPC server for Packer. This must be paired on -// the other side with a Client. -type Server struct { - mux *MuxConn - streamId uint32 - server *rpc.Server -} - -// NewServer returns a new Packer RPC server. -func NewServer(conn io.ReadWriteCloser) *Server { - return NewServerWithMux(NewMuxConn(conn), 0) -} - -func NewServerWithMux(mux *MuxConn, streamId uint32) *Server { - return &Server{ - mux: mux, - streamId: streamId, - server: rpc.NewServer(), - } -} - -func (s *Server) Close() error { - return s.mux.Close() -} - -func (s *Server) RegisterArtifact(a packer.Artifact) { - s.server.RegisterName(DefaultArtifactEndpoint, &ArtifactServer{ - artifact: a, - }) -} - -func (s *Server) RegisterBuild(b packer.Build) { - s.server.RegisterName(DefaultBuildEndpoint, &BuildServer{ - build: b, - mux: s.mux, - }) -} - -func (s *Server) RegisterBuilder(b packer.Builder) { - s.server.RegisterName(DefaultBuilderEndpoint, &BuilderServer{ - builder: b, - mux: s.mux, - }) -} - -func (s *Server) RegisterCache(c packer.Cache) { - s.server.RegisterName(DefaultCacheEndpoint, &CacheServer{ - cache: c, - }) -} - -func (s *Server) RegisterCommand(c packer.Command) { - s.server.RegisterName(DefaultCommandEndpoint, &CommandServer{ - command: c, - mux: s.mux, - }) -} - -func (s *Server) RegisterCommunicator(c packer.Communicator) { - s.server.RegisterName(DefaultCommunicatorEndpoint, &CommunicatorServer{ - c: c, - mux: s.mux, - }) -} - -func (s *Server) RegisterEnvironment(b packer.Environment) { - s.server.RegisterName(DefaultEnvironmentEndpoint, &EnvironmentServer{ - env: b, - mux: s.mux, - }) -} - -func (s *Server) RegisterHook(h packer.Hook) { - s.server.RegisterName(DefaultHookEndpoint, &HookServer{ - hook: h, - mux: s.mux, - }) -} - -func (s *Server) RegisterPostProcessor(p packer.PostProcessor) { - s.server.RegisterName(DefaultPostProcessorEndpoint, &PostProcessorServer{ - mux: s.mux, - p: p, - }) -} - -func (s *Server) RegisterProvisioner(p packer.Provisioner) { - s.server.RegisterName(DefaultProvisionerEndpoint, &ProvisionerServer{ - mux: s.mux, - p: p, - }) -} - -func (s *Server) RegisterUi(ui packer.Ui) { - s.server.RegisterName(DefaultUiEndpoint, &UiServer{ - ui: ui, - }) -} - -// ServeConn serves a single connection over the RPC server. It is up -// to the caller to obtain a proper io.ReadWriteCloser. -func (s *Server) Serve() { - // Accept a connection on stream ID 0, which is always used for - // normal client to server connections. - stream, err := s.mux.Accept(s.streamId) - defer stream.Close() - if err != nil { - log.Printf("[ERR] Error retrieving stream for serving: %s", err) - return - } - - s.server.ServeConn(stream) -} - -// registerComponent registers a single Packer RPC component onto -// the RPC server. If id is true, then a unique ID number will be appended -// onto the end of the endpoint. -// -// The endpoint name is returned. -func registerComponent(server *rpc.Server, name string, rcvr interface{}, id bool) string { - endpoint := name - if id { - fmt.Sprintf("%s.%d", endpoint, atomic.AddUint64(&endpointId, 1)) - } - - server.RegisterName(endpoint, rcvr) - return endpoint -}