Merge pull request #7501 from wandel/limit-parallel

limit number of builds running in parallel & test BuildCommand more
This commit is contained in:
Megan Marsh 2019-05-07 10:09:07 -07:00 committed by GitHub
commit 7e91a3c8ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 509 additions and 66 deletions

View File

@ -27,7 +27,7 @@ build_script:
- git rev-parse HEAD
# go test $(go list ./... | grep -v vendor)
- ps: |
go.exe test -v -timeout=2m (go.exe list ./... `
go.exe test -timeout=2m (go.exe list ./... `
|? { -not $_.Contains('/vendor/') } `
|? { $_ -ne 'github.com/hashicorp/packer/builder/parallels/common' } `
|? { $_ -ne 'github.com/hashicorp/packer/provisioner/ansible' })

View File

@ -5,6 +5,7 @@ import (
"context"
"fmt"
"log"
"math"
"os"
"os/signal"
"strconv"
@ -15,6 +16,7 @@ import (
"github.com/hashicorp/packer/helper/enumflag"
"github.com/hashicorp/packer/packer"
"github.com/hashicorp/packer/template"
"golang.org/x/sync/semaphore"
"github.com/posener/complete"
)
@ -24,31 +26,82 @@ type BuildCommand struct {
}
func (c *BuildCommand) Run(args []string) int {
var cfgColor, cfgDebug, cfgForce, cfgTimestamp, cfgParallel bool
var cfgOnError string
buildCtx, cancelBuildCtx := context.WithCancel(context.Background())
// Handle interrupts for this build
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
defer func() {
cancelBuildCtx()
signal.Stop(sigCh)
close(sigCh)
}()
go func() {
select {
case sig := <-sigCh:
if sig == nil {
// context got cancelled and this closed chan probably
// triggered first
return
}
c.Ui.Error(fmt.Sprintf("Cancelling build after receiving %s", sig))
cancelBuildCtx()
case <-buildCtx.Done():
}
}()
return c.RunContext(buildCtx, args)
}
type Config struct {
Color, Debug, Force, Timestamp bool
ParallelBuilds int64
OnError string
Path string
}
func (c *BuildCommand) ParseArgs(args []string) (Config, int) {
var cfg Config
var parallel bool
flags := c.Meta.FlagSet("build", FlagSetBuildFilter|FlagSetVars)
flags.Usage = func() { c.Ui.Say(c.Help()) }
flags.BoolVar(&cfgColor, "color", true, "")
flags.BoolVar(&cfgDebug, "debug", false, "")
flags.BoolVar(&cfgForce, "force", false, "")
flags.BoolVar(&cfgTimestamp, "timestamp-ui", false, "")
flagOnError := enumflag.New(&cfgOnError, "cleanup", "abort", "ask")
flags.BoolVar(&cfg.Color, "color", true, "")
flags.BoolVar(&cfg.Debug, "debug", false, "")
flags.BoolVar(&cfg.Force, "force", false, "")
flags.BoolVar(&cfg.Timestamp, "timestamp-ui", false, "")
flagOnError := enumflag.New(&cfg.OnError, "cleanup", "abort", "ask")
flags.Var(flagOnError, "on-error", "")
flags.BoolVar(&cfgParallel, "parallel", true, "")
flags.BoolVar(&parallel, "parallel", true, "")
flags.Int64Var(&cfg.ParallelBuilds, "parallel-builds", 0, "")
if err := flags.Parse(args); err != nil {
return 1
return cfg, 1
}
if parallel == false && cfg.ParallelBuilds == 0 {
cfg.ParallelBuilds = 1
}
if cfg.ParallelBuilds < 1 {
cfg.ParallelBuilds = math.MaxInt64
}
args = flags.Args()
if len(args) != 1 {
flags.Usage()
return 1
return cfg, 1
}
cfg.Path = args[0]
return cfg, 0
}
func (c *BuildCommand) RunContext(buildCtx context.Context, args []string) int {
cfg, ret := c.ParseArgs(args)
if ret != 0 {
return ret
}
// Parse the template
var tpl *template.Template
var err error
tpl, err = template.ParseFile(args[0])
tpl, err = template.ParseFile(cfg.Path)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to parse template: %s", err))
return 1
@ -76,7 +129,7 @@ func (c *BuildCommand) Run(args []string) int {
builds = append(builds, b)
}
if cfgDebug {
if cfg.Debug {
c.Ui.Say("Debug mode enabled. Builds will not be parallelized.")
}
@ -92,7 +145,7 @@ func (c *BuildCommand) Run(args []string) int {
for i, b := range buildNames {
var ui packer.Ui
ui = c.Ui
if cfgColor {
if cfg.Color {
ui = &packer.ColoredUi{
Color: colors[i%len(colors)],
Ui: ui,
@ -104,7 +157,7 @@ func (c *BuildCommand) Run(args []string) int {
c.Ui.Say("")
}
// Now add timestamps if requested
if cfgTimestamp {
if cfg.Timestamp {
ui = &packer.TimestampedUi{
Ui: ui,
}
@ -115,16 +168,16 @@ func (c *BuildCommand) Run(args []string) int {
buildUis[b] = ui
}
log.Printf("Build debug mode: %v", cfgDebug)
log.Printf("Force build: %v", cfgForce)
log.Printf("On error: %v", cfgOnError)
log.Printf("Build debug mode: %v", cfg.Debug)
log.Printf("Force build: %v", cfg.Force)
log.Printf("On error: %v", cfg.OnError)
// Set the debug and force mode and prepare all the builds
for _, b := range builds {
log.Printf("Preparing build: %s", b.Name())
b.SetDebug(cfgDebug)
b.SetForce(cfgForce)
b.SetOnError(cfgOnError)
b.SetDebug(cfg.Debug)
b.SetForce(cfg.Force)
b.SetOnError(cfg.OnError)
warnings, err := b.Prepare()
if err != nil {
@ -142,68 +195,68 @@ func (c *BuildCommand) Run(args []string) int {
}
// Run all the builds in parallel and wait for them to complete
var interruptWg, wg sync.WaitGroup
interrupted := false
var wg sync.WaitGroup
var artifacts = struct {
sync.RWMutex
m map[string][]packer.Artifact
}{m: make(map[string][]packer.Artifact)}
errors := make(map[string]error)
// ctx := context.Background()
for _, b := range builds {
var errors = struct {
sync.RWMutex
m map[string]error
}{m: make(map[string]error)}
limitParallel := semaphore.NewWeighted(cfg.ParallelBuilds)
for i := range builds {
if err := buildCtx.Err(); err != nil {
log.Println("Interrupted, not going to start any more builds.")
break
}
b := builds[i]
name := b.Name()
ui := buildUis[name]
if err := limitParallel.Acquire(buildCtx, 1); err != nil {
ui.Error(fmt.Sprintf("Build '%s' failed to acquire semaphore: %s", name, err))
errors.Lock()
errors.m[name] = err
errors.Unlock()
break
}
// Increment the waitgroup so we wait for this item to finish properly
wg.Add(1)
buildCtx, cancelCtx := context.WithCancel(context.Background())
// Handle interrupts for this build
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
defer signal.Stop(sigCh)
go func(b packer.Build) {
sig := <-sigCh
interruptWg.Add(1)
defer interruptWg.Done()
interrupted = true
log.Printf("Stopping build: %s after receiving %s", b.Name(), sig)
cancelCtx()
log.Printf("Build cancelled: %s", b.Name())
}(b)
// Run the build in a goroutine
go func(b packer.Build) {
go func() {
defer wg.Done()
name := b.Name()
defer limitParallel.Release(1)
log.Printf("Starting build run: %s", name)
ui := buildUis[name]
runArtifacts, err := b.Run(buildCtx, ui)
if err != nil {
ui.Error(fmt.Sprintf("Build '%s' errored: %s", name, err))
errors[name] = err
errors.Lock()
errors.m[name] = err
errors.Unlock()
} else {
ui.Say(fmt.Sprintf("Build '%s' finished.", name))
artifacts.Lock()
artifacts.m[name] = runArtifacts
artifacts.Unlock()
}
}(b)
}()
if cfgDebug {
if cfg.Debug {
log.Printf("Debug enabled, so waiting for build to finish: %s", b.Name())
wg.Wait()
}
if !cfgParallel {
if cfg.ParallelBuilds == 1 {
log.Printf("Parallelization disabled, waiting for build to finish: %s", b.Name())
wg.Wait()
}
if interrupted {
log.Println("Interrupted, not going to start any more builds.")
break
}
}
// Wait for both the builds to complete and the interrupt handler,
@ -211,19 +264,16 @@ func (c *BuildCommand) Run(args []string) int {
log.Printf("Waiting on builds to complete...")
wg.Wait()
log.Printf("Builds completed. Waiting on interrupt barrier...")
interruptWg.Wait()
if interrupted {
if err := buildCtx.Err(); err != nil {
c.Ui.Say("Cleanly cancelled builds after being interrupted.")
return 1
}
if len(errors) > 0 {
c.Ui.Machine("error-count", strconv.FormatInt(int64(len(errors)), 10))
if len(errors.m) > 0 {
c.Ui.Machine("error-count", strconv.FormatInt(int64(len(errors.m)), 10))
c.Ui.Error("\n==> Some builds didn't complete successfully and had errors:")
for name, err := range errors {
for name, err := range errors.m {
// Create a UI for the machine readable stuff to be targeted
ui := &packer.TargetedUI{
Target: name,
@ -284,7 +334,7 @@ func (c *BuildCommand) Run(args []string) int {
c.Ui.Say("\n==> Builds finished but no artifacts were created.")
}
if len(errors) > 0 {
if len(errors.m) > 0 {
// If any errors occurred, exit with a non-zero exit status
return 1
}
@ -308,7 +358,8 @@ Options:
-force Force a build to continue if artifacts exist, deletes existing artifacts.
-machine-readable Produce machine-readable output.
-on-error=[cleanup|abort|ask] If the build fails do: clean up (default), abort, or ask.
-parallel=false Disable parallelization. (Default: parallel)
-parallel=false Disable parallelization. (Default: true)
-parallel-builds=1 Number of builds to run in parallel. 0 means no limit (Default: 0)
-timestamp-ui Enable prefixing of each ui output with an RFC3339 timestamp.
-var 'key=value' Variable for templates, can be used multiple times.
-var-file=path JSON file containing user variables.

View File

@ -0,0 +1,83 @@
package command
import (
"context"
"path/filepath"
"testing"
"time"
"github.com/google/go-cmp/cmp"
)
func TestBuildCommand_RunContext_CtxCancel(t *testing.T) {
tests := []struct {
name string
args []string
parallelPassingTests int
expected int
}{
{"cancel 1 pending build - parallel=true",
[]string{"-parallel=true", filepath.Join(testFixture("parallel"), "1lock-5wg.json")},
5,
1,
},
{"cancel in the middle with 2 pending builds - parallel=true",
[]string{"-parallel=true", filepath.Join(testFixture("parallel"), "2lock-4wg.json")},
4,
1,
},
{"cancel 1 locked build - debug - parallel=true",
[]string{"-parallel=true", "-debug=true", filepath.Join(testFixture("parallel"), "1lock.json")},
0,
1,
},
{"cancel 2 locked builds - debug - parallel=true",
[]string{"-parallel=true", "-debug=true", filepath.Join(testFixture("parallel"), "2lock.json")},
0,
1,
},
{"cancel 1 locked build - debug - parallel=false",
[]string{"-parallel=false", "-debug=true", filepath.Join(testFixture("parallel"), "1lock.json")},
0,
1,
},
{"cancel 2 locked builds - debug - parallel=false",
[]string{"-parallel=false", "-debug=true", filepath.Join(testFixture("parallel"), "2lock.json")},
0,
1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
b := NewParallelTestBuilder(tt.parallelPassingTests)
locked := &LockedBuilder{unlock: make(chan interface{})}
c := &BuildCommand{
Meta: testMetaParallel(t, b, locked),
}
ctx, cancelCtx := context.WithCancel(context.Background())
codeC := make(chan int)
go func() {
defer close(codeC)
codeC <- c.RunContext(ctx, tt.args)
}()
t.Logf("waiting for passing tests if any")
b.wg.Wait() // ran `tt.parallelPassingTests` times
t.Logf("cancelling context")
cancelCtx()
select {
case code := <-codeC:
if code != tt.expected {
t.Logf("wrong code: %s", cmp.Diff(code, tt.expected))
fatalCommand(t, c.Meta)
}
case <-time.After(15 * time.Second):
t.Fatal("deadlock")
}
})
}
}

View File

@ -0,0 +1,174 @@
package command
import (
"bytes"
"context"
"fmt"
"path/filepath"
"sync"
"testing"
"golang.org/x/sync/errgroup"
"github.com/hashicorp/packer/builder/file"
"github.com/hashicorp/packer/packer"
"github.com/hashicorp/packer/provisioner/sleep"
)
// NewParallelTestBuilder will return a New ParallelTestBuilder that will
// unlock after `runs` builds
func NewParallelTestBuilder(runs int) *ParallelTestBuilder {
pb := &ParallelTestBuilder{}
pb.wg.Add(runs)
return pb
}
// The ParallelTestBuilder's first run will lock
type ParallelTestBuilder struct {
wg sync.WaitGroup
}
func (b *ParallelTestBuilder) Prepare(raws ...interface{}) ([]string, error) { return nil, nil }
func (b *ParallelTestBuilder) Run(ctx context.Context, ui packer.Ui, hook packer.Hook) (packer.Artifact, error) {
ui.Say("building")
b.wg.Done()
return nil, nil
}
// LockedBuilder wont run until unlock is called
type LockedBuilder struct{ unlock chan interface{} }
func (b *LockedBuilder) Prepare(raws ...interface{}) ([]string, error) { return nil, nil }
func (b *LockedBuilder) Run(ctx context.Context, ui packer.Ui, hook packer.Hook) (packer.Artifact, error) {
ui.Say("locking build")
select {
case <-b.unlock:
case <-ctx.Done():
return nil, ctx.Err()
}
return nil, nil
}
// testMetaFile creates a Meta object that includes a file builder
func testMetaParallel(t *testing.T, builder *ParallelTestBuilder, locked *LockedBuilder) Meta {
var out, err bytes.Buffer
return Meta{
CoreConfig: &packer.CoreConfig{
Components: packer.ComponentFinder{
Builder: func(n string) (packer.Builder, error) {
switch n {
case "parallel-test":
return builder, nil
case "file":
return &file.Builder{}, nil
case "lock":
return locked, nil
default:
panic(n)
}
},
Provisioner: func(n string) (packer.Provisioner, error) {
switch n {
case "sleep":
return &sleep.Provisioner{}, nil
default:
panic(n)
}
},
},
},
Ui: &packer.BasicUi{
Writer: &out,
ErrorWriter: &err,
},
}
}
func TestBuildParallel_1(t *testing.T) {
// testfile has 6 builds, with first one locks 'forever', other builds
// should go through.
b := NewParallelTestBuilder(5)
locked := &LockedBuilder{unlock: make(chan interface{})}
c := &BuildCommand{
Meta: testMetaParallel(t, b, locked),
}
args := []string{
fmt.Sprintf("-parallel=true"),
filepath.Join(testFixture("parallel"), "1lock-5wg.json"),
}
wg := errgroup.Group{}
wg.Go(func() error {
if code := c.Run(args); code != 0 {
fatalCommand(t, c.Meta)
}
return nil
})
b.wg.Wait() // ran 5 times
close(locked.unlock) // unlock locking one
wg.Wait() // wait for termination
}
func TestBuildParallel_2(t *testing.T) {
// testfile has 6 builds, 2 of them lock 'forever', other builds
// should go through.
b := NewParallelTestBuilder(4)
locked := &LockedBuilder{unlock: make(chan interface{})}
c := &BuildCommand{
Meta: testMetaParallel(t, b, locked),
}
args := []string{
fmt.Sprintf("-parallel-builds=3"),
filepath.Join(testFixture("parallel"), "2lock-4wg.json"),
}
wg := errgroup.Group{}
wg.Go(func() error {
if code := c.Run(args); code != 0 {
fatalCommand(t, c.Meta)
}
return nil
})
b.wg.Wait() // ran 4 times
close(locked.unlock) // unlock locking one
wg.Wait() // wait for termination
}
func TestBuildParallel_Timeout(t *testing.T) {
// testfile has 6 builds, 1 of them locks 'forever', one locks and times
// out other builds should go through.
b := NewParallelTestBuilder(4)
locked := &LockedBuilder{unlock: make(chan interface{})}
c := &BuildCommand{
Meta: testMetaParallel(t, b, locked),
}
args := []string{
fmt.Sprintf("-parallel-builds=3"),
filepath.Join(testFixture("parallel"), "2lock-timeout.json"),
}
wg := errgroup.Group{}
wg.Go(func() error {
if code := c.Run(args); code == 0 {
fatalCommand(t, c.Meta)
}
return nil
})
b.wg.Wait() // ran 4 times
close(locked.unlock) // unlock locking one
wg.Wait() // wait for termination
}

View File

@ -2,10 +2,13 @@ package command
import (
"bytes"
"fmt"
"math"
"os"
"path/filepath"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/hashicorp/packer/builder/file"
"github.com/hashicorp/packer/packer"
shell_local "github.com/hashicorp/packer/post-processor/shell-local"
@ -210,3 +213,79 @@ func cleanup() {
os.RemoveAll("lilas.txt")
os.RemoveAll("campanules.txt")
}
func TestBuildCommand_ParseArgs(t *testing.T) {
defaultMeta := testMetaFile(t)
type fields struct {
Meta Meta
}
type args struct {
args []string
}
tests := []struct {
fields fields
args args
wantCfg Config
wantExitCode int
}{
{fields{defaultMeta},
args{[]string{"file.json"}},
Config{
Path: "file.json",
ParallelBuilds: math.MaxInt64,
Color: true,
},
0,
},
{fields{defaultMeta},
args{[]string{"-parallel=true", "file.json"}},
Config{
Path: "file.json",
ParallelBuilds: math.MaxInt64,
Color: true,
},
0,
},
{fields{defaultMeta},
args{[]string{"-parallel=false", "file.json"}},
Config{
Path: "file.json",
ParallelBuilds: 1,
Color: true,
},
0,
},
{fields{defaultMeta},
args{[]string{"-parallel-builds=5", "file.json"}},
Config{
Path: "file.json",
ParallelBuilds: 5,
Color: true,
},
0,
},
{fields{defaultMeta},
args{[]string{"-parallel=false", "-parallel-builds=5", "otherfile.json"}},
Config{
Path: "otherfile.json",
ParallelBuilds: 5,
Color: true,
},
0,
},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%s", tt.args.args), func(t *testing.T) {
c := &BuildCommand{
Meta: tt.fields.Meta,
}
gotCfg, gotExitCode := c.ParseArgs(tt.args.args)
if diff := cmp.Diff(gotCfg, tt.wantCfg); diff != "" {
t.Fatalf("BuildCommand.ParseArgs() unexpected cfg %s", diff)
}
if gotExitCode != tt.wantExitCode {
t.Fatalf("BuildCommand.ParseArgs() gotExitCode = %v, want %v", gotExitCode, tt.wantExitCode)
}
})
}
}

View File

@ -0,0 +1,10 @@
{
"builders": [
{"type": "lock", "name": "build0"},
{"type": "parallel-test", "name": "build1"},
{"type": "parallel-test", "name": "build2"},
{"type": "parallel-test", "name": "build3"},
{"type": "parallel-test", "name": "build4"},
{"type": "parallel-test", "name": "build5"}
]
}

View File

@ -0,0 +1,5 @@
{
"builders": [
{"type": "lock", "name": "build0"}
]
}

View File

@ -0,0 +1,10 @@
{
"builders": [
{"type": "lock", "name": "build0"},
{"type": "parallel-test", "name": "build1"},
{"type": "parallel-test", "name": "build2"},
{"type": "lock", "name": "build3"},
{"type": "parallel-test", "name": "build4"},
{"type": "parallel-test", "name": "build5"}
]
}

View File

@ -0,0 +1,19 @@
{
"builders": [
{"type": "lock", "name": "build0"},
{"type": "parallel-test", "name": "build1"},
{"type": "parallel-test", "name": "build2"},
{"type": "file", "name": "timeout-build", "target": "roses.txt"},
{"type": "parallel-test", "name": "build4"},
{"type": "parallel-test", "name": "build5"}
],
"provisioners": [
{
"only": ["timeout-build"],
"type": "sleep",
"duration": "2m",
"timeout": "1ns"
}
]
}

View File

@ -0,0 +1,6 @@
{
"builders": [
{"type": "lock", "name": "build0"},
{"type": "lock", "name": "build1"}
]
}

View File

@ -17,7 +17,8 @@ _packer () {
'-except=[(foo,bar,baz) Run all builds and post-procesors other than these.]'
'-on-error=[(cleanup,abort,ask) If the build fails do: clean up (default), abort, or ask.]'
'-only=[(foo,bar,baz) Only build the given builds by name.]'
'-parallel=[(false) Disable parallelization. (Default: parallel)]'
'-parallel=[(false) Disable parallelization. (Default: false)]'
'-parallel-builds=[(0) Number of builds to run in parallel. (Defaults to infinite: 0)]'
'-var[("key=value") Variable for templates, can be used multiple times.]'
'-var-file=[(path) JSON file containing user variables.]'
'(-)*:files:_files -g "*.json"'

2
vendor/modules.txt vendored
View File

@ -526,8 +526,8 @@ golang.org/x/oauth2/jwt
golang.org/x/oauth2/internal
golang.org/x/oauth2/jws
# golang.org/x/sync v0.0.0-20190423024810-112230192c58
golang.org/x/sync/errgroup
golang.org/x/sync/semaphore
golang.org/x/sync/errgroup
# golang.org/x/sys v0.0.0-20190425145619-16072639606e
golang.org/x/sys/unix
golang.org/x/sys/cpu

View File

@ -52,8 +52,13 @@ artifacts that are created will be outputted at the end of the build.
attribute is specified within the configuration. `-only` does not apply to
post-processors.
- `-parallel=false` - Disable parallelization of multiple builders (on by
default).
- `-parallel=false` - /!\ Deprecated, use `-parallel-builds=1` instead,
setting `-parallel-builds=N` to more that 0 will ignore the `-parallel`
setting. Set `-parallel=false` to disable parallelization of multiple
builders (on by default).
- `-parallel-builds=N` - Limit the number of builds to run in parallel, 0
means no limit (defaults to 0).
- `-timestamp-ui` - Enable prefixing of each ui output with an RFC3339
timestamp.