From 2800e73c2789f3d41fab618b35a25c59a45bf947 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 5 Jan 2015 12:59:48 +0000 Subject: [PATCH] HADOOP-11446 S3AOutputStream should use shared thread pool to avoid OutOfMemoryError --- .../hadoop-common/CHANGES.txt | 3 + .../org/apache/hadoop/fs/s3a/Constants.java | 17 ++++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 91 ++++++++++++++++++- .../apache/hadoop/fs/s3a/S3AOutputStream.java | 13 +-- 4 files changed, 109 insertions(+), 15 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 635b0b03ba3..451ce61709c 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -313,6 +313,9 @@ Release 2.7.0 - UNRELEASED HADOOP-11039. ByteBufferReadable API doc is inconsistent with the implementations. (Yi Liu via Colin P. McCabe) + HADOOP-11446. S3AOutputStream should use shared thread pool to + avoid OutOfMemoryError. (Ted Yu via stevel) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index ee4bf684a71..f1b5d3df5c9 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -41,6 +41,23 @@ public class Constants { public static final String MAX_PAGING_KEYS = "fs.s3a.paging.maximum"; public static final int DEFAULT_MAX_PAGING_KEYS = 5000; + // the maximum number of threads to allow in the pool used by TransferManager + public static final String MAX_THREADS = "fs.s3a.threads.max"; + public static final int DEFAULT_MAX_THREADS = 256; + + // the number of threads to keep in the pool used by TransferManager + public static final String CORE_THREADS = "fs.s3a.threads.core"; + public static final int DEFAULT_CORE_THREADS = DEFAULT_MAXIMUM_CONNECTIONS; + + // when the number of threads is greater than the core, this is the maximum time + // that excess idle threads will wait for new tasks before terminating. + public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime"; + public static final int DEFAULT_KEEPALIVE_TIME = 60; + + // the maximum number of tasks that the LinkedBlockingQueue can hold + public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks"; + public static final int DEFAULT_MAX_TOTAL_TASKS = 1000; + // size of each of or multipart pieces in bytes public static final String MULTIPART_SIZE = "fs.s3a.multipart.size"; public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 457351d0242..e6b15577b0a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -26,6 +26,11 @@ import java.net.URI; import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.fs.s3.S3Credentials; @@ -77,6 +82,7 @@ public class S3AFileSystem extends FileSystem { private String bucket; private int maxKeys; private long partSize; + private TransferManager transfers; private int partSizeThreshold; public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class); private CannedAccessControlList cannedACL; @@ -85,6 +91,55 @@ public class S3AFileSystem extends FileSystem { // The maximum number of entries that can be deleted in any call to s3 private static final int MAX_ENTRIES_TO_DELETE = 1000; + private static final AtomicInteger poolNumber = new AtomicInteger(1); + /** + * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely, + * with a common prefix. + * @param prefix The prefix of every created Thread's name + * @return a {@link java.util.concurrent.ThreadFactory} that names threads + */ + public static ThreadFactory getNamedThreadFactory(final String prefix) { + SecurityManager s = System.getSecurityManager(); + final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread() + .getThreadGroup(); + + return new ThreadFactory() { + final AtomicInteger threadNumber = new AtomicInteger(1); + private final int poolNum = poolNumber.getAndIncrement(); + final ThreadGroup group = threadGroup; + + @Override + public Thread newThread(Runnable r) { + final String name = prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement(); + return new Thread(group, r, name); + } + }; + } + + /** + * Get a named {@link ThreadFactory} that just builds daemon threads. + * @param prefix name prefix for all threads created from the factory + * @return a thread factory that creates named, daemon threads with + * the supplied exception handler and normal priority + */ + private static ThreadFactory newDaemonThreadFactory(final String prefix) { + final ThreadFactory namedFactory = getNamedThreadFactory(prefix); + return new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = namedFactory.newThread(r); + if (!t.isDaemon()) { + t.setDaemon(true); + } + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } + + }; + } + /** Called after a new FileSystem instance is constructed. * @param name a uri whose authority section names the host, port, etc. * for this FileSystem @@ -93,7 +148,6 @@ public class S3AFileSystem extends FileSystem { public void initialize(URI name, Configuration conf) throws IOException { super.initialize(name, conf); - uri = URI.create(name.getScheme() + "://" + name.getAuthority()); workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, this.getWorkingDirectory()); @@ -138,6 +192,34 @@ public class S3AFileSystem extends FileSystem { partSizeThreshold = 5 * 1024 * 1024; } + int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS); + int coreThreads = conf.getInt(CORE_THREADS, DEFAULT_CORE_THREADS); + if (maxThreads == 0) { + maxThreads = Runtime.getRuntime().availableProcessors() * 8; + } + if (coreThreads == 0) { + coreThreads = Runtime.getRuntime().availableProcessors() * 8; + } + long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME); + LinkedBlockingQueue workQueue = + new LinkedBlockingQueue(maxThreads * + conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS)); + ThreadPoolExecutor tpe = new ThreadPoolExecutor( + coreThreads, + maxThreads, + keepAliveTime, + TimeUnit.SECONDS, + workQueue, + newDaemonThreadFactory("s3a-transfer-shared-")); + tpe.allowCoreThreadTimeOut(true); + + TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration(); + transferConfiguration.setMinimumUploadPartSize(partSize); + transferConfiguration.setMultipartUploadThreshold(partSizeThreshold); + + transfers = new TransferManager(s3, tpe); + transfers.setConfiguration(transferConfiguration); + String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL); if (!cannedACLName.isEmpty()) { cannedACL = CannedAccessControlList.valueOf(cannedACLName); @@ -155,11 +237,10 @@ public class S3AFileSystem extends FileSystem { DEFAULT_PURGE_EXISTING_MULTIPART_AGE); if (purgeExistingMultipart) { - TransferManager transferManager = new TransferManager(s3); Date purgeBefore = new Date(new Date().getTime() - purgeExistingMultipartAge*1000); - transferManager.abortMultipartUploads(bucket, purgeBefore); - transferManager.shutdownNow(false); + transfers.abortMultipartUploads(bucket, purgeBefore); + transfers.shutdownNow(false); } serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM); @@ -245,7 +326,7 @@ public class S3AFileSystem extends FileSystem { } // We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file - return new FSDataOutputStream(new S3AOutputStream(getConf(), s3, this, + return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, this, bucket, key, progress, cannedACL, statistics, serverSideEncryptionAlgorithm), null); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java index 7783b998111..2b611b6fdf1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java @@ -49,7 +49,7 @@ public class S3AOutputStream extends OutputStream { private boolean closed; private String key; private String bucket; - private AmazonS3Client client; + private TransferManager transfers; private Progressable progress; private long partSize; private int partSizeThreshold; @@ -61,14 +61,14 @@ public class S3AOutputStream extends OutputStream { public static final Logger LOG = S3AFileSystem.LOG; - public S3AOutputStream(Configuration conf, AmazonS3Client client, + public S3AOutputStream(Configuration conf, TransferManager transfers, S3AFileSystem fs, String bucket, String key, Progressable progress, CannedAccessControlList cannedACL, FileSystem.Statistics statistics, String serverSideEncryptionAlgorithm) throws IOException { this.bucket = bucket; this.key = key; - this.client = client; + this.transfers = transfers; this.progress = progress; this.fs = fs; this.cannedACL = cannedACL; @@ -114,13 +114,6 @@ public class S3AOutputStream extends OutputStream { try { - TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration(); - transferConfiguration.setMinimumUploadPartSize(partSize); - transferConfiguration.setMultipartUploadThreshold(partSizeThreshold); - - TransferManager transfers = new TransferManager(client); - transfers.setConfiguration(transferConfiguration); - final ObjectMetadata om = new ObjectMetadata(); if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { om.setServerSideEncryption(serverSideEncryptionAlgorithm);