HBASE-25849 Backport HBASE-22738, HBASE-24760 & HBASE-25298 to branch-1 (#3581)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
caroliney14 2021-08-17 06:55:47 -07:00 committed by GitHub
parent 75844c8c39
commit ecac266633
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 370 additions and 82 deletions

View File

@ -21,7 +21,6 @@
package org.apache.hadoop.hbase.rsgroup;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -52,6 +51,7 @@ import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.ReflectionUtils;
/**
@ -81,6 +81,17 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
private RSGroupInfoManager infoManager;
private LoadBalancer internalBalancer;
/**
* Set this key to {@code true} to allow region fallback.
* Fallback to the default rsgroup first, then fallback to any group if no online servers in
* default rsgroup.
* Please keep balancer switch on at the same time, which is relied on to correct misplaced
* regions
*/
public static final String FALLBACK_GROUP_ENABLE_KEY = "hbase.rsgroup.fallback.enable";
private boolean fallbackEnabled = false;
//used during reflection by LoadBalancerFactory
@InterfaceAudience.Private
public RSGroupBasedLoadBalancer() {
@ -133,11 +144,16 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
}
Map<ServerName,List<HRegionInfo>> correctedState = correctAssignments(clusterState);
List<RegionPlan> regionPlans = new ArrayList<RegionPlan>();
List<RegionPlan> regionPlans = new ArrayList<>();
List<HRegionInfo> misplacedRegions = correctedState.get(LoadBalancer.BOGUS_SERVER_NAME);
for (HRegionInfo regionInfo : misplacedRegions) {
regionPlans.add(new RegionPlan(regionInfo, null, null));
if (fallbackEnabled) {
regionPlans.add(new RegionPlan(regionInfo, findServerForRegion(clusterState, regionInfo),
null));
} else {
regionPlans.add(new RegionPlan(regionInfo, null, null));
}
}
try {
// Record which region servers have been processedso as to skip them after processed
@ -172,23 +188,19 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(
List<HRegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
Map<ServerName, List<HRegionInfo>> assignments = Maps.newHashMap();
ListMultimap<String,HRegionInfo> regionMap = ArrayListMultimap.create();
ListMultimap<String,ServerName> serverMap = ArrayListMultimap.create();
generateGroupMaps(regions, servers, regionMap, serverMap);
for(String groupKey : regionMap.keySet()) {
if (regionMap.get(groupKey).size() > 0) {
Map<ServerName, List<HRegionInfo>> result =
this.internalBalancer.roundRobinAssignment(
regionMap.get(groupKey),
serverMap.get(groupKey));
if(result != null) {
if(result.containsKey(LoadBalancer.BOGUS_SERVER_NAME) &&
assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)){
assignments.get(LoadBalancer.BOGUS_SERVER_NAME).addAll(
result.get(LoadBalancer.BOGUS_SERVER_NAME));
} else {
assignments.putAll(result);
List<Pair<List<HRegionInfo>, List<ServerName>>> pairs =
generateGroupAssignments(regions, servers);
for (Pair<List<HRegionInfo>, List<ServerName>> pair : pairs) {
Map<ServerName, List<HRegionInfo>> result = this.internalBalancer
.roundRobinAssignment(pair.getFirst(), pair.getSecond());
if (result != null) {
for (Map.Entry<ServerName, List<HRegionInfo>> entry : result.entrySet()) {
ServerName serverName = entry.getKey();
List<HRegionInfo> regionInfos = entry.getValue();
if (!assignments.containsKey(serverName)) {
assignments.put(serverName, Lists.<HRegionInfo>newArrayList());
}
assignments.get(serverName).addAll(regionInfos);
}
}
}
@ -199,56 +211,24 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
public Map<ServerName, List<HRegionInfo>> retainAssignment(
Map<HRegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException {
try {
Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
ListMultimap<String, HRegionInfo> groupToRegion = ArrayListMultimap.create();
Set<HRegionInfo> misplacedRegions = getMisplacedRegions(regions);
for (HRegionInfo region : regions.keySet()) {
if (!misplacedRegions.contains(region)) {
String groupName = infoManager.getRSGroupOfTable(region.getTable());
if (groupName == null) {
LOG.debug("Group not found for table " + region.getTable() + ", using default");
groupName = RSGroupInfo.DEFAULT_GROUP;
Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<>();
List<Pair<List<HRegionInfo>, List<ServerName>>> pairs =
generateGroupAssignments(Lists.newArrayList(regions.keySet()), servers);
for (Pair<List<HRegionInfo>, List<ServerName>> pair : pairs) {
List<HRegionInfo> regionList = pair.getFirst();
Map<HRegionInfo, ServerName> currentAssignmentMap = Maps.newTreeMap();
for (HRegionInfo regionInfo: regionList) {
currentAssignmentMap.put(regionInfo, regions.get(regionInfo));
}
Map<ServerName, List<HRegionInfo>> pairResult =
this.internalBalancer.retainAssignment(currentAssignmentMap, pair.getSecond());
for (Map.Entry<ServerName, List<HRegionInfo>> entry : pairResult.entrySet()) {
ServerName serverName = entry.getKey();
List<HRegionInfo> regionInfos = entry.getValue();
if (!assignments.containsKey(serverName)) {
assignments.put(serverName, Lists.<HRegionInfo>newArrayList());
}
groupToRegion.put(groupName, region);
}
}
// Now the "groupToRegion" map has only the regions which have correct
// assignments.
for (String key : groupToRegion.keySet()) {
Map<HRegionInfo, ServerName> currentAssignmentMap = new TreeMap<HRegionInfo, ServerName>();
List<HRegionInfo> regionList = groupToRegion.get(key);
RSGroupInfo info = infoManager.getRSGroup(key);
List<ServerName> candidateList = filterOfflineServers(info, servers);
for (HRegionInfo region : regionList) {
currentAssignmentMap.put(region, regions.get(region));
}
if(candidateList.size() > 0) {
assignments.putAll(this.internalBalancer.retainAssignment(
currentAssignmentMap, candidateList));
}
}
for (HRegionInfo region : misplacedRegions) {
String groupName = infoManager.getRSGroupOfTable(region.getTable());
if (groupName == null) {
LOG.debug("Group not found for table " + region.getTable() + ", using default");
groupName = RSGroupInfo.DEFAULT_GROUP;
}
RSGroupInfo info = infoManager.getRSGroup(groupName);
List<ServerName> candidateList = filterOfflineServers(info, servers);
ServerName server = this.internalBalancer.randomAssignment(region,
candidateList);
if (server != null) {
if (!assignments.containsKey(server)) {
assignments.put(server, new ArrayList<HRegionInfo>());
}
assignments.get(server).add(region);
} else {
//if not server is available assign to bogus so it ends up in RIT
if(!assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
assignments.put(LoadBalancer.BOGUS_SERVER_NAME, new ArrayList<HRegionInfo>());
}
assignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region);
assignments.get(serverName).addAll(regionInfos);
}
}
return assignments;
@ -266,19 +246,17 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
@Override
public ServerName randomAssignment(HRegionInfo region,
List<ServerName> servers) throws HBaseIOException {
ListMultimap<String,HRegionInfo> regionMap = LinkedListMultimap.create();
ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create();
generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap);
List<ServerName> filteredServers = serverMap.get(regionMap.keySet().iterator().next());
List<Pair<List<HRegionInfo>, List<ServerName>>> pairs =
generateGroupAssignments(Lists.newArrayList(region), servers);
List<ServerName> filteredServers = pairs.iterator().next().getSecond();
return this.internalBalancer.randomAssignment(region, filteredServers);
}
private void generateGroupMaps(
List<HRegionInfo> regions,
List<ServerName> servers,
ListMultimap<String, HRegionInfo> regionMap,
ListMultimap<String, ServerName> serverMap) throws HBaseIOException {
private List<Pair<List<HRegionInfo>, List<ServerName>>> generateGroupAssignments(
List<HRegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
try {
ListMultimap<String, HRegionInfo> regionMap = ArrayListMultimap.create();
ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create();
for (HRegionInfo region : regions) {
String groupName = infoManager.getRSGroupOfTable(region.getTable());
if (groupName == null) {
@ -290,12 +268,29 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
for (String groupKey : regionMap.keySet()) {
RSGroupInfo info = infoManager.getRSGroup(groupKey);
serverMap.putAll(groupKey, filterOfflineServers(info, servers));
if(serverMap.get(groupKey).size() < 1) {
serverMap.put(groupKey, LoadBalancer.BOGUS_SERVER_NAME);
}
List<Pair<List<HRegionInfo>, List<ServerName>>> result = Lists.newArrayList();
List<HRegionInfo> fallbackRegions = Lists.newArrayList();
for (String groupKey : regionMap.keySet()) {
if (serverMap.get(groupKey).isEmpty()) {
fallbackRegions.addAll(regionMap.get(groupKey));
} else {
result.add(Pair.newPair(regionMap.get(groupKey), serverMap.get(groupKey)));
}
}
if (!fallbackRegions.isEmpty()) {
List<ServerName> candidates = null;
if (isFallbackEnabled()) {
candidates = getFallBackCandidates(servers);
}
candidates = (candidates == null || candidates.isEmpty()) ?
Lists.newArrayList(BOGUS_SERVER_NAME) : candidates;
result.add(Pair.newPair(fallbackRegions, candidates));
}
return result;
} catch(IOException e) {
throw new HBaseIOException("Failed to generate group maps", e);
throw new HBaseIOException("Failed to generate group assignments", e);
}
}
@ -368,6 +363,18 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
return misplacedRegions;
}
private ServerName findServerForRegion(
Map<ServerName, List<HRegionInfo>> existingAssignments, HRegionInfo region) {
for (Map.Entry<ServerName, List<HRegionInfo>> entry : existingAssignments.entrySet()) {
if (entry.getValue().contains(region)) {
return entry.getKey();
}
}
throw new IllegalStateException("Could not find server for region "
+ region.getShortNameToLog());
}
private Map<ServerName, List<HRegionInfo>> correctAssignments(
Map<ServerName, List<HRegionInfo>> existingAssignments) {
Map<ServerName, List<HRegionInfo>> correctAssignments =
@ -434,12 +441,18 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
internalBalancer.setMasterServices(masterServices);
internalBalancer.setConf(config);
internalBalancer.initialize();
// init fallback groups
this.fallbackEnabled = config.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false);
}
public boolean isOnline() {
return infoManager != null && infoManager.isOnline();
}
public boolean isFallbackEnabled() {
return fallbackEnabled;
}
@Override
public void regionOnline(HRegionInfo regionInfo, ServerName sn) {
}
@ -450,6 +463,12 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
@Override
public void onConfigurationChange(Configuration conf) {
boolean newFallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false);
if (fallbackEnabled != newFallbackEnabled) {
LOG.info("Changing the value of " + FALLBACK_GROUP_ENABLE_KEY + " from " + fallbackEnabled
+ " to " + newFallbackEnabled);
fallbackEnabled = newFallbackEnabled;
}
internalBalancer.onConfigurationChange(conf);
}
@ -470,4 +489,15 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
public void updateBalancerStatus(boolean status) {
internalBalancer.updateBalancerStatus(status);
}
private List<ServerName> getFallBackCandidates(List<ServerName> servers) {
List<ServerName> serverNames = null;
try {
RSGroupInfo info = infoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
serverNames = filterOfflineServers(info, servers);
} catch (IOException e) {
LOG.error("Failed to get default rsgroup info to fallback", e);
}
return serverNames == null || serverNames.isEmpty() ? servers : serverNames;
}
}

View File

@ -503,12 +503,13 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
}
private synchronized void updateDefaultServers(
Set<Address> server) throws IOException {
Set<Address> server) {
RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP);
RSGroupInfo newInfo = new RSGroupInfo(info.getName(), server, info.getTables());
HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
newGroupMap.put(newInfo.getName(), newInfo);
flushConfig(newGroupMap);
// do not need to persist, as we do not persist default group.
rsGroupMap = Collections.unmodifiableMap(newGroupMap);
}
@Override

View File

@ -186,4 +186,21 @@ public class TestRSGroupBasedLoadBalancer extends RSGroupableBalancerTestBase {
.roundRobinAssignment(regions, onlineServers);
assertEquals(bogusRegion, assignments.get(LoadBalancer.BOGUS_SERVER_NAME).size());
}
@Test
public void testOnConfigurationChange() {
// fallbackEnabled default is false
assertFalse(loadBalancer.isFallbackEnabled());
// change FALLBACK_GROUP_ENABLE_KEY from false to true
Configuration conf = loadBalancer.getConf();
conf.setBoolean(RSGroupBasedLoadBalancer.FALLBACK_GROUP_ENABLE_KEY, true);
loadBalancer.onConfigurationChange(conf);
assertTrue(loadBalancer.isFallbackEnabled());
// restore
conf.setBoolean(RSGroupBasedLoadBalancer.FALLBACK_GROUP_ENABLE_KEY, false);
loadBalancer.onConfigurationChange(conf);
assertFalse(loadBalancer.isFallbackEnabled());
}
}

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
public abstract class TestRSGroupsBase {
protected static final Log LOG = LogFactory.getLog(TestRSGroupsBase.class);
@ -102,6 +103,7 @@ public abstract class TestRSGroupsBase {
admin = TEST_UTIL.getHBaseAdmin();
cluster = TEST_UTIL.getHBaseCluster();
master = ((MiniHBaseCluster)cluster).getMaster();
master.balanceSwitch(true);
//wait for balancer to come online
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@ -277,6 +279,21 @@ public abstract class TestRSGroupsBase {
return groupPrefix+"_"+baseName+"_"+rand.nextInt(Integer.MAX_VALUE);
}
/**
* The server name in group does not contain the start code, this method will find out the start
* code and construct the ServerName object.
*/
protected final ServerName getServerName(Address addr) {
for (JVMClusterUtil.RegionServerThread rsThread:
TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
ServerName sn = rsThread.getRegionServer().getServerName();
if (sn.getAddress().equals(addr)) {
return sn;
}
}
return null;
}
public static class CPMasterObserver extends BaseMasterObserver {
boolean preBalanceRSGroupCalled = false;
boolean postBalanceRSGroupCalled = false;

View File

@ -0,0 +1,194 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.rsgroup;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Collections;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ MediumTests.class })
public class TestRSGroupsFallback extends TestRSGroupsBase {
protected static final Logger LOG = LoggerFactory.getLogger(TestRSGroupsFallback.class);
private static final String FALLBACK_GROUP = "fallback";
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
TEST_UTIL.getConfiguration().setBoolean(RSGroupBasedLoadBalancer.FALLBACK_GROUP_ENABLE_KEY,
true);
TEST_UTIL.getConfiguration().setFloat(
"hbase.master.balancer.stochastic.tableSkewCost", 6000);
TEST_UTIL.getConfiguration().set(
HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
RSGroupBasedLoadBalancer.class.getName());
TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
RSGroupAdminEndpoint.class.getName() + "," + CPMasterObserver.class.getName());
TEST_UTIL.getConfiguration().setBoolean(
HConstants.ZOOKEEPER_USEMULTI,
true);
TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE - 1);
TEST_UTIL.getConfiguration().setInt(
ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
NUM_SLAVES_BASE - 1);
TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
initialize();
master.balanceSwitch(true);
}
@AfterClass
public static void tearDown() throws Exception {
tearDownAfterClass();
}
@Before
public void beforeMethod() throws Exception {
setUpBeforeMethod();
}
@After
public void afterMethod() throws Exception {
tearDownAfterMethod();
}
@Test
public void testFallback() throws Exception {
// add fallback group
addGroup(rsGroupAdmin, FALLBACK_GROUP, 1);
// add test group
String groupName = "appInfo";
RSGroupInfo appInfo = addGroup(rsGroupAdmin, groupName, 1);
final TableName tableName = TableName.valueOf(tablePrefix + "_ns", "_testFallback");
admin.createNamespace(
NamespaceDescriptor.create(tableName.getNamespaceAsString())
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, appInfo.getName()).build());
final HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor("f"));
admin.createTable(desc);
//wait for created table to be assigned
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return getTableRegionMap().get(desc.getTableName()) != null;
}
});
TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
// server of test group crash, regions move to default group
crashRsInGroup(groupName);
assertRegionsInGroup(tableName, RSGroupInfo.DEFAULT_GROUP);
// server of default group crash, regions move to any other group
crashRsInGroup(RSGroupInfo.DEFAULT_GROUP);
assertRegionsInGroup(tableName, FALLBACK_GROUP);
// add a new server to default group, regions move to default group
JVMClusterUtil.RegionServerThread t =
TEST_UTIL.getMiniHBaseCluster().startRegionServerAndWait(60000);
assertTrue(master.balance());
assertRegionsInGroup(tableName, RSGroupInfo.DEFAULT_GROUP);
// add a new server to test group, regions move back
JVMClusterUtil.RegionServerThread t1 =
TEST_UTIL.getMiniHBaseCluster().startRegionServerAndWait(60000);
rsGroupAdmin.moveServers(Collections.singleton(t.getRegionServer().getServerName()
.getAddress()), groupName);
assertTrue(master.balance());
assertRegionsInGroup(tableName, groupName);
TEST_UTIL.getMiniHBaseCluster().killRegionServer(t.getRegionServer().getServerName());
TEST_UTIL.getMiniHBaseCluster().killRegionServer(t1.getRegionServer().getServerName());
TEST_UTIL.deleteTable(tableName);
}
private void assertRegionsInGroup(TableName table, String group) throws IOException {
ProcedureExecutor<MasterProcedureEnv> procExecutor = TEST_UTIL.getMiniHBaseCluster()
.getMaster().getMasterProcedureExecutor();
for (ProcedureInfo procInfo: procExecutor.listProcedures()) {
LOG.debug("Waiting for " + procInfo.getProcName() + " " + procInfo.toString());
waitProcedure(procExecutor, procInfo, 10000);
}
RSGroupInfo rsGroup = rsGroupAdmin.getRSGroupInfo(group);
for (HRegionInfo region: master.getAssignmentManager().getRegionStates()
.getRegionsOfTable(table)) {
Address regionOnServer = master.getAssignmentManager().getRegionStates()
.getRegionAssignments().get(region).getAddress();
assertTrue(rsGroup.getServers().contains(regionOnServer));
}
}
private void crashRsInGroup(String groupName) throws Exception {
for (Address server : rsGroupAdmin.getRSGroupInfo(groupName).getServers()) {
final ServerName sn = getServerName(server);
TEST_UTIL.getMiniHBaseCluster().killRegionServer(sn);
TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() {
return master.getServerManager().isServerDead(sn);
}
});
}
Threads.sleep(1000);
TEST_UTIL.waitUntilNoRegionsInTransition(60000);
}
private void waitProcedure(ProcedureExecutor<MasterProcedureEnv> procExecutor,
ProcedureInfo procInfo, long timeout) {
long start = EnvironmentEdgeManager.currentTime();
while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
if (procInfo.getProcState() == ProcedureProtos.ProcedureState.INITIALIZING ||
(procExecutor.isRunning() && !procExecutor.isFinished(procInfo.getProcId()))) {
Threads.sleep(1000);
} else {
break;
}
}
}
}

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
@ -393,6 +394,34 @@ public class MiniHBaseCluster extends HBaseCluster {
return t;
}
/**
* Starts a region server thread and waits until its processed by master. Throws an exception
* when it can't start a region server or when the region server is not processed by master
* within the timeout.
*
* @return New RegionServerThread
*/
public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout)
throws IOException {
JVMClusterUtil.RegionServerThread t = startRegionServer();
ServerName rsServerName = t.getRegionServer().getServerName();
long start = EnvironmentEdgeManager.currentTime();
while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
ClusterStatus clusterStatus = getMaster().getClusterStatus();
if (clusterStatus != null && clusterStatus.getLiveServersLoad().containsKey(rsServerName)) {
return t;
}
Threads.sleep(100);
}
if (t.getRegionServer().isOnline()) {
throw new IOException("RS: " + rsServerName + " online, but not processed by master");
} else {
throw new IOException("RS: " + rsServerName + " is offline");
}
}
/**
* Cause a region server to exit doing basic clean up only on its way out.
* @param serverNumber Used as index into a list.