From c48bb67123e7bd622c567393097d81665dc5fff8 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Sun, 9 Jul 2017 19:51:59 +0800 Subject: [PATCH] HBASE-18316 Implement async admin operations for draining region servers --- .../hadoop/hbase/client/AsyncAdmin.java | 19 +++- .../hadoop/hbase/client/AsyncHBaseAdmin.java | 15 +++ .../hadoop/hbase/client/HBaseAdmin.java | 21 +--- .../hbase/client/RawAsyncHBaseAdmin.java | 44 ++++++++ .../shaded/protobuf/RequestConverter.java | 23 +++- .../hbase/client/TestAsyncDrainAdminApi.java | 101 ++++++++++++++++++ 6 files changed, 203 insertions(+), 20 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncDrainAdminApi.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 8411a5bafc1..65c9faf55b9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.client; -import java.io.IOException; import java.util.List; import java.util.Collection; import java.util.Map; @@ -771,6 +770,24 @@ public interface AsyncAdmin { */ CompletableFuture> listProcedures(); + /** + * Mark a region server as draining to prevent additional regions from getting assigned to it. + * @param servers + */ + CompletableFuture drainRegionServers(List servers); + + /** + * List region servers marked as draining to not get additional regions assigned to them. + * @return List of draining region servers wrapped by {@link CompletableFuture} + */ + CompletableFuture> listDrainingRegionServers(); + + /** + * Remove drain from a region server to allow additional regions assignments. + * @param servers List of region servers to remove drain from. + */ + CompletableFuture removeDrainFromRegionServers(List servers); + /** * @return cluster status wrapped by {@link CompletableFuture} */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 7c572db5dcb..311bda44704 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -438,6 +438,21 @@ public class AsyncHBaseAdmin implements AsyncAdmin { return wrap(rawAdmin.listProcedures()); } + @Override + public CompletableFuture drainRegionServers(List servers) { + return wrap(rawAdmin.drainRegionServers(servers)); + } + + @Override + public CompletableFuture> listDrainingRegionServers() { + return wrap(rawAdmin.listDrainingRegionServers()); + } + + @Override + public CompletableFuture removeDrainFromRegionServers(List servers) { + return wrap(rawAdmin.removeDrainFromRegionServers(servers)); + } + @Override public CompletableFuture getClusterStatus() { return wrap(rawAdmin.getClusterStatus()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 7518b9cff60..3b099efc6f1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -4036,19 +4036,11 @@ public class HBaseAdmin implements Admin { @Override public void drainRegionServers(List servers) throws IOException { - final List pbServers = new ArrayList<>(servers.size()); - for (ServerName server : servers) { - // Parse to ServerName to do simple validation. - ServerName.parseServerName(server.toString()); - pbServers.add(ProtobufUtil.toServerName(server)); - } - executeCallable(new MasterCallable(getConnection(), getRpcControllerFactory()) { @Override public Void rpcCall() throws ServiceException { - DrainRegionServersRequest req = - DrainRegionServersRequest.newBuilder().addAllServerName(pbServers).build(); - master.drainRegionServers(getRpcController(), req); + master.drainRegionServers(getRpcController(), + RequestConverter.buildDrainRegionServersRequest(servers)); return null; } }); @@ -4073,17 +4065,10 @@ public class HBaseAdmin implements Admin { @Override public void removeDrainFromRegionServers(List servers) throws IOException { - final List pbServers = new ArrayList<>(servers.size()); - for (ServerName server : servers) { - pbServers.add(ProtobufUtil.toServerName(server)); - } - executeCallable(new MasterCallable(getConnection(), getRpcControllerFactory()) { @Override public Void rpcCall() throws ServiceException { - RemoveDrainFromRegionServersRequest req = RemoveDrainFromRegionServersRequest.newBuilder() - .addAllServerName(pbServers).build(); - master.removeDrainFromRegionServers(getRpcController(), req); + master.removeDrainFromRegionServers(getRpcController(), RequestConverter.buildRemoveDrainFromRegionServersRequest(servers)); return null; } }); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index e8c15a53097..5ba8248d32d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -118,6 +118,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTab import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; @@ -156,6 +158,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedur import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest; @@ -176,6 +180,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeR import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; @@ -1915,6 +1921,44 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { .collect(Collectors.toList()))).call(); } + @Override + public CompletableFuture drainRegionServers(List servers) { + return this + . newMasterCaller() + .action( + (controller, stub) -> this + . call(controller, stub, + RequestConverter.buildDrainRegionServersRequest(servers), + (s, c, req, done) -> s.drainRegionServers(c, req, done), resp -> null)).call(); + } + + @Override + public CompletableFuture> listDrainingRegionServers() { + return this + .> newMasterCaller() + .action( + (controller, stub) -> this + .> call( + controller, + stub, + ListDrainingRegionServersRequest.newBuilder().build(), + (s, c, req, done) -> s.listDrainingRegionServers(c, req, done), + resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName) + .collect(Collectors.toList()))).call(); + } + + @Override + public CompletableFuture removeDrainFromRegionServers(List servers) { + return this + . newMasterCaller() + .action( + (controller, stub) -> this + . call( + controller, stub, RequestConverter + .buildRemoveDrainFromRegionServersRequest(servers), (s, c, req, done) -> s + .removeDrainFromRegionServers(c, req, done), resp -> null)).call(); + } + /** * Get the region location for the passed region name. The region name may be a full region name * or encoded region name. If the region does not found, then it'll throw an diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 5c44b4e0da2..8f726ec0d7b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -25,7 +25,6 @@ import java.util.Optional; import java.util.Set; import java.util.regex.Pattern; - import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; @@ -70,6 +69,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavor import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest.RegionUpdateInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; @@ -93,9 +93,11 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColu import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; @@ -1808,4 +1810,23 @@ public final class RequestConverter { public static GetQuotaStatesRequest buildGetQuotaStatesRequest() { return GET_QUOTA_STATES_REQUEST; } + + public static DrainRegionServersRequest buildDrainRegionServersRequest(List servers) { + return DrainRegionServersRequest.newBuilder().addAllServerName(toProtoServerNames(servers)) + .build(); + } + + public static RemoveDrainFromRegionServersRequest buildRemoveDrainFromRegionServersRequest( + List servers) { + return RemoveDrainFromRegionServersRequest.newBuilder() + .addAllServerName(toProtoServerNames(servers)).build(); + } + + private static List toProtoServerNames(List servers) { + List pbServers = new ArrayList<>(servers.size()); + for (ServerName server : servers) { + pbServers.add(ProtobufUtil.toServerName(server)); + } + return pbServers; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncDrainAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncDrainAdminApi.java new file mode 100644 index 00000000000..88bda10b9b3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncDrainAdminApi.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +@Category({ ClientTests.class, MediumTests.class }) +public class TestAsyncDrainAdminApi extends TestAsyncAdminBase { + + /* + * This test drains all regions so cannot be run in parallel with other tests. + */ + @Ignore @Test(timeout = 30000) + public void testDrainRegionServers() throws Exception { + List drainingServers = admin.listDrainingRegionServers().get(); + assertTrue(drainingServers.isEmpty()); + + // Drain all region servers. + Collection clusterServers = admin.getRegionServers().get(); + drainingServers = new ArrayList<>(); + for (ServerName server : clusterServers) { + drainingServers.add(server); + } + admin.drainRegionServers(drainingServers).join(); + + // Check that drain lists all region servers. + drainingServers = admin.listDrainingRegionServers().get(); + assertEquals(clusterServers.size(), drainingServers.size()); + for (ServerName server : clusterServers) { + assertTrue(drainingServers.contains(server)); + } + + // Try for 20 seconds to create table (new region). Will not complete because all RSs draining. + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); + builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build()); + final Runnable createTable = new Thread() { + @Override + public void run() { + try { + admin.createTable(builder.build()).join(); + } catch (Exception ioe) { + assertTrue(false); // Should not get IOException. + } + } + }; + + final ExecutorService executor = Executors.newSingleThreadExecutor(); + final java.util.concurrent.Future future = executor.submit(createTable); + executor.shutdown(); + try { + future.get(20, TimeUnit.SECONDS); + } catch (TimeoutException ie) { + assertTrue(true); // Expecting timeout to happen. + } + + // Kill executor if still processing. + if (!executor.isTerminated()) { + executor.shutdownNow(); + assertTrue(true); + } + + // Remove drain list. + admin.removeDrainFromRegionServers(drainingServers); + drainingServers = admin.listDrainingRegionServers().get(); + assertTrue(drainingServers.isEmpty()); + } +}