Compare commits

...

3 Commits

Author SHA1 Message Date
Megan Marsh 77137ef541 make progress bar POC for pause 2021-01-26 14:02:47 -08:00
Megan Marsh 703b17d47e tinkering 2021-01-26 11:32:51 -08:00
teddylear 32cc0fb222 Adding update to pause provisioner every 10 seconds if pause is greater
than 10 seconds
2021-01-24 21:10:51 -05:00
3 changed files with 134 additions and 10 deletions

View File

@ -5,7 +5,9 @@ package packer
import (
"io"
"path/filepath"
"strings"
"sync"
"time"
pb "github.com/cheggaaa/pb"
)
@ -25,11 +27,66 @@ type UiProgressBar struct {
func (p *UiProgressBar) TrackProgress(src string, currentSize, totalSize int64, stream io.ReadCloser) io.ReadCloser {
if p == nil {
return stream
}
p.lock.Lock()
defer p.lock.Unlock()
// custom prefix used to track configured waits rather than file downloads
// TODO: next time we feel we can justify a breaking interface change, this
// deserves its own method on the progress bar rather than this hacked
// workaround.
if strings.Contains(src, "-packerwaiter-") {
realPrefix := strings.Replace(src, "-packerwaiter-", "", -1)
return p.TrackProgressWait(realPrefix, currentSize, totalSize, stream)
}
return p.TrackProgressFile(src, currentSize, totalSize, stream)
}
func (p *UiProgressBar) TrackProgressWait(src string, currentSize, totalSize int64, stream io.ReadCloser) io.ReadCloser {
newPb := pb.New64(totalSize)
newPb.SetUnits(pb.U_DURATION)
newPb.ShowPercent = false
newPb.Prefix(src)
if p.pool == nil {
pool := pb.NewPool()
err := pool.Start()
if err != nil {
// here, we probably cannot lock
// stdout, so let's just return
// stream to avoid any error.
return stream
}
p.pool = pool
}
p.pool.Add(newPb)
p.pbs++
return &readCloser{
Reader: nil,
close: func() error {
for i := currentSize; i < totalSize; i++ {
newPb.Increment()
time.Sleep(time.Second)
}
newPb.Finish()
p.lock.Lock()
defer p.lock.Unlock()
p.pbs--
if p.pbs <= 0 {
p.pool.Stop()
p.pool = nil
}
return nil
},
}
}
func (p *UiProgressBar) TrackProgressFile(src string, currentSize, totalSize int64, stream io.ReadCloser) io.ReadCloser {
newPb := pb.New64(totalSize)
newPb.Set64(currentSize)
ProgressBarConfig(newPb, filepath.Base(src))

View File

@ -3,7 +3,9 @@ package packer
import (
"context"
"fmt"
"io/ioutil"
"log"
"math"
"sync"
"time"
@ -145,17 +147,33 @@ func (p *PausedProvisioner) Prepare(raws ...interface{}) error {
func (p *PausedProvisioner) Provision(ctx context.Context, ui packersdk.Ui, comm packersdk.Communicator, generatedData map[string]interface{}) error {
// Use a select to determine if we get cancelled during the wait
ui.Say(fmt.Sprintf("Pausing %s before the next provisioner...", p.PauseBefore))
select {
case <-time.After(p.PauseBefore):
case <-ctx.Done():
return ctx.Err()
minimumTimeForPauseUpdate := float64(10)
if p.PauseBefore.Seconds() < minimumTimeForPauseUpdate {
// Use a select to determine if we get cancelled during the wait
select {
case <-time.After(p.PauseBefore):
case <-ctx.Done():
return ctx.Err()
}
} else {
err := p.updatesWhilePausing(ctx, ui)
if err != nil {
return err
}
}
return p.Provisioner.Provision(ctx, ui, comm, generatedData)
}
func (p *PausedProvisioner) updatesWhilePausing(ctx context.Context, ui packersdk.Ui) error {
TotalTime := int64(math.Round(p.PauseBefore.Seconds()))
ElapsedTime := int64(0)
pbCloser := ui.TrackProgress("-packerwaiter-Pausing...", ElapsedTime, TotalTime, ioutil.NopCloser(nil))
pbCloser.Close() // this is blocking
return nil
}
// RetriedProvisioner is a Provisioner implementation that retries
// the provisioner whenever there's an error.
type RetriedProvisioner struct {

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math"
"testing"
"time"
@ -129,11 +130,8 @@ func TestPausedProvisionerProvision(t *testing.T) {
}
}
func TestPausedProvisionerProvision_waits(t *testing.T) {
startTime := time.Now()
waitTime := 50 * time.Millisecond
prov := &PausedProvisioner{
func pausedTestProvisionor(startTime time.Time, waitTime time.Duration) *PausedProvisioner {
return &PausedProvisioner{
PauseBefore: waitTime,
Provisioner: &packersdk.MockProvisioner{
ProvFunc: func(context.Context) error {
@ -145,7 +143,13 @@ func TestPausedProvisionerProvision_waits(t *testing.T) {
},
},
}
}
func TestPausedProvisionerProvision_waits(t *testing.T) {
startTime := time.Now()
waitTime := 50 * time.Millisecond
prov := pausedTestProvisionor(startTime, waitTime)
err := prov.Provision(context.Background(), testUi(), new(packersdk.MockCommunicator), make(map[string]interface{}))
if err != nil {
@ -153,6 +157,51 @@ func TestPausedProvisionerProvision_waits(t *testing.T) {
}
}
func TestPausedProvisionerProvision_waits_with_updates(t *testing.T) {
startTime := time.Now()
waitTime := 30 * time.Second
prov := pausedTestProvisionor(startTime, waitTime)
ui := new(packersdk.MockUi)
currentTime := time.Now()
err := prov.Provision(context.Background(), ui, new(packersdk.MockCommunicator), make(map[string]interface{}))
if err != nil {
t.Fatalf("prov failed: %v", err)
}
// TODO have to put check to get timestamp of when these messages were posted and verify that this is working as intended
expectedMessages := []string{
fmt.Sprintf("Pausing %s before the next provisioner...", waitTime),
"20 seconds left until the next provisioner",
"10 seconds left until the next provisioner",
}
if ui.SayMessages[0].Message != expectedMessages[0] {
t.Fatalf("expected: %s, got: %s", expectedMessages[0], ui.SayMessages[0].Message)
}
lastTime := currentTime
for index, message := range expectedMessages {
// Skiping first message as this has already been verified
if index == 0 {
continue
}
if ui.SayMessages[index].Message != message {
t.Fatalf("expected: %s, got: %s", message, ui.SayMessages[index].Message)
}
waitTimeBetweenMessages := math.Round(ui.SayMessages[index].SayTime.Sub(lastTime).Seconds())
if waitTimeBetweenMessages != 10 {
t.Fatalf("Did not wait the appropriate amount of time message: %v", ui.SayMessages[index].Message)
}
// setting last time to current SayTime
lastTime = ui.SayMessages[index].SayTime
}
}
func TestPausedProvisionerCancel(t *testing.T) {
topCtx, cancelTopCtx := context.WithCancel(context.Background())