packer/rpc: environment

This commit is contained in:
Mitchell Hashimoto 2013-12-10 12:23:42 -08:00
parent 2ba713d705
commit bd6fbc05eb
6 changed files with 95 additions and 90 deletions

View File

@ -72,6 +72,13 @@ func (c *Client) Communicator() packer.Communicator {
} }
} }
func (c *Client) Environment() packer.Environment {
return &Environment{
client: c.client,
mux: c.mux,
}
}
func (c *Client) Hook() packer.Hook { func (c *Client) Hook() packer.Hook {
return &hook{ return &hook{
client: c.client, client: c.client,

View File

@ -73,7 +73,7 @@ func (c *CommandServer) Run(args *CommandRunArgs, reply *int) error {
return err return err
} }
env := &Environment{client} env := &Environment{client: client}
*reply = c.command.Run(env, args.Args) *reply = c.command.Run(env, args.Args)
return nil return nil

View File

@ -2,6 +2,7 @@ package rpc
import ( import (
"github.com/mitchellh/packer/packer" "github.com/mitchellh/packer/packer"
"log"
"net/rpc" "net/rpc"
) )
@ -9,12 +10,14 @@ import (
// where the actual environment is executed over an RPC connection. // where the actual environment is executed over an RPC connection.
type Environment struct { type Environment struct {
client *rpc.Client client *rpc.Client
mux *MuxConn
} }
// A EnvironmentServer wraps a packer.Environment and makes it exportable // A EnvironmentServer wraps a packer.Environment and makes it exportable
// as part of a Golang RPC server. // as part of a Golang RPC server.
type EnvironmentServer struct { type EnvironmentServer struct {
env packer.Environment env packer.Environment
mux *MuxConn
} }
type EnvironmentCliArgs struct { type EnvironmentCliArgs struct {
@ -22,33 +25,32 @@ type EnvironmentCliArgs struct {
} }
func (e *Environment) Builder(name string) (b packer.Builder, err error) { func (e *Environment) Builder(name string) (b packer.Builder, err error) {
var reply string var streamId uint32
err = e.client.Call("Environment.Builder", name, &reply) err = e.client.Call("Environment.Builder", name, &streamId)
if err != nil { if err != nil {
return return
} }
client, err := rpcDial(reply) client, err := NewClientWithMux(e.mux, streamId)
if err != nil { if err != nil {
return return nil, err
} }
b = client.Builder()
b = Builder(client)
return return
} }
func (e *Environment) Cache() packer.Cache { func (e *Environment) Cache() packer.Cache {
var reply string var streamId uint32
if err := e.client.Call("Environment.Cache", new(interface{}), &reply); err != nil { if err := e.client.Call("Environment.Cache", new(interface{}), &streamId); err != nil {
panic(err) panic(err)
} }
client, err := rpcDial(reply) client, err := NewClientWithMux(e.mux, streamId)
if err != nil { if err != nil {
panic(err) log.Printf("[ERR] Error getting cache client: %s", err)
return nil
} }
return client.Cache()
return Cache(client)
} }
func (e *Environment) Cli(args []string) (result int, err error) { func (e *Environment) Cli(args []string) (result int, err error) {
@ -58,85 +60,81 @@ func (e *Environment) Cli(args []string) (result int, err error) {
} }
func (e *Environment) Hook(name string) (h packer.Hook, err error) { func (e *Environment) Hook(name string) (h packer.Hook, err error) {
var reply string var streamId uint32
err = e.client.Call("Environment.Hook", name, &reply) err = e.client.Call("Environment.Hook", name, &streamId)
if err != nil { if err != nil {
return return
} }
client, err := rpcDial(reply) client, err := NewClientWithMux(e.mux, streamId)
if err != nil { if err != nil {
return return nil, err
} }
return client.Hook(), nil
h = Hook(client)
return
} }
func (e *Environment) PostProcessor(name string) (p packer.PostProcessor, err error) { func (e *Environment) PostProcessor(name string) (p packer.PostProcessor, err error) {
var reply string var streamId uint32
err = e.client.Call("Environment.PostProcessor", name, &reply) err = e.client.Call("Environment.PostProcessor", name, &streamId)
if err != nil { if err != nil {
return return
} }
client, err := rpcDial(reply) client, err := NewClientWithMux(e.mux, streamId)
if err != nil { if err != nil {
return return nil, err
} }
p = client.PostProcessor()
p = PostProcessor(client)
return return
} }
func (e *Environment) Provisioner(name string) (p packer.Provisioner, err error) { func (e *Environment) Provisioner(name string) (p packer.Provisioner, err error) {
var reply string var streamId uint32
err = e.client.Call("Environment.Provisioner", name, &reply) err = e.client.Call("Environment.Provisioner", name, &streamId)
if err != nil { if err != nil {
return return
} }
client, err := rpcDial(reply) client, err := NewClientWithMux(e.mux, streamId)
if err != nil { if err != nil {
return return nil, err
} }
p = client.Provisioner()
p = Provisioner(client)
return return
} }
func (e *Environment) Ui() packer.Ui { func (e *Environment) Ui() packer.Ui {
var reply string var streamId uint32
e.client.Call("Environment.Ui", new(interface{}), &reply) e.client.Call("Environment.Ui", new(interface{}), &streamId)
client, err := rpcDial(reply) client, err := NewClientWithMux(e.mux, streamId)
if err != nil { if err != nil {
panic(err) log.Printf("[ERR] Error connecting to Ui: %s", err)
return nil
} }
return client.Ui()
return &Ui{client: client}
} }
func (e *EnvironmentServer) Builder(name *string, reply *string) error { func (e *EnvironmentServer) Builder(name string, reply *uint32) error {
builder, err := e.env.Builder(*name) builder, err := e.env.Builder(name)
if err != nil { if err != nil {
return err return NewBasicError(err)
} }
// Wrap it *reply = e.mux.NextId()
server := rpc.NewServer() server := NewServerWithMux(e.mux, *reply)
RegisterBuilder(server, builder) server.RegisterBuilder(builder)
go server.Serve()
*reply = serveSingleConn(server)
return nil return nil
} }
func (e *EnvironmentServer) Cache(args *interface{}, reply *string) error { func (e *EnvironmentServer) Cache(args *interface{}, reply *uint32) error {
cache := e.env.Cache() cache := e.env.Cache()
server := rpc.NewServer() *reply = e.mux.NextId()
RegisterCache(server, cache) server := NewServerWithMux(e.mux, *reply)
*reply = serveSingleConn(server) server.RegisterCache(cache)
go server.Serve()
return nil return nil
} }
@ -145,53 +143,51 @@ func (e *EnvironmentServer) Cli(args *EnvironmentCliArgs, reply *int) (err error
return return
} }
func (e *EnvironmentServer) Hook(name *string, reply *string) error { func (e *EnvironmentServer) Hook(name string, reply *uint32) error {
hook, err := e.env.Hook(*name) hook, err := e.env.Hook(name)
if err != nil { if err != nil {
return err return NewBasicError(err)
} }
// Wrap it *reply = e.mux.NextId()
server := rpc.NewServer() server := NewServerWithMux(e.mux, *reply)
RegisterHook(server, hook) server.RegisterHook(hook)
go server.Serve()
*reply = serveSingleConn(server)
return nil return nil
} }
func (e *EnvironmentServer) PostProcessor(name *string, reply *string) error { func (e *EnvironmentServer) PostProcessor(name string, reply *uint32) error {
pp, err := e.env.PostProcessor(*name) pp, err := e.env.PostProcessor(name)
if err != nil { if err != nil {
return err return NewBasicError(err)
} }
server := rpc.NewServer() *reply = e.mux.NextId()
RegisterPostProcessor(server, pp) server := NewServerWithMux(e.mux, *reply)
server.RegisterPostProcessor(pp)
*reply = serveSingleConn(server) go server.Serve()
return nil return nil
} }
func (e *EnvironmentServer) Provisioner(name *string, reply *string) error { func (e *EnvironmentServer) Provisioner(name string, reply *uint32) error {
prov, err := e.env.Provisioner(*name) prov, err := e.env.Provisioner(name)
if err != nil { if err != nil {
return err return NewBasicError(err)
} }
server := rpc.NewServer() *reply = e.mux.NextId()
RegisterProvisioner(server, prov) server := NewServerWithMux(e.mux, *reply)
server.RegisterProvisioner(prov)
*reply = serveSingleConn(server) go server.Serve()
return nil return nil
} }
func (e *EnvironmentServer) Ui(args *interface{}, reply *string) error { func (e *EnvironmentServer) Ui(args *interface{}, reply *uint32) error {
ui := e.env.Ui() ui := e.env.Ui()
// Wrap it *reply = e.mux.NextId()
server := rpc.NewServer() server := NewServerWithMux(e.mux, *reply)
RegisterUi(server, ui) server.RegisterUi(ui)
go server.Serve()
*reply = serveSingleConn(server)
return nil return nil
} }

View File

@ -2,7 +2,6 @@ package rpc
import ( import (
"github.com/mitchellh/packer/packer" "github.com/mitchellh/packer/packer"
"net/rpc"
"reflect" "reflect"
"testing" "testing"
) )
@ -69,16 +68,11 @@ func TestEnvironmentRPC(t *testing.T) {
e := &testEnvironment{} e := &testEnvironment{}
// Start the server // Start the server
server := rpc.NewServer() client, server := testClientServer(t)
RegisterEnvironment(server, e) defer client.Close()
address := serveSingleConn(server) defer server.Close()
server.RegisterEnvironment(e)
// Create the client over RPC and run some methods to verify it works eClient := client.Environment()
client, err := rpc.Dial("tcp", address)
if err != nil {
t.Fatalf("err: %s", err)
}
eClient := &Environment{client}
// Test Builder // Test Builder
builder, _ := eClient.Builder("foo") builder, _ := eClient.Builder("foo")

View File

@ -44,7 +44,7 @@ func RegisterCommunicator(s *rpc.Server, c packer.Communicator) {
// Registers the appropriate endpoint on an RPC server to serve a // Registers the appropriate endpoint on an RPC server to serve a
// Packer Environment // Packer Environment
func RegisterEnvironment(s *rpc.Server, e packer.Environment) { func RegisterEnvironment(s *rpc.Server, e packer.Environment) {
registerComponent(s, "Environment", &EnvironmentServer{e}, false) registerComponent(s, "Environment", &EnvironmentServer{env: e}, false)
} }
// Registers the appropriate endpoint on an RPC server to serve a // Registers the appropriate endpoint on an RPC server to serve a

View File

@ -17,6 +17,7 @@ const (
DefaultCacheEndpoint = "Cache" DefaultCacheEndpoint = "Cache"
DefaultCommandEndpoint = "Command" DefaultCommandEndpoint = "Command"
DefaultCommunicatorEndpoint = "Communicator" DefaultCommunicatorEndpoint = "Communicator"
DefaultEnvironmentEndpoint = "Environment"
DefaultHookEndpoint = "Hook" DefaultHookEndpoint = "Hook"
DefaultPostProcessorEndpoint = "PostProcessor" DefaultPostProcessorEndpoint = "PostProcessor"
DefaultProvisionerEndpoint = "Provisioner" DefaultProvisionerEndpoint = "Provisioner"
@ -81,6 +82,13 @@ func (s *Server) RegisterCommunicator(c packer.Communicator) {
}) })
} }
func (s *Server) RegisterEnvironment(b packer.Environment) {
s.server.RegisterName(DefaultEnvironmentEndpoint, &EnvironmentServer{
env: b,
mux: s.mux,
})
}
func (s *Server) RegisterHook(h packer.Hook) { func (s *Server) RegisterHook(h packer.Hook) {
s.server.RegisterName(DefaultHookEndpoint, &HookServer{ s.server.RegisterName(DefaultHookEndpoint, &HookServer{
hook: h, hook: h,