HADOOP-11684. S3a to use thread pool that blocks clients. (Thomas Demoor and Aaron Fabbri via lei)

(cherry picked from commit bff7c90a56)
This commit is contained in:
Lei Xu 2015-11-05 18:33:53 -08:00
parent 1bb89cbac7
commit 01ae30796d
9 changed files with 561 additions and 97 deletions

View File

@ -329,6 +329,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-11685. StorageException complaining " no lease ID" during HBase HADOOP-11685. StorageException complaining " no lease ID" during HBase
distributed log splitting (Duo Xu via cnauroth) distributed log splitting (Duo Xu via cnauroth)
HADOOP-11684. S3a to use thread pool that blocks clients. (Thomas Demoor
and Aaron Fabbri via lei)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp HADOOP-11785. Reduce the number of listStatus operation in distcp

View File

@ -844,17 +844,11 @@ for ldap providers in the same way as above does.
<property> <property>
<name>fs.s3a.threads.max</name> <name>fs.s3a.threads.max</name>
<value>256</value> <value>10</value>
<description> Maximum number of concurrent active (part)uploads, <description> Maximum number of concurrent active (part)uploads,
which each use a thread from the threadpool.</description> which each use a thread from the threadpool.</description>
</property> </property>
<property>
<name>fs.s3a.threads.core</name>
<value>15</value>
<description>Number of core threads in the threadpool.</description>
</property>
<property> <property>
<name>fs.s3a.threads.keepalivetime</name> <name>fs.s3a.threads.keepalivetime</name>
<value>60</value> <value>60</value>
@ -864,7 +858,7 @@ for ldap providers in the same way as above does.
<property> <property>
<name>fs.s3a.max.total.tasks</name> <name>fs.s3a.max.total.tasks</name>
<value>1000</value> <value>5</value>
<description>Number of (part)uploads allowed to the queue before <description>Number of (part)uploads allowed to the queue before
blocking additional uploads.</description> blocking additional uploads.</description>
</property> </property>

View File

@ -0,0 +1,274 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ForwardingListeningExecutorService;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
/**
* This ExecutorService blocks the submission of new tasks when its queue is
* already full by using a semaphore. Task submissions require permits, task
* completions release permits.
* <p>
* This is inspired by <a href = "https://github
* .com/apache/incubator-s4/blob/master/subprojects
* /s4-comm/src/main/java/org/apache/s4/comm/staging
* /BlockingThreadPoolExecutorService.java"> this s4 threadpool</a>
*/
public class BlockingThreadPoolExecutorService
extends ForwardingListeningExecutorService {
private static Logger LOG = LoggerFactory
.getLogger(BlockingThreadPoolExecutorService.class);
private Semaphore queueingPermits;
private ListeningExecutorService executorDelegatee;
private static final AtomicInteger POOLNUMBER = new AtomicInteger(1);
/**
* Returns a {@link java.util.concurrent.ThreadFactory} that names each
* created thread uniquely,
* with a common prefix.
*
* @param prefix The prefix of every created Thread's name
* @return a {@link java.util.concurrent.ThreadFactory} that names threads
*/
public static ThreadFactory getNamedThreadFactory(final String prefix) {
SecurityManager s = System.getSecurityManager();
final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
return new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final int poolNum = POOLNUMBER.getAndIncrement();
private final ThreadGroup group = threadGroup;
@Override
public Thread newThread(Runnable r) {
final String name =
prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement();
return new Thread(group, r, name);
}
};
}
/**
* Get a named {@link ThreadFactory} that just builds daemon threads.
*
* @param prefix name prefix for all threads created from the factory
* @return a thread factory that creates named, daemon threads with
* the supplied exception handler and normal priority
*/
private static ThreadFactory newDaemonThreadFactory(final String prefix) {
final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
return new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = namedFactory.newThread(r);
if (!t.isDaemon()) {
t.setDaemon(true);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
};
}
/**
* A thread pool that that blocks clients submitting additional tasks if
* there are already {@code activeTasks} running threads and {@code
* waitingTasks} tasks waiting in its queue.
*
* @param activeTasks maximum number of active tasks
* @param waitingTasks maximum number of waiting tasks
* @param keepAliveTime time until threads are cleaned up in {@code unit}
* @param unit time unit
* @param prefixName prefix of name for threads
*/
public BlockingThreadPoolExecutorService(int activeTasks, int waitingTasks,
long keepAliveTime, TimeUnit unit, String prefixName) {
super();
queueingPermits = new Semaphore(waitingTasks + activeTasks, false);
/* Although we generally only expect up to waitingTasks tasks in the
queue, we need to be able to buffer all tasks in case dequeueing is
slower than enqueueing. */
final BlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<>(waitingTasks + activeTasks);
ThreadPoolExecutor eventProcessingExecutor =
new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit,
workQueue, newDaemonThreadFactory(prefixName),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r,
ThreadPoolExecutor executor) {
// This is not expected to happen.
LOG.error("Could not submit task to executor {}",
executor.toString());
}
});
eventProcessingExecutor.allowCoreThreadTimeOut(true);
executorDelegatee =
MoreExecutors.listeningDecorator(eventProcessingExecutor);
}
@Override
protected ListeningExecutorService delegate() {
return executorDelegatee;
}
@Override
public <T> ListenableFuture<T> submit(Callable<T> task) {
try {
queueingPermits.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Futures.immediateFailedCheckedFuture(e);
}
return super.submit(new CallableWithPermitRelease<T>(task));
}
@Override
public <T> ListenableFuture<T> submit(Runnable task, T result) {
try {
queueingPermits.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Futures.immediateFailedCheckedFuture(e);
}
return super.submit(new RunnableWithPermitRelease(task), result);
}
@Override
public ListenableFuture<?> submit(Runnable task) {
try {
queueingPermits.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Futures.immediateFailedCheckedFuture(e);
}
return super.submit(new RunnableWithPermitRelease(task));
}
@Override
public void execute(Runnable command) {
try {
queueingPermits.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
super.execute(new RunnableWithPermitRelease(command));
}
/**
* Releases a permit after the task is executed.
*/
class RunnableWithPermitRelease implements Runnable {
private Runnable delegatee;
public RunnableWithPermitRelease(Runnable delegatee) {
this.delegatee = delegatee;
}
@Override
public void run() {
try {
delegatee.run();
} finally {
queueingPermits.release();
}
}
}
/**
* Releases a permit after the task is completed.
*/
class CallableWithPermitRelease<T> implements Callable<T> {
private Callable<T> delegatee;
public CallableWithPermitRelease(Callable<T> delegatee) {
this.delegatee = delegatee;
}
@Override
public T call() throws Exception {
try {
return delegatee.call();
} finally {
queueingPermits.release();
}
}
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
throw new RuntimeException("Not implemented");
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) throws InterruptedException {
throw new RuntimeException("Not implemented");
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
throw new RuntimeException("Not implemented");
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
throw new RuntimeException("Not implemented");
}
}

View File

@ -61,20 +61,15 @@ public class Constants {
// the maximum number of threads to allow in the pool used by TransferManager // the maximum number of threads to allow in the pool used by TransferManager
public static final String MAX_THREADS = "fs.s3a.threads.max"; public static final String MAX_THREADS = "fs.s3a.threads.max";
public static final int DEFAULT_MAX_THREADS = 256; public static final int DEFAULT_MAX_THREADS = 10;
// the number of threads to keep in the pool used by TransferManager // the time an idle thread waits before terminating
public static final String CORE_THREADS = "fs.s3a.threads.core";
public static final int DEFAULT_CORE_THREADS = DEFAULT_MAXIMUM_CONNECTIONS;
// when the number of threads is greater than the core, this is the maximum time
// that excess idle threads will wait for new tasks before terminating.
public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime"; public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime";
public static final int DEFAULT_KEEPALIVE_TIME = 60; public static final int DEFAULT_KEEPALIVE_TIME = 60;
// the maximum number of tasks that the LinkedBlockingQueue can hold // the maximum number of tasks cached if all threads are already uploading
public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks"; public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks";
public static final int DEFAULT_MAX_TOTAL_TASKS = 1000; public static final int DEFAULT_MAX_TOTAL_TASKS = 5;
// size of each of or multipart pieces in bytes // size of each of or multipart pieces in bytes
public static final String MULTIPART_SIZE = "fs.s3a.multipart.size"; public static final String MULTIPART_SIZE = "fs.s3a.multipart.size";

View File

@ -51,7 +51,7 @@ import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ExecutorService;
/** /**
@ -108,7 +108,7 @@ public class S3AFastOutputStream extends OutputStream {
String bucket, String key, Progressable progress, String bucket, String key, Progressable progress,
FileSystem.Statistics statistics, CannedAccessControlList cannedACL, FileSystem.Statistics statistics, CannedAccessControlList cannedACL,
String serverSideEncryptionAlgorithm, long partSize, String serverSideEncryptionAlgorithm, long partSize,
long multiPartThreshold, ThreadPoolExecutor threadPoolExecutor) long multiPartThreshold, ExecutorService threadPoolExecutor)
throws IOException { throws IOException {
this.bucket = bucket; this.bucket = bucket;
this.key = key; this.key = key;

View File

@ -26,11 +26,8 @@ import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException; import com.amazonaws.AmazonServiceException;
@ -86,7 +83,7 @@ public class S3AFileSystem extends FileSystem {
private int maxKeys; private int maxKeys;
private long partSize; private long partSize;
private TransferManager transfers; private TransferManager transfers;
private ThreadPoolExecutor threadPoolExecutor; private ExecutorService threadPoolExecutor;
private long multiPartThreshold; private long multiPartThreshold;
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class); public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
private CannedAccessControlList cannedACL; private CannedAccessControlList cannedACL;
@ -95,55 +92,6 @@ public class S3AFileSystem extends FileSystem {
// The maximum number of entries that can be deleted in any call to s3 // The maximum number of entries that can be deleted in any call to s3
private static final int MAX_ENTRIES_TO_DELETE = 1000; private static final int MAX_ENTRIES_TO_DELETE = 1000;
private static final AtomicInteger poolNumber = new AtomicInteger(1);
/**
* Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
* with a common prefix.
* @param prefix The prefix of every created Thread's name
* @return a {@link java.util.concurrent.ThreadFactory} that names threads
*/
public static ThreadFactory getNamedThreadFactory(final String prefix) {
SecurityManager s = System.getSecurityManager();
final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
.getThreadGroup();
return new ThreadFactory() {
final AtomicInteger threadNumber = new AtomicInteger(1);
private final int poolNum = poolNumber.getAndIncrement();
final ThreadGroup group = threadGroup;
@Override
public Thread newThread(Runnable r) {
final String name = prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement();
return new Thread(group, r, name);
}
};
}
/**
* Get a named {@link ThreadFactory} that just builds daemon threads.
* @param prefix name prefix for all threads created from the factory
* @return a thread factory that creates named, daemon threads with
* the supplied exception handler and normal priority
*/
private static ThreadFactory newDaemonThreadFactory(final String prefix) {
final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
return new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = namedFactory.newThread(r);
if (!t.isDaemon()) {
t.setDaemon(true);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
};
}
/** Called after a new FileSystem instance is constructed. /** Called after a new FileSystem instance is constructed.
* @param name a uri whose authority section names the host, port, etc. * @param name a uri whose authority section names the host, port, etc.
* for this FileSystem * for this FileSystem
@ -264,25 +212,19 @@ public class S3AFileSystem extends FileSystem {
} }
int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS); int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
int coreThreads = conf.getInt(CORE_THREADS, DEFAULT_CORE_THREADS); if (maxThreads < 2) {
if (maxThreads == 0) { LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
maxThreads = Runtime.getRuntime().availableProcessors() * 8; maxThreads = 2;
} }
if (coreThreads == 0) { int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS);
coreThreads = Runtime.getRuntime().availableProcessors() * 8; if (totalTasks < 1) {
LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1.");
totalTasks = 1;
} }
long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME); long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
LinkedBlockingQueue<Runnable> workQueue = threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads,
new LinkedBlockingQueue<>(maxThreads * maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS,
conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS)); "s3a-transfer-shared");
threadPoolExecutor = new ThreadPoolExecutor(
coreThreads,
maxThreads,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
newDaemonThreadFactory("s3a-transfer-shared-"));
threadPoolExecutor.allowCoreThreadTimeOut(true);
TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration(); TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
transferConfiguration.setMinimumUploadPartSize(partSize); transferConfiguration.setMinimumUploadPartSize(partSize);

View File

@ -231,17 +231,11 @@ If you do any of these: change your credentials immediately!
<property> <property>
<name>fs.s3a.threads.max</name> <name>fs.s3a.threads.max</name>
<value>256</value> <value>10</value>
<description> Maximum number of concurrent active (part)uploads, <description> Maximum number of concurrent active (part)uploads,
which each use a thread from the threadpool.</description> which each use a thread from the threadpool.</description>
</property> </property>
<property>
<name>fs.s3a.threads.core</name>
<value>15</value>
<description>Number of core threads in the threadpool.</description>
</property>
<property> <property>
<name>fs.s3a.threads.keepalivetime</name> <name>fs.s3a.threads.keepalivetime</name>
<value>60</value> <value>60</value>
@ -251,7 +245,7 @@ If you do any of these: change your credentials immediately!
<property> <property>
<name>fs.s3a.max.total.tasks</name> <name>fs.s3a.max.total.tasks</name>
<value>1000</value> <value>5</value>
<description>Number of (part)uploads allowed to the queue before <description>Number of (part)uploads allowed to the queue before
blocking additional uploads.</description> blocking additional uploads.</description>
</property> </property>

View File

@ -0,0 +1,182 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.util.StopWatch;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
/**
* Basic unit test for S3A's blocking executor service.
*/
public class TestBlockingThreadPoolExecutorService {
private static final Logger LOG = LoggerFactory.getLogger(
BlockingThreadPoolExecutorService.class);
private static final int NUM_ACTIVE_TASKS = 4;
private static final int NUM_WAITING_TASKS = 2;
private static final int TASK_SLEEP_MSEC = 100;
private static final int SHUTDOWN_WAIT_MSEC = 200;
private static final int SHUTDOWN_WAIT_TRIES = 5;
private static final int BLOCKING_THRESHOLD_MSEC = 50;
private static final Integer SOME_VALUE = 1337;
private static BlockingThreadPoolExecutorService tpe = null;
@AfterClass
public static void afterClass() throws Exception {
ensureDestroyed();
}
/**
* Basic test of running one trivial task.
*/
@Test
public void testSubmitCallable() throws Exception {
ensureCreated();
ListenableFuture<Integer> f = tpe.submit(callableSleeper);
Integer v = f.get();
assertEquals(SOME_VALUE, v);
}
/**
* More involved test, including detecting blocking when at capacity.
*/
@Test
public void testSubmitRunnable() throws Exception {
ensureCreated();
int totalTasks = NUM_ACTIVE_TASKS + NUM_WAITING_TASKS;
StopWatch stopWatch = new StopWatch().start();
for (int i = 0; i < totalTasks; i++) {
tpe.submit(sleeper);
assertDidntBlock(stopWatch);
}
tpe.submit(sleeper);
assertDidBlock(stopWatch);
}
@Test
public void testShutdown() throws Exception {
// Cover create / destroy, regardless of when this test case runs
ensureCreated();
ensureDestroyed();
// Cover create, execute, destroy, regardless of when test case runs
ensureCreated();
testSubmitRunnable();
ensureDestroyed();
}
// Helper functions, etc.
private void assertDidntBlock(StopWatch sw) {
try {
assertFalse("Non-blocking call took too long.",
sw.now(TimeUnit.MILLISECONDS) > BLOCKING_THRESHOLD_MSEC);
} finally {
sw.reset().start();
}
}
private void assertDidBlock(StopWatch sw) {
try {
if (sw.now(TimeUnit.MILLISECONDS) < BLOCKING_THRESHOLD_MSEC) {
throw new RuntimeException("Blocking call returned too fast.");
}
} finally {
sw.reset().start();
}
}
private Runnable sleeper = new Runnable() {
@Override
public void run() {
String name = Thread.currentThread().getName();
try {
Thread.sleep(TASK_SLEEP_MSEC);
} catch (InterruptedException e) {
LOG.info("Thread {} interrupted.", name);
Thread.currentThread().interrupt();
}
}
};
private Callable<Integer> callableSleeper = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
sleeper.run();
return SOME_VALUE;
}
};
/**
* Helper function to create thread pool under test.
*/
private static void ensureCreated() throws Exception {
if (tpe == null) {
LOG.debug("Creating thread pool");
tpe = new BlockingThreadPoolExecutorService(NUM_ACTIVE_TASKS,
NUM_WAITING_TASKS, 1, TimeUnit.SECONDS, "btpetest");
}
}
/**
* Helper function to terminate thread pool under test, asserting that
* shutdown -> terminate works as expected.
*/
private static void ensureDestroyed() throws Exception {
if (tpe == null) {
return;
}
int shutdownTries = SHUTDOWN_WAIT_TRIES;
tpe.shutdown();
if (!tpe.isShutdown()) {
throw new RuntimeException("Shutdown had no effect.");
}
while (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC,
TimeUnit.MILLISECONDS)) {
LOG.info("Waiting for thread pool shutdown.");
if (shutdownTries-- <= 0) {
LOG.error("Failed to terminate thread pool gracefully.");
break;
}
}
if (!tpe.isTerminated()) {
tpe.shutdownNow();
if (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC,
TimeUnit.MILLISECONDS)) {
throw new RuntimeException(
"Failed to terminate thread pool in timely manner.");
}
}
tpe = null;
}
}

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
/**
* Demonstrate that the threadpool blocks additional client requests if
* its queue is full (rather than throwing an exception) by initiating an
* upload consisting of 4 parts with 2 threads and 1 spot in the queue. The
* 4th part should not trigger an exception as it would with a
* non-blocking threadpool.
*/
public class TestS3ABlockingThreadPool {
private Configuration conf;
private S3AFileSystem fs;
@Rule
public Timeout testTimeout = new Timeout(30 * 60 * 1000);
protected Path getTestPath() {
return new Path("/tests3a");
}
@Before
public void setUp() throws Exception {
conf = new Configuration();
conf.setLong(Constants.MIN_MULTIPART_THRESHOLD, 5 * 1024 * 1024);
conf.setLong(Constants.MULTIPART_SIZE, 5 * 1024 * 1024);
conf.setInt(Constants.MAX_THREADS, 2);
conf.setInt(Constants.MAX_TOTAL_TASKS, 1);
}
@After
public void tearDown() throws Exception {
if (fs != null) {
fs.delete(getTestPath(), true);
}
}
@Test
public void testRegularMultiPartUpload() throws Exception {
fs = S3ATestUtils.createTestFileSystem(conf);
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 *
1024);
}
@Test
public void testFastMultiPartUpload() throws Exception {
conf.setBoolean(Constants.FAST_UPLOAD, true);
fs = S3ATestUtils.createTestFileSystem(conf);
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 *
1024);
}
}