From 176bd88890cc698310be8ae9b03a2d899da9f352 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Tue, 9 Mar 2021 12:01:29 -0800 Subject: [PATCH] HADOOP-16080. hadoop-aws does not work with hadoop-client-api. (#2522) Contributed by Chao Sun. (Cherry-picked via PR #2575) --- .../apache/hadoop/fs/cosn/CosNFileSystem.java | 6 +++--- .../BlockingThreadPoolExecutorService.java | 5 +---- .../util/SemaphoredDelegatingExecutor.java | 21 +++++++++---------- .../hadoop/fs/TestFileSystemCaching.java | 6 ++++-- .../fs/aliyun/oss/AliyunOSSFileSystem.java | 5 ++--- .../dev-support/findbugs-exclude.xml | 6 ++++++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 3 ++- .../hadoop/fs/s3a/impl/DeleteOperation.java | 4 +++- .../hadoop/fs/s3a/impl/StoreContext.java | 12 ++++++----- .../fs/s3a/impl/StoreContextBuilder.java | 7 +++---- .../fs/s3a/s3guard/DynamoDBMetadataStore.java | 19 ++++++++++------- ...TestBlockingThreadPoolExecutorService.java | 4 ++-- .../s3a/impl/ITestPartialRenamesDeletes.java | 12 ++++++----- 13 files changed, 61 insertions(+), 49 deletions(-) diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNFileSystem.java b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNFileSystem.java index 94b10ad4401..4dda1260731 100644 --- a/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNFileSystem.java +++ b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNFileSystem.java @@ -28,11 +28,11 @@ import java.util.Map; import java.util.HashMap; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -71,8 +71,8 @@ public class CosNFileSystem extends FileSystem { private String owner = "Unknown"; private String group = "Unknown"; - private ListeningExecutorService boundedIOThreadPool; - private ListeningExecutorService boundedCopyThreadPool; + private ExecutorService boundedIOThreadPool; + private ExecutorService boundedCopyThreadPool; public CosNFileSystem() { } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java index 451b5f5d6ce..d08e84f99de 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java @@ -28,8 +28,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; - import org.apache.hadoop.classification.InterfaceAudience; /** @@ -105,8 +103,7 @@ public final class BlockingThreadPoolExecutorService private BlockingThreadPoolExecutorService(int permitCount, ThreadPoolExecutor eventProcessingExecutor) { - super(MoreExecutors.listeningDecorator(eventProcessingExecutor), - permitCount, false); + super(eventProcessingExecutor, permitCount, false); this.eventProcessingExecutor = eventProcessingExecutor; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java index 10471c93656..c4c11e57b37 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java @@ -18,10 +18,8 @@ package org.apache.hadoop.util; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ForwardingListeningExecutorService; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ForwardingExecutorService; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.statistics.DurationTracker; @@ -31,6 +29,7 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -55,10 +54,10 @@ import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTO @SuppressWarnings("NullableProblems") @InterfaceAudience.Private public class SemaphoredDelegatingExecutor extends - ForwardingListeningExecutorService { + ForwardingExecutorService { private final Semaphore queueingPermits; - private final ListeningExecutorService executorDelegatee; + private final ExecutorService executorDelegatee; private final int permitCount; private final DurationTrackerFactory trackerFactory; @@ -70,7 +69,7 @@ public class SemaphoredDelegatingExecutor extends * @param trackerFactory duration tracker factory. */ public SemaphoredDelegatingExecutor( - ListeningExecutorService executorDelegatee, + ExecutorService executorDelegatee, int permitCount, boolean fair, DurationTrackerFactory trackerFactory) { @@ -89,14 +88,14 @@ public class SemaphoredDelegatingExecutor extends * @param fair should the semaphore be "fair" */ public SemaphoredDelegatingExecutor( - ListeningExecutorService executorDelegatee, + ExecutorService executorDelegatee, int permitCount, boolean fair) { this(executorDelegatee, permitCount, fair, null); } @Override - protected ListeningExecutorService delegate() { + protected ExecutorService delegate() { return executorDelegatee; } @@ -127,7 +126,7 @@ public class SemaphoredDelegatingExecutor extends } @Override - public ListenableFuture submit(Callable task) { + public Future submit(Callable task) { try (DurationTracker ignored = trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) { queueingPermits.acquire(); @@ -139,7 +138,7 @@ public class SemaphoredDelegatingExecutor extends } @Override - public ListenableFuture submit(Runnable task, T result) { + public Future submit(Runnable task, T result) { try (DurationTracker ignored = trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) { queueingPermits.acquire(); @@ -151,7 +150,7 @@ public class SemaphoredDelegatingExecutor extends } @Override - public ListenableFuture submit(Runnable task) { + public Future submit(Runnable task) { try (DurationTracker ignored = trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) { queueingPermits.acquire(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCaching.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCaching.java index 01abeaaf577..67a933bb9e3 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCaching.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCaching.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -423,9 +424,10 @@ public class TestFileSystemCaching extends HadoopTestBase { // only one instance can be created at a time. URI uri = new URI("blocking://a"); ListeningExecutorService pool = - BlockingThreadPoolExecutorService.newInstance(count * 2, 0, + MoreExecutors.listeningDecorator( + BlockingThreadPoolExecutorService.newInstance(count * 2, 0, 10, TimeUnit.SECONDS, - "creation-threads"); + "creation-threads")); // submit a set of requests to create an FS instance. // the semaphore will block all but one, and that will block until diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java index 66fbd89b323..759484e4239 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -78,8 +77,8 @@ public class AliyunOSSFileSystem extends FileSystem { private int maxKeys; private int maxReadAheadPartNumber; private int maxConcurrentCopyTasksPerDir; - private ListeningExecutorService boundedThreadPool; - private ListeningExecutorService boundedCopyThreadPool; + private ExecutorService boundedThreadPool; + private ExecutorService boundedCopyThreadPool; private static final PathFilter DEFAULT_FILTER = new PathFilter() { @Override diff --git a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml index 1ebf8587e88..861eb83584e 100644 --- a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml +++ b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml @@ -84,4 +84,10 @@ + + + + + + diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 7506a5ed669..4243a4f1a5d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -42,6 +42,7 @@ import java.util.Optional; import java.util.Set; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -262,7 +263,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, private long partSize; private boolean enableMultiObjectsDelete; private TransferManager transfers; - private ListeningExecutorService boundedThreadPool; + private ExecutorService boundedThreadPool; private ThreadPoolExecutor unboundedThreadPool; private int executorCapacity; private long multiPartThreshold; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java index b47c7ad3aa0..2292179b3fd 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java @@ -28,6 +28,7 @@ import java.util.stream.Collectors; import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsResult; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -207,7 +208,8 @@ public class DeleteOperation extends ExecutingStoreOperation { "page size out of range: %s", pageSize); this.pageSize = pageSize; metadataStore = context.getMetadataStore(); - executor = context.createThrottledExecutor(1); + executor = MoreExecutors.listeningDecorator( + context.createThrottledExecutor(1)); } public long getFilesDeleted() { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java index 28be486b438..88231d8af9c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java @@ -23,9 +23,11 @@ import java.io.IOException; import java.net.URI; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -127,7 +129,7 @@ public class StoreContext { final Configuration configuration, final String username, final UserGroupInformation owner, - final ListeningExecutorService executor, + final ExecutorService executor, final int executorCapacity, final Invoker invoker, final S3AStatisticsContext instrumentation, @@ -144,7 +146,7 @@ public class StoreContext { this.configuration = configuration; this.username = username; this.owner = owner; - this.executor = executor; + this.executor = MoreExecutors.listeningDecorator(executor); this.executorCapacity = executorCapacity; this.invoker = invoker; this.instrumentation = instrumentation; @@ -179,7 +181,7 @@ public class StoreContext { return username; } - public ListeningExecutorService getExecutor() { + public ExecutorService getExecutor() { return executor; } @@ -310,7 +312,7 @@ public class StoreContext { * @param capacity maximum capacity of this executor. * @return an executor for submitting work. */ - public ListeningExecutorService createThrottledExecutor(int capacity) { + public ExecutorService createThrottledExecutor(int capacity) { return new SemaphoredDelegatingExecutor(executor, capacity, true); } @@ -320,7 +322,7 @@ public class StoreContext { * {@link #executorCapacity}. * @return a new executor for exclusive use by the caller. */ - public ListeningExecutorService createThrottledExecutor() { + public ExecutorService createThrottledExecutor() { return createThrottledExecutor(executorCapacity); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextBuilder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextBuilder.java index e1f2cb15b82..13953f9c985 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextBuilder.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextBuilder.java @@ -19,8 +19,7 @@ package org.apache.hadoop.fs.s3a.impl; import java.net.URI; - -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; +import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.s3a.Invoker; @@ -46,7 +45,7 @@ public class StoreContextBuilder { private UserGroupInformation owner; - private ListeningExecutorService executor; + private ExecutorService executor; private int executorCapacity; @@ -96,7 +95,7 @@ public class StoreContextBuilder { } public StoreContextBuilder setExecutor( - final ListeningExecutorService ex) { + final ExecutorService ex) { this.executor = ex; return this; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java index eaf9ee22f91..b963e7e2532 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java @@ -67,6 +67,7 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTest import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -454,7 +455,8 @@ public class DynamoDBMetadataStore implements MetadataStore, instrumentation = context.getInstrumentation() .getS3GuardInstrumentation(); username = context.getUsername(); - executor = context.createThrottledExecutor(); + executor = MoreExecutors.listeningDecorator( + context.createThrottledExecutor()); ttlTimeProvider = Preconditions.checkNotNull( context.getTimeProvider(), "ttlTimeProvider must not be null"); @@ -509,13 +511,14 @@ public class DynamoDBMetadataStore implements MetadataStore, // the executor capacity for work. int executorCapacity = intOption(conf, EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1); - executor = BlockingThreadPoolExecutorService.newInstance( - executorCapacity, - executorCapacity * 2, - longOption(conf, KEEPALIVE_TIME, - DEFAULT_KEEPALIVE_TIME, 0), - TimeUnit.SECONDS, - "s3a-ddb-" + tableName); + executor = MoreExecutors.listeningDecorator( + BlockingThreadPoolExecutorService.newInstance( + executorCapacity, + executorCapacity * 2, + longOption(conf, KEEPALIVE_TIME, + DEFAULT_KEEPALIVE_TIME, 0), + TimeUnit.SECONDS, + "s3a-ddb-" + tableName)); initDataAccessRetries(conf); this.ttlTimeProvider = ttlTp; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java index ce20cc3aa2d..55423273b95 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java @@ -18,7 +18,6 @@ package org.apache.hadoop.fs.s3a; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture; import org.apache.hadoop.util.BlockingThreadPoolExecutorService; import org.apache.hadoop.util.SemaphoredDelegatingExecutor; import org.apache.hadoop.util.StopWatch; @@ -33,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -70,7 +70,7 @@ public class ITestBlockingThreadPoolExecutorService { @Test public void testSubmitCallable() throws Exception { ensureCreated(); - ListenableFuture f = tpe.submit(callableSleeper); + Future f = tpe.submit(callableSleeper); Integer v = f.get(); assertEquals(SOME_VALUE, v); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java index e20e936454b..c920be13230 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java @@ -34,6 +34,7 @@ import java.util.stream.Stream; import com.amazonaws.services.s3.model.MultiObjectDeleteException; import org.apache.hadoop.thirdparty.com.google.common.base.Charsets; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;; import org.assertj.core.api.Assertions; import org.junit.Test; import org.junit.runner.RunWith; @@ -128,11 +129,12 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase { * For submitting work. */ private static final ListeningExecutorService EXECUTOR = - BlockingThreadPoolExecutorService.newInstance( - EXECUTOR_THREAD_COUNT, - EXECUTOR_THREAD_COUNT * 2, - 30, TimeUnit.SECONDS, - "test-operations"); + MoreExecutors.listeningDecorator( + BlockingThreadPoolExecutorService.newInstance( + EXECUTOR_THREAD_COUNT, + EXECUTOR_THREAD_COUNT * 2, + 30, TimeUnit.SECONDS, + "test-operations")); /**