diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index ee426752e00..1e54e6f3c82 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -866,6 +866,12 @@ for ldap providers in the same way as above does. Minimum age in seconds of multipart uploads to purge + + fs.s3a.signing-algorithm + Override the default signing algorithm so legacy + implementations can still be used + + fs.s3a.buffer.dir ${hadoop.tmp.dir}/s3a diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml index c2fe2bf6eee..f48b4394d0c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml @@ -132,7 +132,7 @@ com.amazonaws - aws-java-sdk + aws-java-sdk-s3 org.eclipse.jdt @@ -175,7 +175,7 @@ com.amazonaws - aws-java-sdk + aws-java-sdk-s3 org.eclipse.jdt diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 6686018f4a0..0ce28969708 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -656,8 +656,8 @@ com.amazonaws - aws-java-sdk - 1.7.4 + aws-java-sdk-s3 + 1.10.6 org.apache.mina diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index efb3268feb2..30126acee3c 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -119,7 +119,7 @@ com.amazonaws - aws-java-sdk + aws-java-sdk-s3 compile diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 3486dfbedfd..fe8dd77b7b4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -82,8 +82,8 @@ public class Constants { // minimum size in bytes before we start a multipart uploads or copy public static final String MIN_MULTIPART_THRESHOLD = "fs.s3a.multipart.threshold"; - public static final int DEFAULT_MIN_MULTIPART_THRESHOLD = Integer.MAX_VALUE; - + public static final long DEFAULT_MIN_MULTIPART_THRESHOLD = Integer.MAX_VALUE; + // comma separated list of directories public static final String BUFFER_DIR = "fs.s3a.buffer.dir"; @@ -111,7 +111,10 @@ public class Constants { // s3 server-side encryption public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM = "fs.s3a.server-side-encryption-algorithm"; - + + //override signature algorithm used for signing requests + public static final String SIGNING_ALGORITHM = "fs.s3a.signing-algorithm"; + public static final String S3N_FOLDER_SUFFIX = "_$folder$"; public static final String FS_S3A_BLOCK_SIZE = "fs.s3a.block.size"; public static final String FS_S3A = "s3a"; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java index 68195817ee4..2e06fba2752 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java @@ -267,7 +267,7 @@ public class S3AFastOutputStream extends OutputStream { private ObjectMetadata createDefaultMetadata() { ObjectMetadata om = new ObjectMetadata(); if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - om.setServerSideEncryption(serverSideEncryptionAlgorithm); + om.setSSEAlgorithm(serverSideEncryptionAlgorithm); } return om; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 91a606cf1f4..f9e937f8f4d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -87,7 +87,7 @@ public class S3AFileSystem extends FileSystem { private long partSize; private TransferManager transfers; private ThreadPoolExecutor threadPoolExecutor; - private int multiPartThreshold; + private long multiPartThreshold; public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class); private CannedAccessControlList cannedACL; private String serverSideEncryptionAlgorithm; @@ -191,8 +191,12 @@ public class S3AFileSystem extends FileSystem { DEFAULT_ESTABLISH_TIMEOUT)); awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT)); + String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, ""); + if(!signerOverride.isEmpty()) { + awsConf.setSignerOverride(signerOverride); + } - String proxyHost = conf.getTrimmed(PROXY_HOST,""); + String proxyHost = conf.getTrimmed(PROXY_HOST, ""); int proxyPort = conf.getInt(PROXY_PORT, -1); if (!proxyHost.isEmpty()) { awsConf.setProxyHost(proxyHost); @@ -246,7 +250,7 @@ public class S3AFileSystem extends FileSystem { maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS); partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); - multiPartThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD, + multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD); if (partSize < 5 * 1024 * 1024) { @@ -403,7 +407,7 @@ public class S3AFileSystem extends FileSystem { if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) { return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket, key, progress, statistics, cannedACL, - serverSideEncryptionAlgorithm, partSize, (long)multiPartThreshold, + serverSideEncryptionAlgorithm, partSize, multiPartThreshold, threadPoolExecutor), statistics); } // We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file @@ -1027,7 +1031,7 @@ public class S3AFileSystem extends FileSystem { final ObjectMetadata om = new ObjectMetadata(); if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - om.setServerSideEncryption(serverSideEncryptionAlgorithm); + om.setSSEAlgorithm(serverSideEncryptionAlgorithm); } PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, srcfile); putObjectRequest.setCannedAcl(cannedACL); @@ -1035,8 +1039,8 @@ public class S3AFileSystem extends FileSystem { ProgressListener progressListener = new ProgressListener() { public void progressChanged(ProgressEvent progressEvent) { - switch (progressEvent.getEventCode()) { - case ProgressEvent.PART_COMPLETED_EVENT_CODE: + switch (progressEvent.getEventType()) { + case TRANSFER_PART_COMPLETED_EVENT: statistics.incrementWriteOps(1); break; default: @@ -1091,7 +1095,7 @@ public class S3AFileSystem extends FileSystem { ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey); final ObjectMetadata dstom = srcom.clone(); if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - dstom.setServerSideEncryption(serverSideEncryptionAlgorithm); + dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm); } CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, srcKey, bucket, dstKey); copyObjectRequest.setCannedAccessControlList(cannedACL); @@ -1099,8 +1103,8 @@ public class S3AFileSystem extends FileSystem { ProgressListener progressListener = new ProgressListener() { public void progressChanged(ProgressEvent progressEvent) { - switch (progressEvent.getEventCode()) { - case ProgressEvent.PART_COMPLETED_EVENT_CODE: + switch (progressEvent.getEventType()) { + case TRANSFER_PART_COMPLETED_EVENT: statistics.incrementWriteOps(1); break; default: @@ -1187,7 +1191,7 @@ public class S3AFileSystem extends FileSystem { final ObjectMetadata om = new ObjectMetadata(); om.setContentLength(0L); if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - om.setServerSideEncryption(serverSideEncryptionAlgorithm); + om.setSSEAlgorithm(serverSideEncryptionAlgorithm); } PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, im, om); putObjectRequest.setCannedAcl(cannedACL); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java index 2b611b6fdf1..3e079f23679 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.event.ProgressEvent; +import com.amazonaws.event.ProgressEventType; import com.amazonaws.event.ProgressListener; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.CannedAccessControlList; @@ -41,6 +42,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; +import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT; +import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT; import static org.apache.hadoop.fs.s3a.Constants.*; public class S3AOutputStream extends OutputStream { @@ -52,7 +55,7 @@ public class S3AOutputStream extends OutputStream { private TransferManager transfers; private Progressable progress; private long partSize; - private int partSizeThreshold; + private long partSizeThreshold; private S3AFileSystem fs; private CannedAccessControlList cannedACL; private FileSystem.Statistics statistics; @@ -76,7 +79,8 @@ public class S3AOutputStream extends OutputStream { this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm; partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); - partSizeThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD); + partSizeThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD, + DEFAULT_MIN_MULTIPART_THRESHOLD); if (conf.get(BUFFER_DIR, null) != null) { lDirAlloc = new LocalDirAllocator(BUFFER_DIR); @@ -116,7 +120,7 @@ public class S3AOutputStream extends OutputStream { try { final ObjectMetadata om = new ObjectMetadata(); if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - om.setServerSideEncryption(serverSideEncryptionAlgorithm); + om.setSSEAlgorithm(serverSideEncryptionAlgorithm); } PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, backupFile); putObjectRequest.setCannedAcl(cannedACL); @@ -184,8 +188,9 @@ public class S3AOutputStream extends OutputStream { } // There are 3 http ops here, but this should be close enough for now - if (progressEvent.getEventCode() == ProgressEvent.PART_STARTED_EVENT_CODE || - progressEvent.getEventCode() == ProgressEvent.COMPLETED_EVENT_CODE) { + ProgressEventType pet = progressEvent.getEventType(); + if (pet == TRANSFER_PART_STARTED_EVENT || + pet == TRANSFER_COMPLETED_EVENT) { statistics.incrementWriteOps(1); } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index e0389c05caa..5d45e0ab646 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -288,6 +288,12 @@ If you do any of these: change your credentials immediately! Minimum age in seconds of multipart uploads to purge + + fs.s3a.signing-algorithm + Override the default signing algorithm so legacy + implementations can still be used + + fs.s3a.buffer.dir ${hadoop.tmp.dir}/s3a