diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java similarity index 97% rename from hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java rename to hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java index 5b25730ba52..a9e4d682d9b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.fs.s3a; +package org.apache.hadoop.util; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -42,7 +42,7 @@ import org.apache.hadoop.classification.InterfaceAudience; * this s4 threadpool */ @InterfaceAudience.Private -final class BlockingThreadPoolExecutorService +public final class BlockingThreadPoolExecutorService extends SemaphoredDelegatingExecutor { private static Logger LOG = LoggerFactory @@ -86,7 +86,7 @@ final class BlockingThreadPoolExecutorService * @return a thread factory that creates named, daemon threads with * the supplied exception handler and normal priority */ - static ThreadFactory newDaemonThreadFactory(final String prefix) { + public static ThreadFactory newDaemonThreadFactory(final String prefix) { final ThreadFactory namedFactory = getNamedThreadFactory(prefix); return new ThreadFactory() { @Override diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java similarity index 92% rename from hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java rename to hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java index 6b21912871e..bcc19e35e85 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.fs.s3a; +package org.apache.hadoop.util; import com.google.common.util.concurrent.ForwardingListeningExecutorService; import com.google.common.util.concurrent.Futures; @@ -42,17 +42,13 @@ import java.util.concurrent.TimeoutException; * This is a refactoring of {@link BlockingThreadPoolExecutorService}; that code * contains the thread pool logic, whereas this isolates the semaphore * and submit logic for use with other thread pools and delegation models. - * In particular, it permits multiple per stream executors to share a - * single per-FS-instance executor; the latter to throttle overall - * load from the the FS, the others to limit the amount of load which - * a single output stream can generate. *
* This is inspired by
* this s4 threadpool
*/
@SuppressWarnings("NullableProblems")
@InterfaceAudience.Private
-class SemaphoredDelegatingExecutor extends
+public class SemaphoredDelegatingExecutor extends
ForwardingListeningExecutorService {
private final Semaphore queueingPermits;
@@ -65,7 +61,8 @@ class SemaphoredDelegatingExecutor extends
* @param permitCount number of permits into the queue permitted
* @param fair should the semaphore be "fair"
*/
- SemaphoredDelegatingExecutor(ListeningExecutorService executorDelegatee,
+ public SemaphoredDelegatingExecutor(
+ ListeningExecutorService executorDelegatee,
int permitCount,
boolean fair) {
this.permitCount = permitCount;
@@ -190,7 +187,7 @@ class SemaphoredDelegatingExecutor extends
private Runnable delegatee;
- public RunnableWithPermitRelease(Runnable delegatee) {
+ RunnableWithPermitRelease(Runnable delegatee) {
this.delegatee = delegatee;
}
@@ -212,7 +209,7 @@ class SemaphoredDelegatingExecutor extends
private Callable