From c7c5d560844947131dc99acc88bd9cce80c42520 Mon Sep 17 00:00:00 2001 From: GeorryHuang <215175212@qq.com> Date: Sun, 27 Jun 2021 21:58:18 +0800 Subject: [PATCH] =?UTF-8?q?HBASE-26015=20Should=20implement=20getRegionSer?= =?UTF-8?q?vers(boolean)=20method=20in=20Asyn=E2=80=A6=20(#3406)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Duo Zhang --- .../hadoop/hbase/client/AsyncAdmin.java | 30 +++++++++++++++++++ .../client/TestAsyncClusterAdminApi.java | 21 +++++++++++++ 2 files changed, 51 insertions(+) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 5e68ea8c1e2..6b8fda79f07 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; + import com.google.protobuf.RpcChannel; import java.util.Arrays; import java.util.Collection; @@ -49,6 +51,8 @@ import org.apache.hadoop.hbase.security.access.Permission; import org.apache.hadoop.hbase.security.access.UserPermission; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; + /** * The asynchronous administrative API for HBase. * @since 2.0.0 @@ -1058,6 +1062,32 @@ public interface AsyncAdmin { .thenApply(ClusterMetrics::getServersName); } + default CompletableFuture> getRegionServers( + boolean excludeDecommissionedRS) { + CompletableFuture> future = new CompletableFuture<>(); + addListener( + getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)).thenApply(ClusterMetrics::getServersName), + (allServers, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else { + if (!excludeDecommissionedRS) { + future.complete(allServers); + } else { + addListener(listDecommissionedRegionServers(), (decomServers, decomErr) -> { + if (decomErr != null) { + future.completeExceptionally(decomErr); + } else { + future.complete(allServers.stream().filter(s -> !decomServers.contains(s)) + .collect(ImmutableList.toImmutableList())); + } + }); + } + } + }); + return future; + } + /** * @return a list of master coprocessors wrapped by {@link CompletableFuture} */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java index d9f828286c9..0afed41e206 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -262,6 +263,26 @@ public class TestAsyncClusterAdminApi extends TestAsyncAdminBase { } } + @Test + public void testGetRegionServers() throws Exception{ + List serverNames = new ArrayList<>(admin.getRegionServers(true).get()); + assertEquals(2, serverNames.size()); + + List serversToDecom = new ArrayList<>(); + ServerName serverToDecommission = serverNames.get(0); + + serversToDecom.add(serverToDecommission); + admin.decommissionRegionServers(serversToDecom, false).join(); + + assertEquals(1, admin.getRegionServers(true).get().size()); + assertEquals(2, admin.getRegionServers(false).get().size()); + + admin.recommissionRegionServer(serverToDecommission, Collections.emptyList()).join(); + + assertEquals(2, admin.getRegionServers(true).get().size()); + assertEquals(2, admin.getRegionServers(false).get().size()); + } + private void compareRegionLoads(Collection regionLoadCluster, Collection regionLoads) {