diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 7e2645257be..9647ed47c64 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -29,8 +29,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -1512,18 +1510,18 @@ public interface AsyncAdmin { * channel -> xxxService.newStub(channel) * * - * @param stubMaker a delegation to the actual {@code newStub} call. - * @param callable a delegation to the actual protobuf rpc call. See the comment of - * {@link ServiceCaller} for more details. + * @param stubMaker a delegation to the actual {@code newStub} call. + * @param callable a delegation to the actual protobuf rpc call. See the comment of + * {@link ServiceCaller} for more details. * @param serverNames the given list of region servers - * @param the type of the asynchronous stub - * @param the type of the return value - * @return a list of return values of the protobuf rpc call, wrapped by a {@link CompletableFuture}. + * @param the type of the asynchronous stub + * @param the type of the return value + * @return Map of each region server to its result of the protobuf rpc call, wrapped by a + * {@link CompletableFuture}. * @see ServiceCaller */ - CompletableFuture> coprocessorService(Function stubMaker, - ServiceCaller callable, List serverNames) throws ExecutionException, - InterruptedException, TimeoutException; + CompletableFuture> coprocessorService( + Function stubMaker, ServiceCaller callable, List serverNames); /** * List all the dead region servers. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 647045a0c33..b395096309c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -23,9 +23,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.regex.Pattern; import org.apache.hadoop.hbase.CacheEvictionStats; @@ -809,9 +807,8 @@ class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture> coprocessorService(Function stubMaker, - ServiceCaller callable, List serverNames) - throws ExecutionException, InterruptedException, TimeoutException { + public CompletableFuture> coprocessorService( + Function stubMaker, ServiceCaller callable, List serverNames) { return wrap(rawAdmin.coprocessorService(stubMaker, callable, serverNames)); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index d972bbc2ab7..f3f31f00468 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -36,11 +36,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -99,8 +95,8 @@ import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ForeignExceptionUtil; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -3511,40 +3507,27 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture> coprocessorService(Function stubMaker, - ServiceCaller callable, List serverNames) - throws ExecutionException, InterruptedException, TimeoutException { - List stubs = new ArrayList<>(serverNames.size()); - for(ServerName serverName : serverNames) { - RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl( - this. newServerCaller().serverName(serverName)); - S stub = stubMaker.apply(channel); - stubs.add(stub); - } - - return CompletableFuture.supplyAsync(() -> { - ExecutorService executorService = Executors.newFixedThreadPool(serverNames.size(), - new ThreadFactoryBuilder().setNameFormat("coproc-service-%d").setDaemon(true).build()); - List> completableFutureList = new ArrayList<>(); - for (S stub : stubs) { - CompletableFuture future = new CompletableFuture<>(); - completableFutureList.add(future); - ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); - executorService.execute(() -> callable.call(stub, controller, resp -> { - if (controller.failed()) { - future.completeExceptionally(controller.getFailed()); + public CompletableFuture> coprocessorService( + Function stubMaker, ServiceCaller callable, List serverNames) { + CompletableFuture> future = new CompletableFuture<>(); + Map resultMap = new HashMap<>(); + for (ServerName rs : serverNames) { + FutureUtils.addListener(coprocessorService(stubMaker, callable, rs), (r, e) -> { + boolean done; + synchronized (resultMap) { + if (e != null) { + resultMap.put(rs, e); } else { - future.complete(resp); + resultMap.put(rs, r); } - })); - } - List listValues = new ArrayList<>(serverNames.size()); - for(CompletableFuture completableFuture : completableFutureList) { - listValues.add(completableFuture.join()); - } - executorService.shutdownNow(); - return listValues; - }); + done = resultMap.size() == serverNames.size(); + } + if (done) { + future.complete(resultMap); + } + }); + } + return future; } @Override diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncRegionServersCoprocessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncRegionServersCoprocessorEndpoint.java index 17f8ecc8bf9..ca51555ed37 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncRegionServersCoprocessorEndpoint.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncRegionServersCoprocessorEndpoint.java @@ -19,12 +19,13 @@ package org.apache.hadoop.hbase.coprocessor; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.io.FileNotFoundException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; @@ -46,7 +47,6 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.Service; -import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos; import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest; import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse; import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService; @@ -76,54 +76,47 @@ public class TestAsyncRegionServersCoprocessorEndpoint extends TestAsyncAdminBas } @Test - public void testRegionServerCoprocessorServiceWithMultipleServers() throws Exception { - final List regionServerThreads = TEST_UTIL.getHBaseCluster().getRegionServerThreads(); + public void testRegionServersCoprocessorService() + throws ExecutionException, InterruptedException { + final List regionServerThreads = + TEST_UTIL.getHBaseCluster().getRegionServerThreads(); List serverNames = new ArrayList<>(); - for (JVMClusterUtil.RegionServerThread t : regionServerThreads) { - serverNames.add(t.getRegionServer().getServerName()); - } - DummyRegionServerEndpointProtos.DummyRequest request = - DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(); - List responses = - admin. coprocessorService( - DummyRegionServerEndpointProtos.DummyService::newStub, - (s, c, done) -> s.dummyCall(c, request, done), serverNames) - .get(); + regionServerThreads.forEach(t -> serverNames.add(t.getRegionServer().getServerName())); - assertEquals(responses.size(), serverNames.size()); - for (DummyResponse response : responses) { - assertEquals(DUMMY_VALUE, response.getValue()); - } + DummyRequest request = DummyRequest.getDefaultInstance(); + Map resultMap = + admin. coprocessorService(DummyService::newStub, + (s, c, done) -> s.dummyCall(c, request, done), serverNames).get(); + + resultMap.forEach((k, v) -> { + assertTrue(v instanceof DummyResponse); + DummyResponse resp = (DummyResponse) v; + assertEquals(DUMMY_VALUE, resp.getValue()); + }); } @Test - public void testRegionServerCoprocessorServiceErrorWithMultipleServers() throws Exception { - final List regionServerThreads = TEST_UTIL.getHBaseCluster().getRegionServerThreads(); + public void testRegionServerCoprocessorsServiceError() + throws ExecutionException, InterruptedException { + final List regionServerThreads = + TEST_UTIL.getHBaseCluster().getRegionServerThreads(); List serverNames = new ArrayList<>(); - for (JVMClusterUtil.RegionServerThread t : regionServerThreads) { - serverNames.add(t.getRegionServer().getServerName()); - } - DummyRegionServerEndpointProtos.DummyRequest request = - DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(); - try { - admin. coprocessorService( - DummyRegionServerEndpointProtos.DummyService::newStub, - (s, c, done) -> s.dummyThrow(c, request, done), serverNames) - .get(); - fail("Should have thrown an exception"); - } catch (Exception e) { - assertTrue(e.getCause() instanceof RetriesExhaustedException); - assertTrue(e.getCause().getMessage().contains(WHAT_TO_THROW.getClass().getName().trim())); - } + regionServerThreads.forEach(t -> serverNames.add(t.getRegionServer().getServerName())); + + DummyRequest request = DummyRequest.getDefaultInstance(); + Map resultMap = + admin. coprocessorService(DummyService::newStub, + (s, c, done) -> s.dummyThrow(c, request, done), serverNames).get(); + + resultMap.forEach((k, v) -> { + assertTrue(v instanceof RetriesExhaustedException); + Throwable e = (Throwable) v; + assertTrue(e.getMessage().contains(WHAT_TO_THROW.getClass().getName().trim())); + }); } public static class DummyRegionServerEndpoint extends DummyService implements RegionServerCoprocessor { - public DummyRegionServerEndpoint() { - - } @Override public Iterable getServices() { return Collections.singleton(this);