HADOOP-12851. S3AFileSystem Uptake of ProviderUtils.excludeIncompatibleCredentialProviders. Contributed by Larry McCay.
(cherry picked from commitd251e55415
) (cherry picked from commite402371b6a
)
This commit is contained in:
parent
475a277e60
commit
f1236c5d7c
|
@ -1061,6 +1061,10 @@ Release 2.8.0 - UNRELEASED
|
||||||
HADOOP-12813. Migrate TestRPC and related codes to rebase on
|
HADOOP-12813. Migrate TestRPC and related codes to rebase on
|
||||||
ProtobufRpcEngine. (Kai Zheng via wheat9)
|
ProtobufRpcEngine. (Kai Zheng via wheat9)
|
||||||
|
|
||||||
|
HADOOP-12851. S3AFileSystem Uptake of
|
||||||
|
ProviderUtils.excludeIncompatibleCredentialProviders.
|
||||||
|
(Larry McCay via cnauroth)
|
||||||
|
|
||||||
Release 2.7.3 - UNRELEASED
|
Release 2.7.3 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -68,6 +68,7 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.LocalFileSystem;
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.security.ProviderUtils;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||||
|
@ -170,16 +171,16 @@ public class S3AFileSystem extends FileSystem {
|
||||||
bucket = name.getHost();
|
bucket = name.getHost();
|
||||||
|
|
||||||
ClientConfiguration awsConf = new ClientConfiguration();
|
ClientConfiguration awsConf = new ClientConfiguration();
|
||||||
awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS,
|
awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS,
|
||||||
DEFAULT_MAXIMUM_CONNECTIONS));
|
DEFAULT_MAXIMUM_CONNECTIONS));
|
||||||
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
|
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
|
||||||
DEFAULT_SECURE_CONNECTIONS);
|
DEFAULT_SECURE_CONNECTIONS);
|
||||||
awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
|
awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
|
||||||
awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES,
|
awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES,
|
||||||
DEFAULT_MAX_ERROR_RETRIES));
|
DEFAULT_MAX_ERROR_RETRIES));
|
||||||
awsConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT,
|
awsConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT,
|
||||||
DEFAULT_ESTABLISH_TIMEOUT));
|
DEFAULT_ESTABLISH_TIMEOUT));
|
||||||
awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT,
|
awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT,
|
||||||
DEFAULT_SOCKET_TIMEOUT));
|
DEFAULT_SOCKET_TIMEOUT));
|
||||||
String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
|
String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
|
||||||
if(!signerOverride.isEmpty()) {
|
if(!signerOverride.isEmpty()) {
|
||||||
|
@ -321,9 +322,9 @@ public class S3AFileSystem extends FileSystem {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initMultipartUploads(Configuration conf) {
|
private void initMultipartUploads(Configuration conf) {
|
||||||
boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART,
|
boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART,
|
||||||
DEFAULT_PURGE_EXISTING_MULTIPART);
|
DEFAULT_PURGE_EXISTING_MULTIPART);
|
||||||
long purgeExistingMultipartAge = conf.getLong(PURGE_EXISTING_MULTIPART_AGE,
|
long purgeExistingMultipartAge = conf.getLong(PURGE_EXISTING_MULTIPART_AGE,
|
||||||
DEFAULT_PURGE_EXISTING_MULTIPART_AGE);
|
DEFAULT_PURGE_EXISTING_MULTIPART_AGE);
|
||||||
|
|
||||||
if (purgeExistingMultipart) {
|
if (purgeExistingMultipart) {
|
||||||
|
@ -355,9 +356,11 @@ public class S3AFileSystem extends FileSystem {
|
||||||
accessKey = userInfo;
|
accessKey = userInfo;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Configuration c = ProviderUtils.excludeIncompatibleCredentialProviders(
|
||||||
|
conf, S3AFileSystem.class);
|
||||||
if (accessKey == null) {
|
if (accessKey == null) {
|
||||||
try {
|
try {
|
||||||
final char[] key = conf.getPassword(ACCESS_KEY);
|
final char[] key = c.getPassword(ACCESS_KEY);
|
||||||
if (key != null) {
|
if (key != null) {
|
||||||
accessKey = (new String(key)).trim();
|
accessKey = (new String(key)).trim();
|
||||||
}
|
}
|
||||||
|
@ -367,7 +370,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
}
|
}
|
||||||
if (secretKey == null) {
|
if (secretKey == null) {
|
||||||
try {
|
try {
|
||||||
final char[] pass = conf.getPassword(SECRET_KEY);
|
final char[] pass = c.getPassword(SECRET_KEY);
|
||||||
if (pass != null) {
|
if (pass != null) {
|
||||||
secretKey = (new String(pass)).trim();
|
secretKey = (new String(pass)).trim();
|
||||||
}
|
}
|
||||||
|
@ -448,7 +451,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
throw new FileNotFoundException("Can't open " + f + " because it is a directory");
|
throw new FileNotFoundException("Can't open " + f + " because it is a directory");
|
||||||
}
|
}
|
||||||
|
|
||||||
return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f),
|
return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f),
|
||||||
fileStatus.getLen(), s3, statistics));
|
fileStatus.getLen(), s3, statistics));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -483,7 +486,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
}
|
}
|
||||||
// We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file
|
// We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file
|
||||||
return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, this,
|
return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, this,
|
||||||
bucket, key, progress, cannedACL, statistics,
|
bucket, key, progress, cannedACL, statistics,
|
||||||
serverSideEncryptionAlgorithm), null);
|
serverSideEncryptionAlgorithm), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -494,7 +497,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
* @param progress for reporting progress if it is not null.
|
* @param progress for reporting progress if it is not null.
|
||||||
* @throws IOException indicating that append is not supported.
|
* @throws IOException indicating that append is not supported.
|
||||||
*/
|
*/
|
||||||
public FSDataOutputStream append(Path f, int bufferSize,
|
public FSDataOutputStream append(Path f, int bufferSize,
|
||||||
Progressable progress) throws IOException {
|
Progressable progress) throws IOException {
|
||||||
throw new IOException("Not supported");
|
throw new IOException("Not supported");
|
||||||
}
|
}
|
||||||
|
@ -504,8 +507,8 @@ public class S3AFileSystem extends FileSystem {
|
||||||
* Renames Path src to Path dst. Can take place on local fs
|
* Renames Path src to Path dst. Can take place on local fs
|
||||||
* or remote DFS.
|
* or remote DFS.
|
||||||
*
|
*
|
||||||
* Warning: S3 does not support renames. This method does a copy which can
|
* Warning: S3 does not support renames. This method does a copy which can
|
||||||
* take S3 some time to execute with large files and directories. Since
|
* take S3 some time to execute with large files and directories. Since
|
||||||
* there is no Progressable passed in, this can time out jobs.
|
* there is no Progressable passed in, this can time out jobs.
|
||||||
*
|
*
|
||||||
* Note: This implementation differs with other S3 drivers. Specifically:
|
* Note: This implementation differs with other S3 drivers. Specifically:
|
||||||
|
@ -618,7 +621,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
List<DeleteObjectsRequest.KeyVersion> keysToDelete =
|
List<DeleteObjectsRequest.KeyVersion> keysToDelete =
|
||||||
new ArrayList<>();
|
new ArrayList<>();
|
||||||
if (dstStatus != null && dstStatus.isEmptyDirectory()) {
|
if (dstStatus != null && dstStatus.isEmptyDirectory()) {
|
||||||
// delete unnecessary fake directory.
|
// delete unnecessary fake directory.
|
||||||
|
@ -724,7 +727,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!recursive && !status.isEmptyDirectory()) {
|
if (!recursive && !status.isEmptyDirectory()) {
|
||||||
throw new IOException("Path is a folder: " + f +
|
throw new IOException("Path is a folder: " + f +
|
||||||
" and it is not an empty directory");
|
" and it is not an empty directory");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -755,7 +758,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
//request.setDelimiter("/");
|
//request.setDelimiter("/");
|
||||||
request.setMaxKeys(maxKeys);
|
request.setMaxKeys(maxKeys);
|
||||||
|
|
||||||
List<DeleteObjectsRequest.KeyVersion> keys =
|
List<DeleteObjectsRequest.KeyVersion> keys =
|
||||||
new ArrayList<>();
|
new ArrayList<>();
|
||||||
ObjectListing objects = s3.listObjects(request);
|
ObjectListing objects = s3.listObjects(request);
|
||||||
statistics.incrementReadOps(1);
|
statistics.incrementReadOps(1);
|
||||||
|
@ -859,7 +862,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
LOG.debug("Adding: fd: " + keyPath);
|
LOG.debug("Adding: fd: " + keyPath);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
result.add(new S3AFileStatus(summary.getSize(),
|
result.add(new S3AFileStatus(summary.getSize(),
|
||||||
dateToLong(summary.getLastModified()), keyPath,
|
dateToLong(summary.getLastModified()), keyPath,
|
||||||
getDefaultBlockSize(f.makeQualified(uri, workingDir))));
|
getDefaultBlockSize(f.makeQualified(uri, workingDir))));
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -927,7 +930,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
* @param f path to create
|
* @param f path to create
|
||||||
* @param permission to apply to f
|
* @param permission to apply to f
|
||||||
*/
|
*/
|
||||||
// TODO: If we have created an empty file at /foo/bar and we then call
|
// TODO: If we have created an empty file at /foo/bar and we then call
|
||||||
// mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/?
|
// mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/?
|
||||||
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
|
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -950,7 +953,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
FileStatus fileStatus = getFileStatus(fPart);
|
FileStatus fileStatus = getFileStatus(fPart);
|
||||||
if (fileStatus.isFile()) {
|
if (fileStatus.isFile()) {
|
||||||
throw new FileAlreadyExistsException(String.format(
|
throw new FileAlreadyExistsException(String.format(
|
||||||
"Can't make directory for path '%s' since it is a file.",
|
"Can't make directory for path '%s' since it is a file.",
|
||||||
fPart));
|
fPart));
|
||||||
}
|
}
|
||||||
} catch (FileNotFoundException fnfe) {
|
} catch (FileNotFoundException fnfe) {
|
||||||
|
@ -1056,9 +1059,9 @@ public class S3AFileSystem extends FileSystem {
|
||||||
if (!objects.getCommonPrefixes().isEmpty()
|
if (!objects.getCommonPrefixes().isEmpty()
|
||||||
|| objects.getObjectSummaries().size() > 0) {
|
|| objects.getObjectSummaries().size() > 0) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Found path as directory (with /): " +
|
LOG.debug("Found path as directory (with /): " +
|
||||||
objects.getCommonPrefixes().size() + "/" +
|
objects.getCommonPrefixes().size() + "/" +
|
||||||
objects.getObjectSummaries().size());
|
objects.getObjectSummaries().size());
|
||||||
|
|
||||||
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
|
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
|
||||||
LOG.debug("Summary: " + summary.getKey() + " " + summary.getSize());
|
LOG.debug("Summary: " + summary.getKey() + " " + summary.getSize());
|
||||||
|
@ -1104,7 +1107,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
* @param dst path
|
* @param dst path
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
|
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
|
||||||
Path dst) throws IOException {
|
Path dst) throws IOException {
|
||||||
String key = pathToKey(dst);
|
String key = pathToKey(dst);
|
||||||
|
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.junit.rules.Timeout;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
@ -318,4 +319,37 @@ public class TestS3AConfiguration {
|
||||||
assertEquals("AccessKey incorrect.", EXAMPLE_ID, creds.getAccessKey());
|
assertEquals("AccessKey incorrect.", EXAMPLE_ID, creds.getAccessKey());
|
||||||
assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getAccessSecret());
|
assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getAccessSecret());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExcludingS3ACredentialProvider() throws Exception {
|
||||||
|
// set up conf to have a cred provider
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
final File file = tempDir.newFile("test.jks");
|
||||||
|
final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
|
||||||
|
file.toURI());
|
||||||
|
conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
|
||||||
|
"jceks://s3a/foobar," + jks.toString());
|
||||||
|
|
||||||
|
// first make sure that the s3a based provider is removed
|
||||||
|
Configuration c = ProviderUtils.excludeIncompatibleCredentialProviders(
|
||||||
|
conf, S3AFileSystem.class);
|
||||||
|
String newPath = conf.get(
|
||||||
|
CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH);
|
||||||
|
assertFalse("Provider Path incorrect", newPath.contains("s3a://"));
|
||||||
|
|
||||||
|
// now let's make sure the new path is created by the S3AFileSystem
|
||||||
|
// and the integration still works. Let's provision the keys through
|
||||||
|
// the altered configuration instance and then try and access them
|
||||||
|
// using the original config with the s3a provider in the path.
|
||||||
|
provisionAccessKeys(c);
|
||||||
|
|
||||||
|
S3AFileSystem s3afs = new S3AFileSystem();
|
||||||
|
conf.set(Constants.ACCESS_KEY, EXAMPLE_ID + "LJM");
|
||||||
|
URI uriWithUserInfo = new URI("s3a://123:456@foobar");
|
||||||
|
S3AFileSystem.AWSAccessKeys creds =
|
||||||
|
s3afs.getAWSAccessKeys(uriWithUserInfo, conf);
|
||||||
|
assertEquals("AccessKey incorrect.", "123", creds.getAccessKey());
|
||||||
|
assertEquals("SecretKey incorrect.", "456", creds.getAccessSecret());
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue