
469 lines
13 KiB
Raw Normal View History

2017-03-04 05:06:32 -05:00
package oss
import (
2019-10-14 10:21:52 -04:00
2017-03-04 05:06:32 -05:00
2019-10-14 10:21:52 -04:00
2017-03-04 05:06:32 -05:00
2019-10-14 10:21:52 -04:00
// CopyFile is multipart copy object
2017-03-04 05:06:32 -05:00
2019-10-14 10:21:52 -04:00
// srcBucketName source bucket name
// srcObjectKey source object name
// destObjectKey target object name in the form of bucketname.objectkey
// partSize the part size in byte.
// options object's contraints. Check out function InitiateMultipartUpload.
2017-03-04 05:06:32 -05:00
2019-10-14 10:21:52 -04:00
// error it's nil if the operation succeeds, otherwise it's an error object.
2017-03-04 05:06:32 -05:00
func (bucket Bucket) CopyFile(srcBucketName, srcObjectKey, destObjectKey string, partSize int64, options ...Option) error {
destBucketName := bucket.BucketName
if partSize < MinPartSize || partSize > MaxPartSize {
return errors.New("oss: part size invalid range (1024KB, 5GB]")
2019-10-14 10:21:52 -04:00
cpConf := getCpConfig(options)
2017-03-04 05:06:32 -05:00
routines := getRoutines(options)
2019-10-14 10:21:52 -04:00
if cpConf != nil && cpConf.IsEnable {
cpFilePath := getCopyCpFilePath(cpConf, srcBucketName, srcObjectKey, destBucketName, destObjectKey)
if cpFilePath != "" {
return bucket.copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey, partSize, options, cpFilePath, routines)
2017-03-04 05:06:32 -05:00
return bucket.copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey,
partSize, options, routines)
2019-10-14 10:21:52 -04:00
func getCopyCpFilePath(cpConf *cpConfig, srcBucket, srcObject, destBucket, destObject string) string {
if cpConf.FilePath == "" && cpConf.DirPath != "" {
dest := fmt.Sprintf("oss://%v/%v", destBucket, destObject)
src := fmt.Sprintf("oss://%v/%v", srcBucket, srcObject)
cpFileName := getCpFileName(src, dest)
cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
return cpConf.FilePath
2017-03-04 05:06:32 -05:00
2019-10-14 10:21:52 -04:00
// ----- Concurrently copy without checkpoint ---------
// copyWorkerArg defines the copy worker arguments
2017-03-04 05:06:32 -05:00
type copyWorkerArg struct {
bucket *Bucket
imur InitiateMultipartUploadResult
srcBucketName string
srcObjectKey string
options []Option
hook copyPartHook
2019-10-14 10:21:52 -04:00
// copyPartHook is the hook for testing purpose
2017-03-04 05:06:32 -05:00
type copyPartHook func(part copyPart) error
var copyPartHooker copyPartHook = defaultCopyPartHook
func defaultCopyPartHook(part copyPart) error {
return nil
2019-10-14 10:21:52 -04:00
// copyWorker copies worker
2017-03-04 05:06:32 -05:00
func copyWorker(id int, arg copyWorkerArg, jobs <-chan copyPart, results chan<- UploadPart, failed chan<- error, die <-chan bool) {
for chunk := range jobs {
if err := arg.hook(chunk); err != nil {
failed <- err
chunkSize := chunk.End - chunk.Start + 1
part, err := arg.bucket.UploadPartCopy(arg.imur, arg.srcBucketName, arg.srcObjectKey,
chunk.Start, chunkSize, chunk.Number, arg.options...)
if err != nil {
failed <- err
select {
case <-die:
results <- part
2019-10-14 10:21:52 -04:00
// copyScheduler
2017-03-04 05:06:32 -05:00
func copyScheduler(jobs chan copyPart, parts []copyPart) {
for _, part := range parts {
jobs <- part
2019-10-14 10:21:52 -04:00
// copyPart structure
2017-03-04 05:06:32 -05:00
type copyPart struct {
2019-10-14 10:21:52 -04:00
Number int // Part number (from 1 to 10,000)
Start int64 // The start index in the source file.
End int64 // The end index in the source file
2017-03-04 05:06:32 -05:00
2019-10-14 10:21:52 -04:00
// getCopyParts calculates copy parts
func getCopyParts(objectSize, partSize int64) []copyPart {
2017-03-04 05:06:32 -05:00
parts := []copyPart{}
part := copyPart{}
i := 0
for offset := int64(0); offset < objectSize; offset += partSize {
part.Number = i + 1
part.Start = offset
part.End = GetPartEnd(offset, objectSize, partSize)
parts = append(parts, part)
2019-10-14 10:21:52 -04:00
return parts
2017-03-04 05:06:32 -05:00
2019-10-14 10:21:52 -04:00
// getSrcObjectBytes gets the source file size
2017-03-04 05:06:32 -05:00
func getSrcObjectBytes(parts []copyPart) int64 {
var ob int64
for _, part := range parts {
ob += (part.End - part.Start + 1)
return ob
2019-10-14 10:21:52 -04:00
// copyFile is a concurrently copy without checkpoint
2017-03-04 05:06:32 -05:00
func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
partSize int64, options []Option, routines int) error {
descBucket, err := bucket.Client.Bucket(destBucketName)
srcBucket, err := bucket.Client.Bucket(srcBucketName)
listener := getProgressListener(options)
2019-10-14 10:21:52 -04:00
payerOptions := []Option{}
payer := getPayer(options)
if payer != "" {
payerOptions = append(payerOptions, RequestPayer(PayerType(payer)))
meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey, payerOptions...)
if err != nil {
return err
objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
2017-03-04 05:06:32 -05:00
if err != nil {
return err
2019-10-14 10:21:52 -04:00
// Get copy parts
parts := getCopyParts(objectSize, partSize)
// Initialize the multipart upload
2017-03-04 05:06:32 -05:00
imur, err := descBucket.InitiateMultipartUpload(destObjectKey, options...)
if err != nil {
return err
jobs := make(chan copyPart, len(parts))
results := make(chan UploadPart, len(parts))
failed := make(chan error)
die := make(chan bool)
var completedBytes int64
totalBytes := getSrcObjectBytes(parts)
event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
publishProgress(listener, event)
2019-10-14 10:21:52 -04:00
// Start to copy workers
arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, payerOptions, copyPartHooker}
2017-03-04 05:06:32 -05:00
for w := 1; w <= routines; w++ {
go copyWorker(w, arg, jobs, results, failed, die)
2019-10-14 10:21:52 -04:00
// Start the scheduler
2017-03-04 05:06:32 -05:00
go copyScheduler(jobs, parts)
2019-10-14 10:21:52 -04:00
// Wait for the parts finished.
2017-03-04 05:06:32 -05:00
completed := 0
ups := make([]UploadPart, len(parts))
for completed < len(parts) {
select {
case part := <-results:
ups[part.PartNumber-1] = part
completedBytes += (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
publishProgress(listener, event)
case err := <-failed:
2019-10-14 10:21:52 -04:00
descBucket.AbortMultipartUpload(imur, payerOptions...)
2017-03-04 05:06:32 -05:00
event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
publishProgress(listener, event)
return err
if completed >= len(parts) {
event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes)
publishProgress(listener, event)
2019-10-14 10:21:52 -04:00
// Complete the multipart upload
_, err = descBucket.CompleteMultipartUpload(imur, ups, payerOptions...)
2017-03-04 05:06:32 -05:00
if err != nil {
2019-10-14 10:21:52 -04:00
bucket.AbortMultipartUpload(imur, payerOptions...)
2017-03-04 05:06:32 -05:00
return err
return nil
2019-10-14 10:21:52 -04:00
// ----- Concurrently copy with checkpoint -----
2017-03-04 05:06:32 -05:00
const copyCpMagic = "84F1F18C-FF1D-403B-A1D8-9DEB5F65910A"
type copyCheckpoint struct {
2019-10-14 10:21:52 -04:00
Magic string // Magic
MD5 string // CP content MD5
SrcBucketName string // Source bucket
SrcObjectKey string // Source object
DestBucketName string // Target bucket
DestObjectKey string // Target object
CopyID string // Copy ID
ObjStat objectStat // Object stat
Parts []copyPart // Copy parts
CopyParts []UploadPart // The uploaded parts
PartStat []bool // The part status
2017-03-04 05:06:32 -05:00
2019-10-14 10:21:52 -04:00
// isValid checks if the data is valid which means CP is valid and object is not updated.
func (cp copyCheckpoint) isValid(meta http.Header) (bool, error) {
// Compare CP's magic number and the MD5.
2017-03-04 05:06:32 -05:00
cpb := cp
cpb.MD5 = ""
js, _ := json.Marshal(cpb)
sum := md5.Sum(js)
b64 := base64.StdEncoding.EncodeToString(sum[:])
if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
return false, nil
objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
if err != nil {
return false, err
2019-10-14 10:21:52 -04:00
// Compare the object size and last modified time and etag.
2017-03-04 05:06:32 -05:00
if cp.ObjStat.Size != objectSize ||
cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
return false, nil
return true, nil
2019-10-14 10:21:52 -04:00
// load loads from the checkpoint file
2017-03-04 05:06:32 -05:00
func (cp *copyCheckpoint) load(filePath string) error {
contents, err := ioutil.ReadFile(filePath)
if err != nil {
return err
err = json.Unmarshal(contents, cp)
return err
2019-10-14 10:21:52 -04:00
// update updates the parts status
2017-03-04 05:06:32 -05:00
func (cp *copyCheckpoint) update(part UploadPart) {
cp.CopyParts[part.PartNumber-1] = part
cp.PartStat[part.PartNumber-1] = true
2019-10-14 10:21:52 -04:00
// dump dumps the CP to the file
2017-03-04 05:06:32 -05:00
func (cp *copyCheckpoint) dump(filePath string) error {
bcp := *cp
2019-10-14 10:21:52 -04:00
// Calculate MD5
2017-03-04 05:06:32 -05:00
bcp.MD5 = ""
js, err := json.Marshal(bcp)
if err != nil {
return err
sum := md5.Sum(js)
b64 := base64.StdEncoding.EncodeToString(sum[:])
bcp.MD5 = b64
2019-10-14 10:21:52 -04:00
// Serialization
2017-03-04 05:06:32 -05:00
js, err = json.Marshal(bcp)
if err != nil {
return err
2019-10-14 10:21:52 -04:00
// Dump
2017-03-04 05:06:32 -05:00
return ioutil.WriteFile(filePath, js, FilePermMode)
2019-10-14 10:21:52 -04:00
// todoParts returns unfinished parts
2017-03-04 05:06:32 -05:00
func (cp copyCheckpoint) todoParts() []copyPart {
dps := []copyPart{}
for i, ps := range cp.PartStat {
if !ps {
dps = append(dps, cp.Parts[i])
return dps
2019-10-14 10:21:52 -04:00
// getCompletedBytes returns finished bytes count
2017-03-04 05:06:32 -05:00
func (cp copyCheckpoint) getCompletedBytes() int64 {
var completedBytes int64
for i, part := range cp.Parts {
if cp.PartStat[i] {
completedBytes += (part.End - part.Start + 1)
return completedBytes
2019-10-14 10:21:52 -04:00
// prepare initializes the multipart upload
func (cp *copyCheckpoint) prepare(meta http.Header, srcBucket *Bucket, srcObjectKey string, destBucket *Bucket, destObjectKey string,
2017-03-04 05:06:32 -05:00
partSize int64, options []Option) error {
2019-10-14 10:21:52 -04:00
// CP
2017-03-04 05:06:32 -05:00
cp.Magic = copyCpMagic
cp.SrcBucketName = srcBucket.BucketName
cp.SrcObjectKey = srcObjectKey
cp.DestBucketName = destBucket.BucketName
cp.DestObjectKey = destObjectKey
objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
if err != nil {
return err
cp.ObjStat.Size = objectSize
cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
2019-10-14 10:21:52 -04:00
// Parts
cp.Parts = getCopyParts(objectSize, partSize)
2017-03-04 05:06:32 -05:00
cp.PartStat = make([]bool, len(cp.Parts))
for i := range cp.PartStat {
cp.PartStat[i] = false
cp.CopyParts = make([]UploadPart, len(cp.Parts))
2019-10-14 10:21:52 -04:00
// Init copy
2017-03-04 05:06:32 -05:00
imur, err := destBucket.InitiateMultipartUpload(destObjectKey, options...)
if err != nil {
return err
cp.CopyID = imur.UploadID
return nil
2019-10-14 10:21:52 -04:00
func (cp *copyCheckpoint) complete(bucket *Bucket, parts []UploadPart, cpFilePath string, options []Option) error {
2017-03-04 05:06:32 -05:00
imur := InitiateMultipartUploadResult{Bucket: cp.DestBucketName,
Key: cp.DestObjectKey, UploadID: cp.CopyID}
2019-10-14 10:21:52 -04:00
_, err := bucket.CompleteMultipartUpload(imur, parts, options...)
2017-03-04 05:06:32 -05:00
if err != nil {
return err
return err
2019-10-14 10:21:52 -04:00
// copyFileWithCp is concurrently copy with checkpoint
2017-03-04 05:06:32 -05:00
func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
partSize int64, options []Option, cpFilePath string, routines int) error {
descBucket, err := bucket.Client.Bucket(destBucketName)
srcBucket, err := bucket.Client.Bucket(srcBucketName)
listener := getProgressListener(options)
2019-10-14 10:21:52 -04:00
payerOptions := []Option{}
payer := getPayer(options)
if payer != "" {
payerOptions = append(payerOptions, RequestPayer(PayerType(payer)))
// Load CP data
2017-03-04 05:06:32 -05:00
ccp := copyCheckpoint{}
err = ccp.load(cpFilePath)
if err != nil {
2019-10-14 10:21:52 -04:00
// Make sure the object is not updated.
meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey, payerOptions...)
if err != nil {
return err
// Load error or the CP data is invalid---reinitialize
valid, err := ccp.isValid(meta)
2017-03-04 05:06:32 -05:00
if err != nil || !valid {
2019-10-14 10:21:52 -04:00
if err = ccp.prepare(meta, srcBucket, srcObjectKey, descBucket, destObjectKey, partSize, options); err != nil {
2017-03-04 05:06:32 -05:00
return err
2019-10-14 10:21:52 -04:00
// Unfinished parts
2017-03-04 05:06:32 -05:00
parts := ccp.todoParts()
imur := InitiateMultipartUploadResult{
Bucket: destBucketName,
Key: destObjectKey,
UploadID: ccp.CopyID}
jobs := make(chan copyPart, len(parts))
results := make(chan UploadPart, len(parts))
failed := make(chan error)
die := make(chan bool)
completedBytes := ccp.getCompletedBytes()
event := newProgressEvent(TransferStartedEvent, completedBytes, ccp.ObjStat.Size)
publishProgress(listener, event)
2019-10-14 10:21:52 -04:00
// Start the worker coroutines
arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, payerOptions, copyPartHooker}
2017-03-04 05:06:32 -05:00
for w := 1; w <= routines; w++ {
go copyWorker(w, arg, jobs, results, failed, die)
2019-10-14 10:21:52 -04:00
// Start the scheduler
2017-03-04 05:06:32 -05:00
go copyScheduler(jobs, parts)
2019-10-14 10:21:52 -04:00
// Wait for the parts completed.
2017-03-04 05:06:32 -05:00
completed := 0
for completed < len(parts) {
select {
case part := <-results:
completedBytes += (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
event = newProgressEvent(TransferDataEvent, completedBytes, ccp.ObjStat.Size)
publishProgress(listener, event)
case err := <-failed:
event = newProgressEvent(TransferFailedEvent, completedBytes, ccp.ObjStat.Size)
publishProgress(listener, event)
return err
if completed >= len(parts) {
event = newProgressEvent(TransferCompletedEvent, completedBytes, ccp.ObjStat.Size)
publishProgress(listener, event)
2019-10-14 10:21:52 -04:00
return ccp.complete(descBucket, ccp.CopyParts, cpFilePath, payerOptions)
2017-03-04 05:06:32 -05:00