HADOOP-11091. Eliminate old configuration parameter names from s3a (dsw via cmccabe)
(cherry picked from commit 0ac760a58d
)
This commit is contained in:
parent
b4d9aca066
commit
f80b10e1e7
|
@ -186,6 +186,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
HADOOP-11074. Move s3-related FS connector code to hadoop-aws (David S.
|
HADOOP-11074. Move s3-related FS connector code to hadoop-aws (David S.
|
||||||
Wang via Colin Patrick McCabe)
|
Wang via Colin Patrick McCabe)
|
||||||
|
|
||||||
|
HADOOP-11091. Eliminate old configuration parameter names from s3a (David
|
||||||
|
S. Wang via Colin Patrick McCabe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-10838. Byte array native checksumming. (James Thomas via todd)
|
HADOOP-10838. Byte array native checksumming. (James Thomas via todd)
|
||||||
|
|
|
@ -21,46 +21,37 @@ package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
public class Constants {
|
public class Constants {
|
||||||
// s3 access key
|
// s3 access key
|
||||||
public static final String OLD_ACCESS_KEY = "fs.s3a.awsAccessKeyId";
|
public static final String ACCESS_KEY = "fs.s3a.access.key";
|
||||||
public static final String NEW_ACCESS_KEY = "fs.s3a.access.key";
|
|
||||||
|
|
||||||
// s3 secret key
|
// s3 secret key
|
||||||
public static final String OLD_SECRET_KEY = "fs.s3a.awsSecretAccessKey";
|
public static final String SECRET_KEY = "fs.s3a.secret.key";
|
||||||
public static final String NEW_SECRET_KEY = "fs.s3a.secret.key";
|
|
||||||
|
|
||||||
// number of simultaneous connections to s3
|
// number of simultaneous connections to s3
|
||||||
public static final String OLD_MAXIMUM_CONNECTIONS = "fs.s3a.maxConnections";
|
public static final String MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum";
|
||||||
public static final String NEW_MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum";
|
|
||||||
public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15;
|
public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15;
|
||||||
|
|
||||||
// connect to s3 over ssl?
|
// connect to s3 over ssl?
|
||||||
public static final String OLD_SECURE_CONNECTIONS = "fs.s3a.secureConnections";
|
public static final String SECURE_CONNECTIONS = "fs.s3a.connection.ssl.enabled";
|
||||||
public static final String NEW_SECURE_CONNECTIONS = "fs.s3a.connection.ssl.enabled";
|
|
||||||
public static final boolean DEFAULT_SECURE_CONNECTIONS = true;
|
public static final boolean DEFAULT_SECURE_CONNECTIONS = true;
|
||||||
|
|
||||||
// number of times we should retry errors
|
// number of times we should retry errors
|
||||||
public static final String OLD_MAX_ERROR_RETRIES = "fs.s3a.maxErrorRetries";
|
public static final String MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum";
|
||||||
public static final String NEW_MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum";
|
|
||||||
public static final int DEFAULT_MAX_ERROR_RETRIES = 10;
|
public static final int DEFAULT_MAX_ERROR_RETRIES = 10;
|
||||||
|
|
||||||
// seconds until we give up on a connection to s3
|
// seconds until we give up on a connection to s3
|
||||||
public static final String OLD_SOCKET_TIMEOUT = "fs.s3a.socketTimeout";
|
public static final String SOCKET_TIMEOUT = "fs.s3a.connection.timeout";
|
||||||
public static final String NEW_SOCKET_TIMEOUT = "fs.s3a.connection.timeout";
|
|
||||||
public static final int DEFAULT_SOCKET_TIMEOUT = 50000;
|
public static final int DEFAULT_SOCKET_TIMEOUT = 50000;
|
||||||
|
|
||||||
// number of records to get while paging through a directory listing
|
// number of records to get while paging through a directory listing
|
||||||
public static final String OLD_MAX_PAGING_KEYS = "fs.s3a.maxPagingKeys";
|
public static final String MAX_PAGING_KEYS = "fs.s3a.paging.maximum";
|
||||||
public static final String NEW_MAX_PAGING_KEYS = "fs.s3a.paging.maximum";
|
|
||||||
public static final int DEFAULT_MAX_PAGING_KEYS = 5000;
|
public static final int DEFAULT_MAX_PAGING_KEYS = 5000;
|
||||||
|
|
||||||
// size of each of or multipart pieces in bytes
|
// size of each of or multipart pieces in bytes
|
||||||
public static final String OLD_MULTIPART_SIZE = "fs.s3a.multipartSize";
|
public static final String MULTIPART_SIZE = "fs.s3a.multipart.size";
|
||||||
public static final String NEW_MULTIPART_SIZE = "fs.s3a.multipart.size";
|
|
||||||
public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB
|
public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB
|
||||||
|
|
||||||
// minimum size in bytes before we start a multipart uploads or copy
|
// minimum size in bytes before we start a multipart uploads or copy
|
||||||
public static final String OLD_MIN_MULTIPART_THRESHOLD = "fs.s3a.minMultipartSize";
|
public static final String MIN_MULTIPART_THRESHOLD = "fs.s3a.multipart.threshold";
|
||||||
public static final String NEW_MIN_MULTIPART_THRESHOLD = "fs.s3a.multipart.threshold";
|
|
||||||
public static final int DEFAULT_MIN_MULTIPART_THRESHOLD = Integer.MAX_VALUE;
|
public static final int DEFAULT_MIN_MULTIPART_THRESHOLD = Integer.MAX_VALUE;
|
||||||
|
|
||||||
// comma separated list of directories
|
// comma separated list of directories
|
||||||
|
@ -68,18 +59,15 @@ public class Constants {
|
||||||
|
|
||||||
// private | public-read | public-read-write | authenticated-read |
|
// private | public-read | public-read-write | authenticated-read |
|
||||||
// log-delivery-write | bucket-owner-read | bucket-owner-full-control
|
// log-delivery-write | bucket-owner-read | bucket-owner-full-control
|
||||||
public static final String OLD_CANNED_ACL = "fs.s3a.cannedACL";
|
public static final String CANNED_ACL = "fs.s3a.acl.default";
|
||||||
public static final String NEW_CANNED_ACL = "fs.s3a.acl.default";
|
|
||||||
public static final String DEFAULT_CANNED_ACL = "";
|
public static final String DEFAULT_CANNED_ACL = "";
|
||||||
|
|
||||||
// should we try to purge old multipart uploads when starting up
|
// should we try to purge old multipart uploads when starting up
|
||||||
public static final String OLD_PURGE_EXISTING_MULTIPART = "fs.s3a.purgeExistingMultiPart";
|
public static final String PURGE_EXISTING_MULTIPART = "fs.s3a.multipart.purge";
|
||||||
public static final String NEW_PURGE_EXISTING_MULTIPART = "fs.s3a.multipart.purge";
|
|
||||||
public static final boolean DEFAULT_PURGE_EXISTING_MULTIPART = false;
|
public static final boolean DEFAULT_PURGE_EXISTING_MULTIPART = false;
|
||||||
|
|
||||||
// purge any multipart uploads older than this number of seconds
|
// purge any multipart uploads older than this number of seconds
|
||||||
public static final String OLD_PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.purgeExistingMultiPartAge";
|
public static final String PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.multipart.purge.age";
|
||||||
public static final String NEW_PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.multipart.purge.age";
|
|
||||||
public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 14400;
|
public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 14400;
|
||||||
|
|
||||||
// s3 server-side encryption
|
// s3 server-side encryption
|
||||||
|
|
|
@ -95,8 +95,8 @@ public class S3AFileSystem extends FileSystem {
|
||||||
this.getWorkingDirectory());
|
this.getWorkingDirectory());
|
||||||
|
|
||||||
// Try to get our credentials or just connect anonymously
|
// Try to get our credentials or just connect anonymously
|
||||||
String accessKey = conf.get(NEW_ACCESS_KEY, conf.get(OLD_ACCESS_KEY, null));
|
String accessKey = conf.get(ACCESS_KEY, null);
|
||||||
String secretKey = conf.get(NEW_SECRET_KEY, conf.get(OLD_SECRET_KEY, null));
|
String secretKey = conf.get(SECRET_KEY, null);
|
||||||
|
|
||||||
String userInfo = name.getUserInfo();
|
String userInfo = name.getUserInfo();
|
||||||
if (userInfo != null) {
|
if (userInfo != null) {
|
||||||
|
@ -118,37 +118,33 @@ public class S3AFileSystem extends FileSystem {
|
||||||
bucket = name.getHost();
|
bucket = name.getHost();
|
||||||
|
|
||||||
ClientConfiguration awsConf = new ClientConfiguration();
|
ClientConfiguration awsConf = new ClientConfiguration();
|
||||||
awsConf.setMaxConnections(conf.getInt(NEW_MAXIMUM_CONNECTIONS,
|
awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS,
|
||||||
conf.getInt(OLD_MAXIMUM_CONNECTIONS, DEFAULT_MAXIMUM_CONNECTIONS)));
|
DEFAULT_MAXIMUM_CONNECTIONS));
|
||||||
awsConf.setProtocol(conf.getBoolean(NEW_SECURE_CONNECTIONS,
|
awsConf.setProtocol(conf.getBoolean(SECURE_CONNECTIONS,
|
||||||
conf.getBoolean(OLD_SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) ?
|
DEFAULT_SECURE_CONNECTIONS) ? Protocol.HTTPS : Protocol.HTTP);
|
||||||
Protocol.HTTPS : Protocol.HTTP);
|
awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES,
|
||||||
awsConf.setMaxErrorRetry(conf.getInt(NEW_MAX_ERROR_RETRIES,
|
DEFAULT_MAX_ERROR_RETRIES));
|
||||||
conf.getInt(OLD_MAX_ERROR_RETRIES, DEFAULT_MAX_ERROR_RETRIES)));
|
awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT,
|
||||||
awsConf.setSocketTimeout(conf.getInt(NEW_SOCKET_TIMEOUT,
|
DEFAULT_SOCKET_TIMEOUT));
|
||||||
conf.getInt(OLD_SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT)));
|
|
||||||
|
|
||||||
s3 = new AmazonS3Client(credentials, awsConf);
|
s3 = new AmazonS3Client(credentials, awsConf);
|
||||||
|
|
||||||
maxKeys = conf.getInt(NEW_MAX_PAGING_KEYS,
|
maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS);
|
||||||
conf.getInt(OLD_MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS));
|
partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
|
||||||
partSize = conf.getLong(NEW_MULTIPART_SIZE,
|
partSizeThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD,
|
||||||
conf.getLong(OLD_MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE));
|
DEFAULT_MIN_MULTIPART_THRESHOLD);
|
||||||
partSizeThreshold = conf.getInt(NEW_MIN_MULTIPART_THRESHOLD,
|
|
||||||
conf.getInt(OLD_MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD));
|
|
||||||
|
|
||||||
if (partSize < 5 * 1024 * 1024) {
|
if (partSize < 5 * 1024 * 1024) {
|
||||||
LOG.error(NEW_MULTIPART_SIZE + " must be at least 5 MB");
|
LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
|
||||||
partSize = 5 * 1024 * 1024;
|
partSize = 5 * 1024 * 1024;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (partSizeThreshold < 5 * 1024 * 1024) {
|
if (partSizeThreshold < 5 * 1024 * 1024) {
|
||||||
LOG.error(NEW_MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
|
LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
|
||||||
partSizeThreshold = 5 * 1024 * 1024;
|
partSizeThreshold = 5 * 1024 * 1024;
|
||||||
}
|
}
|
||||||
|
|
||||||
String cannedACLName = conf.get(NEW_CANNED_ACL,
|
String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL);
|
||||||
conf.get(OLD_CANNED_ACL, DEFAULT_CANNED_ACL));
|
|
||||||
if (!cannedACLName.isEmpty()) {
|
if (!cannedACLName.isEmpty()) {
|
||||||
cannedACL = CannedAccessControlList.valueOf(cannedACLName);
|
cannedACL = CannedAccessControlList.valueOf(cannedACLName);
|
||||||
} else {
|
} else {
|
||||||
|
@ -159,10 +155,10 @@ public class S3AFileSystem extends FileSystem {
|
||||||
throw new IOException("Bucket " + bucket + " does not exist");
|
throw new IOException("Bucket " + bucket + " does not exist");
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean purgeExistingMultipart = conf.getBoolean(NEW_PURGE_EXISTING_MULTIPART,
|
boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART,
|
||||||
conf.getBoolean(OLD_PURGE_EXISTING_MULTIPART, DEFAULT_PURGE_EXISTING_MULTIPART));
|
DEFAULT_PURGE_EXISTING_MULTIPART);
|
||||||
long purgeExistingMultipartAge = conf.getLong(NEW_PURGE_EXISTING_MULTIPART_AGE,
|
long purgeExistingMultipartAge = conf.getLong(PURGE_EXISTING_MULTIPART_AGE,
|
||||||
conf.getLong(OLD_PURGE_EXISTING_MULTIPART_AGE, DEFAULT_PURGE_EXISTING_MULTIPART_AGE));
|
DEFAULT_PURGE_EXISTING_MULTIPART_AGE);
|
||||||
|
|
||||||
if (purgeExistingMultipart) {
|
if (purgeExistingMultipart) {
|
||||||
TransferManager transferManager = new TransferManager(s3);
|
TransferManager transferManager = new TransferManager(s3);
|
||||||
|
|
|
@ -75,10 +75,8 @@ public class S3AOutputStream extends OutputStream {
|
||||||
this.statistics = statistics;
|
this.statistics = statistics;
|
||||||
this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
|
this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
|
||||||
|
|
||||||
partSize = conf.getLong(NEW_MULTIPART_SIZE,
|
partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
|
||||||
conf.getLong(OLD_MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE));
|
partSizeThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD);
|
||||||
partSizeThreshold = conf.getInt(NEW_MIN_MULTIPART_THRESHOLD,
|
|
||||||
conf.getInt(OLD_MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD));
|
|
||||||
|
|
||||||
if (conf.get(BUFFER_DIR, null) != null) {
|
if (conf.get(BUFFER_DIR, null) != null) {
|
||||||
lDirAlloc = new LocalDirAllocator(BUFFER_DIR);
|
lDirAlloc = new LocalDirAllocator(BUFFER_DIR);
|
||||||
|
|
Loading…
Reference in New Issue