diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java index 30ddf8c6a66..6923b9513af 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.fs.aliyun.oss.Constants.*; +import com.aliyun.oss.ClientException; import com.aliyun.oss.common.auth.CredentialsProvider; import com.aliyun.oss.common.auth.DefaultCredentialProvider; import com.aliyun.oss.common.auth.DefaultCredentials; @@ -782,7 +783,7 @@ private boolean multipartCopy(String srcKey, long dataLen, String dstKey) { ossClient.completeMultipartUpload(completeMultipartUploadRequest); LOG.debug(completeMultipartUploadResult.getETag()); return true; - } catch (Exception e) { + } catch (OSSException | ClientException e) { AbortMultipartUploadRequest abortMultipartUploadRequest = new AbortMultipartUploadRequest(bucketName, dstKey, uploadId); ossClient.abortMultipartUpload(abortMultipartUploadRequest); diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java index bcd00dc50eb..b12e3f0ca57 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java @@ -227,12 +227,12 @@ public synchronized int available() throws IOException { } @Override - public void seek(long pos) throws IOException { + public synchronized void seek(long pos) throws IOException { checkNotClosed(); if (position == pos) { return; } else if (pos > position && pos < position + partRemaining) { - wrappedStream.skip(pos - position); + AliyunOSSUtils.skipFully(wrappedStream, pos - position); position = pos; } else { reopen(pos); @@ -240,7 +240,7 @@ public void seek(long pos) throws IOException { } @Override - public long getPos() throws IOException { + public synchronized long getPos() throws IOException { checkNotClosed(); return position; } diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java index 589e014f452..654b81dab41 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java @@ -29,6 +29,8 @@ import java.util.ArrayList; import java.util.List; +import com.aliyun.oss.ClientException; +import com.aliyun.oss.OSSException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -113,7 +115,9 @@ public synchronized void close() throws IOException { multipartUploadObject(); } } finally { - tmpFile.delete(); + if (!tmpFile.delete()) { + LOG.warn("Can not delete file: " + tmpFile); + } } } @@ -174,7 +178,7 @@ private void multipartUploadObject() throws IOException { FileInputStream fis = new FileInputStream(object); try { long skipBytes = partSize * i; - fis.skip(skipBytes); + AliyunOSSUtils.skipFully(fis, skipBytes); long size = (partSize < dataLen - skipBytes) ? partSize : dataLen - skipBytes; UploadPartRequest uploadPartRequest = new UploadPartRequest(); @@ -198,7 +202,7 @@ private void multipartUploadObject() throws IOException { CompleteMultipartUploadResult completeMultipartUploadResult = ossClient.completeMultipartUpload(completeMultipartUploadRequest); LOG.debug(completeMultipartUploadResult.getETag()); - } catch (Exception e) { + } catch (OSSException | ClientException e) { AbortMultipartUploadRequest abortMultipartUploadRequest = new AbortMultipartUploadRequest(bucketName, key, uploadId); ossClient.abortMultipartUpload(abortMultipartUploadRequest); diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java index 3f66a4fc5ee..9acde00606a 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.aliyun.oss; import java.io.IOException; +import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.net.URI; import java.net.URLDecoder; @@ -116,7 +117,7 @@ static public String getPassword(Configuration conf, String key, String val) /** * Extract the user information details from a URI. - * @param name URI of the filesystem + * @param name URI of the filesystem. * @return a login tuple, possibly empty. */ public static UserInfo extractLoginDetails(URI name) { @@ -148,4 +149,27 @@ public static UserInfo extractLoginDetails(URI name) { throw new RuntimeException(e); } } + + /** + * Skips the requested number of bytes or fail if there are not enough left. + * This allows for the possibility that {@link InputStream#skip(long)} may not + * skip as many bytes as requested (most likely because of reaching EOF). + * @param is the input stream to skip. + * @param n the number of bytes to skip. + * @throws IOException thrown when skipped less number of bytes. + */ + public static void skipFully(InputStream is, long n) throws IOException { + long total = 0; + long cur = 0; + + do { + cur = is.skip(n - total); + total += cur; + } while((total < n) && (cur > 0)); + + if (total < n) { + throw new IOException("Failed to skip " + n + " bytes, possibly due " + + "to EOF."); + } + } }