From 4368f090571da142f3202eb0aceb930f0c5b30f5 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Sat, 8 Jul 2017 14:43:22 +0800 Subject: [PATCH] HBASE-18318 Implement updateConfiguration/stopMaster/stopRegionServer/shutdown methods --- .../hadoop/hbase/client/AsyncAdmin.java | 28 +++++ .../hadoop/hbase/client/AsyncHBaseAdmin.java | 25 ++++ .../hbase/client/RawAsyncHBaseAdmin.java | 78 ++++++++++++ .../hbase/client/TestAsyncAdminBase.java | 6 +- .../client/TestAsyncClusterAdminApi.java | 81 ++++++++++++- .../client/TestAsyncClusterAdminApi2.java | 112 ++++++++++++++++++ .../client/TestAsyncProcedureAdminApi.java | 3 + 7 files changed, 327 insertions(+), 6 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi2.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 65c9faf55b9..9538a4878b4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -823,6 +823,34 @@ public interface AsyncAdmin { return getRegionLoads(serverName, Optional.empty()); } + /** + * Shuts down the HBase cluster. + */ + CompletableFuture shutdown(); + + /** + * Shuts down the current HBase master only. + */ + CompletableFuture stopMaster(); + + /** + * Stop the designated regionserver. + * @param serverName + */ + CompletableFuture stopRegionServer(ServerName serverName); + + /** + * Update the configuration and trigger an online config change on the regionserver. + * @param serverName : The server whose config needs to be updated. + */ + CompletableFuture updateConfiguration(ServerName serverName); + + /** + * Update the configuration and trigger an online config change on all the masters and + * regionservers. + */ + CompletableFuture updateConfiguration(); + /** * Get a list of {@link RegionLoad} of all regions hosted on a region seerver for a table. * @param serverName diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 311bda44704..8e2b3c016ef 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -458,6 +458,31 @@ public class AsyncHBaseAdmin implements AsyncAdmin { return wrap(rawAdmin.getClusterStatus()); } + @Override + public CompletableFuture shutdown() { + return wrap(rawAdmin.shutdown()); + } + + @Override + public CompletableFuture stopMaster() { + return wrap(rawAdmin.stopMaster()); + } + + @Override + public CompletableFuture stopRegionServer(ServerName serverName) { + return wrap(rawAdmin.stopRegionServer(serverName)); + } + + @Override + public CompletableFuture updateConfiguration(ServerName serverName) { + return wrap(rawAdmin.updateConfiguration(serverName)); + } + + @Override + public CompletableFuture updateConfiguration() { + return wrap(rawAdmin.updateConfiguration()); + } + @Override public CompletableFuture> getRegionLoads(ServerName serverName, Optional tableName) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 5ba8248d32d..2c6dd77e981 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -97,6 +97,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLo import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; @@ -196,8 +200,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormali import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; @@ -2343,6 +2351,76 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { resp -> ProtobufUtil.convert(resp.getClusterStatus()))).call(); } + @Override + public CompletableFuture shutdown() { + return this + . newMasterCaller() + .action( + (controller, stub) -> this. call(controller, + stub, ShutdownRequest.newBuilder().build(), + (s, c, req, done) -> s.shutdown(c, req, done), resp -> null)).call(); + } + + @Override + public CompletableFuture stopMaster() { + return this + . newMasterCaller() + .action( + (controller, stub) -> this. call(controller, + stub, StopMasterRequest.newBuilder().build(), + (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)).call(); + } + + @Override + public CompletableFuture stopRegionServer(ServerName serverName) { + StopServerRequest request = + RequestConverter.buildStopServerRequest("Called by admin client " + + this.connection.toString()); + return this + . newAdminCaller() + .action( + (controller, stub) -> this. adminCall( + controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done), + resp -> null)).serverName(serverName).call(); + } + + @Override + public CompletableFuture updateConfiguration(ServerName serverName) { + return this + . newAdminCaller() + .action( + (controller, stub) -> this + . adminCall( + controller, stub, UpdateConfigurationRequest.getDefaultInstance(), + (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null)) + .serverName(serverName).call(); + } + + @Override + public CompletableFuture updateConfiguration() { + CompletableFuture future = new CompletableFuture(); + getClusterStatus().whenComplete( + (status, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else { + List> futures = new ArrayList<>(); + status.getServers().forEach((server) -> futures.add(updateConfiguration(server))); + futures.add(updateConfiguration(status.getMaster())); + status.getBackupMasters().forEach(master -> futures.add(updateConfiguration(master))); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) + .whenComplete((result, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(result); + } + }); + } + }); + return future; + } + @Override public CompletableFuture> getRegionLoads(ServerName serverName, Optional tableName) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java index b182563f2ac..52d29464241 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java @@ -98,14 +98,14 @@ public abstract class TestAsyncAdminBase { } @Before - public void setUp() { - admin = ASYNC_CONN.getAdmin(); + public void setUp() throws Exception { + admin = getAdmin.get(); String methodName = testName.getMethodName(); tableName = TableName.valueOf(methodName.substring(0, methodName.length() - 3)); } @After - public void tearDown() { + public void tearDown() throws Exception { admin.listTableNames(Optional.of(Pattern.compile(tableName.getNameAsString() + ".*")), false) .whenCompleteAsync((tables, err) -> { if (tables != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java index e8f638029ef..6a3c8e1c703 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java @@ -20,12 +20,18 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HRegionInfo; @@ -33,8 +39,8 @@ import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -45,11 +51,78 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; @RunWith(Parameterized.class) -@Category({ MiscTests.class, MediumTests.class }) +@Category({ ClientTests.class, MediumTests.class }) public class TestAsyncClusterAdminApi extends TestAsyncAdminBase { + private final Path cnfPath = FileSystems.getDefault().getPath("target/test-classes/hbase-site.xml"); + private final Path cnf2Path = FileSystems.getDefault().getPath("target/test-classes/hbase-site2.xml"); + private final Path cnf3Path = FileSystems.getDefault().getPath("target/test-classes/hbase-site3.xml"); + @Test - public void testRegionLoad() throws Exception { + public void testRegionServerOnlineConfigChange() throws Exception { + replaceHBaseSiteXML(); + admin.getRegionServers().get().forEach(server -> admin.updateConfiguration(server).join()); + + // Check the configuration of the RegionServers + TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { + Configuration conf = thread.getRegionServer().getConfiguration(); + assertEquals(1000, conf.getInt("hbase.custom.config", 0)); + }); + + restoreHBaseSiteXML(); + } + + @Test + public void testMasterOnlineConfigChange() throws Exception { + replaceHBaseSiteXML(); + ServerName master = admin.getMaster().get(); + admin.updateConfiguration(master).join(); + admin.getBackupMasters().get() + .forEach(backupMaster -> admin.updateConfiguration(backupMaster).join()); + + // Check the configuration of the Masters + TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> { + Configuration conf = thread.getMaster().getConfiguration(); + assertEquals(1000, conf.getInt("hbase.custom.config", 0)); + }); + + restoreHBaseSiteXML(); + } + + @Test + public void testAllClusterOnlineConfigChange() throws IOException { + replaceHBaseSiteXML(); + admin.updateConfiguration().join(); + + // Check the configuration of the Masters + TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> { + Configuration conf = thread.getMaster().getConfiguration(); + assertEquals(1000, conf.getInt("hbase.custom.config", 0)); + }); + + // Check the configuration of the RegionServers + TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { + Configuration conf = thread.getRegionServer().getConfiguration(); + assertEquals(1000, conf.getInt("hbase.custom.config", 0)); + }); + + restoreHBaseSiteXML(); + } + + private void replaceHBaseSiteXML() throws IOException { + // make a backup of hbase-site.xml + Files.copy(cnfPath, cnf3Path, StandardCopyOption.REPLACE_EXISTING); + // update hbase-site.xml by overwriting it + Files.copy(cnf2Path, cnfPath, StandardCopyOption.REPLACE_EXISTING); + } + + private void restoreHBaseSiteXML() throws IOException { + // restore hbase-site.xml + Files.copy(cnf3Path, cnfPath, StandardCopyOption.REPLACE_EXISTING); + } + + @Test + public void testGetRegionLoads() throws Exception { // Turn off the balancer admin.setBalancerOn(false).join(); TableName[] tables = @@ -57,6 +130,8 @@ public class TestAsyncClusterAdminApi extends TestAsyncAdminBase { TableName.valueOf(tableName.getNameAsString() + "2"), TableName.valueOf(tableName.getNameAsString() + "3") }; createAndLoadTable(tables); + // Sleep to wait region server report + Thread.sleep(TEST_UTIL.getConfiguration().getInt("hbase.regionserver.msginterval", 3 * 1000) * 2); // Check if regions match with the regionLoad from the server Collection servers = admin.getRegionServers().get(); for (ServerName serverName : servers) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi2.java new file mode 100644 index 00000000000..16d97baac89 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi2.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Only used to test stopMaster/stopRegionServer/shutdown methods. + */ +@Category({ ClientTests.class, MediumTests.class }) +public class TestAsyncClusterAdminApi2 extends TestAsyncAdminBase { + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + // do nothing + } + + @Before + public void setUp() throws Exception { + TEST_UTIL.startMiniCluster(2, 3); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + admin = ASYNC_CONN.getAdmin(); + } + + @After + public void tearDown() throws Exception { + IOUtils.closeQuietly(ASYNC_CONN); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testStop() throws Exception { + HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); + assertFalse(rs.isStopped()); + admin.stopRegionServer(rs.getServerName()).join(); + assertTrue(rs.isStopped()); + + HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); + assertFalse(master.isStopped()); + admin.stopMaster().join(); + assertTrue(master.isStopped()); + } + + @Test + public void testShutdown() throws Exception { + TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> { + assertFalse(thread.getMaster().isStopped()); + }); + TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { + assertFalse(thread.getRegionServer().isStopped()); + }); + + admin.shutdown().join(); + TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> { + while (!thread.getMaster().isStopped()) { + trySleep(100, TimeUnit.MILLISECONDS); + } + }); + TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { + while (!thread.getRegionServer().isStopped()) { + trySleep(100, TimeUnit.MILLISECONDS); + } + }); + } + + private void trySleep(long timeout, TimeUnit unit) { + try { + unit.sleep(timeout); + } catch (InterruptedException e) { + } + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcedureAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcedureAdminApi.java index 12c699b75af..b7fac061412 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcedureAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcedureAdminApi.java @@ -32,6 +32,8 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.util.HashMap; import java.util.List; @@ -46,6 +48,7 @@ import static org.junit.Assert.assertTrue; /** * Class to test asynchronous procedure admin operations. */ +@RunWith(Parameterized.class) @Category({ LargeTests.class, ClientTests.class }) public class TestAsyncProcedureAdminApi extends TestAsyncAdminBase {