HADOOP-11463 Replace method-local TransferManager object with S3AFileSystem#transfers. (Ted Yu via stevel)

This commit is contained in:
Steve Loughran 2015-02-05 12:19:49 +00:00
parent 970fdc3ad9
commit 4cd9657b71
2 changed files with 15 additions and 18 deletions

View File

@ -175,6 +175,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11492. Bump up curator version to 2.7.1. (Arun Suresh and HADOOP-11492. Bump up curator version to 2.7.1. (Arun Suresh and
Karthik Kambatla via kasha) Karthik Kambatla via kasha)
HADOOP-11463 Replace method-local TransferManager object with
S3AFileSystem#transfers. (Ted Yu via stevel)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-11323. WritableComparator#compare keeps reference to byte array. HADOOP-11323. WritableComparator#compare keeps reference to byte array.

View File

@ -292,7 +292,6 @@ public class S3AFileSystem extends FileSystem {
Date purgeBefore = new Date(new Date().getTime() - purgeExistingMultipartAge*1000); Date purgeBefore = new Date(new Date().getTime() - purgeExistingMultipartAge*1000);
transfers.abortMultipartUploads(bucket, purgeBefore); transfers.abortMultipartUploads(bucket, purgeBefore);
transfers.shutdownNow(false);
} }
serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM); serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM);
@ -995,13 +994,6 @@ public class S3AFileSystem extends FileSystem {
LocalFileSystem local = getLocal(getConf()); LocalFileSystem local = getLocal(getConf());
File srcfile = local.pathToFile(src); File srcfile = local.pathToFile(src);
TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
transferConfiguration.setMinimumUploadPartSize(partSize);
transferConfiguration.setMultipartUploadThreshold(partSizeThreshold);
TransferManager transfers = new TransferManager(s3);
transfers.setConfiguration(transferConfiguration);
final ObjectMetadata om = new ObjectMetadata(); final ObjectMetadata om = new ObjectMetadata();
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
om.setServerSideEncryption(serverSideEncryptionAlgorithm); om.setServerSideEncryption(serverSideEncryptionAlgorithm);
@ -1029,8 +1021,6 @@ public class S3AFileSystem extends FileSystem {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new IOException("Got interrupted, cancelling"); throw new IOException("Got interrupted, cancelling");
} finally {
transfers.shutdownNow(false);
} }
// This will delete unnecessary fake parent directories // This will delete unnecessary fake parent directories
@ -1041,6 +1031,18 @@ public class S3AFileSystem extends FileSystem {
} }
} }
@Override
public void close() throws IOException {
try {
super.close();
} finally {
if (transfers != null) {
transfers.shutdownNow(true);
transfers = null;
}
}
}
/** /**
* Override getCononicalServiceName because we don't support token in S3A * Override getCononicalServiceName because we don't support token in S3A
*/ */
@ -1055,12 +1057,6 @@ public class S3AFileSystem extends FileSystem {
LOG.debug("copyFile " + srcKey + " -> " + dstKey); LOG.debug("copyFile " + srcKey + " -> " + dstKey);
} }
TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
transferConfiguration.setMultipartCopyPartSize(partSize);
TransferManager transfers = new TransferManager(s3);
transfers.setConfiguration(transferConfiguration);
ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey); ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
final ObjectMetadata dstom = srcom.clone(); final ObjectMetadata dstom = srcom.clone();
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
@ -1089,8 +1085,6 @@ public class S3AFileSystem extends FileSystem {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new IOException("Got interrupted, cancelling"); throw new IOException("Got interrupted, cancelling");
} finally {
transfers.shutdownNow(false);
} }
} }