trying to add context to state bag

This commit is contained in:
Matthew Hooker 2018-01-17 22:49:03 -08:00
parent 4c5a7e08b5
commit 807e88245b
No known key found for this signature in database
GPG Key ID: 7B5F933D9CE8C6A1
12 changed files with 875 additions and 1 deletions

View File

@ -1,6 +1,7 @@
package common
import (
"context"
"encoding/base64"
"fmt"
"io/ioutil"
@ -178,7 +179,17 @@ func (s *StepRunSourceInstance) Run(state multistep.StateBag) multistep.StepActi
describeInstance := &ec2.DescribeInstancesInput{
InstanceIds: []*string{aws.String(instanceId)},
}
if err := ec2conn.WaitUntilInstanceRunning(describeInstance); err != nil {
ctx, cancel := context.WithCancel(context.Background())
go func() {
for {
if _, ok := state.GetOk(multistep.StateCancelled); ok {
cancel()
}
}
}()
if err := ec2conn.WaitUntilInstanceRunningWithContext(ctx, describeInstance); err != nil {
err := fmt.Errorf("Error waiting for instance (%s) to become ready: %s", instanceId, err)
state.Put("error", err)
ui.Error(err.Error())

View File

@ -138,9 +138,11 @@ func (c BuildCommand) Run(args []string) int {
m map[string][]packer.Artifact
}{m: make(map[string][]packer.Artifact)}
errors := make(map[string]error)
// ctx := context.Background()
for _, b := range builds {
// Increment the waitgroup so we wait for this item to finish properly
wg.Add(1)
// buildCtx, cancelCtx := ctx.WithCancel()
// Handle interrupts for this build
sigCh := make(chan os.Signal, 1)
@ -154,6 +156,7 @@ func (c BuildCommand) Run(args []string) int {
log.Printf("Stopping build: %s", b.Name())
b.Cancel()
//cancelCtx()
log.Printf("Build cancelled: %s", b.Name())
}(b)

View File

@ -0,0 +1,22 @@
Copyright (c) 2013 Mitchell Hashimoto
MIT License
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -0,0 +1,105 @@
package multistep
import (
"sync"
"sync/atomic"
"golang.org/x/net/context"
)
type runState int32
const (
stateIdle runState = iota
stateRunning
stateCancelling
)
// BasicRunner is a Runner that just runs the given slice of steps.
type BasicRunner struct {
// Steps is a slice of steps to run. Once set, this should _not_ be
// modified.
Steps []Step
cancel context.CancelFunc
doneCh chan struct{}
state runState
l sync.Mutex
}
func (b *BasicRunner) Run(state StateBag) {
ctx, cancel := context.WithCancel(state.Context())
b.l.Lock()
if b.state != stateIdle {
panic("already running")
}
doneCh := make(chan struct{})
b.cancel = cancel
b.doneCh = doneCh
b.state = stateRunning
b.l.Unlock()
defer func() {
b.l.Lock()
b.cancel = nil
b.doneCh = nil
b.state = stateIdle
close(doneCh)
b.l.Unlock()
}()
// This goroutine listens for cancels and puts the StateCancelled key
// as quickly as possible into the state bag to mark it.
go func() {
select {
case <-ctx.Done():
// Flag cancel and wait for finish
state.Put(StateCancelled, true)
<-doneCh
case <-doneCh:
}
}()
for _, step := range b.Steps {
// We also check for cancellation here since we can't be sure
// the goroutine that is running to set it actually ran.
if runState(atomic.LoadInt32((*int32)(&b.state))) == stateCancelling {
state.Put(StateCancelled, true)
break
}
action := step.Run(state)
defer step.Cleanup(state)
if _, ok := state.GetOk(StateCancelled); ok {
break
}
if action == ActionHalt {
state.Put(StateHalted, true)
break
}
}
}
func (b *BasicRunner) Cancel() {
b.l.Lock()
switch b.state {
case stateIdle:
// Not running, so Cancel is... done.
b.l.Unlock()
return
case stateRunning:
// Running, so mark that we cancelled and set the state
b.cancel()
b.state = stateCancelling
fallthrough
case stateCancelling:
// Already cancelling, so just wait until we're done
ch := b.doneCh
b.l.Unlock()
<-ch
}
}

View File

@ -0,0 +1,172 @@
package multistep
import (
"reflect"
"testing"
"time"
"golang.org/x/net/context"
)
func TestBasicRunner_ImplRunner(t *testing.T) {
var raw interface{}
raw = &BasicRunner{}
if _, ok := raw.(Runner); !ok {
t.Fatalf("BasicRunner must be a Runner")
}
}
func TestBasicRunner_Run(t *testing.T) {
data := new(BasicStateBag)
stepA := &TestStepAcc{Data: "a"}
stepB := &TestStepAcc{Data: "b"}
r := &BasicRunner{Steps: []Step{stepA, stepB}}
r.Run(context.Background(), data)
// Test run data
expected := []string{"a", "b"}
results := data.Get("data").([]string)
if !reflect.DeepEqual(results, expected) {
t.Errorf("unexpected result: %#v", results)
}
// Test cleanup data
expected = []string{"b", "a"}
results = data.Get("cleanup").([]string)
if !reflect.DeepEqual(results, expected) {
t.Errorf("unexpected result: %#v", results)
}
// Test no halted or cancelled
if _, ok := data.GetOk(StateCancelled); ok {
t.Errorf("cancelled should not be in state bag")
}
if _, ok := data.GetOk(StateHalted); ok {
t.Errorf("halted should not be in state bag")
}
}
func TestBasicRunner_Run_Halt(t *testing.T) {
data := new(BasicStateBag)
stepA := &TestStepAcc{Data: "a"}
stepB := &TestStepAcc{Data: "b", Halt: true}
stepC := &TestStepAcc{Data: "c"}
r := &BasicRunner{Steps: []Step{stepA, stepB, stepC}}
r.Run(context.Background(), data)
// Test run data
expected := []string{"a", "b"}
results := data.Get("data").([]string)
if !reflect.DeepEqual(results, expected) {
t.Errorf("unexpected result: %#v", results)
}
// Test cleanup data
expected = []string{"b", "a"}
results = data.Get("cleanup").([]string)
if !reflect.DeepEqual(results, expected) {
t.Errorf("unexpected result: %#v", results)
}
// Test that it says it is halted
halted := data.Get(StateHalted).(bool)
if !halted {
t.Errorf("not halted")
}
}
// confirm that can't run twice
func TestBasicRunner_Run_Run(t *testing.T) {
defer func() {
recover()
}()
ch := make(chan chan bool)
stepInt := &TestStepSync{ch}
stepWait := &TestStepWaitForever{}
r := &BasicRunner{Steps: []Step{stepInt, stepWait}}
go r.Run(context.Background(), new(BasicStateBag))
// wait until really running
<-ch
// now try to run aain
r.Run(context.Background(), new(BasicStateBag))
// should not get here in nominal codepath
t.Errorf("Was able to run an already running BasicRunner")
}
func TestBasicRunner_Cancel(t *testing.T) {
ch := make(chan chan bool)
data := new(BasicStateBag)
stepA := &TestStepAcc{Data: "a"}
stepB := &TestStepAcc{Data: "b"}
stepInt := &TestStepSync{ch}
stepC := &TestStepAcc{Data: "c"}
r := &BasicRunner{Steps: []Step{stepA, stepB, stepInt, stepC}}
// cancelling an idle Runner is a no-op
r.Cancel()
go r.Run(context.Background(), data)
// Wait until we reach the sync point
responseCh := <-ch
// Cancel then continue chain
cancelCh := make(chan bool)
go func() {
r.Cancel()
cancelCh <- true
}()
for {
if _, ok := data.GetOk(StateCancelled); ok {
responseCh <- true
break
}
time.Sleep(10 * time.Millisecond)
}
<-cancelCh
// Test run data
expected := []string{"a", "b"}
results := data.Get("data").([]string)
if !reflect.DeepEqual(results, expected) {
t.Errorf("unexpected result: %#v", results)
}
// Test cleanup data
expected = []string{"b", "a"}
results = data.Get("cleanup").([]string)
if !reflect.DeepEqual(results, expected) {
t.Errorf("unexpected result: %#v", results)
}
// Test that it says it is cancelled
cancelled := data.Get(StateCancelled).(bool)
if !cancelled {
t.Errorf("not cancelled")
}
}
func TestBasicRunner_Cancel_Special(t *testing.T) {
stepOne := &TestStepInjectCancel{}
stepTwo := &TestStepInjectCancel{}
r := &BasicRunner{Steps: []Step{stepOne, stepTwo}}
state := new(BasicStateBag)
state.Put("runner", r)
r.Run(context.Background(), state)
// test that state contains cancelled
if _, ok := state.GetOk(StateCancelled); !ok {
t.Errorf("cancelled should be in state bag")
}
}

View File

@ -0,0 +1,123 @@
package multistep
import (
"fmt"
"reflect"
"sync"
)
// DebugLocation is the location where the pause is occuring when debugging
// a step sequence. "DebugLocationAfterRun" is after the run of the named
// step. "DebugLocationBeforeCleanup" is before the cleanup of the named
// step.
type DebugLocation uint
const (
DebugLocationAfterRun DebugLocation = iota
DebugLocationBeforeCleanup
)
// StepWrapper is an interface that wrapped steps can implement to expose their
// inner step names to the debug runner.
type StepWrapper interface {
// InnerStepName should return the human readable name of the wrapped step.
InnerStepName() string
}
// DebugPauseFn is the type signature for the function that is called
// whenever the DebugRunner pauses. It allows the caller time to
// inspect the state of the multi-step sequence at a given step.
type DebugPauseFn func(DebugLocation, string, StateBag)
// DebugRunner is a Runner that runs the given set of steps in order,
// but pauses between each step until it is told to continue.
type DebugRunner struct {
// Steps is the steps to run. These will be run in order.
Steps []Step
// PauseFn is the function that is called whenever the debug runner
// pauses. The debug runner continues when this function returns.
// The function is given the state so that the state can be inspected.
PauseFn DebugPauseFn
l sync.Mutex
runner *BasicRunner
}
func (r *DebugRunner) Run(state StateBag) {
r.l.Lock()
if r.runner != nil {
panic("already running")
}
r.runner = new(BasicRunner)
r.l.Unlock()
pauseFn := r.PauseFn
// If no PauseFn is specified, use the default
if pauseFn == nil {
pauseFn = DebugPauseDefault
}
// Rebuild the steps so that we insert the pause step after each
steps := make([]Step, len(r.Steps)*2)
for i, step := range r.Steps {
steps[i*2] = step
name := ""
if wrapped, ok := step.(StepWrapper); ok {
name = wrapped.InnerStepName()
} else {
name = reflect.Indirect(reflect.ValueOf(step)).Type().Name()
}
steps[(i*2)+1] = &debugStepPause{
name,
pauseFn,
}
}
// Then just use a basic runner to run it
r.runner.Steps = steps
r.runner.Run(state)
}
func (r *DebugRunner) Cancel() {
r.l.Lock()
defer r.l.Unlock()
if r.runner != nil {
r.runner.Cancel()
}
}
// DebugPauseDefault is the default pause function when using the
// DebugRunner if no PauseFn is specified. It outputs some information
// to stderr about the step and waits for keyboard input on stdin before
// continuing.
func DebugPauseDefault(loc DebugLocation, name string, state StateBag) {
var locationString string
switch loc {
case DebugLocationAfterRun:
locationString = "after run of"
case DebugLocationBeforeCleanup:
locationString = "before cleanup of"
}
fmt.Printf("Pausing %s step '%s'. Press any key to continue.\n", locationString, name)
var line string
fmt.Scanln(&line)
}
type debugStepPause struct {
StepName string
PauseFn DebugPauseFn
}
func (s *debugStepPause) Run(state StateBag) StepAction {
s.PauseFn(DebugLocationAfterRun, s.StepName, state)
return ActionContinue
}
func (s *debugStepPause) Cleanup(state StateBag) {
s.PauseFn(DebugLocationBeforeCleanup, s.StepName, state)
}

View File

@ -0,0 +1,176 @@
package multistep
import (
"os"
"reflect"
"testing"
"time"
"golang.org/x/net/context"
)
func TestDebugRunner_Impl(t *testing.T) {
var raw interface{}
raw = &DebugRunner{}
if _, ok := raw.(Runner); !ok {
t.Fatal("DebugRunner must be a runner.")
}
}
func TestDebugRunner_Run(t *testing.T) {
data := new(BasicStateBag)
stepA := &TestStepAcc{Data: "a"}
stepB := &TestStepAcc{Data: "b"}
pauseFn := func(loc DebugLocation, name string, state StateBag) {
key := "data"
if loc == DebugLocationBeforeCleanup {
key = "cleanup"
}
if _, ok := state.GetOk(key); !ok {
state.Put(key, make([]string, 0, 5))
}
data := state.Get(key).([]string)
state.Put(key, append(data, name))
}
r := &DebugRunner{
Steps: []Step{stepA, stepB},
PauseFn: pauseFn,
}
r.Run(context.Background(), data)
// Test data
expected := []string{"a", "TestStepAcc", "b", "TestStepAcc"}
results := data.Get("data").([]string)
if !reflect.DeepEqual(results, expected) {
t.Errorf("unexpected results: %#v", results)
}
// Test cleanup
expected = []string{"TestStepAcc", "b", "TestStepAcc", "a"}
results = data.Get("cleanup").([]string)
if !reflect.DeepEqual(results, expected) {
t.Errorf("unexpected results: %#v", results)
}
}
// confirm that can't run twice
func TestDebugRunner_Run_Run(t *testing.T) {
defer func() {
recover()
}()
ch := make(chan chan bool)
stepInt := &TestStepSync{ch}
stepWait := &TestStepWaitForever{}
r := &DebugRunner{Steps: []Step{stepInt, stepWait}}
go r.Run(context.Background(), new(BasicStateBag))
// wait until really running
<-ch
// now try to run aain
r.Run(context.Background(), new(BasicStateBag))
// should not get here in nominal codepath
t.Errorf("Was able to run an already running DebugRunner")
}
func TestDebugRunner_Cancel(t *testing.T) {
ch := make(chan chan bool)
data := new(BasicStateBag)
stepA := &TestStepAcc{Data: "a"}
stepB := &TestStepAcc{Data: "b"}
stepInt := &TestStepSync{ch}
stepC := &TestStepAcc{Data: "c"}
r := &DebugRunner{}
r.Steps = []Step{stepA, stepB, stepInt, stepC}
// cancelling an idle Runner is a no-op
r.Cancel()
go r.Run(context.Background(), data)
// Wait until we reach the sync point
responseCh := <-ch
// Cancel then continue chain
cancelCh := make(chan bool)
go func() {
r.Cancel()
cancelCh <- true
}()
for {
if _, ok := data.GetOk(StateCancelled); ok {
responseCh <- true
break
}
time.Sleep(10 * time.Millisecond)
}
<-cancelCh
// Test run data
expected := []string{"a", "b"}
results := data.Get("data").([]string)
if !reflect.DeepEqual(results, expected) {
t.Errorf("unexpected result: %#v", results)
}
// Test cleanup data
expected = []string{"b", "a"}
results = data.Get("cleanup").([]string)
if !reflect.DeepEqual(results, expected) {
t.Errorf("unexpected result: %#v", results)
}
// Test that it says it is cancelled
cancelled := data.Get(StateCancelled).(bool)
if !cancelled {
t.Errorf("not cancelled")
}
}
func TestDebugPauseDefault(t *testing.T) {
// Create a pipe pair so that writes/reads are blocked until we do it
r, w, err := os.Pipe()
if err != nil {
t.Fatalf("err: %s", err)
}
// Set stdin so we can control it
oldStdin := os.Stdin
os.Stdin = r
defer func() { os.Stdin = oldStdin }()
// Start pausing
complete := make(chan bool, 1)
go func() {
dr := &DebugRunner{Steps: []Step{
&TestStepAcc{Data: "a"},
}}
dr.Run(context.Background(), new(BasicStateBag))
complete <- true
}()
select {
case <-complete:
t.Fatal("shouldn't have completed")
case <-time.After(100 * time.Millisecond):
}
w.Write([]byte("\n\n"))
select {
case <-complete:
case <-time.After(100 * time.Millisecond):
t.Fatal("didn't complete")
}
}

61
helper/multistep/doc.go Normal file
View File

@ -0,0 +1,61 @@
/*
multistep is a Go library for building up complex actions using discrete,
individual "steps." These steps are strung together and run in sequence
to achieve a more complex goal. The runner handles cleanup, cancelling, etc.
if necessary.
## Basic Example
Make a step to perform some action. The step can access your "state",
which is passed between steps by the runner.
```go
type stepAdd struct{}
func (s *stepAdd) Run(ctx context.Context, state multistep.StateBag) multistep.StepAction {
// Read our value and assert that it is they type we want
value := state.Get("value").(int)
fmt.Printf("Value is %d\n", value)
// Store some state back
state.Put("value", value + 1)
return multistep.ActionContinue
}
func (s *stepAdd) Cleanup(multistep.StateBag) {
// This is called after all the steps have run or if the runner is
// cancelled so that cleanup can be performed.
}
```
Make a runner and call your array of Steps.
```go
func main() {
// Our "bag of state" that we read the value from
state := new(multistep.BasicStateBag)
state.Put("value", 0)
steps := []multistep.Step{
&stepAdd{},
&stepAdd{},
&stepAdd{},
}
runner := &multistep.BasicRunner{Steps: steps}
// Executes the steps
runner.Run(context.Background(), state)
}
```
This will produce:
```
Value is 0
Value is 1
Value is 2
```
*/
package multistep

View File

@ -0,0 +1,48 @@
// multistep is a library for building up complex actions using individual,
// discrete steps.
package multistep
// A StepAction determines the next step to take regarding multi-step actions.
type StepAction uint
const (
ActionContinue StepAction = iota
ActionHalt
)
// This is the key set in the state bag when using the basic runner to
// signal that the step sequence was cancelled.
const StateCancelled = "cancelled"
// This is the key set in the state bag when a step halted the sequence.
const StateHalted = "halted"
// Step is a single step that is part of a potentially large sequence
// of other steps, responsible for performing some specific action.
type Step interface {
// Run is called to perform the action. The parameter is a "state bag"
// of untyped things. Please be very careful about type-checking the
// items in this bag.
//
// The return value determines whether multi-step sequences continue
// or should halt.
Run(StateBag) StepAction
// Cleanup is called in reverse order of the steps that have run
// and allow steps to clean up after themselves. Do not assume if this
// ran that the entire multi-step sequence completed successfully. This
// method can be ran in the face of errors and cancellations as well.
//
// The parameter is the same "state bag" as Run, and represents the
// state at the latest possible time prior to calling Cleanup.
Cleanup(StateBag)
}
// Runner is a thing that runs one or more steps.
type Runner interface {
// Run runs the steps with the given initial state.
Run(StateBag)
// Cancel cancels a potentially running stack of steps.
Cancel()
}

View File

@ -0,0 +1,75 @@
package multistep
import "golang.org/x/net/context"
// A step for testing that accumuluates data into a string slice in the
// the state bag. It always uses the "data" key in the state bag, and will
// initialize it.
type TestStepAcc struct {
// The data inserted into the state bag.
Data string
// If true, it will halt at the step when it is run
Halt bool
}
// A step that syncs by sending a channel and expecting a response.
type TestStepSync struct {
Ch chan chan bool
}
// A step that sleeps forever
type TestStepWaitForever struct {
}
// A step that manually flips state to cancelling in run
type TestStepInjectCancel struct {
}
func (s TestStepAcc) Run(_ context.Context, state StateBag) StepAction {
s.insertData(state, "data")
if s.Halt {
return ActionHalt
}
return ActionContinue
}
func (s TestStepAcc) Cleanup(state StateBag) {
s.insertData(state, "cleanup")
}
func (s TestStepAcc) insertData(state StateBag, key string) {
if _, ok := state.GetOk(key); !ok {
state.Put(key, make([]string, 0, 5))
}
data := state.Get(key).([]string)
data = append(data, s.Data)
state.Put(key, data)
}
func (s TestStepSync) Run(context.Context, StateBag) StepAction {
ch := make(chan bool)
s.Ch <- ch
<-ch
return ActionContinue
}
func (s TestStepSync) Cleanup(StateBag) {}
func (s TestStepWaitForever) Run(context.Context, StateBag) StepAction {
select {}
}
func (s TestStepWaitForever) Cleanup(StateBag) {}
func (s TestStepInjectCancel) Run(_ context.Context, state StateBag) StepAction {
r := state.Get("runner").(*BasicRunner)
r.state = stateCancelling
return ActionContinue
}
func (s TestStepInjectCancel) Cleanup(StateBag) {}

View File

@ -0,0 +1,48 @@
package multistep
import (
"context"
"sync"
)
// StateBag implements StateBag by using a normal map underneath
// protected by a RWMutex.
type StateBag struct {
data map[string]interface{}
l sync.RWMutex
once sync.Once
ctx context.Context
}
func (b *StateBag) Context() context.Context {
if b.ctx != nil {
return b.ctx
}
return context.Background()
}
func (b *StateBag) Get(k string) interface{} {
result, _ := b.GetOk(k)
return result
}
func (b *StateBag) GetOk(k string) (interface{}, bool) {
b.l.RLock()
defer b.l.RUnlock()
result, ok := b.data[k]
return result, ok
}
func (b *StateBag) Put(k string, v interface{}) {
b.l.Lock()
defer b.l.Unlock()
// Make sure the map is initialized one time, on write
b.once.Do(func() {
b.data = make(map[string]interface{})
})
// Write the data
b.data[k] = v
}

View File

@ -0,0 +1,30 @@
package multistep
import (
"testing"
)
func TestBasicStateBag_ImplRunner(t *testing.T) {
var raw interface{}
raw = &BasicStateBag{}
if _, ok := raw.(StateBag); !ok {
t.Fatalf("must be a StateBag")
}
}
func TestBasicStateBag(t *testing.T) {
b := new(BasicStateBag)
if b.Get("foo") != nil {
t.Fatalf("bad: %#v", b.Get("foo"))
}
if _, ok := b.GetOk("foo"); ok {
t.Fatal("should not have foo")
}
b.Put("foo", "bar")
if b.Get("foo").(string) != "bar" {
t.Fatalf("bad")
}
}