package ufsdk import ( "bytes" "crypto/md5" "encoding/json" "fmt" "io" "math" "net/http" "net/url" "strconv" "strings" "sync" ) //MultipartState 用于保存分片上传的中间状态 type MultipartState struct { BlkSize int //服务器返回的分片大小 uploadID string mimeType string keyName string etags map[int]string mux sync.Mutex } //UnmarshalJSON custom unmarshal json func (m *MultipartState) UnmarshalJSON(bytes []byte) error { tmp := struct { BlkSize int `json:"BlkSize"` UploadID string `json:"UploadId"` }{} err := json.Unmarshal(bytes, &tmp) if err != nil { return err } m.BlkSize = tmp.BlkSize m.uploadID = tmp.UploadID return nil } type uploadChan struct { etag string err error } //MPut 分片上传一个文件,filePath 是本地文件所在的路径,内部会自动对文件进行分片上传,上传的方式是同步一片一片的上传。 //mimeType 如果为空的话,会调用 net/http 里面的 DetectContentType 进行检测。 //keyName 表示传到 ufile 的文件名。 //大于 100M 的文件推荐使用本接口上传。 func (u *UFileRequest) MPut(filePath, keyName, mimeType string) error { file, err := openFile(filePath) if err != nil { return err } defer file.Close() if mimeType == "" { mimeType = getMimeType(file) } state, err := u.InitiateMultipartUpload(keyName, mimeType) if err != nil { return err } chunk := make([]byte, state.BlkSize) var pos int for { bytesRead, fileErr := file.Read(chunk) if fileErr == io.EOF || bytesRead == 0 { //后面直接读到了结尾 break } buf := bytes.NewBuffer(chunk[:bytesRead]) err := u.UploadPart(buf, state, pos) if err != nil { u.AbortMultipartUpload(state) return err } pos++ } return u.FinishMultipartUpload(state) } //AsyncMPut 异步分片上传一个文件,filePath 是本地文件所在的路径,内部会自动对文件进行分片上传,上传的方式是使用异步的方式同时传多个分片的块。 //mimeType 如果为空的话,会调用 net/http 里面的 DetectContentType 进行检测。 //keyName 表示传到 ufile 的文件名。 //大于 100M 的文件推荐使用本接口上传。 //同时并发上传的分片数量为10 func (u *UFileRequest) AsyncMPut(filePath, keyName, mimeType string) error { return u.AsyncUpload(filePath, keyName, mimeType, 10) } //AsyncUpload AsyncMPut 的升级版, jobs 表示同时并发的数量。 func (u *UFileRequest) AsyncUpload(filePath, keyName, mimeType string, jobs int) error { if jobs <= 0 { jobs = 1 } if jobs >= 30 { jobs = 10 } file, err := openFile(filePath) if err != nil { return err } defer file.Close() if mimeType == "" { mimeType = getMimeType(file) } state, err := u.InitiateMultipartUpload(keyName, mimeType) if err != nil { return err } fsize := getFileSize(file) chunkCount := divideCeil(fsize, int64(state.BlkSize)) //向上取整 concurrentChan := make(chan error, jobs) for i := 0; i != jobs; i++ { concurrentChan <- nil } wg := &sync.WaitGroup{} for i := 0; i != chunkCount; i++ { uploadErr := <-concurrentChan //最初允许启动 10 个 goroutine,超出10个后,有分片返回才会开新的goroutine. if uploadErr != nil { err = uploadErr break // 中间如果出现错误立即停止继续上传 } wg.Add(1) go func(pos int) { defer wg.Done() offset := int64(state.BlkSize * pos) chunk := make([]byte, state.BlkSize) bytesRead, _ := file.ReadAt(chunk, offset) e := u.UploadPart(bytes.NewBuffer(chunk[:bytesRead]), state, pos) concurrentChan <- e //跑完一个 goroutine 后,发信号表示可以开启新的 goroutine。 }(i) } wg.Wait() //等待所有任务返回 if err == nil { //再次检查剩余上传完的分片是否有错误 loopCheck: for { select { case e := <-concurrentChan: err = e if err != nil { break loopCheck } default: break loopCheck } } } close(concurrentChan) if err != nil { u.AbortMultipartUpload(state) return err } return u.FinishMultipartUpload(state) } //AbortMultipartUpload 取消分片上传,如果掉用 UploadPart 出现错误,可以调用本函数取消分片上传。 //state 参数是 InitiateMultipartUpload 返回的 func (u *UFileRequest) AbortMultipartUpload(state *MultipartState) error { query := &url.Values{} query.Add("uploadId", state.uploadID) reqURL := u.genFileURL(state.keyName) + "?" + query.Encode() req, err := http.NewRequest("DELETE", reqURL, nil) if err != nil { return err } authorization := u.Auth.Authorization("DELETE", u.BucketName, state.keyName, req.Header) req.Header.Add("authorization", authorization) return u.request(req) } //InitiateMultipartUpload 初始化分片上传,返回一个 state 用于后续的 UploadPart, FinishMultipartUpload, AbortMultipartUpload 的接口。 // //keyName 表示传到 ufile 的文件名。 // //mimeType 表示文件的 mimeType, 传空会报错,你可以使用 GetFileMimeType 方法检测文件的 mimeType。如果您上传的不是文件,您可以使用 http.DetectContentType https://golang.org/src/net/http/sniff.go?s=646:688#L11进行检测。 func (u *UFileRequest) InitiateMultipartUpload(keyName, mimeType string) (*MultipartState, error) { reqURL := u.genFileURL(keyName) + "?uploads" req, err := http.NewRequest("POST", reqURL, nil) if err != nil { return nil, err } // if mimeType == "" { // return nil, fmt.Errorf("Mime Type 不能为空!!!") // } req.Header.Add("Content-Type", mimeType) for k, v := range u.RequestHeader { for i := 0; i < len(v); i++ { req.Header.Add(k, v[i]) } } authorization := u.Auth.Authorization("POST", u.BucketName, keyName, req.Header) req.Header.Add("authorization", authorization) err = u.request(req) if err != nil { return nil, err } response := new(MultipartState) err = json.Unmarshal(u.LastResponseBody, response) if err != nil { return nil, err } response.keyName = keyName response.etags = make(map[int]string) response.mimeType = mimeType return response, err } //UploadPart 上传一个分片,buf 就是分片数据,buf 的数据块大小必须为 state.BlkSize,否则会报错。 //pardNumber 表示第几个分片,从 0 开始。例如一个文件按 state.BlkSize 分为 5 块,那么分片分别是 0,1,2,3,4。 //state 参数是 InitiateMultipartUpload 返回的 func (u *UFileRequest) UploadPart(buf *bytes.Buffer, state *MultipartState, partNumber int) error { query := &url.Values{} query.Add("uploadId", state.uploadID) query.Add("partNumber", strconv.Itoa(partNumber)) reqURL := u.genFileURL(state.keyName) + "?" + query.Encode() req, err := http.NewRequest("PUT", reqURL, buf) if err != nil { return err } if u.verifyUploadMD5 { md5Str := fmt.Sprintf("%x", md5.Sum(buf.Bytes())) req.Header.Add("Content-MD5", md5Str) } req.Header.Add("Content-Type", state.mimeType) authorization := u.Auth.Authorization("PUT", u.BucketName, state.keyName, req.Header) req.Header.Add("Authorization", authorization) req.Header.Add("Content-Length", strconv.Itoa(buf.Len())) resp, err := u.requestWithResp(req) if err != nil { return err } defer resp.Body.Close() etag := strings.Trim(resp.Header.Get("Etag"), "\"") //为保证线程安全,这里就不保留 lastResponse if etag == "" { etag = strings.Trim(resp.Header.Get("ETag"), "\"") //为保证线程安全,这里就不保留 lastResponse } state.mux.Lock() state.etags[partNumber] = etag state.mux.Unlock() return nil } //FinishMultipartUpload 完成分片上传。分片上传必须要调用的接口。 //state 参数是 InitiateMultipartUpload 返回的 func (u *UFileRequest) FinishMultipartUpload(state *MultipartState) error { query := &url.Values{} query.Add("uploadId", state.uploadID) reqURL := u.genFileURL(state.keyName) + "?" + query.Encode() var etagsStr string etagLen := len(state.etags) for i := 0; i != etagLen; i++ { etagsStr += state.etags[i] if i != etagLen-1 { etagsStr += "," } } req, err := http.NewRequest("POST", reqURL, strings.NewReader(etagsStr)) if err != nil { return err } req.Header.Add("Content-Type", state.mimeType) authorization := u.Auth.Authorization("POST", u.BucketName, state.keyName, req.Header) req.Header.Add("Authorization", authorization) req.Header.Add("Content-Length", strconv.Itoa(len(etagsStr))) return u.request(req) } func divideCeil(a, b int64) int { div := float64(a) / float64(b) c := math.Ceil(div) return int(c) }