From 47941858af3411c49170a69c6a9ba406fb816516 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Fri, 6 Nov 2015 09:52:21 -0800 Subject: [PATCH] Revert "HADOOP-11684. S3a to use thread pool that blocks clients. (Thomas Demoor and Aaron Fabbri via lei)" This reverts commit 01ae30796d0a120e4317e819533fb52bdae76885. --- .../hadoop-common/CHANGES.txt | 3 - .../src/main/resources/core-default.xml | 10 +- .../BlockingThreadPoolExecutorService.java | 274 ------------------ .../org/apache/hadoop/fs/s3a/Constants.java | 13 +- .../hadoop/fs/s3a/S3AFastOutputStream.java | 4 +- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 82 +++++- .../site/markdown/tools/hadoop-aws/index.md | 10 +- ...TestBlockingThreadPoolExecutorService.java | 182 ------------ .../fs/s3a/TestS3ABlockingThreadPool.java | 80 ----- 9 files changed, 97 insertions(+), 561 deletions(-) delete mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java delete mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java delete mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockingThreadPool.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index d30f0d55b81..e75693eb1be 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -326,9 +326,6 @@ Release 2.8.0 - UNRELEASED HADOOP-11685. StorageException complaining " no lease ID" during HBase distributed log splitting (Duo Xu via cnauroth) - HADOOP-11684. S3a to use thread pool that blocks clients. (Thomas Demoor - and Aaron Fabbri via lei) - OPTIMIZATIONS HADOOP-11785. Reduce the number of listStatus operation in distcp diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 5607489b71a..fc09ddfc37a 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -844,11 +844,17 @@ for ldap providers in the same way as above does. fs.s3a.threads.max - 10 + 256 Maximum number of concurrent active (part)uploads, which each use a thread from the threadpool. + + fs.s3a.threads.core + 15 + Number of core threads in the threadpool. + + fs.s3a.threads.keepalivetime 60 @@ -858,7 +864,7 @@ for ldap providers in the same way as above does. fs.s3a.max.total.tasks - 5 + 1000 Number of (part)uploads allowed to the queue before blocking additional uploads. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java deleted file mode 100644 index 3baf6fc178d..00000000000 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java +++ /dev/null @@ -1,274 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.util.concurrent.ForwardingListeningExecutorService; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; - -/** - * This ExecutorService blocks the submission of new tasks when its queue is - * already full by using a semaphore. Task submissions require permits, task - * completions release permits. - *

- * This is inspired by this s4 threadpool - */ -public class BlockingThreadPoolExecutorService - extends ForwardingListeningExecutorService { - - private static Logger LOG = LoggerFactory - .getLogger(BlockingThreadPoolExecutorService.class); - - private Semaphore queueingPermits; - private ListeningExecutorService executorDelegatee; - - private static final AtomicInteger POOLNUMBER = new AtomicInteger(1); - - /** - * Returns a {@link java.util.concurrent.ThreadFactory} that names each - * created thread uniquely, - * with a common prefix. - * - * @param prefix The prefix of every created Thread's name - * @return a {@link java.util.concurrent.ThreadFactory} that names threads - */ - public static ThreadFactory getNamedThreadFactory(final String prefix) { - SecurityManager s = System.getSecurityManager(); - final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : - Thread.currentThread().getThreadGroup(); - - return new ThreadFactory() { - private final AtomicInteger threadNumber = new AtomicInteger(1); - private final int poolNum = POOLNUMBER.getAndIncrement(); - private final ThreadGroup group = threadGroup; - - @Override - public Thread newThread(Runnable r) { - final String name = - prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement(); - return new Thread(group, r, name); - } - }; - } - - /** - * Get a named {@link ThreadFactory} that just builds daemon threads. - * - * @param prefix name prefix for all threads created from the factory - * @return a thread factory that creates named, daemon threads with - * the supplied exception handler and normal priority - */ - private static ThreadFactory newDaemonThreadFactory(final String prefix) { - final ThreadFactory namedFactory = getNamedThreadFactory(prefix); - return new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = namedFactory.newThread(r); - if (!t.isDaemon()) { - t.setDaemon(true); - } - if (t.getPriority() != Thread.NORM_PRIORITY) { - t.setPriority(Thread.NORM_PRIORITY); - } - return t; - } - - }; - } - - - /** - * A thread pool that that blocks clients submitting additional tasks if - * there are already {@code activeTasks} running threads and {@code - * waitingTasks} tasks waiting in its queue. - * - * @param activeTasks maximum number of active tasks - * @param waitingTasks maximum number of waiting tasks - * @param keepAliveTime time until threads are cleaned up in {@code unit} - * @param unit time unit - * @param prefixName prefix of name for threads - */ - public BlockingThreadPoolExecutorService(int activeTasks, int waitingTasks, - long keepAliveTime, TimeUnit unit, String prefixName) { - super(); - queueingPermits = new Semaphore(waitingTasks + activeTasks, false); - /* Although we generally only expect up to waitingTasks tasks in the - queue, we need to be able to buffer all tasks in case dequeueing is - slower than enqueueing. */ - final BlockingQueue workQueue = - new LinkedBlockingQueue<>(waitingTasks + activeTasks); - ThreadPoolExecutor eventProcessingExecutor = - new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit, - workQueue, newDaemonThreadFactory(prefixName), - new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable r, - ThreadPoolExecutor executor) { - // This is not expected to happen. - LOG.error("Could not submit task to executor {}", - executor.toString()); - } - }); - eventProcessingExecutor.allowCoreThreadTimeOut(true); - executorDelegatee = - MoreExecutors.listeningDecorator(eventProcessingExecutor); - - } - - @Override - protected ListeningExecutorService delegate() { - return executorDelegatee; - } - - @Override - public ListenableFuture submit(Callable task) { - try { - queueingPermits.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return Futures.immediateFailedCheckedFuture(e); - } - return super.submit(new CallableWithPermitRelease(task)); - } - - @Override - public ListenableFuture submit(Runnable task, T result) { - try { - queueingPermits.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return Futures.immediateFailedCheckedFuture(e); - } - return super.submit(new RunnableWithPermitRelease(task), result); - } - - @Override - public ListenableFuture submit(Runnable task) { - try { - queueingPermits.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return Futures.immediateFailedCheckedFuture(e); - } - return super.submit(new RunnableWithPermitRelease(task)); - } - - @Override - public void execute(Runnable command) { - try { - queueingPermits.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - super.execute(new RunnableWithPermitRelease(command)); - } - - /** - * Releases a permit after the task is executed. - */ - class RunnableWithPermitRelease implements Runnable { - - private Runnable delegatee; - - public RunnableWithPermitRelease(Runnable delegatee) { - this.delegatee = delegatee; - } - - @Override - public void run() { - try { - delegatee.run(); - } finally { - queueingPermits.release(); - } - - } - } - - /** - * Releases a permit after the task is completed. - */ - class CallableWithPermitRelease implements Callable { - - private Callable delegatee; - - public CallableWithPermitRelease(Callable delegatee) { - this.delegatee = delegatee; - } - - @Override - public T call() throws Exception { - try { - return delegatee.call(); - } finally { - queueingPermits.release(); - } - } - - } - - @Override - public List> invokeAll(Collection> tasks) - throws InterruptedException { - throw new RuntimeException("Not implemented"); - } - - @Override - public List> invokeAll(Collection> tasks, - long timeout, TimeUnit unit) throws InterruptedException { - throw new RuntimeException("Not implemented"); - } - - @Override - public T invokeAny(Collection> tasks) - throws InterruptedException, ExecutionException { - throw new RuntimeException("Not implemented"); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, - TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - throw new RuntimeException("Not implemented"); - } - -} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 99e85cfef54..fa81d935222 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -61,15 +61,20 @@ public class Constants { // the maximum number of threads to allow in the pool used by TransferManager public static final String MAX_THREADS = "fs.s3a.threads.max"; - public static final int DEFAULT_MAX_THREADS = 10; + public static final int DEFAULT_MAX_THREADS = 256; - // the time an idle thread waits before terminating + // the number of threads to keep in the pool used by TransferManager + public static final String CORE_THREADS = "fs.s3a.threads.core"; + public static final int DEFAULT_CORE_THREADS = DEFAULT_MAXIMUM_CONNECTIONS; + + // when the number of threads is greater than the core, this is the maximum time + // that excess idle threads will wait for new tasks before terminating. public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime"; public static final int DEFAULT_KEEPALIVE_TIME = 60; - // the maximum number of tasks cached if all threads are already uploading + // the maximum number of tasks that the LinkedBlockingQueue can hold public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks"; - public static final int DEFAULT_MAX_TOTAL_TASKS = 5; + public static final int DEFAULT_MAX_TOTAL_TASKS = 1000; // size of each of or multipart pieces in bytes public static final String MULTIPART_SIZE = "fs.s3a.multipart.size"; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java index 5558693557b..2e06fba2752 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java @@ -51,7 +51,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; /** @@ -108,7 +108,7 @@ public S3AFastOutputStream(AmazonS3Client client, S3AFileSystem fs, String bucket, String key, Progressable progress, FileSystem.Statistics statistics, CannedAccessControlList cannedACL, String serverSideEncryptionAlgorithm, long partSize, - long multiPartThreshold, ExecutorService threadPoolExecutor) + long multiPartThreshold, ThreadPoolExecutor threadPoolExecutor) throws IOException { this.bucket = bucket; this.key = key; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index ccb0cd26ac1..83be18423fe 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -26,8 +26,11 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; @@ -83,7 +86,7 @@ public class S3AFileSystem extends FileSystem { private int maxKeys; private long partSize; private TransferManager transfers; - private ExecutorService threadPoolExecutor; + private ThreadPoolExecutor threadPoolExecutor; private long multiPartThreshold; public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class); private CannedAccessControlList cannedACL; @@ -92,6 +95,55 @@ public class S3AFileSystem extends FileSystem { // The maximum number of entries that can be deleted in any call to s3 private static final int MAX_ENTRIES_TO_DELETE = 1000; + private static final AtomicInteger poolNumber = new AtomicInteger(1); + /** + * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely, + * with a common prefix. + * @param prefix The prefix of every created Thread's name + * @return a {@link java.util.concurrent.ThreadFactory} that names threads + */ + public static ThreadFactory getNamedThreadFactory(final String prefix) { + SecurityManager s = System.getSecurityManager(); + final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread() + .getThreadGroup(); + + return new ThreadFactory() { + final AtomicInteger threadNumber = new AtomicInteger(1); + private final int poolNum = poolNumber.getAndIncrement(); + final ThreadGroup group = threadGroup; + + @Override + public Thread newThread(Runnable r) { + final String name = prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement(); + return new Thread(group, r, name); + } + }; + } + + /** + * Get a named {@link ThreadFactory} that just builds daemon threads. + * @param prefix name prefix for all threads created from the factory + * @return a thread factory that creates named, daemon threads with + * the supplied exception handler and normal priority + */ + private static ThreadFactory newDaemonThreadFactory(final String prefix) { + final ThreadFactory namedFactory = getNamedThreadFactory(prefix); + return new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = namedFactory.newThread(r); + if (!t.isDaemon()) { + t.setDaemon(true); + } + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } + + }; + } + /** Called after a new FileSystem instance is constructed. * @param name a uri whose authority section names the host, port, etc. * for this FileSystem @@ -212,19 +264,25 @@ public void initialize(URI name, Configuration conf) throws IOException { } int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS); - if (maxThreads < 2) { - LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2."); - maxThreads = 2; + int coreThreads = conf.getInt(CORE_THREADS, DEFAULT_CORE_THREADS); + if (maxThreads == 0) { + maxThreads = Runtime.getRuntime().availableProcessors() * 8; } - int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS); - if (totalTasks < 1) { - LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1."); - totalTasks = 1; + if (coreThreads == 0) { + coreThreads = Runtime.getRuntime().availableProcessors() * 8; } long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME); - threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads, - maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS, - "s3a-transfer-shared"); + LinkedBlockingQueue workQueue = + new LinkedBlockingQueue<>(maxThreads * + conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS)); + threadPoolExecutor = new ThreadPoolExecutor( + coreThreads, + maxThreads, + keepAliveTime, + TimeUnit.SECONDS, + workQueue, + newDaemonThreadFactory("s3a-transfer-shared-")); + threadPoolExecutor.allowCoreThreadTimeOut(true); TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration(); transferConfiguration.setMinimumUploadPartSize(partSize); diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index 0260e26d4bf..6df15e68cf4 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -231,11 +231,17 @@ If you do any of these: change your credentials immediately! fs.s3a.threads.max - 10 + 256 Maximum number of concurrent active (part)uploads, which each use a thread from the threadpool. + + fs.s3a.threads.core + 15 + Number of core threads in the threadpool. + + fs.s3a.threads.keepalivetime 60 @@ -245,7 +251,7 @@ If you do any of these: change your credentials immediately! fs.s3a.max.total.tasks - 5 + 1000 Number of (part)uploads allowed to the queue before blocking additional uploads. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java deleted file mode 100644 index 25a895862ea..00000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a; - -import com.google.common.util.concurrent.ListenableFuture; -import org.apache.hadoop.util.StopWatch; -import org.junit.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - -/** - * Basic unit test for S3A's blocking executor service. - */ -public class TestBlockingThreadPoolExecutorService { - - private static final Logger LOG = LoggerFactory.getLogger( - BlockingThreadPoolExecutorService.class); - - private static final int NUM_ACTIVE_TASKS = 4; - private static final int NUM_WAITING_TASKS = 2; - private static final int TASK_SLEEP_MSEC = 100; - private static final int SHUTDOWN_WAIT_MSEC = 200; - private static final int SHUTDOWN_WAIT_TRIES = 5; - private static final int BLOCKING_THRESHOLD_MSEC = 50; - - private static final Integer SOME_VALUE = 1337; - - private static BlockingThreadPoolExecutorService tpe = null; - - @AfterClass - public static void afterClass() throws Exception { - ensureDestroyed(); - } - - /** - * Basic test of running one trivial task. - */ - @Test - public void testSubmitCallable() throws Exception { - ensureCreated(); - ListenableFuture f = tpe.submit(callableSleeper); - Integer v = f.get(); - assertEquals(SOME_VALUE, v); - } - - /** - * More involved test, including detecting blocking when at capacity. - */ - @Test - public void testSubmitRunnable() throws Exception { - ensureCreated(); - int totalTasks = NUM_ACTIVE_TASKS + NUM_WAITING_TASKS; - StopWatch stopWatch = new StopWatch().start(); - for (int i = 0; i < totalTasks; i++) { - tpe.submit(sleeper); - assertDidntBlock(stopWatch); - } - tpe.submit(sleeper); - assertDidBlock(stopWatch); - } - - @Test - public void testShutdown() throws Exception { - // Cover create / destroy, regardless of when this test case runs - ensureCreated(); - ensureDestroyed(); - - // Cover create, execute, destroy, regardless of when test case runs - ensureCreated(); - testSubmitRunnable(); - ensureDestroyed(); - } - - // Helper functions, etc. - - private void assertDidntBlock(StopWatch sw) { - try { - assertFalse("Non-blocking call took too long.", - sw.now(TimeUnit.MILLISECONDS) > BLOCKING_THRESHOLD_MSEC); - } finally { - sw.reset().start(); - } - } - - private void assertDidBlock(StopWatch sw) { - try { - if (sw.now(TimeUnit.MILLISECONDS) < BLOCKING_THRESHOLD_MSEC) { - throw new RuntimeException("Blocking call returned too fast."); - } - } finally { - sw.reset().start(); - } - } - - private Runnable sleeper = new Runnable() { - @Override - public void run() { - String name = Thread.currentThread().getName(); - try { - Thread.sleep(TASK_SLEEP_MSEC); - } catch (InterruptedException e) { - LOG.info("Thread {} interrupted.", name); - Thread.currentThread().interrupt(); - } - } - }; - - private Callable callableSleeper = new Callable() { - @Override - public Integer call() throws Exception { - sleeper.run(); - return SOME_VALUE; - } - }; - - /** - * Helper function to create thread pool under test. - */ - private static void ensureCreated() throws Exception { - if (tpe == null) { - LOG.debug("Creating thread pool"); - tpe = new BlockingThreadPoolExecutorService(NUM_ACTIVE_TASKS, - NUM_WAITING_TASKS, 1, TimeUnit.SECONDS, "btpetest"); - } - } - - /** - * Helper function to terminate thread pool under test, asserting that - * shutdown -> terminate works as expected. - */ - private static void ensureDestroyed() throws Exception { - if (tpe == null) { - return; - } - int shutdownTries = SHUTDOWN_WAIT_TRIES; - - tpe.shutdown(); - if (!tpe.isShutdown()) { - throw new RuntimeException("Shutdown had no effect."); - } - - while (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC, - TimeUnit.MILLISECONDS)) { - LOG.info("Waiting for thread pool shutdown."); - if (shutdownTries-- <= 0) { - LOG.error("Failed to terminate thread pool gracefully."); - break; - } - } - if (!tpe.isTerminated()) { - tpe.shutdownNow(); - if (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC, - TimeUnit.MILLISECONDS)) { - throw new RuntimeException( - "Failed to terminate thread pool in timely manner."); - } - } - tpe = null; - } -} \ No newline at end of file diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockingThreadPool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockingThreadPool.java deleted file mode 100644 index bd738b25423..00000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockingThreadPool.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.contract.ContractTestUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.Timeout; - -/** - * Demonstrate that the threadpool blocks additional client requests if - * its queue is full (rather than throwing an exception) by initiating an - * upload consisting of 4 parts with 2 threads and 1 spot in the queue. The - * 4th part should not trigger an exception as it would with a - * non-blocking threadpool. - */ -public class TestS3ABlockingThreadPool { - - private Configuration conf; - private S3AFileSystem fs; - - @Rule - public Timeout testTimeout = new Timeout(30 * 60 * 1000); - - protected Path getTestPath() { - return new Path("/tests3a"); - } - - @Before - public void setUp() throws Exception { - conf = new Configuration(); - conf.setLong(Constants.MIN_MULTIPART_THRESHOLD, 5 * 1024 * 1024); - conf.setLong(Constants.MULTIPART_SIZE, 5 * 1024 * 1024); - conf.setInt(Constants.MAX_THREADS, 2); - conf.setInt(Constants.MAX_TOTAL_TASKS, 1); - } - - @After - public void tearDown() throws Exception { - if (fs != null) { - fs.delete(getTestPath(), true); - } - } - - @Test - public void testRegularMultiPartUpload() throws Exception { - fs = S3ATestUtils.createTestFileSystem(conf); - ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 * - 1024); - } - - @Test - public void testFastMultiPartUpload() throws Exception { - conf.setBoolean(Constants.FAST_UPLOAD, true); - fs = S3ATestUtils.createTestFileSystem(conf); - ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 * - 1024); - - } -}