HADOOP-15039/HADOOP-15189. Move SemaphoredDelegatingExecutor to hadoop-common
Contributed by Genmao Yu
(cherry picked from commit 312c571694
)
This commit is contained in:
parent
d69df0b596
commit
d9875b046f
|
@ -16,7 +16,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
package org.apache.hadoop.util;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
@ -42,7 +42,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
* this s4 threadpool</a>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
final class BlockingThreadPoolExecutorService
|
||||
public final class BlockingThreadPoolExecutorService
|
||||
extends SemaphoredDelegatingExecutor {
|
||||
|
||||
private static final Logger LOG = LoggerFactory
|
||||
|
@ -86,7 +86,7 @@ final class BlockingThreadPoolExecutorService
|
|||
* @return a thread factory that creates named, daemon threads with
|
||||
* the supplied exception handler and normal priority
|
||||
*/
|
||||
static ThreadFactory newDaemonThreadFactory(final String prefix) {
|
||||
public static ThreadFactory newDaemonThreadFactory(final String prefix) {
|
||||
final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
|
||||
return new ThreadFactory() {
|
||||
@Override
|
|
@ -1,4 +1,4 @@
|
|||
/*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -16,7 +16,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
package org.apache.hadoop.util;
|
||||
|
||||
import com.google.common.util.concurrent.ForwardingListeningExecutorService;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
|
@ -42,17 +42,13 @@ import java.util.concurrent.TimeoutException;
|
|||
* This is a refactoring of {@link BlockingThreadPoolExecutorService}; that code
|
||||
* contains the thread pool logic, whereas this isolates the semaphore
|
||||
* and submit logic for use with other thread pools and delegation models.
|
||||
* In particular, it <i>permits multiple per stream executors to share a
|
||||
* single per-FS-instance executor; the latter to throttle overall
|
||||
* load from the the FS, the others to limit the amount of load which
|
||||
* a single output stream can generate.</i>
|
||||
* <p>
|
||||
* This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java">
|
||||
* this s4 threadpool</a>
|
||||
*/
|
||||
@SuppressWarnings("NullableProblems")
|
||||
@InterfaceAudience.Private
|
||||
class SemaphoredDelegatingExecutor extends
|
||||
public class SemaphoredDelegatingExecutor extends
|
||||
ForwardingListeningExecutorService {
|
||||
|
||||
private final Semaphore queueingPermits;
|
||||
|
@ -65,7 +61,8 @@ class SemaphoredDelegatingExecutor extends
|
|||
* @param permitCount number of permits into the queue permitted
|
||||
* @param fair should the semaphore be "fair"
|
||||
*/
|
||||
SemaphoredDelegatingExecutor(ListeningExecutorService executorDelegatee,
|
||||
public SemaphoredDelegatingExecutor(
|
||||
ListeningExecutorService executorDelegatee,
|
||||
int permitCount,
|
||||
boolean fair) {
|
||||
this.permitCount = permitCount;
|
||||
|
@ -190,7 +187,7 @@ class SemaphoredDelegatingExecutor extends
|
|||
|
||||
private Runnable delegatee;
|
||||
|
||||
public RunnableWithPermitRelease(Runnable delegatee) {
|
||||
RunnableWithPermitRelease(Runnable delegatee) {
|
||||
this.delegatee = delegatee;
|
||||
}
|
||||
|
||||
|
@ -212,7 +209,7 @@ class SemaphoredDelegatingExecutor extends
|
|||
|
||||
private Callable<T> delegatee;
|
||||
|
||||
public CallableWithPermitRelease(Callable<T> delegatee) {
|
||||
CallableWithPermitRelease(Callable<T> delegatee) {
|
||||
this.delegatee = delegatee;
|
||||
}
|
||||
|
|
@ -103,8 +103,10 @@ import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
|
|||
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
|
||||
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
import static org.apache.hadoop.fs.s3a.Listing.ACCEPT_ALL;
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
|
||||
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
|
||||
import org.apache.hadoop.util.StopWatch;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
|
|
Loading…
Reference in New Issue