fs.s3a.threads.keepalivetime
60
@@ -894,7 +888,7 @@
fs.s3a.max.total.tasks
- 1000
+ 5
Number of (part)uploads allowed to the queue before
blocking additional uploads.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
new file mode 100644
index 00000000000..fc8ae877473
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * This ExecutorService blocks the submission of new tasks when its queue is
+ * already full by using a semaphore. Task submissions require permits, task
+ * completions release permits.
+ *
+ * This is inspired by
+ * this s4 threadpool
+ */
+public class BlockingThreadPoolExecutorService
+ extends ForwardingListeningExecutorService {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(BlockingThreadPoolExecutorService.class);
+
+ private Semaphore queueingPermits;
+ private ListeningExecutorService executorDelegatee;
+
+ private static final AtomicInteger POOLNUMBER = new AtomicInteger(1);
+
+ /**
+ * Returns a {@link java.util.concurrent.ThreadFactory} that names each
+ * created thread uniquely,
+ * with a common prefix.
+ *
+ * @param prefix The prefix of every created Thread's name
+ * @return a {@link java.util.concurrent.ThreadFactory} that names threads
+ */
+ public static ThreadFactory getNamedThreadFactory(final String prefix) {
+ SecurityManager s = System.getSecurityManager();
+ final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() :
+ Thread.currentThread().getThreadGroup();
+
+ return new ThreadFactory() {
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+ private final int poolNum = POOLNUMBER.getAndIncrement();
+ private final ThreadGroup group = threadGroup;
+
+ @Override
+ public Thread newThread(Runnable r) {
+ final String name =
+ prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement();
+ return new Thread(group, r, name);
+ }
+ };
+ }
+
+ /**
+ * Get a named {@link ThreadFactory} that just builds daemon threads.
+ *
+ * @param prefix name prefix for all threads created from the factory
+ * @return a thread factory that creates named, daemon threads with
+ * the supplied exception handler and normal priority
+ */
+ private static ThreadFactory newDaemonThreadFactory(final String prefix) {
+ final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
+ return new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = namedFactory.newThread(r);
+ if (!t.isDaemon()) {
+ t.setDaemon(true);
+ }
+ if (t.getPriority() != Thread.NORM_PRIORITY) {
+ t.setPriority(Thread.NORM_PRIORITY);
+ }
+ return t;
+ }
+
+ };
+ }
+
+
+ /**
+ * A thread pool that that blocks clients submitting additional tasks if
+ * there are already {@code activeTasks} running threads and {@code
+ * waitingTasks} tasks waiting in its queue.
+ *
+ * @param activeTasks maximum number of active tasks
+ * @param waitingTasks maximum number of waiting tasks
+ * @param keepAliveTime time until threads are cleaned up in {@code unit}
+ * @param unit time unit
+ * @param prefixName prefix of name for threads
+ */
+ public BlockingThreadPoolExecutorService(int activeTasks, int waitingTasks,
+ long keepAliveTime, TimeUnit unit, String prefixName) {
+ super();
+ queueingPermits = new Semaphore(waitingTasks + activeTasks, false);
+ /* Although we generally only expect up to waitingTasks tasks in the
+ queue, we need to be able to buffer all tasks in case dequeueing is
+ slower than enqueueing. */
+ final BlockingQueue workQueue =
+ new LinkedBlockingQueue<>(waitingTasks + activeTasks);
+ ThreadPoolExecutor eventProcessingExecutor =
+ new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit,
+ workQueue, newDaemonThreadFactory(prefixName),
+ new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(Runnable r,
+ ThreadPoolExecutor executor) {
+ // This is not expected to happen.
+ LOG.error("Could not submit task to executor {}",
+ executor.toString());
+ }
+ });
+ eventProcessingExecutor.allowCoreThreadTimeOut(true);
+ executorDelegatee =
+ MoreExecutors.listeningDecorator(eventProcessingExecutor);
+
+ }
+
+ @Override
+ protected ListeningExecutorService delegate() {
+ return executorDelegatee;
+ }
+
+ @Override
+ public ListenableFuture submit(Callable task) {
+ try {
+ queueingPermits.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return Futures.immediateFailedCheckedFuture(e);
+ }
+ return super.submit(new CallableWithPermitRelease(task));
+ }
+
+ @Override
+ public ListenableFuture submit(Runnable task, T result) {
+ try {
+ queueingPermits.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return Futures.immediateFailedCheckedFuture(e);
+ }
+ return super.submit(new RunnableWithPermitRelease(task), result);
+ }
+
+ @Override
+ public ListenableFuture> submit(Runnable task) {
+ try {
+ queueingPermits.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return Futures.immediateFailedCheckedFuture(e);
+ }
+ return super.submit(new RunnableWithPermitRelease(task));
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ try {
+ queueingPermits.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ super.execute(new RunnableWithPermitRelease(command));
+ }
+
+ /**
+ * Releases a permit after the task is executed.
+ */
+ class RunnableWithPermitRelease implements Runnable {
+
+ private Runnable delegatee;
+
+ public RunnableWithPermitRelease(Runnable delegatee) {
+ this.delegatee = delegatee;
+ }
+
+ @Override
+ public void run() {
+ try {
+ delegatee.run();
+ } finally {
+ queueingPermits.release();
+ }
+
+ }
+ }
+
+ /**
+ * Releases a permit after the task is completed.
+ */
+ class CallableWithPermitRelease implements Callable {
+
+ private Callable delegatee;
+
+ public CallableWithPermitRelease(Callable delegatee) {
+ this.delegatee = delegatee;
+ }
+
+ @Override
+ public T call() throws Exception {
+ try {
+ return delegatee.call();
+ } finally {
+ queueingPermits.release();
+ }
+ }
+
+ }
+
+ @Override
+ public List> invokeAll(Collection extends Callable> tasks)
+ throws InterruptedException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public List> invokeAll(Collection extends Callable> tasks,
+ long timeout, TimeUnit unit) throws InterruptedException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public T invokeAny(Collection extends Callable> tasks)
+ throws InterruptedException, ExecutionException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public T invokeAny(Collection extends Callable> tasks, long timeout,
+ TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ throw new RuntimeException("Not implemented");
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 71668db0e86..218d5ce238e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -88,20 +88,20 @@ public final class Constants {
// the maximum number of threads to allow in the pool used by TransferManager
public static final String MAX_THREADS = "fs.s3a.threads.max";
- public static final int DEFAULT_MAX_THREADS = 256;
+ public static final int DEFAULT_MAX_THREADS = 10;
- // the number of threads to keep in the pool used by TransferManager
+ // unused option: maintained for compile-time compatibility.
+ // if set, a warning is logged in S3A during init
+ @Deprecated
public static final String CORE_THREADS = "fs.s3a.threads.core";
- public static final int DEFAULT_CORE_THREADS = DEFAULT_MAXIMUM_CONNECTIONS;
- // when the number of threads is greater than the core, this is the maximum time
- // that excess idle threads will wait for new tasks before terminating.
+ // the time an idle thread waits before terminating
public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime";
public static final int DEFAULT_KEEPALIVE_TIME = 60;
- // the maximum number of tasks that the LinkedBlockingQueue can hold
+ // the maximum number of tasks cached if all threads are already uploading
public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks";
- public static final int DEFAULT_MAX_TOTAL_TASKS = 1000;
+ public static final int DEFAULT_MAX_TOTAL_TASKS = 5;
// size of each of or multipart pieces in bytes
public static final String MULTIPART_SIZE = "fs.s3a.multipart.size";
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
index 7a985c6f95c..5509d369d9d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
@@ -49,7 +49,7 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutorService;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.Statistic.*;
@@ -110,7 +110,7 @@ public class S3AFastOutputStream extends OutputStream {
CannedAccessControlList cannedACL,
long partSize,
long multiPartThreshold,
- ThreadPoolExecutor threadPoolExecutor)
+ ExecutorService threadPoolExecutor)
throws IOException {
this.bucket = bucket;
this.key = key;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 9a3fcf25081..23106fbfc35 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -29,11 +29,9 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.AmazonClientException;
@@ -121,7 +119,7 @@ public class S3AFileSystem extends FileSystem {
private long partSize;
private boolean enableMultiObjectsDelete;
private TransferManager transfers;
- private ThreadPoolExecutor threadPoolExecutor;
+ private ExecutorService threadPoolExecutor;
private long multiPartThreshold;
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
private CannedAccessControlList cannedACL;
@@ -130,62 +128,12 @@ public class S3AFileSystem extends FileSystem {
private S3AStorageStatistics storageStatistics;
private long readAhead;
private S3AInputPolicy inputPolicy;
+ private static final AtomicBoolean warnedOfCoreThreadDeprecation =
+ new AtomicBoolean(false);
// The maximum number of entries that can be deleted in any call to s3
private static final int MAX_ENTRIES_TO_DELETE = 1000;
- private static final AtomicInteger poolNumber = new AtomicInteger(1);
-
- /**
- * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
- * with a common prefix.
- * @param prefix The prefix of every created Thread's name
- * @return a {@link java.util.concurrent.ThreadFactory} that names threads
- */
- public static ThreadFactory getNamedThreadFactory(final String prefix) {
- SecurityManager s = System.getSecurityManager();
- final ThreadGroup threadGroup = (s != null)
- ? s.getThreadGroup()
- : Thread.currentThread().getThreadGroup();
-
- return new ThreadFactory() {
- private final AtomicInteger threadNumber = new AtomicInteger(1);
- private final int poolNum = poolNumber.getAndIncrement();
- private final ThreadGroup group = threadGroup;
-
- @Override
- public Thread newThread(Runnable r) {
- final String name = String.format("%s-pool%03d-t%04d",
- prefix, poolNum, threadNumber.getAndIncrement());
- return new Thread(group, r, name);
- }
- };
- }
-
- /**
- * Get a named {@link ThreadFactory} that just builds daemon threads.
- * @param prefix name prefix for all threads created from the factory
- * @return a thread factory that creates named, daemon threads with
- * the supplied exception handler and normal priority
- */
- private static ThreadFactory newDaemonThreadFactory(final String prefix) {
- final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
- return new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = namedFactory.newThread(r);
- if (!t.isDaemon()) {
- t.setDaemon(true);
- }
- if (t.getPriority() != Thread.NORM_PRIORITY) {
- t.setPriority(Thread.NORM_PRIORITY);
- }
- return t;
- }
-
- };
- }
-
/** Called after a new FileSystem instance is constructed.
* @param name a uri whose authority section names the host, port, etc.
* for this FileSystem
@@ -258,27 +206,27 @@ public class S3AFileSystem extends FileSystem {
}
});
- int maxThreads = intOption(conf, MAX_THREADS, DEFAULT_MAX_THREADS, 0);
- int coreThreads = intOption(conf, CORE_THREADS, DEFAULT_CORE_THREADS, 0);
- if (maxThreads == 0) {
- maxThreads = Runtime.getRuntime().availableProcessors() * 8;
+ if (conf.get("fs.s3a.threads.core") != null &&
+ warnedOfCoreThreadDeprecation.compareAndSet(false, true)) {
+ LoggerFactory.getLogger(
+ "org.apache.hadoop.conf.Configuration.deprecation")
+ .warn("Unsupported option \"fs.s3a.threads.core\"" +
+ " will be ignored {}", conf.get("fs.s3a.threads.core"));
}
- if (coreThreads == 0) {
- coreThreads = Runtime.getRuntime().availableProcessors() * 8;
+ int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
+ if (maxThreads < 2) {
+ LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
+ maxThreads = 2;
}
- long keepAliveTime = longOption(conf, KEEPALIVE_TIME,
- DEFAULT_KEEPALIVE_TIME, 0);
- LinkedBlockingQueue workQueue =
- new LinkedBlockingQueue<>(maxThreads *
- intOption(conf, MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1));
- threadPoolExecutor = new ThreadPoolExecutor(
- coreThreads,
- maxThreads,
- keepAliveTime,
- TimeUnit.SECONDS,
- workQueue,
- newDaemonThreadFactory("s3a-transfer-shared-"));
- threadPoolExecutor.allowCoreThreadTimeOut(true);
+ int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS);
+ if (totalTasks < 1) {
+ LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1.");
+ totalTasks = 1;
+ }
+ long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
+ threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads,
+ maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS,
+ "s3a-transfer-shared");
initTransferManager();
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index 9213a132b8c..c97b6473b1b 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -495,17 +495,11 @@ this capability.
fs.s3a.threads.max
- 256
+ 10
Maximum number of concurrent active (part)uploads,
which each use a thread from the threadpool.
-
- fs.s3a.threads.core
- 15
- Number of core threads in the threadpool.
-
-
fs.s3a.threads.keepalivetime
60
@@ -515,7 +509,7 @@ this capability.
fs.s3a.max.total.tasks
- 1000
+ 5
Number of (part)uploads allowed to the queue before
blocking additional uploads.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java
new file mode 100644
index 00000000000..25a895862ea
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.util.StopWatch;
+import org.junit.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Basic unit test for S3A's blocking executor service.
+ */
+public class TestBlockingThreadPoolExecutorService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ BlockingThreadPoolExecutorService.class);
+
+ private static final int NUM_ACTIVE_TASKS = 4;
+ private static final int NUM_WAITING_TASKS = 2;
+ private static final int TASK_SLEEP_MSEC = 100;
+ private static final int SHUTDOWN_WAIT_MSEC = 200;
+ private static final int SHUTDOWN_WAIT_TRIES = 5;
+ private static final int BLOCKING_THRESHOLD_MSEC = 50;
+
+ private static final Integer SOME_VALUE = 1337;
+
+ private static BlockingThreadPoolExecutorService tpe = null;
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ ensureDestroyed();
+ }
+
+ /**
+ * Basic test of running one trivial task.
+ */
+ @Test
+ public void testSubmitCallable() throws Exception {
+ ensureCreated();
+ ListenableFuture f = tpe.submit(callableSleeper);
+ Integer v = f.get();
+ assertEquals(SOME_VALUE, v);
+ }
+
+ /**
+ * More involved test, including detecting blocking when at capacity.
+ */
+ @Test
+ public void testSubmitRunnable() throws Exception {
+ ensureCreated();
+ int totalTasks = NUM_ACTIVE_TASKS + NUM_WAITING_TASKS;
+ StopWatch stopWatch = new StopWatch().start();
+ for (int i = 0; i < totalTasks; i++) {
+ tpe.submit(sleeper);
+ assertDidntBlock(stopWatch);
+ }
+ tpe.submit(sleeper);
+ assertDidBlock(stopWatch);
+ }
+
+ @Test
+ public void testShutdown() throws Exception {
+ // Cover create / destroy, regardless of when this test case runs
+ ensureCreated();
+ ensureDestroyed();
+
+ // Cover create, execute, destroy, regardless of when test case runs
+ ensureCreated();
+ testSubmitRunnable();
+ ensureDestroyed();
+ }
+
+ // Helper functions, etc.
+
+ private void assertDidntBlock(StopWatch sw) {
+ try {
+ assertFalse("Non-blocking call took too long.",
+ sw.now(TimeUnit.MILLISECONDS) > BLOCKING_THRESHOLD_MSEC);
+ } finally {
+ sw.reset().start();
+ }
+ }
+
+ private void assertDidBlock(StopWatch sw) {
+ try {
+ if (sw.now(TimeUnit.MILLISECONDS) < BLOCKING_THRESHOLD_MSEC) {
+ throw new RuntimeException("Blocking call returned too fast.");
+ }
+ } finally {
+ sw.reset().start();
+ }
+ }
+
+ private Runnable sleeper = new Runnable() {
+ @Override
+ public void run() {
+ String name = Thread.currentThread().getName();
+ try {
+ Thread.sleep(TASK_SLEEP_MSEC);
+ } catch (InterruptedException e) {
+ LOG.info("Thread {} interrupted.", name);
+ Thread.currentThread().interrupt();
+ }
+ }
+ };
+
+ private Callable callableSleeper = new Callable() {
+ @Override
+ public Integer call() throws Exception {
+ sleeper.run();
+ return SOME_VALUE;
+ }
+ };
+
+ /**
+ * Helper function to create thread pool under test.
+ */
+ private static void ensureCreated() throws Exception {
+ if (tpe == null) {
+ LOG.debug("Creating thread pool");
+ tpe = new BlockingThreadPoolExecutorService(NUM_ACTIVE_TASKS,
+ NUM_WAITING_TASKS, 1, TimeUnit.SECONDS, "btpetest");
+ }
+ }
+
+ /**
+ * Helper function to terminate thread pool under test, asserting that
+ * shutdown -> terminate works as expected.
+ */
+ private static void ensureDestroyed() throws Exception {
+ if (tpe == null) {
+ return;
+ }
+ int shutdownTries = SHUTDOWN_WAIT_TRIES;
+
+ tpe.shutdown();
+ if (!tpe.isShutdown()) {
+ throw new RuntimeException("Shutdown had no effect.");
+ }
+
+ while (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC,
+ TimeUnit.MILLISECONDS)) {
+ LOG.info("Waiting for thread pool shutdown.");
+ if (shutdownTries-- <= 0) {
+ LOG.error("Failed to terminate thread pool gracefully.");
+ break;
+ }
+ }
+ if (!tpe.isTerminated()) {
+ tpe.shutdownNow();
+ if (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC,
+ TimeUnit.MILLISECONDS)) {
+ throw new RuntimeException(
+ "Failed to terminate thread pool in timely manner.");
+ }
+ }
+ tpe = null;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockingThreadPool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockingThreadPool.java
new file mode 100644
index 00000000000..bd738b25423
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockingThreadPool.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+/**
+ * Demonstrate that the threadpool blocks additional client requests if
+ * its queue is full (rather than throwing an exception) by initiating an
+ * upload consisting of 4 parts with 2 threads and 1 spot in the queue. The
+ * 4th part should not trigger an exception as it would with a
+ * non-blocking threadpool.
+ */
+public class TestS3ABlockingThreadPool {
+
+ private Configuration conf;
+ private S3AFileSystem fs;
+
+ @Rule
+ public Timeout testTimeout = new Timeout(30 * 60 * 1000);
+
+ protected Path getTestPath() {
+ return new Path("/tests3a");
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new Configuration();
+ conf.setLong(Constants.MIN_MULTIPART_THRESHOLD, 5 * 1024 * 1024);
+ conf.setLong(Constants.MULTIPART_SIZE, 5 * 1024 * 1024);
+ conf.setInt(Constants.MAX_THREADS, 2);
+ conf.setInt(Constants.MAX_TOTAL_TASKS, 1);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (fs != null) {
+ fs.delete(getTestPath(), true);
+ }
+ }
+
+ @Test
+ public void testRegularMultiPartUpload() throws Exception {
+ fs = S3ATestUtils.createTestFileSystem(conf);
+ ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 *
+ 1024);
+ }
+
+ @Test
+ public void testFastMultiPartUpload() throws Exception {
+ conf.setBoolean(Constants.FAST_UPLOAD, true);
+ fs = S3ATestUtils.createTestFileSystem(conf);
+ ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 *
+ 1024);
+
+ }
+}