packer-cn/packer/rpc/server.go

170 lines
4.0 KiB
Go
Raw Normal View History

package rpc
import (
2013-12-10 16:26:07 -05:00
"fmt"
"github.com/mitchellh/packer/packer"
2014-04-26 16:31:22 -04:00
"github.com/ugorji/go/codec"
2013-12-10 16:26:07 -05:00
"io"
"log"
"net/rpc"
2013-12-10 16:26:07 -05:00
"sync/atomic"
)
2013-12-10 16:26:07 -05:00
var endpointId uint64
const (
DefaultArtifactEndpoint string = "Artifact"
2013-12-11 14:19:36 -05:00
DefaultBuildEndpoint = "Build"
2013-12-10 16:26:07 -05:00
DefaultBuilderEndpoint = "Builder"
DefaultCacheEndpoint = "Cache"
DefaultCommandEndpoint = "Command"
DefaultCommunicatorEndpoint = "Communicator"
DefaultEnvironmentEndpoint = "Environment"
DefaultHookEndpoint = "Hook"
DefaultPostProcessorEndpoint = "PostProcessor"
DefaultProvisionerEndpoint = "Provisioner"
DefaultUiEndpoint = "Ui"
)
// Server represents an RPC server for Packer. This must be paired on
// the other side with a Client.
type Server struct {
2014-09-02 17:23:06 -04:00
mux *muxBroker
2013-12-10 16:26:07 -05:00
streamId uint32
server *rpc.Server
closeMux bool
2013-05-22 01:10:21 -04:00
}
2013-12-10 16:26:07 -05:00
// NewServer returns a new Packer RPC server.
func NewServer(conn io.ReadWriteCloser) *Server {
2014-09-02 17:23:06 -04:00
mux, _ := newMuxBrokerServer(conn)
result := newServerWithMux(mux, 0)
result.closeMux = true
2014-09-02 17:23:06 -04:00
go mux.Run()
return result
}
2014-09-02 17:23:06 -04:00
func newServerWithMux(mux *muxBroker, streamId uint32) *Server {
2013-12-10 16:26:07 -05:00
return &Server{
mux: mux,
streamId: streamId,
server: rpc.NewServer(),
closeMux: false,
2013-12-10 16:26:07 -05:00
}
2013-05-04 16:47:11 -04:00
}
2013-12-10 16:26:07 -05:00
func (s *Server) Close() error {
if s.closeMux {
log.Printf("[WARN] Shutting down mux conn in Server")
return s.mux.Close()
}
return nil
2013-06-09 22:25:48 -04:00
}
2013-12-10 16:26:07 -05:00
func (s *Server) RegisterArtifact(a packer.Artifact) {
s.server.RegisterName(DefaultArtifactEndpoint, &ArtifactServer{
artifact: a,
})
}
2013-12-10 16:26:07 -05:00
func (s *Server) RegisterBuild(b packer.Build) {
s.server.RegisterName(DefaultBuildEndpoint, &BuildServer{
build: b,
mux: s.mux,
})
2013-05-12 17:47:55 -04:00
}
2013-12-10 16:26:07 -05:00
func (s *Server) RegisterBuilder(b packer.Builder) {
s.server.RegisterName(DefaultBuilderEndpoint, &BuilderServer{
builder: b,
mux: s.mux,
})
}
2013-12-10 16:26:07 -05:00
func (s *Server) RegisterCache(c packer.Cache) {
s.server.RegisterName(DefaultCacheEndpoint, &CacheServer{
cache: c,
})
2013-05-11 12:51:49 -04:00
}
2013-12-10 16:26:07 -05:00
func (s *Server) RegisterCommand(c packer.Command) {
s.server.RegisterName(DefaultCommandEndpoint, &CommandServer{
command: c,
mux: s.mux,
})
2013-06-18 16:44:57 -04:00
}
2013-12-10 16:26:07 -05:00
func (s *Server) RegisterCommunicator(c packer.Communicator) {
s.server.RegisterName(DefaultCommunicatorEndpoint, &CommunicatorServer{
c: c,
mux: s.mux,
})
2013-05-22 18:35:52 -04:00
}
2013-12-10 16:26:07 -05:00
func (s *Server) RegisterEnvironment(b packer.Environment) {
s.server.RegisterName(DefaultEnvironmentEndpoint, &EnvironmentServer{
env: b,
mux: s.mux,
})
}
2013-12-10 16:26:07 -05:00
func (s *Server) RegisterHook(h packer.Hook) {
s.server.RegisterName(DefaultHookEndpoint, &HookServer{
hook: h,
mux: s.mux,
})
}
2013-12-10 16:26:07 -05:00
func (s *Server) RegisterPostProcessor(p packer.PostProcessor) {
s.server.RegisterName(DefaultPostProcessorEndpoint, &PostProcessorServer{
mux: s.mux,
p: p,
})
}
2013-12-10 16:26:07 -05:00
func (s *Server) RegisterProvisioner(p packer.Provisioner) {
s.server.RegisterName(DefaultProvisionerEndpoint, &ProvisionerServer{
mux: s.mux,
p: p,
})
}
func (s *Server) RegisterUi(ui packer.Ui) {
s.server.RegisterName(DefaultUiEndpoint, &UiServer{
ui: ui,
})
}
// ServeConn serves a single connection over the RPC server. It is up
// to the caller to obtain a proper io.ReadWriteCloser.
func (s *Server) Serve() {
// Accept a connection on stream ID 0, which is always used for
// normal client to server connections.
stream, err := s.mux.Accept(s.streamId)
if err != nil {
log.Printf("[ERR] Error retrieving stream for serving: %s", err)
return
}
2014-09-02 17:23:06 -04:00
defer stream.Close()
2013-12-10 16:26:07 -05:00
2014-04-26 16:31:22 -04:00
var h codec.MsgpackHandle
rpcCodec := codec.GoRpc.ServerCodec(stream, &h)
s.server.ServeCodec(rpcCodec)
2013-12-10 16:26:07 -05:00
}
2013-12-10 16:26:07 -05:00
// registerComponent registers a single Packer RPC component onto
// the RPC server. If id is true, then a unique ID number will be appended
// onto the end of the endpoint.
//
// The endpoint name is returned.
func registerComponent(server *rpc.Server, name string, rcvr interface{}, id bool) string {
endpoint := name
if id {
fmt.Sprintf("%s.%d", endpoint, atomic.AddUint64(&endpointId, 1))
}
server.RegisterName(endpoint, rcvr)
return endpoint
}