packer/rpc: set keep-alive on all RPC connections [GH-416]

This commit is contained in:
Mitchell Hashimoto 2013-09-18 17:15:48 -07:00
parent 877dfb81fe
commit f85c9e4354
10 changed files with 57 additions and 23 deletions

View File

@ -14,6 +14,8 @@ IMPROVEMENTS:
BUG FIXES:
* core: Set TCP KeepAlives on internally created RPC connections so that
they don't die. [GH-416]
* builder/amazon/all: While waiting for AMI, will detect "failed" state.
* builder/amazon/all: Waiting for state will detect if the resource (AMI,
instance, etc.) disappears from under it.

View File

@ -52,7 +52,7 @@ func (b *build) Run(ui packer.Ui, cache packer.Cache) ([]packer.Artifact, error)
artifacts := make([]packer.Artifact, len(result))
for i, addr := range result {
client, err := rpc.Dial("tcp", addr)
client, err := rpcDial(addr)
if err != nil {
return nil, err
}
@ -92,7 +92,7 @@ func (b *BuildServer) Prepare(v map[string]string, reply *error) error {
}
func (b *BuildServer) Run(args *BuildRunArgs, reply *[]string) error {
client, err := rpc.Dial("tcp", args.UiRPCAddress)
client, err := rpcDial(args.UiRPCAddress)
if err != nil {
return err
}

View File

@ -5,7 +5,6 @@ import (
"fmt"
"github.com/mitchellh/packer/packer"
"log"
"net"
"net/rpc"
)
@ -95,7 +94,7 @@ func (b *builder) Run(ui packer.Ui, hook packer.Hook, cache packer.Cache) (packe
return nil, nil
}
client, err := rpc.Dial("tcp", response.RPCAddress)
client, err := rpcDial(response.RPCAddress)
if err != nil {
return nil, err
}
@ -119,12 +118,12 @@ func (b *BuilderServer) Prepare(args *BuilderPrepareArgs, reply *error) error {
}
func (b *BuilderServer) Run(args *BuilderRunArgs, reply *interface{}) error {
client, err := rpc.Dial("tcp", args.RPCAddress)
client, err := rpcDial(args.RPCAddress)
if err != nil {
return err
}
responseC, err := net.Dial("tcp", args.ResponseAddress)
responseC, err := tcpDial(args.ResponseAddress)
if err != nil {
return err
}

View File

@ -66,7 +66,7 @@ func (c *CommandServer) Help(args *interface{}, reply *string) error {
}
func (c *CommandServer) Run(args *CommandRunArgs, reply *int) error {
client, err := rpc.Dial("tcp", args.RPCAddress)
client, err := rpcDial(args.RPCAddress)
if err != nil {
return err
}

View File

@ -177,7 +177,7 @@ func (c *CommunicatorServer) Start(args *CommunicatorStartArgs, reply *interface
toClose := make([]net.Conn, 0)
if args.StdinAddress != "" {
stdinC, err := net.Dial("tcp", args.StdinAddress)
stdinC, err := tcpDial(args.StdinAddress)
if err != nil {
return err
}
@ -187,7 +187,7 @@ func (c *CommunicatorServer) Start(args *CommunicatorStartArgs, reply *interface
}
if args.StdoutAddress != "" {
stdoutC, err := net.Dial("tcp", args.StdoutAddress)
stdoutC, err := tcpDial(args.StdoutAddress)
if err != nil {
return err
}
@ -197,7 +197,7 @@ func (c *CommunicatorServer) Start(args *CommunicatorStartArgs, reply *interface
}
if args.StderrAddress != "" {
stderrC, err := net.Dial("tcp", args.StderrAddress)
stderrC, err := tcpDial(args.StderrAddress)
if err != nil {
return err
}
@ -208,7 +208,7 @@ func (c *CommunicatorServer) Start(args *CommunicatorStartArgs, reply *interface
// Connect to the response address so we can write our result to it
// when ready.
responseC, err := net.Dial("tcp", args.ResponseAddress)
responseC, err := tcpDial(args.ResponseAddress)
if err != nil {
return err
}
@ -234,7 +234,7 @@ func (c *CommunicatorServer) Start(args *CommunicatorStartArgs, reply *interface
}
func (c *CommunicatorServer) Upload(args *CommunicatorUploadArgs, reply *interface{}) (err error) {
readerC, err := net.Dial("tcp", args.ReaderAddress)
readerC, err := tcpDial(args.ReaderAddress)
if err != nil {
return
}
@ -250,7 +250,7 @@ func (c *CommunicatorServer) UploadDir(args *CommunicatorUploadDirArgs, reply *e
}
func (c *CommunicatorServer) Download(args *CommunicatorDownloadArgs, reply *interface{}) (err error) {
writerC, err := net.Dial("tcp", args.WriterAddress)
writerC, err := tcpDial(args.WriterAddress)
if err != nil {
return
}

33
packer/rpc/dial.go Normal file
View File

@ -0,0 +1,33 @@
package rpc
import (
"net"
"net/rpc"
)
// rpcDial makes a TCP connection to a remote RPC server and returns
// the client. This will set the connection up properly so that keep-alives
// are set and so on and should be used to make all RPC connections within
// this package.
func rpcDial(address string) (*rpc.Client, error) {
tcpConn, err := tcpDial(address)
if err != nil {
return nil, err
}
// Create an RPC client around our connection
return rpc.NewClient(tcpConn), nil
}
// tcpDial connects via TCP to the designated address.
func tcpDial(address string) (*net.TCPConn, error) {
conn, err := net.Dial("tcp", address)
if err != nil {
return nil, err
}
// Set a keep-alive so that the connection stays alive even when idle
tcpConn := conn.(*net.TCPConn)
tcpConn.SetKeepAlive(true)
return tcpConn, nil
}

View File

@ -28,7 +28,7 @@ func (e *Environment) Builder(name string) (b packer.Builder, err error) {
return
}
client, err := rpc.Dial("tcp", reply)
client, err := rpcDial(reply)
if err != nil {
return
}
@ -43,7 +43,7 @@ func (e *Environment) Cache() packer.Cache {
panic(err)
}
client, err := rpc.Dial("tcp", reply)
client, err := rpcDial(reply)
if err != nil {
panic(err)
}
@ -64,7 +64,7 @@ func (e *Environment) Hook(name string) (h packer.Hook, err error) {
return
}
client, err := rpc.Dial("tcp", reply)
client, err := rpcDial(reply)
if err != nil {
return
}
@ -80,7 +80,7 @@ func (e *Environment) PostProcessor(name string) (p packer.PostProcessor, err er
return
}
client, err := rpc.Dial("tcp", reply)
client, err := rpcDial(reply)
if err != nil {
return
}
@ -96,7 +96,7 @@ func (e *Environment) Provisioner(name string) (p packer.Provisioner, err error)
return
}
client, err := rpc.Dial("tcp", reply)
client, err := rpcDial(reply)
if err != nil {
return
}
@ -109,7 +109,7 @@ func (e *Environment) Ui() packer.Ui {
var reply string
e.client.Call("Environment.Ui", new(interface{}), &reply)
client, err := rpc.Dial("tcp", reply)
client, err := rpcDial(reply)
if err != nil {
panic(err)
}

View File

@ -46,7 +46,7 @@ func (h *hook) Cancel() {
}
func (h *HookServer) Run(args *HookRunArgs, reply *interface{}) error {
client, err := rpc.Dial("tcp", args.RPCAddress)
client, err := rpcDial(args.RPCAddress)
if err != nil {
return err
}

View File

@ -57,7 +57,7 @@ func (p *postProcessor) PostProcess(ui packer.Ui, a packer.Artifact) (packer.Art
return nil, false, nil
}
client, err := rpc.Dial("tcp", response.RPCAddress)
client, err := rpcDial(response.RPCAddress)
if err != nil {
return nil, false, err
}
@ -75,7 +75,7 @@ func (p *PostProcessorServer) Configure(args *PostProcessorConfigureArgs, reply
}
func (p *PostProcessorServer) PostProcess(address string, reply *PostProcessorProcessResponse) error {
client, err := rpc.Dial("tcp", address)
client, err := rpcDial(address)
if err != nil {
return err
}

View File

@ -65,7 +65,7 @@ func (p *ProvisionerServer) Prepare(args *ProvisionerPrepareArgs, reply *error)
}
func (p *ProvisionerServer) Provision(args *ProvisionerProvisionArgs, reply *interface{}) error {
client, err := rpc.Dial("tcp", args.RPCAddress)
client, err := rpcDial(args.RPCAddress)
if err != nil {
return err
}