HADOOP-11463 Replace method-local TransferManager object with S3AFileSystem#transfers. (Ted Yu via stevel)
This commit is contained in:
parent
20660b7a67
commit
4e7ad4f0a8
|
@ -544,6 +544,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
HADOOP-11492. Bump up curator version to 2.7.1. (Arun Suresh and
|
HADOOP-11492. Bump up curator version to 2.7.1. (Arun Suresh and
|
||||||
Karthik Kambatla via kasha)
|
Karthik Kambatla via kasha)
|
||||||
|
|
||||||
|
HADOOP-11463 Replace method-local TransferManager object with
|
||||||
|
S3AFileSystem#transfers. (Ted Yu via stevel)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-11323. WritableComparator#compare keeps reference to byte array.
|
HADOOP-11323. WritableComparator#compare keeps reference to byte array.
|
||||||
|
|
|
@ -292,7 +292,6 @@ public class S3AFileSystem extends FileSystem {
|
||||||
Date purgeBefore = new Date(new Date().getTime() - purgeExistingMultipartAge*1000);
|
Date purgeBefore = new Date(new Date().getTime() - purgeExistingMultipartAge*1000);
|
||||||
|
|
||||||
transfers.abortMultipartUploads(bucket, purgeBefore);
|
transfers.abortMultipartUploads(bucket, purgeBefore);
|
||||||
transfers.shutdownNow(false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM);
|
serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM);
|
||||||
|
@ -995,13 +994,6 @@ public class S3AFileSystem extends FileSystem {
|
||||||
LocalFileSystem local = getLocal(getConf());
|
LocalFileSystem local = getLocal(getConf());
|
||||||
File srcfile = local.pathToFile(src);
|
File srcfile = local.pathToFile(src);
|
||||||
|
|
||||||
TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
|
|
||||||
transferConfiguration.setMinimumUploadPartSize(partSize);
|
|
||||||
transferConfiguration.setMultipartUploadThreshold(partSizeThreshold);
|
|
||||||
|
|
||||||
TransferManager transfers = new TransferManager(s3);
|
|
||||||
transfers.setConfiguration(transferConfiguration);
|
|
||||||
|
|
||||||
final ObjectMetadata om = new ObjectMetadata();
|
final ObjectMetadata om = new ObjectMetadata();
|
||||||
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
|
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
|
||||||
om.setServerSideEncryption(serverSideEncryptionAlgorithm);
|
om.setServerSideEncryption(serverSideEncryptionAlgorithm);
|
||||||
|
@ -1029,8 +1021,6 @@ public class S3AFileSystem extends FileSystem {
|
||||||
statistics.incrementWriteOps(1);
|
statistics.incrementWriteOps(1);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new IOException("Got interrupted, cancelling");
|
throw new IOException("Got interrupted, cancelling");
|
||||||
} finally {
|
|
||||||
transfers.shutdownNow(false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// This will delete unnecessary fake parent directories
|
// This will delete unnecessary fake parent directories
|
||||||
|
@ -1041,6 +1031,18 @@ public class S3AFileSystem extends FileSystem {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
try {
|
||||||
|
super.close();
|
||||||
|
} finally {
|
||||||
|
if (transfers != null) {
|
||||||
|
transfers.shutdownNow(true);
|
||||||
|
transfers = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Override getCononicalServiceName because we don't support token in S3A
|
* Override getCononicalServiceName because we don't support token in S3A
|
||||||
*/
|
*/
|
||||||
|
@ -1055,12 +1057,6 @@ public class S3AFileSystem extends FileSystem {
|
||||||
LOG.debug("copyFile " + srcKey + " -> " + dstKey);
|
LOG.debug("copyFile " + srcKey + " -> " + dstKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
|
|
||||||
transferConfiguration.setMultipartCopyPartSize(partSize);
|
|
||||||
|
|
||||||
TransferManager transfers = new TransferManager(s3);
|
|
||||||
transfers.setConfiguration(transferConfiguration);
|
|
||||||
|
|
||||||
ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
|
ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
|
||||||
final ObjectMetadata dstom = srcom.clone();
|
final ObjectMetadata dstom = srcom.clone();
|
||||||
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
|
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
|
||||||
|
@ -1089,8 +1085,6 @@ public class S3AFileSystem extends FileSystem {
|
||||||
statistics.incrementWriteOps(1);
|
statistics.incrementWriteOps(1);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new IOException("Got interrupted, cancelling");
|
throw new IOException("Got interrupted, cancelling");
|
||||||
} finally {
|
|
||||||
transfers.shutdownNow(false);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue