Revert "WIP: go towards context cancelled rpc pkg"

This reverts commit e4609b4a6a4ba2ec76a45ad67d50d180f1a914f9.
This commit is contained in:
Adrien Delorme 2019-03-26 15:21:07 +01:00
parent 9e3a179c0d
commit 14048b1e11
13 changed files with 70 additions and 186 deletions

View File

@ -1,37 +0,0 @@
package rpc
import (
baserpc "net/rpc"
"github.com/keegancsmith/rpc"
)
type Server = rpc.Server
type Client = rpc.Client
type Request = rpc.Request
type Response = rpc.Response
type ClientCodec interface {
WriteRequest(*baserpc.Request, interface{}) error
ReadResponseHeader(*baserpc.Response) error
ReadResponseBody(interface{}) error
Close() error
}
type ServerCodec interface {
ReadRequestHeader(*baserpc.Request) error
ReadRequestBody(interface{}) error
WriteResponse(*baserpc.Response, interface{}) error
// Close can be called multiple times and must be idempotent.
Close() error
}
func NewClientWithCodec(codec rpc.ClientCodec) *Client {
return rpc.NewClientWithCodec(codec)
}
func NewServer() *Server {
return rpc.NewServer()
}

View File

@ -1,51 +0,0 @@
package codec
import (
"io"
baserpc "net/rpc"
"github.com/hashicorp/packer/common/net/rpc"
"github.com/ugorji/go/codec"
)
type Handle = codec.Handle
var msgpackHandle = &codec.MsgpackHandle{
RawToString: true,
WriteExt: true,
}
type serverCodec struct {
baserpc.ServerCodec
}
func (s *serverCodec) ReadRequestHeader(r *rpc.Request) error {
return s.ServerCodec.ReadRequestHeader(r)
}
func (s *serverCodec) WriteResponse(r *baserpc.Response, v interface{}) error {
return s.ServerCodec.WriteResponse(r, v)
}
func MsgpackServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec {
c := codec.GoRpc.ServerCodec(conn, msgpackHandle)
return &serverCodec{c}
}
type clientCodec struct {
baserpc.ClientCodec
}
func (c *clientCodec) WriteRequest(req *baserpc.Request, v interface{}) error {
return c.ClientCodec.WriteRequest(req, v)
}
func (c *clientCodec) ReadResponseHeader(res *rpc.Response) error {
r := baserpc.Response(*res)
return c.ClientCodec.ReadResponseHeader()
}
func MsgpackClientCodec(conn io.ReadWriteCloser) rpc.ClientCodec {
c := codec.GoRpc.ClientCodec(conn, msgpackHandle)
return &clientCodec{c}
}

View File

@ -1,9 +1,7 @@
package rpc
import (
"context"
"github.com/hashicorp/packer/common/net/rpc"
"net/rpc"
"github.com/hashicorp/packer/packer"
)
@ -22,40 +20,33 @@ type ArtifactServer struct {
}
func (a *artifact) BuilderId() (result string) {
ctx := context.TODO()
a.client.Call(ctx, a.endpoint+".BuilderId", new(interface{}), &result)
a.client.Call(a.endpoint+".BuilderId", new(interface{}), &result)
return
}
func (a *artifact) Files() (result []string) {
ctx := context.TODO()
a.client.Call(ctx, a.endpoint+".Files", new(interface{}), &result)
a.client.Call(a.endpoint+".Files", new(interface{}), &result)
return
}
func (a *artifact) Id() (result string) {
ctx := context.TODO()
a.client.Call(ctx, a.endpoint+".Id", new(interface{}), &result)
a.client.Call(a.endpoint+".Id", new(interface{}), &result)
return
}
func (a *artifact) String() (result string) {
ctx := context.TODO()
a.client.Call(ctx, a.endpoint+".String", new(interface{}), &result)
a.client.Call(a.endpoint+".String", new(interface{}), &result)
return
}
func (a *artifact) State(name string) (result interface{}) {
ctx := context.TODO()
a.client.Call(ctx, a.endpoint+".State", name, &result)
a.client.Call(a.endpoint+".State", name, &result)
return
}
func (a *artifact) Destroy() error {
ctx := context.TODO()
var result error
if err := a.client.Call(ctx, a.endpoint+".Destroy", new(interface{}), &result); err != nil {
if err := a.client.Call(a.endpoint+".Destroy", new(interface{}), &result); err != nil {
return err
}

View File

@ -2,8 +2,7 @@ package rpc
import (
"context"
"github.com/hashicorp/packer/common/net/rpc"
"net/rpc"
"github.com/hashicorp/packer/packer"
)
@ -28,15 +27,13 @@ type BuildPrepareResponse struct {
}
func (b *build) Name() (result string) {
ctx := context.TODO()
b.client.Call(ctx, "Build.Name", new(interface{}), &result)
b.client.Call("Build.Name", new(interface{}), &result)
return
}
func (b *build) Prepare() ([]string, error) {
ctx := context.TODO()
var resp BuildPrepareResponse
if cerr := b.client.Call(ctx, "Build.Prepare", new(interface{}), &resp); cerr != nil {
if cerr := b.client.Call("Build.Prepare", new(interface{}), &resp); cerr != nil {
return nil, cerr
}
var err error = nil
@ -54,7 +51,7 @@ func (b *build) Run(ctx context.Context, ui packer.Ui) ([]packer.Artifact, error
go server.Serve()
var result []uint32
if err := b.client.Call(ctx, "Build.Run", nextId, &result); err != nil {
if err := b.client.Call("Build.Run", nextId, &result); err != nil {
return nil, err
}
@ -72,22 +69,25 @@ func (b *build) Run(ctx context.Context, ui packer.Ui) ([]packer.Artifact, error
}
func (b *build) SetDebug(val bool) {
ctx := context.TODO()
if err := b.client.Call(ctx, "Build.SetDebug", val, new(interface{})); err != nil {
if err := b.client.Call("Build.SetDebug", val, new(interface{})); err != nil {
panic(err)
}
}
func (b *build) SetForce(val bool) {
ctx := context.TODO()
if err := b.client.Call(ctx, "Build.SetForce", val, new(interface{})); err != nil {
if err := b.client.Call("Build.SetForce", val, new(interface{})); err != nil {
panic(err)
}
}
func (b *build) SetOnError(val string) {
ctx := context.TODO()
if err := b.client.Call(ctx, "Build.SetOnError", val, new(interface{})); err != nil {
if err := b.client.Call("Build.SetOnError", val, new(interface{})); err != nil {
panic(err)
}
}
func (b *build) Cancel() {
if err := b.client.Call("Build.Cancel", new(interface{}), new(interface{})); err != nil {
panic(err)
}
}

View File

@ -2,8 +2,7 @@ package rpc
import (
"context"
"github.com/hashicorp/packer/common/net/rpc"
"net/rpc"
"github.com/hashicorp/packer/packer"
)
@ -33,8 +32,7 @@ type BuilderPrepareResponse struct {
func (b *builder) Prepare(config ...interface{}) ([]string, error) {
var resp BuilderPrepareResponse
ctx := context.TODO()
cerr := b.client.Call(ctx, "Builder.Prepare", &BuilderPrepareArgs{config}, &resp)
cerr := b.client.Call("Builder.Prepare", &BuilderPrepareArgs{config}, &resp)
if cerr != nil {
return nil, cerr
}
@ -54,7 +52,7 @@ func (b *builder) Run(ctx context.Context, ui packer.Ui, hook packer.Hook) (pack
go server.Serve()
var responseId uint32
if err := b.client.Call(ctx, "Builder.Run", nextId, &responseId); err != nil {
if err := b.client.Call("Builder.Run", nextId, &responseId); err != nil {
return nil, err
}

View File

@ -3,12 +3,10 @@ package rpc
import (
"io"
"log"
"github.com/hashicorp/packer/common/net/rpc/codec"
"github.com/hashicorp/packer/common/net/rpc"
"net/rpc"
"github.com/hashicorp/packer/packer"
"github.com/ugorji/go/codec"
)
// Client is the client end that communicates with a Packer RPC server.
@ -43,13 +41,11 @@ func newClientWithMux(mux *muxBroker, streamId uint32) (*Client, error) {
return nil, err
}
// h := &codec.MsgpackHandle{
// RawToString: true,
// WriteExt: true,
// }
var clientCodec rpc.ClientCodec
clientCodec = codec.MsgpackClientCodec(clientConn)
h := &codec.MsgpackHandle{
RawToString: true,
WriteExt: true,
}
clientCodec := codec.GoRpc.ClientCodec(clientConn, h)
return &Client{
mux: mux,

View File

@ -1,15 +1,13 @@
package rpc
import (
"context"
"encoding/gob"
"io"
"log"
"net/rpc"
"os"
"sync"
"github.com/hashicorp/packer/common/net/rpc"
"github.com/hashicorp/packer/packer"
)
@ -124,8 +122,7 @@ func (c *communicator) Start(cmd *packer.RemoteCmd) (err error) {
cmd.SetExited(finished.ExitStatus)
}()
ctx := context.TODO()
err = c.client.Call(ctx, "Communicator.Start", &args, new(interface{}))
err = c.client.Call("Communicator.Start", &args, new(interface{}))
return
}
@ -143,8 +140,8 @@ func (c *communicator) Upload(path string, r io.Reader, fi *os.FileInfo) (err er
args.FileInfo = NewFileInfo(*fi)
}
ctx := context.TODO()
return c.client.Call(ctx, "Communicator.Upload", &args, new(interface{}))
err = c.client.Call("Communicator.Upload", &args, new(interface{}))
return
}
func (c *communicator) UploadDir(dst string, src string, exclude []string) error {
@ -155,8 +152,7 @@ func (c *communicator) UploadDir(dst string, src string, exclude []string) error
}
var reply error
ctx := context.TODO()
err := c.client.Call(ctx, "Communicator.UploadDir", args, &reply)
err := c.client.Call("Communicator.UploadDir", args, &reply)
if err == nil {
err = reply
}
@ -172,8 +168,7 @@ func (c *communicator) DownloadDir(src string, dst string, exclude []string) err
}
var reply error
ctx := context.TODO()
err := c.client.Call(ctx, "Communicator.DownloadDir", args, &reply)
err := c.client.Call("Communicator.DownloadDir", args, &reply)
if err == nil {
err = reply
}
@ -197,8 +192,7 @@ func (c *communicator) Download(path string, w io.Writer) (err error) {
}
// Start sending data to the RPC server
ctx := context.TODO()
err = c.client.Call(ctx, "Communicator.Download", &args, new(interface{}))
err = c.client.Call("Communicator.Download", &args, new(interface{}))
// Wait for the RPC server to finish receiving the data before we return
<-waitServer

View File

@ -2,8 +2,8 @@ package rpc
import (
"context"
"github.com/hashicorp/packer/common/net/rpc"
"log"
"net/rpc"
"github.com/hashicorp/packer/packer"
)
@ -41,7 +41,14 @@ func (h *hook) Run(ctx context.Context, name string, ui packer.Ui, comm packer.C
StreamId: nextId,
}
return h.client.Call(ctx, "Hook.Run", &args, new(interface{}))
return h.client.Call("Hook.Run", &args, new(interface{}))
}
func (h *hook) Cancel() {
err := h.client.Call("Hook.Cancel", new(interface{}), new(interface{}))
if err != nil {
log.Printf("Hook.Cancel error: %s", err)
}
}
func (h *HookServer) Run(ctx context.Context, args *HookRunArgs, reply *interface{}) error {

View File

@ -2,8 +2,7 @@ package rpc
import (
"context"
"github.com/hashicorp/packer/common/net/rpc"
"net/rpc"
"github.com/hashicorp/packer/packer"
)
@ -34,8 +33,7 @@ type PostProcessorProcessResponse struct {
func (p *postProcessor) Configure(raw ...interface{}) (err error) {
args := &PostProcessorConfigureArgs{Configs: raw}
ctx := context.TODO()
if cerr := p.client.Call(ctx, "PostProcessor.Configure", args, new(interface{})); cerr != nil {
if cerr := p.client.Call("PostProcessor.Configure", args, new(interface{})); cerr != nil {
err = cerr
}
@ -50,7 +48,7 @@ func (p *postProcessor) PostProcess(ctx context.Context, ui packer.Ui, a packer.
go server.Serve()
var response PostProcessorProcessResponse
if err := p.client.Call(ctx, "PostProcessor.PostProcess", nextId, &response); err != nil {
if err := p.client.Call("PostProcessor.PostProcess", nextId, &response); err != nil {
return nil, false, err
}

View File

@ -2,8 +2,7 @@ package rpc
import (
"context"
"github.com/hashicorp/packer/common/net/rpc"
"net/rpc"
"github.com/hashicorp/packer/packer"
)
@ -28,8 +27,7 @@ type ProvisionerPrepareArgs struct {
func (p *provisioner) Prepare(configs ...interface{}) (err error) {
args := &ProvisionerPrepareArgs{configs}
ctx := context.TODO()
if cerr := p.client.Call(ctx, "Provisioner.Prepare", args, new(interface{})); cerr != nil {
if cerr := p.client.Call("Provisioner.Prepare", args, new(interface{})); cerr != nil {
err = cerr
}
@ -43,7 +41,7 @@ func (p *provisioner) Provision(ctx context.Context, ui packer.Ui, comm packer.C
server.RegisterUi(ui)
go server.Serve()
return p.client.Call(ctx, "Provisioner.Provision", nextId, new(interface{}))
return p.client.Call("Provisioner.Provision", nextId, new(interface{}))
}
func (p *ProvisionerServer) Prepare(_ context.Context, args *ProvisionerPrepareArgs, reply *interface{}) error {

View File

@ -3,12 +3,10 @@ package rpc
import (
"io"
"log"
"github.com/hashicorp/packer/common/net/rpc/codec"
"github.com/hashicorp/packer/common/net/rpc"
"net/rpc"
"github.com/hashicorp/packer/packer"
"github.com/ugorji/go/codec"
)
const (
@ -127,6 +125,10 @@ func (s *Server) Serve() {
}
defer stream.Close()
rpcCodec := codec.MsgpackServerCodec(stream)
h := &codec.MsgpackHandle{
RawToString: true,
WriteExt: true,
}
rpcCodec := codec.GoRpc.ServerCodec(stream, h)
s.server.ServeCodec(rpcCodec)
}

View File

@ -1,10 +1,8 @@
package rpc
import (
"context"
"log"
"github.com/hashicorp/packer/common/net/rpc"
"net/rpc"
"github.com/hashicorp/packer/packer"
)
@ -32,14 +30,12 @@ type UiMachineArgs struct {
}
func (u *Ui) Ask(query string) (result string, err error) {
ctx := context.TODO()
err = u.client.Call(ctx, "Ui.Ask", query, &result)
err = u.client.Call("Ui.Ask", query, &result)
return
}
func (u *Ui) Error(message string) {
ctx := context.TODO()
if err := u.client.Call(ctx, "Ui.Error", message, new(interface{})); err != nil {
if err := u.client.Call("Ui.Error", message, new(interface{})); err != nil {
log.Printf("Error in Ui.Error RPC call: %s", err)
}
}
@ -50,22 +46,19 @@ func (u *Ui) Machine(t string, args ...string) {
Args: args,
}
ctx := context.TODO()
if err := u.client.Call(ctx, "Ui.Machine", rpcArgs, new(interface{})); err != nil {
if err := u.client.Call("Ui.Machine", rpcArgs, new(interface{})); err != nil {
log.Printf("Error in Ui.Machine RPC call: %s", err)
}
}
func (u *Ui) Message(message string) {
ctx := context.TODO()
if err := u.client.Call(ctx, "Ui.Message", message, new(interface{})); err != nil {
if err := u.client.Call("Ui.Message", message, new(interface{})); err != nil {
log.Printf("Error in Ui.Message RPC call: %s", err)
}
}
func (u *Ui) Say(message string) {
ctx := context.TODO()
if err := u.client.Call(ctx, "Ui.Say", message, new(interface{})); err != nil {
if err := u.client.Call("Ui.Say", message, new(interface{})); err != nil {
log.Printf("Error in Ui.Say RPC call: %s", err)
}
}

View File

@ -1,11 +1,9 @@
package rpc
import (
"context"
"io"
"log"
"github.com/hashicorp/packer/common/net/rpc"
"net/rpc"
"github.com/hashicorp/packer/common/random"
)
@ -20,8 +18,7 @@ func (u *Ui) TrackProgress(src string, currentSize, totalSize int64, stream io.R
TotalSize: totalSize,
}
var trackingID string
ctx := context.TODO()
if err := u.client.Call(ctx, "Ui.NewTrackProgress", pl, &trackingID); err != nil {
if err := u.client.Call("Ui.NewTrackProgress", pl, &trackingID); err != nil {
log.Printf("Error in Ui.NewTrackProgress RPC call: %s", err)
return stream
}
@ -42,8 +39,7 @@ type ProgressTrackingClient struct {
// Read will send len(b) over the wire instead of it's content
func (u *ProgressTrackingClient) Read(b []byte) (read int, err error) {
defer func() {
ctx := context.TODO()
if err := u.client.Call(ctx, "Ui"+u.id+".Add", read, new(interface{})); err != nil {
if err := u.client.Call("Ui"+u.id+".Add", read, new(interface{})); err != nil {
log.Printf("Error in ProgressTrackingClient.Read RPC call: %s", err)
}
}()
@ -52,8 +48,7 @@ func (u *ProgressTrackingClient) Read(b []byte) (read int, err error) {
func (u *ProgressTrackingClient) Close() error {
log.Printf("closing")
ctx := context.TODO()
if err := u.client.Call(ctx, "Ui"+u.id+".Close", nil, new(interface{})); err != nil {
if err := u.client.Call("Ui"+u.id+".Close", nil, new(interface{})); err != nil {
log.Printf("Error in ProgressTrackingClient.Close RPC call: %s", err)
}
return u.stream.Close()