Revert "HADOOP-11684. S3a to use thread pool that blocks clients. (Thomas Demoor and Aaron Fabbri via lei)"
This reverts commit 01ae30796d
.
This commit is contained in:
parent
836bfdd646
commit
47941858af
|
@ -326,9 +326,6 @@ Release 2.8.0 - UNRELEASED
|
||||||
HADOOP-11685. StorageException complaining " no lease ID" during HBase
|
HADOOP-11685. StorageException complaining " no lease ID" during HBase
|
||||||
distributed log splitting (Duo Xu via cnauroth)
|
distributed log splitting (Duo Xu via cnauroth)
|
||||||
|
|
||||||
HADOOP-11684. S3a to use thread pool that blocks clients. (Thomas Demoor
|
|
||||||
and Aaron Fabbri via lei)
|
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-11785. Reduce the number of listStatus operation in distcp
|
HADOOP-11785. Reduce the number of listStatus operation in distcp
|
||||||
|
|
|
@ -844,11 +844,17 @@ for ldap providers in the same way as above does.
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.threads.max</name>
|
<name>fs.s3a.threads.max</name>
|
||||||
<value>10</value>
|
<value>256</value>
|
||||||
<description> Maximum number of concurrent active (part)uploads,
|
<description> Maximum number of concurrent active (part)uploads,
|
||||||
which each use a thread from the threadpool.</description>
|
which each use a thread from the threadpool.</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.threads.core</name>
|
||||||
|
<value>15</value>
|
||||||
|
<description>Number of core threads in the threadpool.</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.threads.keepalivetime</name>
|
<name>fs.s3a.threads.keepalivetime</name>
|
||||||
<value>60</value>
|
<value>60</value>
|
||||||
|
@ -858,7 +864,7 @@ for ldap providers in the same way as above does.
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.max.total.tasks</name>
|
<name>fs.s3a.max.total.tasks</name>
|
||||||
<value>5</value>
|
<value>1000</value>
|
||||||
<description>Number of (part)uploads allowed to the queue before
|
<description>Number of (part)uploads allowed to the queue before
|
||||||
blocking additional uploads.</description>
|
blocking additional uploads.</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
|
@ -1,274 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.fs.s3a;
|
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import java.util.concurrent.RejectedExecutionHandler;
|
|
||||||
import java.util.concurrent.Semaphore;
|
|
||||||
import java.util.concurrent.ThreadFactory;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ForwardingListeningExecutorService;
|
|
||||||
import com.google.common.util.concurrent.Futures;
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
|
||||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This ExecutorService blocks the submission of new tasks when its queue is
|
|
||||||
* already full by using a semaphore. Task submissions require permits, task
|
|
||||||
* completions release permits.
|
|
||||||
* <p>
|
|
||||||
* This is inspired by <a href = "https://github
|
|
||||||
* .com/apache/incubator-s4/blob/master/subprojects
|
|
||||||
* /s4-comm/src/main/java/org/apache/s4/comm/staging
|
|
||||||
* /BlockingThreadPoolExecutorService.java"> this s4 threadpool</a>
|
|
||||||
*/
|
|
||||||
public class BlockingThreadPoolExecutorService
|
|
||||||
extends ForwardingListeningExecutorService {
|
|
||||||
|
|
||||||
private static Logger LOG = LoggerFactory
|
|
||||||
.getLogger(BlockingThreadPoolExecutorService.class);
|
|
||||||
|
|
||||||
private Semaphore queueingPermits;
|
|
||||||
private ListeningExecutorService executorDelegatee;
|
|
||||||
|
|
||||||
private static final AtomicInteger POOLNUMBER = new AtomicInteger(1);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a {@link java.util.concurrent.ThreadFactory} that names each
|
|
||||||
* created thread uniquely,
|
|
||||||
* with a common prefix.
|
|
||||||
*
|
|
||||||
* @param prefix The prefix of every created Thread's name
|
|
||||||
* @return a {@link java.util.concurrent.ThreadFactory} that names threads
|
|
||||||
*/
|
|
||||||
public static ThreadFactory getNamedThreadFactory(final String prefix) {
|
|
||||||
SecurityManager s = System.getSecurityManager();
|
|
||||||
final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() :
|
|
||||||
Thread.currentThread().getThreadGroup();
|
|
||||||
|
|
||||||
return new ThreadFactory() {
|
|
||||||
private final AtomicInteger threadNumber = new AtomicInteger(1);
|
|
||||||
private final int poolNum = POOLNUMBER.getAndIncrement();
|
|
||||||
private final ThreadGroup group = threadGroup;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Thread newThread(Runnable r) {
|
|
||||||
final String name =
|
|
||||||
prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement();
|
|
||||||
return new Thread(group, r, name);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get a named {@link ThreadFactory} that just builds daemon threads.
|
|
||||||
*
|
|
||||||
* @param prefix name prefix for all threads created from the factory
|
|
||||||
* @return a thread factory that creates named, daemon threads with
|
|
||||||
* the supplied exception handler and normal priority
|
|
||||||
*/
|
|
||||||
private static ThreadFactory newDaemonThreadFactory(final String prefix) {
|
|
||||||
final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
|
|
||||||
return new ThreadFactory() {
|
|
||||||
@Override
|
|
||||||
public Thread newThread(Runnable r) {
|
|
||||||
Thread t = namedFactory.newThread(r);
|
|
||||||
if (!t.isDaemon()) {
|
|
||||||
t.setDaemon(true);
|
|
||||||
}
|
|
||||||
if (t.getPriority() != Thread.NORM_PRIORITY) {
|
|
||||||
t.setPriority(Thread.NORM_PRIORITY);
|
|
||||||
}
|
|
||||||
return t;
|
|
||||||
}
|
|
||||||
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A thread pool that that blocks clients submitting additional tasks if
|
|
||||||
* there are already {@code activeTasks} running threads and {@code
|
|
||||||
* waitingTasks} tasks waiting in its queue.
|
|
||||||
*
|
|
||||||
* @param activeTasks maximum number of active tasks
|
|
||||||
* @param waitingTasks maximum number of waiting tasks
|
|
||||||
* @param keepAliveTime time until threads are cleaned up in {@code unit}
|
|
||||||
* @param unit time unit
|
|
||||||
* @param prefixName prefix of name for threads
|
|
||||||
*/
|
|
||||||
public BlockingThreadPoolExecutorService(int activeTasks, int waitingTasks,
|
|
||||||
long keepAliveTime, TimeUnit unit, String prefixName) {
|
|
||||||
super();
|
|
||||||
queueingPermits = new Semaphore(waitingTasks + activeTasks, false);
|
|
||||||
/* Although we generally only expect up to waitingTasks tasks in the
|
|
||||||
queue, we need to be able to buffer all tasks in case dequeueing is
|
|
||||||
slower than enqueueing. */
|
|
||||||
final BlockingQueue<Runnable> workQueue =
|
|
||||||
new LinkedBlockingQueue<>(waitingTasks + activeTasks);
|
|
||||||
ThreadPoolExecutor eventProcessingExecutor =
|
|
||||||
new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit,
|
|
||||||
workQueue, newDaemonThreadFactory(prefixName),
|
|
||||||
new RejectedExecutionHandler() {
|
|
||||||
@Override
|
|
||||||
public void rejectedExecution(Runnable r,
|
|
||||||
ThreadPoolExecutor executor) {
|
|
||||||
// This is not expected to happen.
|
|
||||||
LOG.error("Could not submit task to executor {}",
|
|
||||||
executor.toString());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
eventProcessingExecutor.allowCoreThreadTimeOut(true);
|
|
||||||
executorDelegatee =
|
|
||||||
MoreExecutors.listeningDecorator(eventProcessingExecutor);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ListeningExecutorService delegate() {
|
|
||||||
return executorDelegatee;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <T> ListenableFuture<T> submit(Callable<T> task) {
|
|
||||||
try {
|
|
||||||
queueingPermits.acquire();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
return Futures.immediateFailedCheckedFuture(e);
|
|
||||||
}
|
|
||||||
return super.submit(new CallableWithPermitRelease<T>(task));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <T> ListenableFuture<T> submit(Runnable task, T result) {
|
|
||||||
try {
|
|
||||||
queueingPermits.acquire();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
return Futures.immediateFailedCheckedFuture(e);
|
|
||||||
}
|
|
||||||
return super.submit(new RunnableWithPermitRelease(task), result);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ListenableFuture<?> submit(Runnable task) {
|
|
||||||
try {
|
|
||||||
queueingPermits.acquire();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
return Futures.immediateFailedCheckedFuture(e);
|
|
||||||
}
|
|
||||||
return super.submit(new RunnableWithPermitRelease(task));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void execute(Runnable command) {
|
|
||||||
try {
|
|
||||||
queueingPermits.acquire();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
super.execute(new RunnableWithPermitRelease(command));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Releases a permit after the task is executed.
|
|
||||||
*/
|
|
||||||
class RunnableWithPermitRelease implements Runnable {
|
|
||||||
|
|
||||||
private Runnable delegatee;
|
|
||||||
|
|
||||||
public RunnableWithPermitRelease(Runnable delegatee) {
|
|
||||||
this.delegatee = delegatee;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
delegatee.run();
|
|
||||||
} finally {
|
|
||||||
queueingPermits.release();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Releases a permit after the task is completed.
|
|
||||||
*/
|
|
||||||
class CallableWithPermitRelease<T> implements Callable<T> {
|
|
||||||
|
|
||||||
private Callable<T> delegatee;
|
|
||||||
|
|
||||||
public CallableWithPermitRelease(Callable<T> delegatee) {
|
|
||||||
this.delegatee = delegatee;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public T call() throws Exception {
|
|
||||||
try {
|
|
||||||
return delegatee.call();
|
|
||||||
} finally {
|
|
||||||
queueingPermits.release();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
|
|
||||||
throws InterruptedException {
|
|
||||||
throw new RuntimeException("Not implemented");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
|
|
||||||
long timeout, TimeUnit unit) throws InterruptedException {
|
|
||||||
throw new RuntimeException("Not implemented");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
|
|
||||||
throws InterruptedException, ExecutionException {
|
|
||||||
throw new RuntimeException("Not implemented");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
|
|
||||||
TimeUnit unit)
|
|
||||||
throws InterruptedException, ExecutionException, TimeoutException {
|
|
||||||
throw new RuntimeException("Not implemented");
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -61,15 +61,20 @@ public class Constants {
|
||||||
|
|
||||||
// the maximum number of threads to allow in the pool used by TransferManager
|
// the maximum number of threads to allow in the pool used by TransferManager
|
||||||
public static final String MAX_THREADS = "fs.s3a.threads.max";
|
public static final String MAX_THREADS = "fs.s3a.threads.max";
|
||||||
public static final int DEFAULT_MAX_THREADS = 10;
|
public static final int DEFAULT_MAX_THREADS = 256;
|
||||||
|
|
||||||
// the time an idle thread waits before terminating
|
// the number of threads to keep in the pool used by TransferManager
|
||||||
|
public static final String CORE_THREADS = "fs.s3a.threads.core";
|
||||||
|
public static final int DEFAULT_CORE_THREADS = DEFAULT_MAXIMUM_CONNECTIONS;
|
||||||
|
|
||||||
|
// when the number of threads is greater than the core, this is the maximum time
|
||||||
|
// that excess idle threads will wait for new tasks before terminating.
|
||||||
public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime";
|
public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime";
|
||||||
public static final int DEFAULT_KEEPALIVE_TIME = 60;
|
public static final int DEFAULT_KEEPALIVE_TIME = 60;
|
||||||
|
|
||||||
// the maximum number of tasks cached if all threads are already uploading
|
// the maximum number of tasks that the LinkedBlockingQueue can hold
|
||||||
public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks";
|
public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks";
|
||||||
public static final int DEFAULT_MAX_TOTAL_TASKS = 5;
|
public static final int DEFAULT_MAX_TOTAL_TASKS = 1000;
|
||||||
|
|
||||||
// size of each of or multipart pieces in bytes
|
// size of each of or multipart pieces in bytes
|
||||||
public static final String MULTIPART_SIZE = "fs.s3a.multipart.size";
|
public static final String MULTIPART_SIZE = "fs.s3a.multipart.size";
|
||||||
|
|
|
@ -51,7 +51,7 @@ import java.util.List;
|
||||||
|
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -108,7 +108,7 @@ public class S3AFastOutputStream extends OutputStream {
|
||||||
String bucket, String key, Progressable progress,
|
String bucket, String key, Progressable progress,
|
||||||
FileSystem.Statistics statistics, CannedAccessControlList cannedACL,
|
FileSystem.Statistics statistics, CannedAccessControlList cannedACL,
|
||||||
String serverSideEncryptionAlgorithm, long partSize,
|
String serverSideEncryptionAlgorithm, long partSize,
|
||||||
long multiPartThreshold, ExecutorService threadPoolExecutor)
|
long multiPartThreshold, ThreadPoolExecutor threadPoolExecutor)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.bucket = bucket;
|
this.bucket = bucket;
|
||||||
this.key = key;
|
this.key = key;
|
||||||
|
|
|
@ -26,8 +26,11 @@ import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import com.amazonaws.AmazonClientException;
|
import com.amazonaws.AmazonClientException;
|
||||||
import com.amazonaws.AmazonServiceException;
|
import com.amazonaws.AmazonServiceException;
|
||||||
|
@ -83,7 +86,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
private int maxKeys;
|
private int maxKeys;
|
||||||
private long partSize;
|
private long partSize;
|
||||||
private TransferManager transfers;
|
private TransferManager transfers;
|
||||||
private ExecutorService threadPoolExecutor;
|
private ThreadPoolExecutor threadPoolExecutor;
|
||||||
private long multiPartThreshold;
|
private long multiPartThreshold;
|
||||||
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
|
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
|
||||||
private CannedAccessControlList cannedACL;
|
private CannedAccessControlList cannedACL;
|
||||||
|
@ -92,6 +95,55 @@ public class S3AFileSystem extends FileSystem {
|
||||||
// The maximum number of entries that can be deleted in any call to s3
|
// The maximum number of entries that can be deleted in any call to s3
|
||||||
private static final int MAX_ENTRIES_TO_DELETE = 1000;
|
private static final int MAX_ENTRIES_TO_DELETE = 1000;
|
||||||
|
|
||||||
|
private static final AtomicInteger poolNumber = new AtomicInteger(1);
|
||||||
|
/**
|
||||||
|
* Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
|
||||||
|
* with a common prefix.
|
||||||
|
* @param prefix The prefix of every created Thread's name
|
||||||
|
* @return a {@link java.util.concurrent.ThreadFactory} that names threads
|
||||||
|
*/
|
||||||
|
public static ThreadFactory getNamedThreadFactory(final String prefix) {
|
||||||
|
SecurityManager s = System.getSecurityManager();
|
||||||
|
final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
|
||||||
|
.getThreadGroup();
|
||||||
|
|
||||||
|
return new ThreadFactory() {
|
||||||
|
final AtomicInteger threadNumber = new AtomicInteger(1);
|
||||||
|
private final int poolNum = poolNumber.getAndIncrement();
|
||||||
|
final ThreadGroup group = threadGroup;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Thread newThread(Runnable r) {
|
||||||
|
final String name = prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement();
|
||||||
|
return new Thread(group, r, name);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a named {@link ThreadFactory} that just builds daemon threads.
|
||||||
|
* @param prefix name prefix for all threads created from the factory
|
||||||
|
* @return a thread factory that creates named, daemon threads with
|
||||||
|
* the supplied exception handler and normal priority
|
||||||
|
*/
|
||||||
|
private static ThreadFactory newDaemonThreadFactory(final String prefix) {
|
||||||
|
final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
|
||||||
|
return new ThreadFactory() {
|
||||||
|
@Override
|
||||||
|
public Thread newThread(Runnable r) {
|
||||||
|
Thread t = namedFactory.newThread(r);
|
||||||
|
if (!t.isDaemon()) {
|
||||||
|
t.setDaemon(true);
|
||||||
|
}
|
||||||
|
if (t.getPriority() != Thread.NORM_PRIORITY) {
|
||||||
|
t.setPriority(Thread.NORM_PRIORITY);
|
||||||
|
}
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
/** Called after a new FileSystem instance is constructed.
|
/** Called after a new FileSystem instance is constructed.
|
||||||
* @param name a uri whose authority section names the host, port, etc.
|
* @param name a uri whose authority section names the host, port, etc.
|
||||||
* for this FileSystem
|
* for this FileSystem
|
||||||
|
@ -212,19 +264,25 @@ public class S3AFileSystem extends FileSystem {
|
||||||
}
|
}
|
||||||
|
|
||||||
int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
|
int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
|
||||||
if (maxThreads < 2) {
|
int coreThreads = conf.getInt(CORE_THREADS, DEFAULT_CORE_THREADS);
|
||||||
LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
|
if (maxThreads == 0) {
|
||||||
maxThreads = 2;
|
maxThreads = Runtime.getRuntime().availableProcessors() * 8;
|
||||||
}
|
}
|
||||||
int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS);
|
if (coreThreads == 0) {
|
||||||
if (totalTasks < 1) {
|
coreThreads = Runtime.getRuntime().availableProcessors() * 8;
|
||||||
LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1.");
|
|
||||||
totalTasks = 1;
|
|
||||||
}
|
}
|
||||||
long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
|
long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
|
||||||
threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads,
|
LinkedBlockingQueue<Runnable> workQueue =
|
||||||
maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS,
|
new LinkedBlockingQueue<>(maxThreads *
|
||||||
"s3a-transfer-shared");
|
conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS));
|
||||||
|
threadPoolExecutor = new ThreadPoolExecutor(
|
||||||
|
coreThreads,
|
||||||
|
maxThreads,
|
||||||
|
keepAliveTime,
|
||||||
|
TimeUnit.SECONDS,
|
||||||
|
workQueue,
|
||||||
|
newDaemonThreadFactory("s3a-transfer-shared-"));
|
||||||
|
threadPoolExecutor.allowCoreThreadTimeOut(true);
|
||||||
|
|
||||||
TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
|
TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
|
||||||
transferConfiguration.setMinimumUploadPartSize(partSize);
|
transferConfiguration.setMinimumUploadPartSize(partSize);
|
||||||
|
|
|
@ -231,11 +231,17 @@ If you do any of these: change your credentials immediately!
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.threads.max</name>
|
<name>fs.s3a.threads.max</name>
|
||||||
<value>10</value>
|
<value>256</value>
|
||||||
<description> Maximum number of concurrent active (part)uploads,
|
<description> Maximum number of concurrent active (part)uploads,
|
||||||
which each use a thread from the threadpool.</description>
|
which each use a thread from the threadpool.</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.threads.core</name>
|
||||||
|
<value>15</value>
|
||||||
|
<description>Number of core threads in the threadpool.</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.threads.keepalivetime</name>
|
<name>fs.s3a.threads.keepalivetime</name>
|
||||||
<value>60</value>
|
<value>60</value>
|
||||||
|
@ -245,7 +251,7 @@ If you do any of these: change your credentials immediately!
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.max.total.tasks</name>
|
<name>fs.s3a.max.total.tasks</name>
|
||||||
<value>5</value>
|
<value>1000</value>
|
||||||
<description>Number of (part)uploads allowed to the queue before
|
<description>Number of (part)uploads allowed to the queue before
|
||||||
blocking additional uploads.</description>
|
blocking additional uploads.</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
|
@ -1,182 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
* <p/>
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* <p/>
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.fs.s3a;
|
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
|
||||||
import org.apache.hadoop.util.StopWatch;
|
|
||||||
import org.junit.*;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Basic unit test for S3A's blocking executor service.
|
|
||||||
*/
|
|
||||||
public class TestBlockingThreadPoolExecutorService {
|
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(
|
|
||||||
BlockingThreadPoolExecutorService.class);
|
|
||||||
|
|
||||||
private static final int NUM_ACTIVE_TASKS = 4;
|
|
||||||
private static final int NUM_WAITING_TASKS = 2;
|
|
||||||
private static final int TASK_SLEEP_MSEC = 100;
|
|
||||||
private static final int SHUTDOWN_WAIT_MSEC = 200;
|
|
||||||
private static final int SHUTDOWN_WAIT_TRIES = 5;
|
|
||||||
private static final int BLOCKING_THRESHOLD_MSEC = 50;
|
|
||||||
|
|
||||||
private static final Integer SOME_VALUE = 1337;
|
|
||||||
|
|
||||||
private static BlockingThreadPoolExecutorService tpe = null;
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void afterClass() throws Exception {
|
|
||||||
ensureDestroyed();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Basic test of running one trivial task.
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testSubmitCallable() throws Exception {
|
|
||||||
ensureCreated();
|
|
||||||
ListenableFuture<Integer> f = tpe.submit(callableSleeper);
|
|
||||||
Integer v = f.get();
|
|
||||||
assertEquals(SOME_VALUE, v);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* More involved test, including detecting blocking when at capacity.
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testSubmitRunnable() throws Exception {
|
|
||||||
ensureCreated();
|
|
||||||
int totalTasks = NUM_ACTIVE_TASKS + NUM_WAITING_TASKS;
|
|
||||||
StopWatch stopWatch = new StopWatch().start();
|
|
||||||
for (int i = 0; i < totalTasks; i++) {
|
|
||||||
tpe.submit(sleeper);
|
|
||||||
assertDidntBlock(stopWatch);
|
|
||||||
}
|
|
||||||
tpe.submit(sleeper);
|
|
||||||
assertDidBlock(stopWatch);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testShutdown() throws Exception {
|
|
||||||
// Cover create / destroy, regardless of when this test case runs
|
|
||||||
ensureCreated();
|
|
||||||
ensureDestroyed();
|
|
||||||
|
|
||||||
// Cover create, execute, destroy, regardless of when test case runs
|
|
||||||
ensureCreated();
|
|
||||||
testSubmitRunnable();
|
|
||||||
ensureDestroyed();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper functions, etc.
|
|
||||||
|
|
||||||
private void assertDidntBlock(StopWatch sw) {
|
|
||||||
try {
|
|
||||||
assertFalse("Non-blocking call took too long.",
|
|
||||||
sw.now(TimeUnit.MILLISECONDS) > BLOCKING_THRESHOLD_MSEC);
|
|
||||||
} finally {
|
|
||||||
sw.reset().start();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void assertDidBlock(StopWatch sw) {
|
|
||||||
try {
|
|
||||||
if (sw.now(TimeUnit.MILLISECONDS) < BLOCKING_THRESHOLD_MSEC) {
|
|
||||||
throw new RuntimeException("Blocking call returned too fast.");
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
sw.reset().start();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Runnable sleeper = new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
String name = Thread.currentThread().getName();
|
|
||||||
try {
|
|
||||||
Thread.sleep(TASK_SLEEP_MSEC);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.info("Thread {} interrupted.", name);
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
private Callable<Integer> callableSleeper = new Callable<Integer>() {
|
|
||||||
@Override
|
|
||||||
public Integer call() throws Exception {
|
|
||||||
sleeper.run();
|
|
||||||
return SOME_VALUE;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Helper function to create thread pool under test.
|
|
||||||
*/
|
|
||||||
private static void ensureCreated() throws Exception {
|
|
||||||
if (tpe == null) {
|
|
||||||
LOG.debug("Creating thread pool");
|
|
||||||
tpe = new BlockingThreadPoolExecutorService(NUM_ACTIVE_TASKS,
|
|
||||||
NUM_WAITING_TASKS, 1, TimeUnit.SECONDS, "btpetest");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Helper function to terminate thread pool under test, asserting that
|
|
||||||
* shutdown -> terminate works as expected.
|
|
||||||
*/
|
|
||||||
private static void ensureDestroyed() throws Exception {
|
|
||||||
if (tpe == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
int shutdownTries = SHUTDOWN_WAIT_TRIES;
|
|
||||||
|
|
||||||
tpe.shutdown();
|
|
||||||
if (!tpe.isShutdown()) {
|
|
||||||
throw new RuntimeException("Shutdown had no effect.");
|
|
||||||
}
|
|
||||||
|
|
||||||
while (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC,
|
|
||||||
TimeUnit.MILLISECONDS)) {
|
|
||||||
LOG.info("Waiting for thread pool shutdown.");
|
|
||||||
if (shutdownTries-- <= 0) {
|
|
||||||
LOG.error("Failed to terminate thread pool gracefully.");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!tpe.isTerminated()) {
|
|
||||||
tpe.shutdownNow();
|
|
||||||
if (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC,
|
|
||||||
TimeUnit.MILLISECONDS)) {
|
|
||||||
throw new RuntimeException(
|
|
||||||
"Failed to terminate thread pool in timely manner.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tpe = null;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,80 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.fs.s3a;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Rule;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.rules.Timeout;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Demonstrate that the threadpool blocks additional client requests if
|
|
||||||
* its queue is full (rather than throwing an exception) by initiating an
|
|
||||||
* upload consisting of 4 parts with 2 threads and 1 spot in the queue. The
|
|
||||||
* 4th part should not trigger an exception as it would with a
|
|
||||||
* non-blocking threadpool.
|
|
||||||
*/
|
|
||||||
public class TestS3ABlockingThreadPool {
|
|
||||||
|
|
||||||
private Configuration conf;
|
|
||||||
private S3AFileSystem fs;
|
|
||||||
|
|
||||||
@Rule
|
|
||||||
public Timeout testTimeout = new Timeout(30 * 60 * 1000);
|
|
||||||
|
|
||||||
protected Path getTestPath() {
|
|
||||||
return new Path("/tests3a");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() throws Exception {
|
|
||||||
conf = new Configuration();
|
|
||||||
conf.setLong(Constants.MIN_MULTIPART_THRESHOLD, 5 * 1024 * 1024);
|
|
||||||
conf.setLong(Constants.MULTIPART_SIZE, 5 * 1024 * 1024);
|
|
||||||
conf.setInt(Constants.MAX_THREADS, 2);
|
|
||||||
conf.setInt(Constants.MAX_TOTAL_TASKS, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void tearDown() throws Exception {
|
|
||||||
if (fs != null) {
|
|
||||||
fs.delete(getTestPath(), true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRegularMultiPartUpload() throws Exception {
|
|
||||||
fs = S3ATestUtils.createTestFileSystem(conf);
|
|
||||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 *
|
|
||||||
1024);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFastMultiPartUpload() throws Exception {
|
|
||||||
conf.setBoolean(Constants.FAST_UPLOAD, true);
|
|
||||||
fs = S3ATestUtils.createTestFileSystem(conf);
|
|
||||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 *
|
|
||||||
1024);
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue