Merge pull request #8131 from hashicorp/fix_8036

Fix 8036
This commit is contained in:
Megan Marsh 2019-10-01 09:40:11 -07:00 committed by GitHub
commit 4644af7b66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 4437 additions and 407 deletions

2
go.mod
View File

@ -22,7 +22,7 @@ require (
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6 // indirect
github.com/approvals/go-approval-tests v0.0.0-20160714161514-ad96e53bea43
github.com/armon/go-radix v1.0.0 // indirect
github.com/aws/aws-sdk-go v1.22.2
github.com/aws/aws-sdk-go v1.24.1
github.com/biogo/hts v0.0.0-20160420073057-50da7d4131a3
github.com/c2h5oh/datasize v0.0.0-20171227191756-4eba002a5eae
github.com/cheggaaa/pb v1.0.27

2
go.sum
View File

@ -59,6 +59,8 @@ github.com/aws/aws-sdk-go v1.15.78/go.mod h1:E3/ieXAlvM0XWO57iftYVDLLvQ824smPP3A
github.com/aws/aws-sdk-go v1.16.22/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.22.2 h1:uYP58k2Cd9y1qBy8CxTe5ADmdi4kANm8Ul8ch3kkIcQ=
github.com/aws/aws-sdk-go v1.22.2/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.24.1 h1:B2NRyTV1/+h+Dg8Bh7vnuvW6QZz/NBL+uzgC2uILDMI=
github.com/aws/aws-sdk-go v1.24.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/azr/flock v0.0.0-20190823144736-958d66434653 h1:2H3Cu0cbG8iszfcgnANwC/cm0YkPJIQvaJ9/tSpwh9o=
github.com/azr/flock v0.0.0-20190823144736-958d66434653/go.mod h1:EI7lzWWilX2K3ZMZ7Ta+E4DZtWzMC2tbn3cM3oVPuAU=
github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d h1:xDfNPAt8lFiC1UJrqV3uuy861HCTo708pDMbjHHdCas=

View File

@ -64,7 +64,7 @@ func New(cfg aws.Config, info metadata.ClientInfo, handlers request.Handlers, op
default:
maxRetries := aws.IntValue(cfg.MaxRetries)
if cfg.MaxRetries == nil || maxRetries == aws.UseServiceDefaultRetries {
maxRetries = 3
maxRetries = DefaultRetryerMaxNumRetries
}
svc.Retryer = DefaultRetryer{NumMaxRetries: maxRetries}
}

View File

@ -1,6 +1,7 @@
package client
import (
"math"
"strconv"
"time"
@ -9,53 +10,131 @@ import (
)
// DefaultRetryer implements basic retry logic using exponential backoff for
// most services. If you want to implement custom retry logic, implement the
// request.Retryer interface or create a structure type that composes this
// struct and override the specific methods. For example, to override only
// the MaxRetries method:
// most services. If you want to implement custom retry logic, you can implement the
// request.Retryer interface.
//
// type retryer struct {
// client.DefaultRetryer
// }
//
// // This implementation always has 100 max retries
// func (d retryer) MaxRetries() int { return 100 }
type DefaultRetryer struct {
NumMaxRetries int
// Num max Retries is the number of max retries that will be performed.
// By default, this is zero.
NumMaxRetries int
// MinRetryDelay is the minimum retry delay after which retry will be performed.
// If not set, the value is 0ns.
MinRetryDelay time.Duration
// MinThrottleRetryDelay is the minimum retry delay when throttled.
// If not set, the value is 0ns.
MinThrottleDelay time.Duration
// MaxRetryDelay is the maximum retry delay before which retry must be performed.
// If not set, the value is 0ns.
MaxRetryDelay time.Duration
// MaxThrottleDelay is the maximum retry delay when throttled.
// If not set, the value is 0ns.
MaxThrottleDelay time.Duration
}
const (
// DefaultRetryerMaxNumRetries sets maximum number of retries
DefaultRetryerMaxNumRetries = 3
// DefaultRetryerMinRetryDelay sets minimum retry delay
DefaultRetryerMinRetryDelay = 30 * time.Millisecond
// DefaultRetryerMinThrottleDelay sets minimum delay when throttled
DefaultRetryerMinThrottleDelay = 500 * time.Millisecond
// DefaultRetryerMaxRetryDelay sets maximum retry delay
DefaultRetryerMaxRetryDelay = 300 * time.Second
// DefaultRetryerMaxThrottleDelay sets maximum delay when throttled
DefaultRetryerMaxThrottleDelay = 300 * time.Second
)
// MaxRetries returns the number of maximum returns the service will use to make
// an individual API request.
func (d DefaultRetryer) MaxRetries() int {
return d.NumMaxRetries
}
// setRetryerDefaults sets the default values of the retryer if not set
func (d *DefaultRetryer) setRetryerDefaults() {
if d.MinRetryDelay == 0 {
d.MinRetryDelay = DefaultRetryerMinRetryDelay
}
if d.MaxRetryDelay == 0 {
d.MaxRetryDelay = DefaultRetryerMaxRetryDelay
}
if d.MinThrottleDelay == 0 {
d.MinThrottleDelay = DefaultRetryerMinThrottleDelay
}
if d.MaxThrottleDelay == 0 {
d.MaxThrottleDelay = DefaultRetryerMaxThrottleDelay
}
}
// RetryRules returns the delay duration before retrying this request again
func (d DefaultRetryer) RetryRules(r *request.Request) time.Duration {
// Set the upper limit of delay in retrying at ~five minutes
minTime := 30
throttle := d.shouldThrottle(r)
if throttle {
if delay, ok := getRetryDelay(r); ok {
return delay
}
minTime = 500
// if number of max retries is zero, no retries will be performed.
if d.NumMaxRetries == 0 {
return 0
}
// Sets default value for retryer members
d.setRetryerDefaults()
// minDelay is the minimum retryer delay
minDelay := d.MinRetryDelay
var initialDelay time.Duration
isThrottle := r.IsErrorThrottle()
if isThrottle {
if delay, ok := getRetryAfterDelay(r); ok {
initialDelay = delay
}
minDelay = d.MinThrottleDelay
}
retryCount := r.RetryCount
if throttle && retryCount > 8 {
retryCount = 8
} else if retryCount > 13 {
retryCount = 13
// maxDelay the maximum retryer delay
maxDelay := d.MaxRetryDelay
if isThrottle {
maxDelay = d.MaxThrottleDelay
}
delay := (1 << uint(retryCount)) * (sdkrand.SeededRand.Intn(minTime) + minTime)
return time.Duration(delay) * time.Millisecond
var delay time.Duration
// Logic to cap the retry count based on the minDelay provided
actualRetryCount := int(math.Log2(float64(minDelay))) + 1
if actualRetryCount < 63-retryCount {
delay = time.Duration(1<<uint64(retryCount)) * getJitterDelay(minDelay)
if delay > maxDelay {
delay = getJitterDelay(maxDelay / 2)
}
} else {
delay = getJitterDelay(maxDelay / 2)
}
return delay + initialDelay
}
// getJitterDelay returns a jittered delay for retry
func getJitterDelay(duration time.Duration) time.Duration {
return time.Duration(sdkrand.SeededRand.Int63n(int64(duration)) + int64(duration))
}
// ShouldRetry returns true if the request should be retried.
func (d DefaultRetryer) ShouldRetry(r *request.Request) bool {
// ShouldRetry returns false if number of max retries is 0.
if d.NumMaxRetries == 0 {
return false
}
// If one of the other handlers already set the retry state
// we don't want to override it based on the service's state
if r.Retryable != nil {
@ -65,26 +144,13 @@ func (d DefaultRetryer) ShouldRetry(r *request.Request) bool {
if r.HTTPResponse.StatusCode >= 500 && r.HTTPResponse.StatusCode != 501 {
return true
}
return r.IsErrorRetryable() || d.shouldThrottle(r)
}
// ShouldThrottle returns true if the request should be throttled.
func (d DefaultRetryer) shouldThrottle(r *request.Request) bool {
switch r.HTTPResponse.StatusCode {
case 429:
case 502:
case 503:
case 504:
default:
return r.IsErrorThrottle()
}
return true
return r.IsErrorRetryable() || r.IsErrorThrottle()
}
// This will look in the Retry-After header, RFC 7231, for how long
// it will wait before attempting another request
func getRetryDelay(r *request.Request) (time.Duration, bool) {
func getRetryAfterDelay(r *request.Request) (time.Duration, bool) {
if !canUseRetryAfterHeader(r) {
return 0, false
}

View File

@ -0,0 +1,28 @@
package client
import (
"time"
"github.com/aws/aws-sdk-go/aws/request"
)
// NoOpRetryer provides a retryer that performs no retries.
// It should be used when we do not want retries to be performed.
type NoOpRetryer struct{}
// MaxRetries returns the number of maximum returns the service will use to make
// an individual API; For NoOpRetryer the MaxRetries will always be zero.
func (d NoOpRetryer) MaxRetries() int {
return 0
}
// ShouldRetry will always return false for NoOpRetryer, as it should never retry.
func (d NoOpRetryer) ShouldRetry(_ *request.Request) bool {
return false
}
// RetryRules returns the delay duration before retrying this request again;
// since NoOpRetryer does not retry, RetryRules always returns 0.
func (d NoOpRetryer) RetryRules(_ *request.Request) time.Duration {
return 0
}

View File

@ -20,7 +20,7 @@ type RequestRetryer interface{}
// A Config provides service configuration for service clients. By default,
// all clients will use the defaults.DefaultConfig structure.
//
// // Create Session with MaxRetry configuration to be shared by multiple
// // Create Session with MaxRetries configuration to be shared by multiple
// // service clients.
// sess := session.Must(session.NewSession(&aws.Config{
// MaxRetries: aws.Int(3),
@ -251,7 +251,7 @@ type Config struct {
// NewConfig returns a new Config pointer that can be chained with builder
// methods to set multiple configuration values inline without using pointers.
//
// // Create Session with MaxRetry configuration to be shared by multiple
// // Create Session with MaxRetries configuration to be shared by multiple
// // service clients.
// sess := session.Must(session.NewSession(aws.NewConfig().
// WithMaxRetries(3),

View File

@ -179,6 +179,242 @@ func IntValueMap(src map[string]*int) map[string]int {
return dst
}
// Uint returns a pointer to the uint value passed in.
func Uint(v uint) *uint {
return &v
}
// UintValue returns the value of the uint pointer passed in or
// 0 if the pointer is nil.
func UintValue(v *uint) uint {
if v != nil {
return *v
}
return 0
}
// UintSlice converts a slice of uint values uinto a slice of
// uint pointers
func UintSlice(src []uint) []*uint {
dst := make([]*uint, len(src))
for i := 0; i < len(src); i++ {
dst[i] = &(src[i])
}
return dst
}
// UintValueSlice converts a slice of uint pointers uinto a slice of
// uint values
func UintValueSlice(src []*uint) []uint {
dst := make([]uint, len(src))
for i := 0; i < len(src); i++ {
if src[i] != nil {
dst[i] = *(src[i])
}
}
return dst
}
// UintMap converts a string map of uint values uinto a string
// map of uint pointers
func UintMap(src map[string]uint) map[string]*uint {
dst := make(map[string]*uint)
for k, val := range src {
v := val
dst[k] = &v
}
return dst
}
// UintValueMap converts a string map of uint pointers uinto a string
// map of uint values
func UintValueMap(src map[string]*uint) map[string]uint {
dst := make(map[string]uint)
for k, val := range src {
if val != nil {
dst[k] = *val
}
}
return dst
}
// Int8 returns a pointer to the int8 value passed in.
func Int8(v int8) *int8 {
return &v
}
// Int8Value returns the value of the int8 pointer passed in or
// 0 if the pointer is nil.
func Int8Value(v *int8) int8 {
if v != nil {
return *v
}
return 0
}
// Int8Slice converts a slice of int8 values into a slice of
// int8 pointers
func Int8Slice(src []int8) []*int8 {
dst := make([]*int8, len(src))
for i := 0; i < len(src); i++ {
dst[i] = &(src[i])
}
return dst
}
// Int8ValueSlice converts a slice of int8 pointers into a slice of
// int8 values
func Int8ValueSlice(src []*int8) []int8 {
dst := make([]int8, len(src))
for i := 0; i < len(src); i++ {
if src[i] != nil {
dst[i] = *(src[i])
}
}
return dst
}
// Int8Map converts a string map of int8 values into a string
// map of int8 pointers
func Int8Map(src map[string]int8) map[string]*int8 {
dst := make(map[string]*int8)
for k, val := range src {
v := val
dst[k] = &v
}
return dst
}
// Int8ValueMap converts a string map of int8 pointers into a string
// map of int8 values
func Int8ValueMap(src map[string]*int8) map[string]int8 {
dst := make(map[string]int8)
for k, val := range src {
if val != nil {
dst[k] = *val
}
}
return dst
}
// Int16 returns a pointer to the int16 value passed in.
func Int16(v int16) *int16 {
return &v
}
// Int16Value returns the value of the int16 pointer passed in or
// 0 if the pointer is nil.
func Int16Value(v *int16) int16 {
if v != nil {
return *v
}
return 0
}
// Int16Slice converts a slice of int16 values into a slice of
// int16 pointers
func Int16Slice(src []int16) []*int16 {
dst := make([]*int16, len(src))
for i := 0; i < len(src); i++ {
dst[i] = &(src[i])
}
return dst
}
// Int16ValueSlice converts a slice of int16 pointers into a slice of
// int16 values
func Int16ValueSlice(src []*int16) []int16 {
dst := make([]int16, len(src))
for i := 0; i < len(src); i++ {
if src[i] != nil {
dst[i] = *(src[i])
}
}
return dst
}
// Int16Map converts a string map of int16 values into a string
// map of int16 pointers
func Int16Map(src map[string]int16) map[string]*int16 {
dst := make(map[string]*int16)
for k, val := range src {
v := val
dst[k] = &v
}
return dst
}
// Int16ValueMap converts a string map of int16 pointers into a string
// map of int16 values
func Int16ValueMap(src map[string]*int16) map[string]int16 {
dst := make(map[string]int16)
for k, val := range src {
if val != nil {
dst[k] = *val
}
}
return dst
}
// Int32 returns a pointer to the int32 value passed in.
func Int32(v int32) *int32 {
return &v
}
// Int32Value returns the value of the int32 pointer passed in or
// 0 if the pointer is nil.
func Int32Value(v *int32) int32 {
if v != nil {
return *v
}
return 0
}
// Int32Slice converts a slice of int32 values into a slice of
// int32 pointers
func Int32Slice(src []int32) []*int32 {
dst := make([]*int32, len(src))
for i := 0; i < len(src); i++ {
dst[i] = &(src[i])
}
return dst
}
// Int32ValueSlice converts a slice of int32 pointers into a slice of
// int32 values
func Int32ValueSlice(src []*int32) []int32 {
dst := make([]int32, len(src))
for i := 0; i < len(src); i++ {
if src[i] != nil {
dst[i] = *(src[i])
}
}
return dst
}
// Int32Map converts a string map of int32 values into a string
// map of int32 pointers
func Int32Map(src map[string]int32) map[string]*int32 {
dst := make(map[string]*int32)
for k, val := range src {
v := val
dst[k] = &v
}
return dst
}
// Int32ValueMap converts a string map of int32 pointers into a string
// map of int32 values
func Int32ValueMap(src map[string]*int32) map[string]int32 {
dst := make(map[string]int32)
for k, val := range src {
if val != nil {
dst[k] = *val
}
}
return dst
}
// Int64 returns a pointer to the int64 value passed in.
func Int64(v int64) *int64 {
return &v
@ -238,6 +474,301 @@ func Int64ValueMap(src map[string]*int64) map[string]int64 {
return dst
}
// Uint8 returns a pointer to the uint8 value passed in.
func Uint8(v uint8) *uint8 {
return &v
}
// Uint8Value returns the value of the uint8 pointer passed in or
// 0 if the pointer is nil.
func Uint8Value(v *uint8) uint8 {
if v != nil {
return *v
}
return 0
}
// Uint8Slice converts a slice of uint8 values into a slice of
// uint8 pointers
func Uint8Slice(src []uint8) []*uint8 {
dst := make([]*uint8, len(src))
for i := 0; i < len(src); i++ {
dst[i] = &(src[i])
}
return dst
}
// Uint8ValueSlice converts a slice of uint8 pointers into a slice of
// uint8 values
func Uint8ValueSlice(src []*uint8) []uint8 {
dst := make([]uint8, len(src))
for i := 0; i < len(src); i++ {
if src[i] != nil {
dst[i] = *(src[i])
}
}
return dst
}
// Uint8Map converts a string map of uint8 values into a string
// map of uint8 pointers
func Uint8Map(src map[string]uint8) map[string]*uint8 {
dst := make(map[string]*uint8)
for k, val := range src {
v := val
dst[k] = &v
}
return dst
}
// Uint8ValueMap converts a string map of uint8 pointers into a string
// map of uint8 values
func Uint8ValueMap(src map[string]*uint8) map[string]uint8 {
dst := make(map[string]uint8)
for k, val := range src {
if val != nil {
dst[k] = *val
}
}
return dst
}
// Uint16 returns a pointer to the uint16 value passed in.
func Uint16(v uint16) *uint16 {
return &v
}
// Uint16Value returns the value of the uint16 pointer passed in or
// 0 if the pointer is nil.
func Uint16Value(v *uint16) uint16 {
if v != nil {
return *v
}
return 0
}
// Uint16Slice converts a slice of uint16 values into a slice of
// uint16 pointers
func Uint16Slice(src []uint16) []*uint16 {
dst := make([]*uint16, len(src))
for i := 0; i < len(src); i++ {
dst[i] = &(src[i])
}
return dst
}
// Uint16ValueSlice converts a slice of uint16 pointers into a slice of
// uint16 values
func Uint16ValueSlice(src []*uint16) []uint16 {
dst := make([]uint16, len(src))
for i := 0; i < len(src); i++ {
if src[i] != nil {
dst[i] = *(src[i])
}
}
return dst
}
// Uint16Map converts a string map of uint16 values into a string
// map of uint16 pointers
func Uint16Map(src map[string]uint16) map[string]*uint16 {
dst := make(map[string]*uint16)
for k, val := range src {
v := val
dst[k] = &v
}
return dst
}
// Uint16ValueMap converts a string map of uint16 pointers into a string
// map of uint16 values
func Uint16ValueMap(src map[string]*uint16) map[string]uint16 {
dst := make(map[string]uint16)
for k, val := range src {
if val != nil {
dst[k] = *val
}
}
return dst
}
// Uint32 returns a pointer to the uint32 value passed in.
func Uint32(v uint32) *uint32 {
return &v
}
// Uint32Value returns the value of the uint32 pointer passed in or
// 0 if the pointer is nil.
func Uint32Value(v *uint32) uint32 {
if v != nil {
return *v
}
return 0
}
// Uint32Slice converts a slice of uint32 values into a slice of
// uint32 pointers
func Uint32Slice(src []uint32) []*uint32 {
dst := make([]*uint32, len(src))
for i := 0; i < len(src); i++ {
dst[i] = &(src[i])
}
return dst
}
// Uint32ValueSlice converts a slice of uint32 pointers into a slice of
// uint32 values
func Uint32ValueSlice(src []*uint32) []uint32 {
dst := make([]uint32, len(src))
for i := 0; i < len(src); i++ {
if src[i] != nil {
dst[i] = *(src[i])
}
}
return dst
}
// Uint32Map converts a string map of uint32 values into a string
// map of uint32 pointers
func Uint32Map(src map[string]uint32) map[string]*uint32 {
dst := make(map[string]*uint32)
for k, val := range src {
v := val
dst[k] = &v
}
return dst
}
// Uint32ValueMap converts a string map of uint32 pointers into a string
// map of uint32 values
func Uint32ValueMap(src map[string]*uint32) map[string]uint32 {
dst := make(map[string]uint32)
for k, val := range src {
if val != nil {
dst[k] = *val
}
}
return dst
}
// Uint64 returns a pointer to the uint64 value passed in.
func Uint64(v uint64) *uint64 {
return &v
}
// Uint64Value returns the value of the uint64 pointer passed in or
// 0 if the pointer is nil.
func Uint64Value(v *uint64) uint64 {
if v != nil {
return *v
}
return 0
}
// Uint64Slice converts a slice of uint64 values into a slice of
// uint64 pointers
func Uint64Slice(src []uint64) []*uint64 {
dst := make([]*uint64, len(src))
for i := 0; i < len(src); i++ {
dst[i] = &(src[i])
}
return dst
}
// Uint64ValueSlice converts a slice of uint64 pointers into a slice of
// uint64 values
func Uint64ValueSlice(src []*uint64) []uint64 {
dst := make([]uint64, len(src))
for i := 0; i < len(src); i++ {
if src[i] != nil {
dst[i] = *(src[i])
}
}
return dst
}
// Uint64Map converts a string map of uint64 values into a string
// map of uint64 pointers
func Uint64Map(src map[string]uint64) map[string]*uint64 {
dst := make(map[string]*uint64)
for k, val := range src {
v := val
dst[k] = &v
}
return dst
}
// Uint64ValueMap converts a string map of uint64 pointers into a string
// map of uint64 values
func Uint64ValueMap(src map[string]*uint64) map[string]uint64 {
dst := make(map[string]uint64)
for k, val := range src {
if val != nil {
dst[k] = *val
}
}
return dst
}
// Float32 returns a pointer to the float32 value passed in.
func Float32(v float32) *float32 {
return &v
}
// Float32Value returns the value of the float32 pointer passed in or
// 0 if the pointer is nil.
func Float32Value(v *float32) float32 {
if v != nil {
return *v
}
return 0
}
// Float32Slice converts a slice of float32 values into a slice of
// float32 pointers
func Float32Slice(src []float32) []*float32 {
dst := make([]*float32, len(src))
for i := 0; i < len(src); i++ {
dst[i] = &(src[i])
}
return dst
}
// Float32ValueSlice converts a slice of float32 pointers into a slice of
// float32 values
func Float32ValueSlice(src []*float32) []float32 {
dst := make([]float32, len(src))
for i := 0; i < len(src); i++ {
if src[i] != nil {
dst[i] = *(src[i])
}
}
return dst
}
// Float32Map converts a string map of float32 values into a string
// map of float32 pointers
func Float32Map(src map[string]float32) map[string]*float32 {
dst := make(map[string]*float32)
for k, val := range src {
v := val
dst[k] = &v
}
return dst
}
// Float32ValueMap converts a string map of float32 pointers into a string
// map of float32 values
func Float32ValueMap(src map[string]*float32) map[string]float32 {
dst := make(map[string]float32)
for k, val := range src {
if val != nil {
dst[k] = *val
}
}
return dst
}
// Float64 returns a pointer to the float64 value passed in.
func Float64(v float64) *float64 {
return &v

View File

@ -159,9 +159,9 @@ func handleSendError(r *request.Request, err error) {
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}
}
// Catch all other request errors.
// Catch all request errors, and let the default retrier determine
// if the error is retryable.
r.Error = awserr.New("RequestError", "send request failed", err)
r.Retryable = aws.Bool(true) // network errors are retryable
// Override the error with a context canceled error, if that was canceled.
ctx := r.Context()
@ -184,37 +184,39 @@ var ValidateResponseHandler = request.NamedHandler{Name: "core.ValidateResponseH
// AfterRetryHandler performs final checks to determine if the request should
// be retried and how long to delay.
var AfterRetryHandler = request.NamedHandler{Name: "core.AfterRetryHandler", Fn: func(r *request.Request) {
// If one of the other handlers already set the retry state
// we don't want to override it based on the service's state
if r.Retryable == nil || aws.BoolValue(r.Config.EnforceShouldRetryCheck) {
r.Retryable = aws.Bool(r.ShouldRetry(r))
}
if r.WillRetry() {
r.RetryDelay = r.RetryRules(r)
if sleepFn := r.Config.SleepDelay; sleepFn != nil {
// Support SleepDelay for backwards compatibility and testing
sleepFn(r.RetryDelay)
} else if err := aws.SleepWithContext(r.Context(), r.RetryDelay); err != nil {
r.Error = awserr.New(request.CanceledErrorCode,
"request context canceled", err)
r.Retryable = aws.Bool(false)
return
var AfterRetryHandler = request.NamedHandler{
Name: "core.AfterRetryHandler",
Fn: func(r *request.Request) {
// If one of the other handlers already set the retry state
// we don't want to override it based on the service's state
if r.Retryable == nil || aws.BoolValue(r.Config.EnforceShouldRetryCheck) {
r.Retryable = aws.Bool(r.ShouldRetry(r))
}
// when the expired token exception occurs the credentials
// need to be expired locally so that the next request to
// get credentials will trigger a credentials refresh.
if r.IsErrorExpired() {
r.Config.Credentials.Expire()
}
if r.WillRetry() {
r.RetryDelay = r.RetryRules(r)
r.RetryCount++
r.Error = nil
}
}}
if sleepFn := r.Config.SleepDelay; sleepFn != nil {
// Support SleepDelay for backwards compatibility and testing
sleepFn(r.RetryDelay)
} else if err := aws.SleepWithContext(r.Context(), r.RetryDelay); err != nil {
r.Error = awserr.New(request.CanceledErrorCode,
"request context canceled", err)
r.Retryable = aws.Bool(false)
return
}
// when the expired token exception occurs the credentials
// need to be expired locally so that the next request to
// get credentials will trigger a credentials refresh.
if r.IsErrorExpired() {
r.Config.Credentials.Expire()
}
r.RetryCount++
r.Error = nil
}
}}
// ValidateEndpointHandler is a request handler to validate a request had the
// appropriate Region and Endpoint set. Will set r.Error if the endpoint or

View File

@ -98,8 +98,8 @@ func NewProviderClient(cfg aws.Config, handlers request.Handlers, endpoint strin
return p
}
// NewCredentialsClient returns a Credentials wrapper for retrieving credentials
// from an arbitrary endpoint concurrently. The client will request the
// NewCredentialsClient returns a pointer to a new Credentials object
// wrapping the endpoint credentials Provider.
func NewCredentialsClient(cfg aws.Config, handlers request.Handlers, endpoint string, options ...func(*Provider)) *credentials.Credentials {
return credentials.NewCredentials(NewProviderClient(cfg, handlers, endpoint, options...))
}

View File

@ -76,12 +76,15 @@ func (p *WebIdentityRoleProvider) Retrieve() (credentials.Value, error) {
// uses unix time in nanoseconds to uniquely identify sessions.
sessionName = strconv.FormatInt(now().UnixNano(), 10)
}
resp, err := p.client.AssumeRoleWithWebIdentity(&sts.AssumeRoleWithWebIdentityInput{
req, resp := p.client.AssumeRoleWithWebIdentityRequest(&sts.AssumeRoleWithWebIdentityInput{
RoleArn: &p.roleARN,
RoleSessionName: &sessionName,
WebIdentityToken: aws.String(string(b)),
})
if err != nil {
// InvalidIdentityToken error is a temporary error that can occur
// when assuming an Role with a JWT web identity token.
req.RetryErrorCodes = append(req.RetryErrorCodes, sts.ErrCodeInvalidIdentityTokenException)
if err := req.Send(); err != nil {
return credentials.Value{}, awserr.New(ErrCodeWebIdentity, "failed to retrieve credentials", err)
}

View File

@ -16,25 +16,26 @@ var (
type metricChan struct {
ch chan metric
paused int64
paused *int64
}
func newMetricChan(size int) metricChan {
return metricChan{
ch: make(chan metric, size),
ch: make(chan metric, size),
paused: new(int64),
}
}
func (ch *metricChan) Pause() {
atomic.StoreInt64(&ch.paused, pausedEnum)
atomic.StoreInt64(ch.paused, pausedEnum)
}
func (ch *metricChan) Continue() {
atomic.StoreInt64(&ch.paused, runningEnum)
atomic.StoreInt64(ch.paused, runningEnum)
}
func (ch *metricChan) IsPaused() bool {
v := atomic.LoadInt64(&ch.paused)
v := atomic.LoadInt64(ch.paused)
return v == pausedEnum
}

View File

@ -152,18 +152,19 @@ type EC2IAMInfo struct {
// An EC2InstanceIdentityDocument provides the shape for unmarshaling
// an instance identity document
type EC2InstanceIdentityDocument struct {
DevpayProductCodes []string `json:"devpayProductCodes"`
AvailabilityZone string `json:"availabilityZone"`
PrivateIP string `json:"privateIp"`
Version string `json:"version"`
Region string `json:"region"`
InstanceID string `json:"instanceId"`
BillingProducts []string `json:"billingProducts"`
InstanceType string `json:"instanceType"`
AccountID string `json:"accountId"`
PendingTime time.Time `json:"pendingTime"`
ImageID string `json:"imageId"`
KernelID string `json:"kernelId"`
RamdiskID string `json:"ramdiskId"`
Architecture string `json:"architecture"`
DevpayProductCodes []string `json:"devpayProductCodes"`
MarketplaceProductCodes []string `json:"marketplaceProductCodes"`
AvailabilityZone string `json:"availabilityZone"`
PrivateIP string `json:"privateIp"`
Version string `json:"version"`
Region string `json:"region"`
InstanceID string `json:"instanceId"`
BillingProducts []string `json:"billingProducts"`
InstanceType string `json:"instanceType"`
AccountID string `json:"accountId"`
PendingTime time.Time `json:"pendingTime"`
ImageID string `json:"imageId"`
KernelID string `json:"kernelId"`
RamdiskID string `json:"ramdiskId"`
Architecture string `json:"architecture"`
}

File diff suppressed because it is too large Load Diff

View File

@ -23,7 +23,7 @@ type Handlers struct {
Complete HandlerList
}
// Copy returns of this handler's lists.
// Copy returns a copy of this handler's lists.
func (h *Handlers) Copy() Handlers {
return Handlers{
Validate: h.Validate.copy(),
@ -42,7 +42,7 @@ func (h *Handlers) Copy() Handlers {
}
}
// Clear removes callback functions for all handlers
// Clear removes callback functions for all handlers.
func (h *Handlers) Clear() {
h.Validate.Clear()
h.Build.Clear()

View File

@ -4,7 +4,6 @@ import (
"bytes"
"fmt"
"io"
"net"
"net/http"
"net/url"
"reflect"
@ -65,6 +64,15 @@ type Request struct {
LastSignedAt time.Time
DisableFollowRedirects bool
// Additional API error codes that should be retried. IsErrorRetryable
// will consider these codes in addition to its built in cases.
RetryErrorCodes []string
// Additional API error codes that should be retried with throttle backoff
// delay. IsErrorThrottle will consider these codes in addition to its
// built in cases.
ThrottleErrorCodes []string
// A value greater than 0 instructs the request to be signed as Presigned URL
// You should not set this field directly. Instead use Request's
// Presign or PresignRequest methods.
@ -498,21 +506,17 @@ func (r *Request) Send() error {
if err := r.sendRequest(); err == nil {
return nil
} else if !shouldRetryError(r.Error) {
}
r.Handlers.Retry.Run(r)
r.Handlers.AfterRetry.Run(r)
if r.Error != nil || !aws.BoolValue(r.Retryable) {
return r.Error
}
if err := r.prepareRetry(); err != nil {
r.Error = err
return err
} else {
r.Handlers.Retry.Run(r)
r.Handlers.AfterRetry.Run(r)
if r.Error != nil || !aws.BoolValue(r.Retryable) {
return r.Error
}
if err := r.prepareRetry(); err != nil {
r.Error = err
return err
}
continue
}
}
}
@ -596,51 +600,6 @@ func AddToUserAgent(r *Request, s string) {
r.HTTPRequest.Header.Set("User-Agent", s)
}
type temporary interface {
Temporary() bool
}
func shouldRetryError(origErr error) bool {
switch err := origErr.(type) {
case awserr.Error:
if err.Code() == CanceledErrorCode {
return false
}
return shouldRetryError(err.OrigErr())
case *url.Error:
if strings.Contains(err.Error(), "connection refused") {
// Refused connections should be retried as the service may not yet
// be running on the port. Go TCP dial considers refused
// connections as not temporary.
return true
}
// *url.Error only implements Temporary after golang 1.6 but since
// url.Error only wraps the error:
return shouldRetryError(err.Err)
case temporary:
if netErr, ok := err.(*net.OpError); ok && netErr.Op == "dial" {
return true
}
// If the error is temporary, we want to allow continuation of the
// retry process
return err.Temporary() || isErrConnectionReset(origErr)
case nil:
// `awserr.Error.OrigErr()` can be nil, meaning there was an error but
// because we don't know the cause, it is marked as retryable. See
// TestRequest4xxUnretryable for an example.
return true
default:
switch err.Error() {
case "net/http: request canceled",
"net/http: request canceled while waiting for connection":
// known 1.5 error case when an http request is cancelled
return false
}
// here we don't know the error; so we allow a retry.
return true
}
}
// SanitizeHostForHeader removes default port from host and updates request.Host
func SanitizeHostForHeader(r *http.Request) {
host := getHost(r)

View File

@ -1,23 +1,41 @@
package request
import (
"net"
"net/url"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
)
// Retryer is an interface to control retry logic for a given service.
// The default implementation used by most services is the client.DefaultRetryer
// structure, which contains basic retry logic using exponential backoff.
// Retryer provides the interface drive the SDK's request retry behavior. The
// Retryer implementation is responsible for implementing exponential backoff,
// and determine if a request API error should be retried.
//
// client.DefaultRetryer is the SDK's default implementation of the Retryer. It
// uses the which uses the Request.IsErrorRetryable and Request.IsErrorThrottle
// methods to determine if the request is retried.
type Retryer interface {
// RetryRules return the retry delay that should be used by the SDK before
// making another request attempt for the failed request.
RetryRules(*Request) time.Duration
// ShouldRetry returns if the failed request is retryable.
//
// Implementations may consider request attempt count when determining if a
// request is retryable, but the SDK will use MaxRetries to limit the
// number of attempts a request are made.
ShouldRetry(*Request) bool
// MaxRetries is the number of times a request may be retried before
// failing.
MaxRetries() int
}
// WithRetryer sets a config Retryer value to the given Config returning it
// for chaining.
// WithRetryer sets a Retryer value to the given Config returning the Config
// value for chaining.
func WithRetryer(cfg *aws.Config, retryer Retryer) *aws.Config {
cfg.Retryer = retryer
return cfg
@ -76,10 +94,6 @@ var validParentCodes = map[string]struct{}{
ErrCodeRead: {},
}
type temporaryError interface {
Temporary() bool
}
func isNestedErrorRetryable(parentErr awserr.Error) bool {
if parentErr == nil {
return false
@ -98,7 +112,7 @@ func isNestedErrorRetryable(parentErr awserr.Error) bool {
return isCodeRetryable(aerr.Code())
}
if t, ok := err.(temporaryError); ok {
if t, ok := err.(temporary); ok {
return t.Temporary() || isErrConnectionReset(err)
}
@ -108,32 +122,90 @@ func isNestedErrorRetryable(parentErr awserr.Error) bool {
// IsErrorRetryable returns whether the error is retryable, based on its Code.
// Returns false if error is nil.
func IsErrorRetryable(err error) bool {
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
return isCodeRetryable(aerr.Code()) || isNestedErrorRetryable(aerr)
}
if err == nil {
return false
}
return shouldRetryError(err)
}
type temporary interface {
Temporary() bool
}
func shouldRetryError(origErr error) bool {
switch err := origErr.(type) {
case awserr.Error:
if err.Code() == CanceledErrorCode {
return false
}
if isNestedErrorRetryable(err) {
return true
}
origErr := err.OrigErr()
var shouldRetry bool
if origErr != nil {
shouldRetry := shouldRetryError(origErr)
if err.Code() == "RequestError" && !shouldRetry {
return false
}
}
if isCodeRetryable(err.Code()) {
return true
}
return shouldRetry
case *url.Error:
if strings.Contains(err.Error(), "connection refused") {
// Refused connections should be retried as the service may not yet
// be running on the port. Go TCP dial considers refused
// connections as not temporary.
return true
}
// *url.Error only implements Temporary after golang 1.6 but since
// url.Error only wraps the error:
return shouldRetryError(err.Err)
case temporary:
if netErr, ok := err.(*net.OpError); ok && netErr.Op == "dial" {
return true
}
// If the error is temporary, we want to allow continuation of the
// retry process
return err.Temporary() || isErrConnectionReset(origErr)
case nil:
// `awserr.Error.OrigErr()` can be nil, meaning there was an error but
// because we don't know the cause, it is marked as retryable. See
// TestRequest4xxUnretryable for an example.
return true
default:
switch err.Error() {
case "net/http: request canceled",
"net/http: request canceled while waiting for connection":
// known 1.5 error case when an http request is cancelled
return false
}
// here we don't know the error; so we allow a retry.
return true
}
return false
}
// IsErrorThrottle returns whether the error is to be throttled based on its code.
// Returns false if error is nil.
func IsErrorThrottle(err error) bool {
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
return isCodeThrottle(aerr.Code())
}
if aerr, ok := err.(awserr.Error); ok && aerr != nil {
return isCodeThrottle(aerr.Code())
}
return false
}
// IsErrorExpiredCreds returns whether the error code is a credential expiry error.
// Returns false if error is nil.
// IsErrorExpiredCreds returns whether the error code is a credential expiry
// error. Returns false if error is nil.
func IsErrorExpiredCreds(err error) bool {
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
return isCodeExpiredCreds(aerr.Code())
}
if aerr, ok := err.(awserr.Error); ok && aerr != nil {
return isCodeExpiredCreds(aerr.Code())
}
return false
}
@ -143,17 +215,44 @@ func IsErrorExpiredCreds(err error) bool {
//
// Alias for the utility function IsErrorRetryable
func (r *Request) IsErrorRetryable() bool {
if isErrCode(r.Error, r.RetryErrorCodes) {
return true
}
return IsErrorRetryable(r.Error)
}
// IsErrorThrottle returns whether the error is to be throttled based on its code.
// Returns false if the request has no Error set
// IsErrorThrottle returns whether the error is to be throttled based on its
// code. Returns false if the request has no Error set.
//
// Alias for the utility function IsErrorThrottle
func (r *Request) IsErrorThrottle() bool {
if isErrCode(r.Error, r.ThrottleErrorCodes) {
return true
}
if r.HTTPResponse != nil {
switch r.HTTPResponse.StatusCode {
case 429, 502, 503, 504:
return true
}
}
return IsErrorThrottle(r.Error)
}
func isErrCode(err error, codes []string) bool {
if aerr, ok := err.(awserr.Error); ok && aerr != nil {
for _, code := range codes {
if code == aerr.Code() {
return true
}
}
}
return false
}
// IsErrorExpired returns whether the error code is a credential expiry error.
// Returns false if the request has no Error set.
//

View File

@ -99,10 +99,10 @@ type envConfig struct {
CustomCABundle string
csmEnabled string
CSMEnabled bool
CSMEnabled *bool
CSMPort string
CSMClientID string
CSMHost string
CSMClientID string
// Enables endpoint discovery via environment variables.
//
@ -230,7 +230,11 @@ func envConfigLoad(enableSharedConfig bool) envConfig {
setFromEnvVal(&cfg.CSMHost, csmHostEnvKey)
setFromEnvVal(&cfg.CSMPort, csmPortEnvKey)
setFromEnvVal(&cfg.CSMClientID, csmClientIDEnvKey)
cfg.CSMEnabled = len(cfg.csmEnabled) > 0
if len(cfg.csmEnabled) != 0 {
v, _ := strconv.ParseBool(cfg.csmEnabled)
cfg.CSMEnabled = &v
}
regionKeys := regionEnvKeys
profileKeys := profileEnvKeys

View File

@ -104,9 +104,13 @@ func New(cfgs ...*aws.Config) *Session {
}
s := deprecatedNewSession(cfgs...)
if envCfg.CSMEnabled {
err := enableCSM(&s.Handlers, envCfg.CSMClientID,
envCfg.CSMHost, envCfg.CSMPort, s.Config.Logger)
if csmCfg, err := loadCSMConfig(envCfg, []string{}); err != nil {
if l := s.Config.Logger; l != nil {
l.Log(fmt.Sprintf("ERROR: failed to load CSM configuration, %v", err))
}
} else if csmCfg.Enabled {
err := enableCSM(&s.Handlers, csmCfg, s.Config.Logger)
if err != nil {
err = fmt.Errorf("failed to enable CSM, %v", err)
s.Config.Logger.Log("ERROR:", err.Error())
@ -132,7 +136,7 @@ func New(cfgs ...*aws.Config) *Session {
// to be built with retrieving credentials with AssumeRole set in the config.
//
// See the NewSessionWithOptions func for information on how to override or
// control through code how the Session will be created. Such as specifying the
// control through code how the Session will be created, such as specifying the
// config profile, and controlling if shared config is enabled or not.
func NewSession(cfgs ...*aws.Config) (*Session, error) {
opts := Options{}
@ -347,15 +351,12 @@ func deprecatedNewSession(cfgs ...*aws.Config) *Session {
return s
}
func enableCSM(handlers *request.Handlers,
clientID, host, port string,
logger aws.Logger,
) error {
func enableCSM(handlers *request.Handlers, cfg csmConfig, logger aws.Logger) error {
if logger != nil {
logger.Log("Enabling CSM")
}
r, err := csm.Start(clientID, csm.AddressWithDefaults(host, port))
r, err := csm.Start(cfg.ClientID, csm.AddressWithDefaults(cfg.Host, cfg.Port))
if err != nil {
return err
}
@ -395,7 +396,13 @@ func newSession(opts Options, envCfg envConfig, cfgs ...*aws.Config) (*Session,
// Load additional config from file(s)
sharedCfg, err := loadSharedConfig(envCfg.Profile, cfgFiles, envCfg.EnableSharedConfig)
if err != nil {
if _, ok := err.(SharedConfigProfileNotExistsError); !ok {
if len(envCfg.Profile) == 0 && !envCfg.EnableSharedConfig && (envCfg.Creds.HasKeys() || userCfg.Credentials != nil) {
// Special case where the user has not explicitly specified an AWS_PROFILE,
// or session.Options.profile, shared config is not enabled, and the
// environment has credentials, allow the shared config file to fail to
// load since the user has already provided credentials, and nothing else
// is required to be read file. Github(aws/aws-sdk-go#2455)
} else if _, ok := err.(SharedConfigProfileNotExistsError); !ok {
return nil, err
}
}
@ -410,9 +417,13 @@ func newSession(opts Options, envCfg envConfig, cfgs ...*aws.Config) (*Session,
}
initHandlers(s)
if envCfg.CSMEnabled {
err := enableCSM(&s.Handlers, envCfg.CSMClientID,
envCfg.CSMHost, envCfg.CSMPort, s.Config.Logger)
if csmCfg, err := loadCSMConfig(envCfg, cfgFiles); err != nil {
if l := s.Config.Logger; l != nil {
l.Log(fmt.Sprintf("ERROR: failed to load CSM configuration, %v", err))
}
} else if csmCfg.Enabled {
err = enableCSM(&s.Handlers, csmCfg, s.Config.Logger)
if err != nil {
return nil, err
}
@ -428,6 +439,46 @@ func newSession(opts Options, envCfg envConfig, cfgs ...*aws.Config) (*Session,
return s, nil
}
type csmConfig struct {
Enabled bool
Host string
Port string
ClientID string
}
var csmProfileName = "aws_csm"
func loadCSMConfig(envCfg envConfig, cfgFiles []string) (csmConfig, error) {
if envCfg.CSMEnabled != nil {
if *envCfg.CSMEnabled {
return csmConfig{
Enabled: true,
ClientID: envCfg.CSMClientID,
Host: envCfg.CSMHost,
Port: envCfg.CSMPort,
}, nil
}
return csmConfig{}, nil
}
sharedCfg, err := loadSharedConfig(csmProfileName, cfgFiles, false)
if err != nil {
if _, ok := err.(SharedConfigProfileNotExistsError); !ok {
return csmConfig{}, err
}
}
if sharedCfg.CSMEnabled != nil && *sharedCfg.CSMEnabled == true {
return csmConfig{
Enabled: true,
ClientID: sharedCfg.CSMClientID,
Host: sharedCfg.CSMHost,
Port: sharedCfg.CSMPort,
}, nil
}
return csmConfig{}, nil
}
func loadCustomCABundle(s *Session, bundle io.Reader) error {
var t *http.Transport
switch v := s.Config.HTTPClient.Transport.(type) {
@ -520,7 +571,7 @@ func initHandlers(s *Session) {
}
}
// Copy creates and returns a copy of the current Session, coping the config
// Copy creates and returns a copy of the current Session, copying the config
// and handlers. If any additional configs are provided they will be merged
// on top of the Session's copied config.
//

View File

@ -22,6 +22,12 @@ const (
mfaSerialKey = `mfa_serial` // optional
roleSessionNameKey = `role_session_name` // optional
// CSM options
csmEnabledKey = `csm_enabled`
csmHostKey = `csm_host`
csmPortKey = `csm_port`
csmClientIDKey = `csm_client_id`
// Additional Config fields
regionKey = `region`
@ -76,6 +82,12 @@ type sharedConfig struct {
//
// endpoint_discovery_enabled = true
EnableEndpointDiscovery *bool
// CSM Options
CSMEnabled *bool
CSMHost string
CSMPort string
CSMClientID string
}
type sharedConfigFile struct {
@ -251,10 +263,13 @@ func (cfg *sharedConfig) setFromIniFile(profile string, file sharedConfigFile, e
}
// Endpoint discovery
if section.Has(enableEndpointDiscoveryKey) {
v := section.Bool(enableEndpointDiscoveryKey)
cfg.EnableEndpointDiscovery = &v
}
updateBoolPtr(&cfg.EnableEndpointDiscovery, section, enableEndpointDiscoveryKey)
// CSM options
updateBoolPtr(&cfg.CSMEnabled, section, csmEnabledKey)
updateString(&cfg.CSMHost, section, csmHostKey)
updateString(&cfg.CSMPort, section, csmPortKey)
updateString(&cfg.CSMClientID, section, csmClientIDKey)
return nil
}
@ -348,6 +363,16 @@ func updateString(dst *string, section ini.Section, key string) {
*dst = section.String(key)
}
// updateBoolPtr will only update the dst with the value in the section key,
// key is present in the section.
func updateBoolPtr(dst **bool, section ini.Section, key string) {
if !section.Has(key) {
return
}
*dst = new(bool)
**dst = section.Bool(key)
}
// SharedConfigLoadError is an error for the shared config file failed to load.
type SharedConfigLoadError struct {
Filename string

View File

@ -5,4 +5,4 @@ package aws
const SDKName = "aws-sdk-go"
// SDKVersion is the version of this SDK
const SDKVersion = "1.22.2"
const SDKVersion = "1.24.1"

View File

@ -0,0 +1,15 @@
// +build go1.10
package sdkmath
import "math"
// Round returns the nearest integer, rounding half away from zero.
//
// Special cases are:
// Round(±0) = ±0
// Round(±Inf) = ±Inf
// Round(NaN) = NaN
func Round(x float64) float64 {
return math.Round(x)
}

View File

@ -0,0 +1,56 @@
// +build !go1.10
package sdkmath
import "math"
// Copied from the Go standard library's (Go 1.12) math/floor.go for use in
// Go version prior to Go 1.10.
const (
uvone = 0x3FF0000000000000
mask = 0x7FF
shift = 64 - 11 - 1
bias = 1023
signMask = 1 << 63
fracMask = 1<<shift - 1
)
// Round returns the nearest integer, rounding half away from zero.
//
// Special cases are:
// Round(±0) = ±0
// Round(±Inf) = ±Inf
// Round(NaN) = NaN
//
// Copied from the Go standard library's (Go 1.12) math/floor.go for use in
// Go version prior to Go 1.10.
func Round(x float64) float64 {
// Round is a faster implementation of:
//
// func Round(x float64) float64 {
// t := Trunc(x)
// if Abs(x-t) >= 0.5 {
// return t + Copysign(1, x)
// }
// return t
// }
bits := math.Float64bits(x)
e := uint(bits>>shift) & mask
if e < bias {
// Round abs(x) < 1 including denormals.
bits &= signMask // +-0
if e == bias-1 {
bits |= uvone // +-1
}
} else if e < bias+shift {
// Round any abs(x) >= 1 containing a fractional component [0,1).
//
// Numbers with larger exponents are returned unchanged since they
// must be either an integer, infinity, or NaN.
const half = 1 << (shift - 1)
e -= bias
bits += half >> e
bits &^= fracMask >> e
}
return math.Float64frombits(bits)
}

View File

@ -146,6 +146,9 @@ func unmarshalStatusCode(v reflect.Value, statusCode int) {
}
func unmarshalHeaderMap(r reflect.Value, headers http.Header, prefix string) error {
if len(headers) == 0 {
return nil
}
switch r.Interface().(type) {
case map[string]*string: // we only support string map value types
out := map[string]*string{}
@ -155,19 +158,28 @@ func unmarshalHeaderMap(r reflect.Value, headers http.Header, prefix string) err
out[k[len(prefix):]] = &v[0]
}
}
r.Set(reflect.ValueOf(out))
if len(out) != 0 {
r.Set(reflect.ValueOf(out))
}
}
return nil
}
func unmarshalHeader(v reflect.Value, header string, tag reflect.StructTag) error {
isJSONValue := tag.Get("type") == "jsonvalue"
if isJSONValue {
switch tag.Get("type") {
case "jsonvalue":
if len(header) == 0 {
return nil
}
} else if !v.IsValid() || (header == "" && v.Elem().Kind() != reflect.String) {
return nil
case "blob":
if len(header) == 0 {
return nil
}
default:
if !v.IsValid() || (header == "" && v.Elem().Kind() != reflect.String) {
return nil
}
}
switch v.Interface().(type) {
@ -178,7 +190,7 @@ func unmarshalHeader(v reflect.Value, header string, tag reflect.StructTag) erro
if err != nil {
return err
}
v.Set(reflect.ValueOf(&b))
v.Set(reflect.ValueOf(b))
case *bool:
b, err := strconv.ParseBool(header)
if err != nil {

View File

@ -39,7 +39,7 @@ func Build(r *request.Request) {
r.Error = awserr.NewRequestFailure(
awserr.New(request.ErrCodeSerialization,
"failed to encode rest XML request", err),
r.HTTPResponse.StatusCode,
0,
r.RequestID,
)
return

View File

@ -1,8 +1,11 @@
package protocol
import (
"math"
"strconv"
"time"
"github.com/aws/aws-sdk-go/internal/sdkmath"
)
// Names of time formats supported by the SDK
@ -13,12 +16,19 @@ const (
)
// Time formats supported by the SDK
// Output time is intended to not contain decimals
const (
// RFC 7231#section-7.1.1.1 timetamp format. e.g Tue, 29 Apr 2014 18:30:38 GMT
RFC822TimeFormat = "Mon, 2 Jan 2006 15:04:05 GMT"
// This format is used for output time without seconds precision
RFC822OutputTimeFormat = "Mon, 02 Jan 2006 15:04:05 GMT"
// RFC3339 a subset of the ISO8601 timestamp format. e.g 2014-04-29T18:30:38Z
ISO8601TimeFormat = "2006-01-02T15:04:05Z"
ISO8601TimeFormat = "2006-01-02T15:04:05.999999999Z"
// This format is used for output time without seconds precision
ISO8601OutputTimeFormat = "2006-01-02T15:04:05Z"
)
// IsKnownTimestampFormat returns if the timestamp format name
@ -42,9 +52,9 @@ func FormatTime(name string, t time.Time) string {
switch name {
case RFC822TimeFormatName:
return t.Format(RFC822TimeFormat)
return t.Format(RFC822OutputTimeFormat)
case ISO8601TimeFormatName:
return t.Format(ISO8601TimeFormat)
return t.Format(ISO8601OutputTimeFormat)
case UnixTimeFormatName:
return strconv.FormatInt(t.Unix(), 10)
default:
@ -62,10 +72,12 @@ func ParseTime(formatName, value string) (time.Time, error) {
return time.Parse(ISO8601TimeFormat, value)
case UnixTimeFormatName:
v, err := strconv.ParseFloat(value, 64)
_, dec := math.Modf(v)
dec = sdkmath.Round(dec*1e3) / 1e3 //Rounds 0.1229999 to 0.123
if err != nil {
return time.Time{}, err
}
return time.Unix(int64(v), 0), nil
return time.Unix(int64(v), int64(dec*(1e9))), nil
default:
panic("unknown timestamp format name, " + formatName)
}

File diff suppressed because it is too large Load Diff

View File

@ -8,65 +8,32 @@ import (
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/endpoints"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/internal/sdkrand"
)
type retryer struct {
client.DefaultRetryer
}
const (
// customRetryerMinRetryDelay sets min retry delay
customRetryerMinRetryDelay = 1 * time.Second
func (d retryer) RetryRules(r *request.Request) time.Duration {
switch r.Operation.Name {
case opModifyNetworkInterfaceAttribute:
fallthrough
case opAssignPrivateIpAddresses:
return customRetryRule(r)
default:
return d.DefaultRetryer.RetryRules(r)
}
}
func customRetryRule(r *request.Request) time.Duration {
retryTimes := []time.Duration{
time.Second,
3 * time.Second,
5 * time.Second,
}
count := r.RetryCount
if count >= len(retryTimes) {
count = len(retryTimes) - 1
}
minTime := int(retryTimes[count])
return time.Duration(sdkrand.SeededRand.Intn(minTime) + minTime)
}
func setCustomRetryer(c *client.Client) {
maxRetries := aws.IntValue(c.Config.MaxRetries)
if c.Config.MaxRetries == nil || maxRetries == aws.UseServiceDefaultRetries {
maxRetries = 3
}
c.Retryer = retryer{
DefaultRetryer: client.DefaultRetryer{
NumMaxRetries: maxRetries,
},
}
}
// customRetryerMaxRetryDelay sets max retry delay
customRetryerMaxRetryDelay = 8 * time.Second
)
func init() {
initClient = func(c *client.Client) {
if c.Config.Retryer == nil {
// Only override the retryer with a custom one if the config
// does not already contain a retryer
setCustomRetryer(c)
}
}
initRequest = func(r *request.Request) {
if r.Operation.Name == opCopySnapshot { // fill the PresignedURL parameter
r.Handlers.Build.PushFront(fillPresignedURL)
}
// only set the retryer on request if config doesn't have a retryer
if r.Config.Retryer == nil && (r.Operation.Name == opModifyNetworkInterfaceAttribute || r.Operation.Name == opAssignPrivateIpAddresses) {
r.Retryer = client.DefaultRetryer{
NumMaxRetries: client.DefaultRetryerMaxNumRetries,
MinRetryDelay: customRetryerMinRetryDelay,
MinThrottleDelay: customRetryerMinRetryDelay,
MaxRetryDelay: customRetryerMaxRetryDelay,
MaxThrottleDelay: customRetryerMaxRetryDelay,
}
}
}
}

View File

@ -9,7 +9,7 @@
//
// To learn more, see the following resources:
//
// * Amazon EC2: Amazon EC2 product page (http://aws.amazon.com/ec2), Amazon
// * Amazon EC2: AmazonEC2 product page (http://aws.amazon.com/ec2), Amazon
// EC2 documentation (http://aws.amazon.com/documentation/ec2)
//
// * Amazon EBS: Amazon EBS product page (http://aws.amazon.com/ebs), Amazon

View File

@ -690,6 +690,10 @@ type EC2API interface {
DescribeElasticGpusWithContext(aws.Context, *ec2.DescribeElasticGpusInput, ...request.Option) (*ec2.DescribeElasticGpusOutput, error)
DescribeElasticGpusRequest(*ec2.DescribeElasticGpusInput) (*request.Request, *ec2.DescribeElasticGpusOutput)
DescribeExportImageTasks(*ec2.DescribeExportImageTasksInput) (*ec2.DescribeExportImageTasksOutput, error)
DescribeExportImageTasksWithContext(aws.Context, *ec2.DescribeExportImageTasksInput, ...request.Option) (*ec2.DescribeExportImageTasksOutput, error)
DescribeExportImageTasksRequest(*ec2.DescribeExportImageTasksInput) (*request.Request, *ec2.DescribeExportImageTasksOutput)
DescribeExportTasks(*ec2.DescribeExportTasksInput) (*ec2.DescribeExportTasksOutput, error)
DescribeExportTasksWithContext(aws.Context, *ec2.DescribeExportTasksInput, ...request.Option) (*ec2.DescribeExportTasksOutput, error)
DescribeExportTasksRequest(*ec2.DescribeExportTasksInput) (*request.Request, *ec2.DescribeExportTasksOutput)
@ -1272,6 +1276,10 @@ type EC2API interface {
ExportClientVpnClientConfigurationWithContext(aws.Context, *ec2.ExportClientVpnClientConfigurationInput, ...request.Option) (*ec2.ExportClientVpnClientConfigurationOutput, error)
ExportClientVpnClientConfigurationRequest(*ec2.ExportClientVpnClientConfigurationInput) (*request.Request, *ec2.ExportClientVpnClientConfigurationOutput)
ExportImage(*ec2.ExportImageInput) (*ec2.ExportImageOutput, error)
ExportImageWithContext(aws.Context, *ec2.ExportImageInput, ...request.Option) (*ec2.ExportImageOutput, error)
ExportImageRequest(*ec2.ExportImageInput) (*request.Request, *ec2.ExportImageOutput)
ExportTransitGatewayRoutes(*ec2.ExportTransitGatewayRoutesInput) (*ec2.ExportTransitGatewayRoutesOutput, error)
ExportTransitGatewayRoutesWithContext(aws.Context, *ec2.ExportTransitGatewayRoutesInput, ...request.Option) (*ec2.ExportTransitGatewayRoutesOutput, error)
ExportTransitGatewayRoutesRequest(*ec2.ExportTransitGatewayRoutesInput) (*request.Request, *ec2.ExportTransitGatewayRoutesOutput)
@ -1493,6 +1501,14 @@ type EC2API interface {
ModifyVpnConnectionWithContext(aws.Context, *ec2.ModifyVpnConnectionInput, ...request.Option) (*ec2.ModifyVpnConnectionOutput, error)
ModifyVpnConnectionRequest(*ec2.ModifyVpnConnectionInput) (*request.Request, *ec2.ModifyVpnConnectionOutput)
ModifyVpnTunnelCertificate(*ec2.ModifyVpnTunnelCertificateInput) (*ec2.ModifyVpnTunnelCertificateOutput, error)
ModifyVpnTunnelCertificateWithContext(aws.Context, *ec2.ModifyVpnTunnelCertificateInput, ...request.Option) (*ec2.ModifyVpnTunnelCertificateOutput, error)
ModifyVpnTunnelCertificateRequest(*ec2.ModifyVpnTunnelCertificateInput) (*request.Request, *ec2.ModifyVpnTunnelCertificateOutput)
ModifyVpnTunnelOptions(*ec2.ModifyVpnTunnelOptionsInput) (*ec2.ModifyVpnTunnelOptionsOutput, error)
ModifyVpnTunnelOptionsWithContext(aws.Context, *ec2.ModifyVpnTunnelOptionsInput, ...request.Option) (*ec2.ModifyVpnTunnelOptionsOutput, error)
ModifyVpnTunnelOptionsRequest(*ec2.ModifyVpnTunnelOptionsInput) (*request.Request, *ec2.ModifyVpnTunnelOptionsOutput)
MonitorInstances(*ec2.MonitorInstancesInput) (*ec2.MonitorInstancesOutput, error)
MonitorInstancesWithContext(aws.Context, *ec2.MonitorInstancesInput, ...request.Option) (*ec2.MonitorInstancesOutput, error)
MonitorInstancesRequest(*ec2.MonitorInstancesInput) (*request.Request, *ec2.MonitorInstancesOutput)
@ -1633,6 +1649,10 @@ type EC2API interface {
SearchTransitGatewayRoutesWithContext(aws.Context, *ec2.SearchTransitGatewayRoutesInput, ...request.Option) (*ec2.SearchTransitGatewayRoutesOutput, error)
SearchTransitGatewayRoutesRequest(*ec2.SearchTransitGatewayRoutesInput) (*request.Request, *ec2.SearchTransitGatewayRoutesOutput)
SendDiagnosticInterrupt(*ec2.SendDiagnosticInterruptInput) (*ec2.SendDiagnosticInterruptOutput, error)
SendDiagnosticInterruptWithContext(aws.Context, *ec2.SendDiagnosticInterruptInput, ...request.Option) (*ec2.SendDiagnosticInterruptOutput, error)
SendDiagnosticInterruptRequest(*ec2.SendDiagnosticInterruptInput) (*request.Request, *ec2.SendDiagnosticInterruptOutput)
StartInstances(*ec2.StartInstancesInput) (*ec2.StartInstancesOutput, error)
StartInstancesWithContext(aws.Context, *ec2.StartInstancesInput, ...request.Option) (*ec2.StartInstancesOutput, error)
StartInstancesRequest(*ec2.StartInstancesInput) (*request.Request, *ec2.StartInstancesOutput)

View File

@ -0,0 +1,81 @@
package s3manager
import (
"io"
"github.com/aws/aws-sdk-go/internal/sdkio"
)
// BufferedReadSeeker is buffered io.ReadSeeker
type BufferedReadSeeker struct {
r io.ReadSeeker
buffer []byte
readIdx, writeIdx int
}
// NewBufferedReadSeeker returns a new BufferedReadSeeker
// if len(b) == 0 then the buffer will be initialized to 64 KiB.
func NewBufferedReadSeeker(r io.ReadSeeker, b []byte) *BufferedReadSeeker {
if len(b) == 0 {
b = make([]byte, 64*1024)
}
return &BufferedReadSeeker{r: r, buffer: b}
}
func (b *BufferedReadSeeker) reset(r io.ReadSeeker) {
b.r = r
b.readIdx, b.writeIdx = 0, 0
}
// Read will read up len(p) bytes into p and will return
// the number of bytes read and any error that occurred.
// If the len(p) > the buffer size then a single read request
// will be issued to the underlying io.ReadSeeker for len(p) bytes.
// A Read request will at most perform a single Read to the underlying
// io.ReadSeeker, and may return < len(p) if serviced from the buffer.
func (b *BufferedReadSeeker) Read(p []byte) (n int, err error) {
if len(p) == 0 {
return n, err
}
if b.readIdx == b.writeIdx {
if len(p) >= len(b.buffer) {
n, err = b.r.Read(p)
return n, err
}
b.readIdx, b.writeIdx = 0, 0
n, err = b.r.Read(b.buffer)
if n == 0 {
return n, err
}
b.writeIdx += n
}
n = copy(p, b.buffer[b.readIdx:b.writeIdx])
b.readIdx += n
return n, err
}
// Seek will position then underlying io.ReadSeeker to the given offset
// and will clear the buffer.
func (b *BufferedReadSeeker) Seek(offset int64, whence int) (int64, error) {
n, err := b.r.Seek(offset, whence)
b.reset(b.r)
return n, err
}
// ReadAt will read up to len(p) bytes at the given file offset.
// This will result in the buffer being cleared.
func (b *BufferedReadSeeker) ReadAt(p []byte, off int64) (int, error) {
_, err := b.Seek(off, sdkio.SeekStart)
if err != nil {
return 0, err
}
return b.Read(p)
}

View File

@ -0,0 +1,7 @@
// +build !windows
package s3manager
func defaultUploadBufferProvider() ReadSeekerWriteToProvider {
return nil
}

View File

@ -0,0 +1,5 @@
package s3manager
func defaultUploadBufferProvider() ReadSeekerWriteToProvider {
return NewBufferedReadSeekerWriteToPool(1024 * 1024)
}

View File

@ -99,7 +99,7 @@ func NewDownloader(c client.ConfigProvider, options ...func(*Downloader)) *Downl
// sess := session.Must(session.NewSession())
//
// // The S3 client the S3 Downloader will use
// s3Svc := s3.new(sess)
// s3Svc := s3.New(sess)
//
// // Create a downloader with the s3 client and default options
// downloader := s3manager.NewDownloaderWithClient(s3Svc)

View File

@ -0,0 +1,65 @@
package s3manager
import (
"io"
"sync"
)
// ReadSeekerWriteTo defines an interface implementing io.WriteTo and io.ReadSeeker
type ReadSeekerWriteTo interface {
io.ReadSeeker
io.WriterTo
}
// BufferedReadSeekerWriteTo wraps a BufferedReadSeeker with an io.WriteAt
// implementation.
type BufferedReadSeekerWriteTo struct {
*BufferedReadSeeker
}
// WriteTo writes to the given io.Writer from BufferedReadSeeker until there's no more data to write or
// an error occurs. Returns the number of bytes written and any error encountered during the write.
func (b *BufferedReadSeekerWriteTo) WriteTo(writer io.Writer) (int64, error) {
return io.Copy(writer, b.BufferedReadSeeker)
}
// ReadSeekerWriteToProvider provides an implementation of io.WriteTo for an io.ReadSeeker
type ReadSeekerWriteToProvider interface {
GetWriteTo(seeker io.ReadSeeker) (r ReadSeekerWriteTo, cleanup func())
}
// BufferedReadSeekerWriteToPool uses a sync.Pool to create and reuse
// []byte slices for buffering parts in memory
type BufferedReadSeekerWriteToPool struct {
pool sync.Pool
}
// NewBufferedReadSeekerWriteToPool will return a new BufferedReadSeekerWriteToPool that will create
// a pool of reusable buffers . If size is less then < 64 KiB then the buffer
// will default to 64 KiB. Reason: io.Copy from writers or readers that don't support io.WriteTo or io.ReadFrom
// respectively will default to copying 32 KiB.
func NewBufferedReadSeekerWriteToPool(size int) *BufferedReadSeekerWriteToPool {
if size < 65536 {
size = 65536
}
return &BufferedReadSeekerWriteToPool{
pool: sync.Pool{New: func() interface{} {
return make([]byte, size)
}},
}
}
// GetWriteTo will wrap the provided io.ReadSeeker with a BufferedReadSeekerWriteTo.
// The provided cleanup must be called after operations have been completed on the
// returned io.ReadSeekerWriteTo in order to signal the return of resources to the pool.
func (p *BufferedReadSeekerWriteToPool) GetWriteTo(seeker io.ReadSeeker) (r ReadSeekerWriteTo, cleanup func()) {
buffer := p.pool.Get().([]byte)
r = &BufferedReadSeekerWriteTo{BufferedReadSeeker: NewBufferedReadSeeker(seeker, buffer)}
cleanup = func() {
p.pool.Put(buffer)
}
return r, cleanup
}

View File

@ -162,6 +162,9 @@ type Uploader struct {
// List of request options that will be passed down to individual API
// operation requests made by the uploader.
RequestOptions []request.Option
// Defines the buffer strategy used when uploading a part
BufferProvider ReadSeekerWriteToProvider
}
// NewUploader creates a new Uploader instance to upload objects to S3. Pass In
@ -187,6 +190,7 @@ func NewUploader(c client.ConfigProvider, options ...func(*Uploader)) *Uploader
Concurrency: DefaultUploadConcurrency,
LeavePartsOnError: false,
MaxUploadParts: MaxUploadParts,
BufferProvider: defaultUploadBufferProvider(),
}
for _, option := range options {
@ -221,6 +225,7 @@ func NewUploaderWithClient(svc s3iface.S3API, options ...func(*Uploader)) *Uploa
Concurrency: DefaultUploadConcurrency,
LeavePartsOnError: false,
MaxUploadParts: MaxUploadParts,
BufferProvider: defaultUploadBufferProvider(),
}
for _, option := range options {
@ -357,7 +362,8 @@ type uploader struct {
readerPos int64 // current reader position
totalSize int64 // set to -1 if the size is not known
bufferPool sync.Pool
bufferPool sync.Pool
bufferUploadPool sync.Pool
}
// internal logic for deciding whether to upload a single part or use a
@ -373,15 +379,16 @@ func (u *uploader) upload() (*UploadOutput, error) {
}
// Do one read to determine if we have more than one part
reader, _, part, err := u.nextReader()
reader, _, part, cleanup, err := u.nextReader()
if err == io.EOF { // single part
return u.singlePart(reader)
return u.singlePart(reader, cleanup)
} else if err != nil {
cleanup()
return nil, awserr.New("ReadRequestBody", "read upload data failed", err)
}
mu := multiuploader{uploader: u}
return mu.upload(reader, part)
return mu.upload(reader, part, cleanup)
}
// init will initialize all default options.
@ -433,7 +440,7 @@ func (u *uploader) initSize() error {
// This operation increases the shared u.readerPos counter, but note that it
// does not need to be wrapped in a mutex because nextReader is only called
// from the main thread.
func (u *uploader) nextReader() (io.ReadSeeker, int, []byte, error) {
func (u *uploader) nextReader() (io.ReadSeeker, int, []byte, func(), error) {
type readerAtSeeker interface {
io.ReaderAt
io.ReadSeeker
@ -452,17 +459,32 @@ func (u *uploader) nextReader() (io.ReadSeeker, int, []byte, error) {
}
}
reader := io.NewSectionReader(r, u.readerPos, n)
var (
reader io.ReadSeeker
cleanup func()
)
reader = io.NewSectionReader(r, u.readerPos, n)
if u.cfg.BufferProvider != nil {
reader, cleanup = u.cfg.BufferProvider.GetWriteTo(reader)
} else {
cleanup = func() {}
}
u.readerPos += n
return reader, int(n), nil, err
return reader, int(n), nil, cleanup, err
default:
part := u.bufferPool.Get().([]byte)
n, err := readFillBuf(r, part)
u.readerPos += int64(n)
return bytes.NewReader(part[0:n]), n, part, err
cleanup := func() {
u.bufferPool.Put(part)
}
return bytes.NewReader(part[0:n]), n, part, cleanup, err
}
}
@ -479,10 +501,12 @@ func readFillBuf(r io.Reader, b []byte) (offset int, err error) {
// singlePart contains upload logic for uploading a single chunk via
// a regular PutObject request. Multipart requests require at least two
// parts, or at least 5MB of data.
func (u *uploader) singlePart(buf io.ReadSeeker) (*UploadOutput, error) {
func (u *uploader) singlePart(r io.ReadSeeker, cleanup func()) (*UploadOutput, error) {
defer cleanup()
params := &s3.PutObjectInput{}
awsutil.Copy(params, u.in)
params.Body = buf
params.Body = r
// Need to use request form because URL generated in request is
// used in return.
@ -512,9 +536,10 @@ type multiuploader struct {
// keeps track of a single chunk of data being sent to S3.
type chunk struct {
buf io.ReadSeeker
part []byte
num int64
buf io.ReadSeeker
part []byte
num int64
cleanup func()
}
// completedParts is a wrapper to make parts sortable by their part number,
@ -527,7 +552,7 @@ func (a completedParts) Less(i, j int) bool { return *a[i].PartNumber < *a[j].Pa
// upload will perform a multipart upload using the firstBuf buffer containing
// the first chunk of data.
func (u *multiuploader) upload(firstBuf io.ReadSeeker, firstPart []byte) (*UploadOutput, error) {
func (u *multiuploader) upload(firstBuf io.ReadSeeker, firstPart []byte, cleanup func()) (*UploadOutput, error) {
params := &s3.CreateMultipartUploadInput{}
awsutil.Copy(params, u.in)
@ -547,46 +572,30 @@ func (u *multiuploader) upload(firstBuf io.ReadSeeker, firstPart []byte) (*Uploa
// Send part 1 to the workers
var num int64 = 1
ch <- chunk{buf: firstBuf, part: firstPart, num: num}
ch <- chunk{buf: firstBuf, part: firstPart, num: num, cleanup: cleanup}
// Read and queue the rest of the parts
for u.geterr() == nil && err == nil {
var reader io.ReadSeeker
var nextChunkLen int
var part []byte
reader, nextChunkLen, part, err = u.nextReader()
var (
reader io.ReadSeeker
nextChunkLen int
part []byte
ok bool
)
if err != nil && err != io.EOF {
u.seterr(awserr.New(
"ReadRequestBody",
"read multipart upload data failed",
err))
break
}
if nextChunkLen == 0 {
// No need to upload empty part, if file was empty to start
// with empty single part would of been created and never
// started multipart upload.
reader, nextChunkLen, part, cleanup, err = u.nextReader()
ok, err = u.shouldContinue(num, nextChunkLen, err)
if !ok {
cleanup()
if err != nil {
u.seterr(err)
}
break
}
num++
// This upload exceeded maximum number of supported parts, error now.
if num > int64(u.cfg.MaxUploadParts) || num > int64(MaxUploadParts) {
var msg string
if num > int64(u.cfg.MaxUploadParts) {
msg = fmt.Sprintf("exceeded total allowed configured MaxUploadParts (%d). Adjust PartSize to fit in this limit",
u.cfg.MaxUploadParts)
} else {
msg = fmt.Sprintf("exceeded total allowed S3 limit MaxUploadParts (%d). Adjust PartSize to fit in this limit",
MaxUploadParts)
}
u.seterr(awserr.New("TotalPartsExceeded", msg, nil))
break
}
ch <- chunk{buf: reader, part: part, num: num}
ch <- chunk{buf: reader, part: part, num: num, cleanup: cleanup}
}
// Close the channel, wait for workers, and complete upload
@ -620,6 +629,35 @@ func (u *multiuploader) upload(firstBuf io.ReadSeeker, firstPart []byte) (*Uploa
}, nil
}
func (u *multiuploader) shouldContinue(part int64, nextChunkLen int, err error) (bool, error) {
if err != nil && err != io.EOF {
return false, awserr.New("ReadRequestBody", "read multipart upload data failed", err)
}
if nextChunkLen == 0 {
// No need to upload empty part, if file was empty to start
// with empty single part would of been created and never
// started multipart upload.
return false, nil
}
part++
// This upload exceeded maximum number of supported parts, error now.
if part > int64(u.cfg.MaxUploadParts) || part > int64(MaxUploadParts) {
var msg string
if part > int64(u.cfg.MaxUploadParts) {
msg = fmt.Sprintf("exceeded total allowed configured MaxUploadParts (%d). Adjust PartSize to fit in this limit",
u.cfg.MaxUploadParts)
} else {
msg = fmt.Sprintf("exceeded total allowed S3 limit MaxUploadParts (%d). Adjust PartSize to fit in this limit",
MaxUploadParts)
}
return false, awserr.New("TotalPartsExceeded", msg, nil)
}
return true, err
}
// readChunk runs in worker goroutines to pull chunks off of the ch channel
// and send() them as UploadPart requests.
func (u *multiuploader) readChunk(ch chan chunk) {
@ -651,9 +689,9 @@ func (u *multiuploader) send(c chunk) error {
SSECustomerKey: u.in.SSECustomerKey,
PartNumber: &c.num,
}
resp, err := u.cfg.S3.UploadPartWithContext(u.ctx, params, u.cfg.RequestOptions...)
// put the byte array back into the pool to conserve memory
u.bufferPool.Put(c.part)
c.cleanup()
if err != nil {
return err
}

View File

@ -0,0 +1,11 @@
package sts
import "github.com/aws/aws-sdk-go/aws/request"
func init() {
initRequest = customizeRequest
}
func customizeRequest(r *request.Request) {
r.RetryErrorCodes = append(r.RetryErrorCodes, ErrCodeIDPCommunicationErrorException)
}

3
vendor/modules.txt vendored
View File

@ -91,7 +91,7 @@ github.com/approvals/go-approval-tests/utils
github.com/armon/go-metrics
# github.com/armon/go-radix v1.0.0
github.com/armon/go-radix
# github.com/aws/aws-sdk-go v1.22.2
# github.com/aws/aws-sdk-go v1.24.1
github.com/aws/aws-sdk-go/aws
github.com/aws/aws-sdk-go/aws/awserr
github.com/aws/aws-sdk-go/aws/awsutil
@ -113,6 +113,7 @@ github.com/aws/aws-sdk-go/aws/signer/v4
github.com/aws/aws-sdk-go/internal/ini
github.com/aws/aws-sdk-go/internal/s3err
github.com/aws/aws-sdk-go/internal/sdkio
github.com/aws/aws-sdk-go/internal/sdkmath
github.com/aws/aws-sdk-go/internal/sdkrand
github.com/aws/aws-sdk-go/internal/sdkuri
github.com/aws/aws-sdk-go/internal/shareddefaults