fs.s3a.threads.keepalivetime
60
@@ -858,7 +864,7 @@ for ldap providers in the same way as above does.
fs.s3a.max.total.tasks
- 5
+ 1000
Number of (part)uploads allowed to the queue before
blocking additional uploads.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
deleted file mode 100644
index 3baf6fc178d..00000000000
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.util.concurrent.ForwardingListeningExecutorService;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-/**
- * This ExecutorService blocks the submission of new tasks when its queue is
- * already full by using a semaphore. Task submissions require permits, task
- * completions release permits.
- *
- * This is inspired by this s4 threadpool
- */
-public class BlockingThreadPoolExecutorService
- extends ForwardingListeningExecutorService {
-
- private static Logger LOG = LoggerFactory
- .getLogger(BlockingThreadPoolExecutorService.class);
-
- private Semaphore queueingPermits;
- private ListeningExecutorService executorDelegatee;
-
- private static final AtomicInteger POOLNUMBER = new AtomicInteger(1);
-
- /**
- * Returns a {@link java.util.concurrent.ThreadFactory} that names each
- * created thread uniquely,
- * with a common prefix.
- *
- * @param prefix The prefix of every created Thread's name
- * @return a {@link java.util.concurrent.ThreadFactory} that names threads
- */
- public static ThreadFactory getNamedThreadFactory(final String prefix) {
- SecurityManager s = System.getSecurityManager();
- final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() :
- Thread.currentThread().getThreadGroup();
-
- return new ThreadFactory() {
- private final AtomicInteger threadNumber = new AtomicInteger(1);
- private final int poolNum = POOLNUMBER.getAndIncrement();
- private final ThreadGroup group = threadGroup;
-
- @Override
- public Thread newThread(Runnable r) {
- final String name =
- prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement();
- return new Thread(group, r, name);
- }
- };
- }
-
- /**
- * Get a named {@link ThreadFactory} that just builds daemon threads.
- *
- * @param prefix name prefix for all threads created from the factory
- * @return a thread factory that creates named, daemon threads with
- * the supplied exception handler and normal priority
- */
- private static ThreadFactory newDaemonThreadFactory(final String prefix) {
- final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
- return new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = namedFactory.newThread(r);
- if (!t.isDaemon()) {
- t.setDaemon(true);
- }
- if (t.getPriority() != Thread.NORM_PRIORITY) {
- t.setPriority(Thread.NORM_PRIORITY);
- }
- return t;
- }
-
- };
- }
-
-
- /**
- * A thread pool that that blocks clients submitting additional tasks if
- * there are already {@code activeTasks} running threads and {@code
- * waitingTasks} tasks waiting in its queue.
- *
- * @param activeTasks maximum number of active tasks
- * @param waitingTasks maximum number of waiting tasks
- * @param keepAliveTime time until threads are cleaned up in {@code unit}
- * @param unit time unit
- * @param prefixName prefix of name for threads
- */
- public BlockingThreadPoolExecutorService(int activeTasks, int waitingTasks,
- long keepAliveTime, TimeUnit unit, String prefixName) {
- super();
- queueingPermits = new Semaphore(waitingTasks + activeTasks, false);
- /* Although we generally only expect up to waitingTasks tasks in the
- queue, we need to be able to buffer all tasks in case dequeueing is
- slower than enqueueing. */
- final BlockingQueue workQueue =
- new LinkedBlockingQueue<>(waitingTasks + activeTasks);
- ThreadPoolExecutor eventProcessingExecutor =
- new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit,
- workQueue, newDaemonThreadFactory(prefixName),
- new RejectedExecutionHandler() {
- @Override
- public void rejectedExecution(Runnable r,
- ThreadPoolExecutor executor) {
- // This is not expected to happen.
- LOG.error("Could not submit task to executor {}",
- executor.toString());
- }
- });
- eventProcessingExecutor.allowCoreThreadTimeOut(true);
- executorDelegatee =
- MoreExecutors.listeningDecorator(eventProcessingExecutor);
-
- }
-
- @Override
- protected ListeningExecutorService delegate() {
- return executorDelegatee;
- }
-
- @Override
- public ListenableFuture submit(Callable task) {
- try {
- queueingPermits.acquire();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return Futures.immediateFailedCheckedFuture(e);
- }
- return super.submit(new CallableWithPermitRelease(task));
- }
-
- @Override
- public ListenableFuture submit(Runnable task, T result) {
- try {
- queueingPermits.acquire();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return Futures.immediateFailedCheckedFuture(e);
- }
- return super.submit(new RunnableWithPermitRelease(task), result);
- }
-
- @Override
- public ListenableFuture> submit(Runnable task) {
- try {
- queueingPermits.acquire();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return Futures.immediateFailedCheckedFuture(e);
- }
- return super.submit(new RunnableWithPermitRelease(task));
- }
-
- @Override
- public void execute(Runnable command) {
- try {
- queueingPermits.acquire();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- super.execute(new RunnableWithPermitRelease(command));
- }
-
- /**
- * Releases a permit after the task is executed.
- */
- class RunnableWithPermitRelease implements Runnable {
-
- private Runnable delegatee;
-
- public RunnableWithPermitRelease(Runnable delegatee) {
- this.delegatee = delegatee;
- }
-
- @Override
- public void run() {
- try {
- delegatee.run();
- } finally {
- queueingPermits.release();
- }
-
- }
- }
-
- /**
- * Releases a permit after the task is completed.
- */
- class CallableWithPermitRelease implements Callable {
-
- private Callable delegatee;
-
- public CallableWithPermitRelease(Callable delegatee) {
- this.delegatee = delegatee;
- }
-
- @Override
- public T call() throws Exception {
- try {
- return delegatee.call();
- } finally {
- queueingPermits.release();
- }
- }
-
- }
-
- @Override
- public List> invokeAll(Collection extends Callable> tasks)
- throws InterruptedException {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public List> invokeAll(Collection extends Callable> tasks,
- long timeout, TimeUnit unit) throws InterruptedException {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public T invokeAny(Collection extends Callable> tasks)
- throws InterruptedException, ExecutionException {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public T invokeAny(Collection extends Callable> tasks, long timeout,
- TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- throw new RuntimeException("Not implemented");
- }
-
-}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 99e85cfef54..fa81d935222 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -61,15 +61,20 @@ public class Constants {
// the maximum number of threads to allow in the pool used by TransferManager
public static final String MAX_THREADS = "fs.s3a.threads.max";
- public static final int DEFAULT_MAX_THREADS = 10;
+ public static final int DEFAULT_MAX_THREADS = 256;
- // the time an idle thread waits before terminating
+ // the number of threads to keep in the pool used by TransferManager
+ public static final String CORE_THREADS = "fs.s3a.threads.core";
+ public static final int DEFAULT_CORE_THREADS = DEFAULT_MAXIMUM_CONNECTIONS;
+
+ // when the number of threads is greater than the core, this is the maximum time
+ // that excess idle threads will wait for new tasks before terminating.
public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime";
public static final int DEFAULT_KEEPALIVE_TIME = 60;
- // the maximum number of tasks cached if all threads are already uploading
+ // the maximum number of tasks that the LinkedBlockingQueue can hold
public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks";
- public static final int DEFAULT_MAX_TOTAL_TASKS = 5;
+ public static final int DEFAULT_MAX_TOTAL_TASKS = 1000;
// size of each of or multipart pieces in bytes
public static final String MULTIPART_SIZE = "fs.s3a.multipart.size";
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
index 5558693557b..2e06fba2752 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
@@ -51,7 +51,7 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
/**
@@ -108,7 +108,7 @@ public class S3AFastOutputStream extends OutputStream {
String bucket, String key, Progressable progress,
FileSystem.Statistics statistics, CannedAccessControlList cannedACL,
String serverSideEncryptionAlgorithm, long partSize,
- long multiPartThreshold, ExecutorService threadPoolExecutor)
+ long multiPartThreshold, ThreadPoolExecutor threadPoolExecutor)
throws IOException {
this.bucket = bucket;
this.key = key;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index ccb0cd26ac1..83be18423fe 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -26,8 +26,11 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
@@ -83,7 +86,7 @@ public class S3AFileSystem extends FileSystem {
private int maxKeys;
private long partSize;
private TransferManager transfers;
- private ExecutorService threadPoolExecutor;
+ private ThreadPoolExecutor threadPoolExecutor;
private long multiPartThreshold;
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
private CannedAccessControlList cannedACL;
@@ -92,6 +95,55 @@ public class S3AFileSystem extends FileSystem {
// The maximum number of entries that can be deleted in any call to s3
private static final int MAX_ENTRIES_TO_DELETE = 1000;
+ private static final AtomicInteger poolNumber = new AtomicInteger(1);
+ /**
+ * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
+ * with a common prefix.
+ * @param prefix The prefix of every created Thread's name
+ * @return a {@link java.util.concurrent.ThreadFactory} that names threads
+ */
+ public static ThreadFactory getNamedThreadFactory(final String prefix) {
+ SecurityManager s = System.getSecurityManager();
+ final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
+ .getThreadGroup();
+
+ return new ThreadFactory() {
+ final AtomicInteger threadNumber = new AtomicInteger(1);
+ private final int poolNum = poolNumber.getAndIncrement();
+ final ThreadGroup group = threadGroup;
+
+ @Override
+ public Thread newThread(Runnable r) {
+ final String name = prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement();
+ return new Thread(group, r, name);
+ }
+ };
+ }
+
+ /**
+ * Get a named {@link ThreadFactory} that just builds daemon threads.
+ * @param prefix name prefix for all threads created from the factory
+ * @return a thread factory that creates named, daemon threads with
+ * the supplied exception handler and normal priority
+ */
+ private static ThreadFactory newDaemonThreadFactory(final String prefix) {
+ final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
+ return new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = namedFactory.newThread(r);
+ if (!t.isDaemon()) {
+ t.setDaemon(true);
+ }
+ if (t.getPriority() != Thread.NORM_PRIORITY) {
+ t.setPriority(Thread.NORM_PRIORITY);
+ }
+ return t;
+ }
+
+ };
+ }
+
/** Called after a new FileSystem instance is constructed.
* @param name a uri whose authority section names the host, port, etc.
* for this FileSystem
@@ -212,19 +264,25 @@ public class S3AFileSystem extends FileSystem {
}
int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
- if (maxThreads < 2) {
- LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
- maxThreads = 2;
+ int coreThreads = conf.getInt(CORE_THREADS, DEFAULT_CORE_THREADS);
+ if (maxThreads == 0) {
+ maxThreads = Runtime.getRuntime().availableProcessors() * 8;
}
- int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS);
- if (totalTasks < 1) {
- LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1.");
- totalTasks = 1;
+ if (coreThreads == 0) {
+ coreThreads = Runtime.getRuntime().availableProcessors() * 8;
}
long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
- threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads,
- maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS,
- "s3a-transfer-shared");
+ LinkedBlockingQueue workQueue =
+ new LinkedBlockingQueue<>(maxThreads *
+ conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS));
+ threadPoolExecutor = new ThreadPoolExecutor(
+ coreThreads,
+ maxThreads,
+ keepAliveTime,
+ TimeUnit.SECONDS,
+ workQueue,
+ newDaemonThreadFactory("s3a-transfer-shared-"));
+ threadPoolExecutor.allowCoreThreadTimeOut(true);
TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
transferConfiguration.setMinimumUploadPartSize(partSize);
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index 0260e26d4bf..6df15e68cf4 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -231,11 +231,17 @@ If you do any of these: change your credentials immediately!
fs.s3a.threads.max
- 10
+ 256
Maximum number of concurrent active (part)uploads,
which each use a thread from the threadpool.
+
+ fs.s3a.threads.core
+ 15
+ Number of core threads in the threadpool.
+
+
fs.s3a.threads.keepalivetime
60
@@ -245,7 +251,7 @@ If you do any of these: change your credentials immediately!
fs.s3a.max.total.tasks
- 5
+ 1000
Number of (part)uploads allowed to the queue before
blocking additional uploads.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java
deleted file mode 100644
index 25a895862ea..00000000000
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.hadoop.util.StopWatch;
-import org.junit.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-/**
- * Basic unit test for S3A's blocking executor service.
- */
-public class TestBlockingThreadPoolExecutorService {
-
- private static final Logger LOG = LoggerFactory.getLogger(
- BlockingThreadPoolExecutorService.class);
-
- private static final int NUM_ACTIVE_TASKS = 4;
- private static final int NUM_WAITING_TASKS = 2;
- private static final int TASK_SLEEP_MSEC = 100;
- private static final int SHUTDOWN_WAIT_MSEC = 200;
- private static final int SHUTDOWN_WAIT_TRIES = 5;
- private static final int BLOCKING_THRESHOLD_MSEC = 50;
-
- private static final Integer SOME_VALUE = 1337;
-
- private static BlockingThreadPoolExecutorService tpe = null;
-
- @AfterClass
- public static void afterClass() throws Exception {
- ensureDestroyed();
- }
-
- /**
- * Basic test of running one trivial task.
- */
- @Test
- public void testSubmitCallable() throws Exception {
- ensureCreated();
- ListenableFuture f = tpe.submit(callableSleeper);
- Integer v = f.get();
- assertEquals(SOME_VALUE, v);
- }
-
- /**
- * More involved test, including detecting blocking when at capacity.
- */
- @Test
- public void testSubmitRunnable() throws Exception {
- ensureCreated();
- int totalTasks = NUM_ACTIVE_TASKS + NUM_WAITING_TASKS;
- StopWatch stopWatch = new StopWatch().start();
- for (int i = 0; i < totalTasks; i++) {
- tpe.submit(sleeper);
- assertDidntBlock(stopWatch);
- }
- tpe.submit(sleeper);
- assertDidBlock(stopWatch);
- }
-
- @Test
- public void testShutdown() throws Exception {
- // Cover create / destroy, regardless of when this test case runs
- ensureCreated();
- ensureDestroyed();
-
- // Cover create, execute, destroy, regardless of when test case runs
- ensureCreated();
- testSubmitRunnable();
- ensureDestroyed();
- }
-
- // Helper functions, etc.
-
- private void assertDidntBlock(StopWatch sw) {
- try {
- assertFalse("Non-blocking call took too long.",
- sw.now(TimeUnit.MILLISECONDS) > BLOCKING_THRESHOLD_MSEC);
- } finally {
- sw.reset().start();
- }
- }
-
- private void assertDidBlock(StopWatch sw) {
- try {
- if (sw.now(TimeUnit.MILLISECONDS) < BLOCKING_THRESHOLD_MSEC) {
- throw new RuntimeException("Blocking call returned too fast.");
- }
- } finally {
- sw.reset().start();
- }
- }
-
- private Runnable sleeper = new Runnable() {
- @Override
- public void run() {
- String name = Thread.currentThread().getName();
- try {
- Thread.sleep(TASK_SLEEP_MSEC);
- } catch (InterruptedException e) {
- LOG.info("Thread {} interrupted.", name);
- Thread.currentThread().interrupt();
- }
- }
- };
-
- private Callable callableSleeper = new Callable() {
- @Override
- public Integer call() throws Exception {
- sleeper.run();
- return SOME_VALUE;
- }
- };
-
- /**
- * Helper function to create thread pool under test.
- */
- private static void ensureCreated() throws Exception {
- if (tpe == null) {
- LOG.debug("Creating thread pool");
- tpe = new BlockingThreadPoolExecutorService(NUM_ACTIVE_TASKS,
- NUM_WAITING_TASKS, 1, TimeUnit.SECONDS, "btpetest");
- }
- }
-
- /**
- * Helper function to terminate thread pool under test, asserting that
- * shutdown -> terminate works as expected.
- */
- private static void ensureDestroyed() throws Exception {
- if (tpe == null) {
- return;
- }
- int shutdownTries = SHUTDOWN_WAIT_TRIES;
-
- tpe.shutdown();
- if (!tpe.isShutdown()) {
- throw new RuntimeException("Shutdown had no effect.");
- }
-
- while (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC,
- TimeUnit.MILLISECONDS)) {
- LOG.info("Waiting for thread pool shutdown.");
- if (shutdownTries-- <= 0) {
- LOG.error("Failed to terminate thread pool gracefully.");
- break;
- }
- }
- if (!tpe.isTerminated()) {
- tpe.shutdownNow();
- if (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC,
- TimeUnit.MILLISECONDS)) {
- throw new RuntimeException(
- "Failed to terminate thread pool in timely manner.");
- }
- }
- tpe = null;
- }
-}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockingThreadPool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockingThreadPool.java
deleted file mode 100644
index bd738b25423..00000000000
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockingThreadPool.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.contract.ContractTestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
-
-/**
- * Demonstrate that the threadpool blocks additional client requests if
- * its queue is full (rather than throwing an exception) by initiating an
- * upload consisting of 4 parts with 2 threads and 1 spot in the queue. The
- * 4th part should not trigger an exception as it would with a
- * non-blocking threadpool.
- */
-public class TestS3ABlockingThreadPool {
-
- private Configuration conf;
- private S3AFileSystem fs;
-
- @Rule
- public Timeout testTimeout = new Timeout(30 * 60 * 1000);
-
- protected Path getTestPath() {
- return new Path("/tests3a");
- }
-
- @Before
- public void setUp() throws Exception {
- conf = new Configuration();
- conf.setLong(Constants.MIN_MULTIPART_THRESHOLD, 5 * 1024 * 1024);
- conf.setLong(Constants.MULTIPART_SIZE, 5 * 1024 * 1024);
- conf.setInt(Constants.MAX_THREADS, 2);
- conf.setInt(Constants.MAX_TOTAL_TASKS, 1);
- }
-
- @After
- public void tearDown() throws Exception {
- if (fs != null) {
- fs.delete(getTestPath(), true);
- }
- }
-
- @Test
- public void testRegularMultiPartUpload() throws Exception {
- fs = S3ATestUtils.createTestFileSystem(conf);
- ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 *
- 1024);
- }
-
- @Test
- public void testFastMultiPartUpload() throws Exception {
- conf.setBoolean(Constants.FAST_UPLOAD, true);
- fs = S3ATestUtils.createTestFileSystem(conf);
- ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 *
- 1024);
-
- }
-}