Compare commits
3 Commits
master
...
use_progre
Author | SHA1 | Date | |
---|---|---|---|
|
77137ef541 | ||
|
703b17d47e | ||
|
32cc0fb222 |
@ -5,7 +5,9 @@ package packer
|
|||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
pb "github.com/cheggaaa/pb"
|
pb "github.com/cheggaaa/pb"
|
||||||
)
|
)
|
||||||
@ -25,11 +27,66 @@ type UiProgressBar struct {
|
|||||||
|
|
||||||
func (p *UiProgressBar) TrackProgress(src string, currentSize, totalSize int64, stream io.ReadCloser) io.ReadCloser {
|
func (p *UiProgressBar) TrackProgress(src string, currentSize, totalSize int64, stream io.ReadCloser) io.ReadCloser {
|
||||||
if p == nil {
|
if p == nil {
|
||||||
|
|
||||||
return stream
|
return stream
|
||||||
}
|
}
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
defer p.lock.Unlock()
|
defer p.lock.Unlock()
|
||||||
|
// custom prefix used to track configured waits rather than file downloads
|
||||||
|
// TODO: next time we feel we can justify a breaking interface change, this
|
||||||
|
// deserves its own method on the progress bar rather than this hacked
|
||||||
|
// workaround.
|
||||||
|
if strings.Contains(src, "-packerwaiter-") {
|
||||||
|
realPrefix := strings.Replace(src, "-packerwaiter-", "", -1)
|
||||||
|
return p.TrackProgressWait(realPrefix, currentSize, totalSize, stream)
|
||||||
|
}
|
||||||
|
return p.TrackProgressFile(src, currentSize, totalSize, stream)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *UiProgressBar) TrackProgressWait(src string, currentSize, totalSize int64, stream io.ReadCloser) io.ReadCloser {
|
||||||
|
newPb := pb.New64(totalSize)
|
||||||
|
newPb.SetUnits(pb.U_DURATION)
|
||||||
|
newPb.ShowPercent = false
|
||||||
|
newPb.Prefix(src)
|
||||||
|
|
||||||
|
if p.pool == nil {
|
||||||
|
|
||||||
|
pool := pb.NewPool()
|
||||||
|
err := pool.Start()
|
||||||
|
if err != nil {
|
||||||
|
// here, we probably cannot lock
|
||||||
|
// stdout, so let's just return
|
||||||
|
// stream to avoid any error.
|
||||||
|
return stream
|
||||||
|
}
|
||||||
|
p.pool = pool
|
||||||
|
}
|
||||||
|
p.pool.Add(newPb)
|
||||||
|
|
||||||
|
p.pbs++
|
||||||
|
return &readCloser{
|
||||||
|
Reader: nil,
|
||||||
|
close: func() error {
|
||||||
|
|
||||||
|
for i := currentSize; i < totalSize; i++ {
|
||||||
|
newPb.Increment()
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
}
|
||||||
|
newPb.Finish()
|
||||||
|
p.lock.Lock()
|
||||||
|
defer p.lock.Unlock()
|
||||||
|
p.pbs--
|
||||||
|
if p.pbs <= 0 {
|
||||||
|
p.pool.Stop()
|
||||||
|
p.pool = nil
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *UiProgressBar) TrackProgressFile(src string, currentSize, totalSize int64, stream io.ReadCloser) io.ReadCloser {
|
||||||
newPb := pb.New64(totalSize)
|
newPb := pb.New64(totalSize)
|
||||||
newPb.Set64(currentSize)
|
newPb.Set64(currentSize)
|
||||||
ProgressBarConfig(newPb, filepath.Base(src))
|
ProgressBarConfig(newPb, filepath.Base(src))
|
||||||
|
@ -3,7 +3,9 @@ package packer
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -145,17 +147,33 @@ func (p *PausedProvisioner) Prepare(raws ...interface{}) error {
|
|||||||
|
|
||||||
func (p *PausedProvisioner) Provision(ctx context.Context, ui packersdk.Ui, comm packersdk.Communicator, generatedData map[string]interface{}) error {
|
func (p *PausedProvisioner) Provision(ctx context.Context, ui packersdk.Ui, comm packersdk.Communicator, generatedData map[string]interface{}) error {
|
||||||
|
|
||||||
// Use a select to determine if we get cancelled during the wait
|
|
||||||
ui.Say(fmt.Sprintf("Pausing %s before the next provisioner...", p.PauseBefore))
|
ui.Say(fmt.Sprintf("Pausing %s before the next provisioner...", p.PauseBefore))
|
||||||
select {
|
minimumTimeForPauseUpdate := float64(10)
|
||||||
case <-time.After(p.PauseBefore):
|
if p.PauseBefore.Seconds() < minimumTimeForPauseUpdate {
|
||||||
case <-ctx.Done():
|
// Use a select to determine if we get cancelled during the wait
|
||||||
return ctx.Err()
|
select {
|
||||||
|
case <-time.After(p.PauseBefore):
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err := p.updatesWhilePausing(ctx, ui)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return p.Provisioner.Provision(ctx, ui, comm, generatedData)
|
return p.Provisioner.Provision(ctx, ui, comm, generatedData)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *PausedProvisioner) updatesWhilePausing(ctx context.Context, ui packersdk.Ui) error {
|
||||||
|
TotalTime := int64(math.Round(p.PauseBefore.Seconds()))
|
||||||
|
ElapsedTime := int64(0)
|
||||||
|
pbCloser := ui.TrackProgress("-packerwaiter-Pausing...", ElapsedTime, TotalTime, ioutil.NopCloser(nil))
|
||||||
|
pbCloser.Close() // this is blocking
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// RetriedProvisioner is a Provisioner implementation that retries
|
// RetriedProvisioner is a Provisioner implementation that retries
|
||||||
// the provisioner whenever there's an error.
|
// the provisioner whenever there's an error.
|
||||||
type RetriedProvisioner struct {
|
type RetriedProvisioner struct {
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -129,11 +130,8 @@ func TestPausedProvisionerProvision(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPausedProvisionerProvision_waits(t *testing.T) {
|
func pausedTestProvisionor(startTime time.Time, waitTime time.Duration) *PausedProvisioner {
|
||||||
startTime := time.Now()
|
return &PausedProvisioner{
|
||||||
waitTime := 50 * time.Millisecond
|
|
||||||
|
|
||||||
prov := &PausedProvisioner{
|
|
||||||
PauseBefore: waitTime,
|
PauseBefore: waitTime,
|
||||||
Provisioner: &packersdk.MockProvisioner{
|
Provisioner: &packersdk.MockProvisioner{
|
||||||
ProvFunc: func(context.Context) error {
|
ProvFunc: func(context.Context) error {
|
||||||
@ -145,7 +143,13 @@ func TestPausedProvisionerProvision_waits(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPausedProvisionerProvision_waits(t *testing.T) {
|
||||||
|
startTime := time.Now()
|
||||||
|
waitTime := 50 * time.Millisecond
|
||||||
|
|
||||||
|
prov := pausedTestProvisionor(startTime, waitTime)
|
||||||
err := prov.Provision(context.Background(), testUi(), new(packersdk.MockCommunicator), make(map[string]interface{}))
|
err := prov.Provision(context.Background(), testUi(), new(packersdk.MockCommunicator), make(map[string]interface{}))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -153,6 +157,51 @@ func TestPausedProvisionerProvision_waits(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPausedProvisionerProvision_waits_with_updates(t *testing.T) {
|
||||||
|
startTime := time.Now()
|
||||||
|
waitTime := 30 * time.Second
|
||||||
|
|
||||||
|
prov := pausedTestProvisionor(startTime, waitTime)
|
||||||
|
ui := new(packersdk.MockUi)
|
||||||
|
currentTime := time.Now()
|
||||||
|
err := prov.Provision(context.Background(), ui, new(packersdk.MockCommunicator), make(map[string]interface{}))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("prov failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO have to put check to get timestamp of when these messages were posted and verify that this is working as intended
|
||||||
|
expectedMessages := []string{
|
||||||
|
fmt.Sprintf("Pausing %s before the next provisioner...", waitTime),
|
||||||
|
"20 seconds left until the next provisioner",
|
||||||
|
"10 seconds left until the next provisioner",
|
||||||
|
}
|
||||||
|
|
||||||
|
if ui.SayMessages[0].Message != expectedMessages[0] {
|
||||||
|
t.Fatalf("expected: %s, got: %s", expectedMessages[0], ui.SayMessages[0].Message)
|
||||||
|
}
|
||||||
|
|
||||||
|
lastTime := currentTime
|
||||||
|
for index, message := range expectedMessages {
|
||||||
|
// Skiping first message as this has already been verified
|
||||||
|
if index == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if ui.SayMessages[index].Message != message {
|
||||||
|
t.Fatalf("expected: %s, got: %s", message, ui.SayMessages[index].Message)
|
||||||
|
}
|
||||||
|
|
||||||
|
waitTimeBetweenMessages := math.Round(ui.SayMessages[index].SayTime.Sub(lastTime).Seconds())
|
||||||
|
if waitTimeBetweenMessages != 10 {
|
||||||
|
t.Fatalf("Did not wait the appropriate amount of time message: %v", ui.SayMessages[index].Message)
|
||||||
|
}
|
||||||
|
|
||||||
|
// setting last time to current SayTime
|
||||||
|
lastTime = ui.SayMessages[index].SayTime
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestPausedProvisionerCancel(t *testing.T) {
|
func TestPausedProvisionerCancel(t *testing.T) {
|
||||||
topCtx, cancelTopCtx := context.WithCancel(context.Background())
|
topCtx, cancelTopCtx := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user