HBASE-18318 Implement updateConfiguration/stopMaster/stopRegionServer/shutdown methods

This commit is contained in:
Guanghao Zhang 2017-07-08 14:43:22 +08:00
parent da80839ecc
commit 4368f09057
7 changed files with 327 additions and 6 deletions

View File

@ -823,6 +823,34 @@ public interface AsyncAdmin {
return getRegionLoads(serverName, Optional.empty());
}
/**
* Shuts down the HBase cluster.
*/
CompletableFuture<Void> shutdown();
/**
* Shuts down the current HBase master only.
*/
CompletableFuture<Void> stopMaster();
/**
* Stop the designated regionserver.
* @param serverName
*/
CompletableFuture<Void> stopRegionServer(ServerName serverName);
/**
* Update the configuration and trigger an online config change on the regionserver.
* @param serverName : The server whose config needs to be updated.
*/
CompletableFuture<Void> updateConfiguration(ServerName serverName);
/**
* Update the configuration and trigger an online config change on all the masters and
* regionservers.
*/
CompletableFuture<Void> updateConfiguration();
/**
* Get a list of {@link RegionLoad} of all regions hosted on a region seerver for a table.
* @param serverName

View File

@ -458,6 +458,31 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
return wrap(rawAdmin.getClusterStatus());
}
@Override
public CompletableFuture<Void> shutdown() {
return wrap(rawAdmin.shutdown());
}
@Override
public CompletableFuture<Void> stopMaster() {
return wrap(rawAdmin.stopMaster());
}
@Override
public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
return wrap(rawAdmin.stopRegionServer(serverName));
}
@Override
public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
return wrap(rawAdmin.updateConfiguration(serverName));
}
@Override
public CompletableFuture<Void> updateConfiguration() {
return wrap(rawAdmin.updateConfiguration());
}
@Override
public CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName serverName,
Optional<TableName> tableName) {

View File

@ -97,6 +97,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLo
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
@ -196,8 +200,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormali
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
@ -2343,6 +2351,76 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
resp -> ProtobufUtil.convert(resp.getClusterStatus()))).call();
}
@Override
public CompletableFuture<Void> shutdown() {
return this
.<Void> newMasterCaller()
.action(
(controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
stub, ShutdownRequest.newBuilder().build(),
(s, c, req, done) -> s.shutdown(c, req, done), resp -> null)).call();
}
@Override
public CompletableFuture<Void> stopMaster() {
return this
.<Void> newMasterCaller()
.action(
(controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(controller,
stub, StopMasterRequest.newBuilder().build(),
(s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)).call();
}
@Override
public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
StopServerRequest request =
RequestConverter.buildStopServerRequest("Called by admin client "
+ this.connection.toString());
return this
.<Void> newAdminCaller()
.action(
(controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
resp -> null)).serverName(serverName).call();
}
@Override
public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
return this
.<Void> newAdminCaller()
.action(
(controller, stub) -> this
.<UpdateConfigurationRequest, UpdateConfigurationResponse, Void> adminCall(
controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
(s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
.serverName(serverName).call();
}
@Override
public CompletableFuture<Void> updateConfiguration() {
CompletableFuture<Void> future = new CompletableFuture<Void>();
getClusterStatus().whenComplete(
(status, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else {
List<CompletableFuture<Void>> futures = new ArrayList<>();
status.getServers().forEach((server) -> futures.add(updateConfiguration(server)));
futures.add(updateConfiguration(status.getMaster()));
status.getBackupMasters().forEach(master -> futures.add(updateConfiguration(master)));
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
.whenComplete((result, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
future.complete(result);
}
});
}
});
return future;
}
@Override
public CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName serverName,
Optional<TableName> tableName) {

View File

@ -98,14 +98,14 @@ public abstract class TestAsyncAdminBase {
}
@Before
public void setUp() {
admin = ASYNC_CONN.getAdmin();
public void setUp() throws Exception {
admin = getAdmin.get();
String methodName = testName.getMethodName();
tableName = TableName.valueOf(methodName.substring(0, methodName.length() - 3));
}
@After
public void tearDown() {
public void tearDown() throws Exception {
admin.listTableNames(Optional.of(Pattern.compile(tableName.getNameAsString() + ".*")), false)
.whenCompleteAsync((tables, err) -> {
if (tables != null) {

View File

@ -20,12 +20,18 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
@ -33,8 +39,8 @@ import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -45,11 +51,78 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@RunWith(Parameterized.class)
@Category({ MiscTests.class, MediumTests.class })
@Category({ ClientTests.class, MediumTests.class })
public class TestAsyncClusterAdminApi extends TestAsyncAdminBase {
private final Path cnfPath = FileSystems.getDefault().getPath("target/test-classes/hbase-site.xml");
private final Path cnf2Path = FileSystems.getDefault().getPath("target/test-classes/hbase-site2.xml");
private final Path cnf3Path = FileSystems.getDefault().getPath("target/test-classes/hbase-site3.xml");
@Test
public void testRegionLoad() throws Exception {
public void testRegionServerOnlineConfigChange() throws Exception {
replaceHBaseSiteXML();
admin.getRegionServers().get().forEach(server -> admin.updateConfiguration(server).join());
// Check the configuration of the RegionServers
TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
Configuration conf = thread.getRegionServer().getConfiguration();
assertEquals(1000, conf.getInt("hbase.custom.config", 0));
});
restoreHBaseSiteXML();
}
@Test
public void testMasterOnlineConfigChange() throws Exception {
replaceHBaseSiteXML();
ServerName master = admin.getMaster().get();
admin.updateConfiguration(master).join();
admin.getBackupMasters().get()
.forEach(backupMaster -> admin.updateConfiguration(backupMaster).join());
// Check the configuration of the Masters
TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> {
Configuration conf = thread.getMaster().getConfiguration();
assertEquals(1000, conf.getInt("hbase.custom.config", 0));
});
restoreHBaseSiteXML();
}
@Test
public void testAllClusterOnlineConfigChange() throws IOException {
replaceHBaseSiteXML();
admin.updateConfiguration().join();
// Check the configuration of the Masters
TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> {
Configuration conf = thread.getMaster().getConfiguration();
assertEquals(1000, conf.getInt("hbase.custom.config", 0));
});
// Check the configuration of the RegionServers
TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
Configuration conf = thread.getRegionServer().getConfiguration();
assertEquals(1000, conf.getInt("hbase.custom.config", 0));
});
restoreHBaseSiteXML();
}
private void replaceHBaseSiteXML() throws IOException {
// make a backup of hbase-site.xml
Files.copy(cnfPath, cnf3Path, StandardCopyOption.REPLACE_EXISTING);
// update hbase-site.xml by overwriting it
Files.copy(cnf2Path, cnfPath, StandardCopyOption.REPLACE_EXISTING);
}
private void restoreHBaseSiteXML() throws IOException {
// restore hbase-site.xml
Files.copy(cnf3Path, cnfPath, StandardCopyOption.REPLACE_EXISTING);
}
@Test
public void testGetRegionLoads() throws Exception {
// Turn off the balancer
admin.setBalancerOn(false).join();
TableName[] tables =
@ -57,6 +130,8 @@ public class TestAsyncClusterAdminApi extends TestAsyncAdminBase {
TableName.valueOf(tableName.getNameAsString() + "2"),
TableName.valueOf(tableName.getNameAsString() + "3") };
createAndLoadTable(tables);
// Sleep to wait region server report
Thread.sleep(TEST_UTIL.getConfiguration().getInt("hbase.regionserver.msginterval", 3 * 1000) * 2);
// Check if regions match with the regionLoad from the server
Collection<ServerName> servers = admin.getRegionServers().get();
for (ServerName serverName : servers) {

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Only used to test stopMaster/stopRegionServer/shutdown methods.
*/
@Category({ ClientTests.class, MediumTests.class })
public class TestAsyncClusterAdminApi2 extends TestAsyncAdminBase {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
// do nothing
}
@Before
public void setUp() throws Exception {
TEST_UTIL.startMiniCluster(2, 3);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
admin = ASYNC_CONN.getAdmin();
}
@After
public void tearDown() throws Exception {
IOUtils.closeQuietly(ASYNC_CONN);
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testStop() throws Exception {
HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
assertFalse(rs.isStopped());
admin.stopRegionServer(rs.getServerName()).join();
assertTrue(rs.isStopped());
HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
assertFalse(master.isStopped());
admin.stopMaster().join();
assertTrue(master.isStopped());
}
@Test
public void testShutdown() throws Exception {
TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> {
assertFalse(thread.getMaster().isStopped());
});
TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
assertFalse(thread.getRegionServer().isStopped());
});
admin.shutdown().join();
TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> {
while (!thread.getMaster().isStopped()) {
trySleep(100, TimeUnit.MILLISECONDS);
}
});
TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
while (!thread.getRegionServer().isStopped()) {
trySleep(100, TimeUnit.MILLISECONDS);
}
});
}
private void trySleep(long timeout, TimeUnit unit) {
try {
unit.sleep(timeout);
} catch (InterruptedException e) {
}
}
}

View File

@ -32,6 +32,8 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.HashMap;
import java.util.List;
@ -46,6 +48,7 @@ import static org.junit.Assert.assertTrue;
/**
* Class to test asynchronous procedure admin operations.
*/
@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class })
public class TestAsyncProcedureAdminApi extends TestAsyncAdminBase {