contextualize basic runner
This commit is contained in:
parent
a4bf94dd3c
commit
cccbd7f316
|
@ -26,8 +26,8 @@ type BasicRunner struct {
|
|||
l sync.Mutex
|
||||
}
|
||||
|
||||
func (b *BasicRunner) Run(state StateBag) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
func (b *BasicRunner) Run(ctx context.Context, state StateBag) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
b.l.Lock()
|
||||
if b.state != stateIdle {
|
||||
|
@ -82,23 +82,3 @@ func (b *BasicRunner) Run(state StateBag) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *BasicRunner) Cancel() {
|
||||
b.l.Lock()
|
||||
switch b.state {
|
||||
case stateIdle:
|
||||
// Not running, so Cancel is... done.
|
||||
b.l.Unlock()
|
||||
return
|
||||
case stateRunning:
|
||||
// Running, so mark that we cancelled and set the state
|
||||
b.cancel()
|
||||
b.state = stateCancelling
|
||||
fallthrough
|
||||
case stateCancelling:
|
||||
// Already cancelling, so just wait until we're done
|
||||
ch := b.doneCh
|
||||
b.l.Unlock()
|
||||
<-ch
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package multistep
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -20,7 +21,7 @@ func TestBasicRunner_Run(t *testing.T) {
|
|||
stepB := &TestStepAcc{Data: "b"}
|
||||
|
||||
r := &BasicRunner{Steps: []Step{stepA, stepB}}
|
||||
r.Run(data)
|
||||
r.Run(context.Background(), data)
|
||||
|
||||
// Test run data
|
||||
expected := []string{"a", "b"}
|
||||
|
@ -53,7 +54,7 @@ func TestBasicRunner_Run_Halt(t *testing.T) {
|
|||
stepC := &TestStepAcc{Data: "c"}
|
||||
|
||||
r := &BasicRunner{Steps: []Step{stepA, stepB, stepC}}
|
||||
r.Run(data)
|
||||
r.Run(context.Background(), data)
|
||||
|
||||
// Test run data
|
||||
expected := []string{"a", "b"}
|
||||
|
@ -86,12 +87,12 @@ func TestBasicRunner_Run_Run(t *testing.T) {
|
|||
stepWait := &TestStepWaitForever{}
|
||||
r := &BasicRunner{Steps: []Step{stepInt, stepWait}}
|
||||
|
||||
go r.Run(new(BasicStateBag))
|
||||
go r.Run(context.Background(), new(BasicStateBag))
|
||||
// wait until really running
|
||||
<-ch
|
||||
|
||||
// now try to run aain
|
||||
r.Run(new(BasicStateBag))
|
||||
r.Run(context.Background(), new(BasicStateBag))
|
||||
|
||||
// should not get here in nominal codepath
|
||||
t.Errorf("Was able to run an already running BasicRunner")
|
||||
|
@ -107,10 +108,9 @@ func TestBasicRunner_Cancel(t *testing.T) {
|
|||
|
||||
r := &BasicRunner{Steps: []Step{stepA, stepB, stepInt, stepC}}
|
||||
|
||||
// cancelling an idle Runner is a no-op
|
||||
r.Cancel()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
go r.Run(data)
|
||||
go r.Run(ctx, data)
|
||||
|
||||
// Wait until we reach the sync point
|
||||
responseCh := <-ch
|
||||
|
@ -118,7 +118,7 @@ func TestBasicRunner_Cancel(t *testing.T) {
|
|||
// Cancel then continue chain
|
||||
cancelCh := make(chan bool)
|
||||
go func() {
|
||||
r.Cancel()
|
||||
cancel()
|
||||
cancelCh <- true
|
||||
}()
|
||||
|
||||
|
@ -161,7 +161,7 @@ func TestBasicRunner_Cancel_Special(t *testing.T) {
|
|||
|
||||
state := new(BasicStateBag)
|
||||
state.Put("runner", r)
|
||||
r.Run(state)
|
||||
r.Run(context.Background(), state)
|
||||
|
||||
// test that state contains cancelled
|
||||
if _, ok := state.GetOk(StateCancelled); !ok {
|
||||
|
|
|
@ -45,7 +45,7 @@ type DebugRunner struct {
|
|||
runner *BasicRunner
|
||||
}
|
||||
|
||||
func (r *DebugRunner) Run(state StateBag) {
|
||||
func (r *DebugRunner) Run(ctx context.Context, state StateBag) {
|
||||
r.l.Lock()
|
||||
if r.runner != nil {
|
||||
panic("already running")
|
||||
|
@ -78,16 +78,7 @@ func (r *DebugRunner) Run(state StateBag) {
|
|||
|
||||
// Then just use a basic runner to run it
|
||||
r.runner.Steps = steps
|
||||
r.runner.Run(state)
|
||||
}
|
||||
|
||||
func (r *DebugRunner) Cancel() {
|
||||
r.l.Lock()
|
||||
defer r.l.Unlock()
|
||||
|
||||
if r.runner != nil {
|
||||
r.runner.Cancel()
|
||||
}
|
||||
r.runner.Run(ctx, state)
|
||||
}
|
||||
|
||||
// DebugPauseDefault is the default pause function when using the
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package multistep
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
@ -39,7 +40,7 @@ func TestDebugRunner_Run(t *testing.T) {
|
|||
PauseFn: pauseFn,
|
||||
}
|
||||
|
||||
r.Run(data)
|
||||
r.Run(context.Background(), data)
|
||||
|
||||
// Test data
|
||||
expected := []string{"a", "TestStepAcc", "b", "TestStepAcc"}
|
||||
|
@ -66,12 +67,12 @@ func TestDebugRunner_Run_Run(t *testing.T) {
|
|||
stepWait := &TestStepWaitForever{}
|
||||
r := &DebugRunner{Steps: []Step{stepInt, stepWait}}
|
||||
|
||||
go r.Run(new(BasicStateBag))
|
||||
go r.Run(context.Background(), new(BasicStateBag))
|
||||
// wait until really running
|
||||
<-ch
|
||||
|
||||
// now try to run aain
|
||||
r.Run(new(BasicStateBag))
|
||||
r.Run(context.Background(), new(BasicStateBag))
|
||||
|
||||
// should not get here in nominal codepath
|
||||
t.Errorf("Was able to run an already running DebugRunner")
|
||||
|
@ -88,10 +89,9 @@ func TestDebugRunner_Cancel(t *testing.T) {
|
|||
r := &DebugRunner{}
|
||||
r.Steps = []Step{stepA, stepB, stepInt, stepC}
|
||||
|
||||
// cancelling an idle Runner is a no-op
|
||||
r.Cancel()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
go r.Run(data)
|
||||
go r.Run(ctx, data)
|
||||
|
||||
// Wait until we reach the sync point
|
||||
responseCh := <-ch
|
||||
|
@ -99,7 +99,7 @@ func TestDebugRunner_Cancel(t *testing.T) {
|
|||
// Cancel then continue chain
|
||||
cancelCh := make(chan bool)
|
||||
go func() {
|
||||
r.Cancel()
|
||||
cancel()
|
||||
cancelCh <- true
|
||||
}()
|
||||
|
||||
|
@ -154,7 +154,7 @@ func TestDebugPauseDefault(t *testing.T) {
|
|||
dr := &DebugRunner{Steps: []Step{
|
||||
&TestStepAcc{Data: "a"},
|
||||
}}
|
||||
dr.Run(new(BasicStateBag))
|
||||
dr.Run(context.Background(), new(BasicStateBag))
|
||||
complete <- true
|
||||
}()
|
||||
|
||||
|
|
|
@ -44,8 +44,5 @@ type Step interface {
|
|||
// Runner is a thing that runs one or more steps.
|
||||
type Runner interface {
|
||||
// Run runs the steps with the given initial state.
|
||||
Run(StateBag)
|
||||
|
||||
// Cancel cancels a potentially running stack of steps.
|
||||
Cancel()
|
||||
Run(context.Context, StateBag)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue