diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java index a408833f79e..23c75cc091a 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.rsgroup; import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -52,6 +51,7 @@ import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer; import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.ReflectionUtils; /** @@ -81,6 +81,17 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc private RSGroupInfoManager infoManager; private LoadBalancer internalBalancer; + /** + * Set this key to {@code true} to allow region fallback. + * Fallback to the default rsgroup first, then fallback to any group if no online servers in + * default rsgroup. + * Please keep balancer switch on at the same time, which is relied on to correct misplaced + * regions + */ + public static final String FALLBACK_GROUP_ENABLE_KEY = "hbase.rsgroup.fallback.enable"; + + private boolean fallbackEnabled = false; + //used during reflection by LoadBalancerFactory @InterfaceAudience.Private public RSGroupBasedLoadBalancer() { @@ -133,11 +144,16 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc } Map> correctedState = correctAssignments(clusterState); - List regionPlans = new ArrayList(); + List regionPlans = new ArrayList<>(); List misplacedRegions = correctedState.get(LoadBalancer.BOGUS_SERVER_NAME); for (HRegionInfo regionInfo : misplacedRegions) { - regionPlans.add(new RegionPlan(regionInfo, null, null)); + if (fallbackEnabled) { + regionPlans.add(new RegionPlan(regionInfo, findServerForRegion(clusterState, regionInfo), + null)); + } else { + regionPlans.add(new RegionPlan(regionInfo, null, null)); + } } try { // Record which region servers have been processed,so as to skip them after processed @@ -172,23 +188,19 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc public Map> roundRobinAssignment( List regions, List servers) throws HBaseIOException { Map> assignments = Maps.newHashMap(); - ListMultimap regionMap = ArrayListMultimap.create(); - ListMultimap serverMap = ArrayListMultimap.create(); - generateGroupMaps(regions, servers, regionMap, serverMap); - for(String groupKey : regionMap.keySet()) { - if (regionMap.get(groupKey).size() > 0) { - Map> result = - this.internalBalancer.roundRobinAssignment( - regionMap.get(groupKey), - serverMap.get(groupKey)); - if(result != null) { - if(result.containsKey(LoadBalancer.BOGUS_SERVER_NAME) && - assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)){ - assignments.get(LoadBalancer.BOGUS_SERVER_NAME).addAll( - result.get(LoadBalancer.BOGUS_SERVER_NAME)); - } else { - assignments.putAll(result); + List, List>> pairs = + generateGroupAssignments(regions, servers); + for (Pair, List> pair : pairs) { + Map> result = this.internalBalancer + .roundRobinAssignment(pair.getFirst(), pair.getSecond()); + if (result != null) { + for (Map.Entry> entry : result.entrySet()) { + ServerName serverName = entry.getKey(); + List regionInfos = entry.getValue(); + if (!assignments.containsKey(serverName)) { + assignments.put(serverName, Lists.newArrayList()); } + assignments.get(serverName).addAll(regionInfos); } } } @@ -199,56 +211,24 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc public Map> retainAssignment( Map regions, List servers) throws HBaseIOException { try { - Map> assignments = new TreeMap>(); - ListMultimap groupToRegion = ArrayListMultimap.create(); - Set misplacedRegions = getMisplacedRegions(regions); - for (HRegionInfo region : regions.keySet()) { - if (!misplacedRegions.contains(region)) { - String groupName = infoManager.getRSGroupOfTable(region.getTable()); - if (groupName == null) { - LOG.debug("Group not found for table " + region.getTable() + ", using default"); - groupName = RSGroupInfo.DEFAULT_GROUP; + Map> assignments = new TreeMap<>(); + List, List>> pairs = + generateGroupAssignments(Lists.newArrayList(regions.keySet()), servers); + for (Pair, List> pair : pairs) { + List regionList = pair.getFirst(); + Map currentAssignmentMap = Maps.newTreeMap(); + for (HRegionInfo regionInfo: regionList) { + currentAssignmentMap.put(regionInfo, regions.get(regionInfo)); + } + Map> pairResult = + this.internalBalancer.retainAssignment(currentAssignmentMap, pair.getSecond()); + for (Map.Entry> entry : pairResult.entrySet()) { + ServerName serverName = entry.getKey(); + List regionInfos = entry.getValue(); + if (!assignments.containsKey(serverName)) { + assignments.put(serverName, Lists.newArrayList()); } - groupToRegion.put(groupName, region); - } - } - // Now the "groupToRegion" map has only the regions which have correct - // assignments. - for (String key : groupToRegion.keySet()) { - Map currentAssignmentMap = new TreeMap(); - List regionList = groupToRegion.get(key); - RSGroupInfo info = infoManager.getRSGroup(key); - List candidateList = filterOfflineServers(info, servers); - for (HRegionInfo region : regionList) { - currentAssignmentMap.put(region, regions.get(region)); - } - if(candidateList.size() > 0) { - assignments.putAll(this.internalBalancer.retainAssignment( - currentAssignmentMap, candidateList)); - } - } - - for (HRegionInfo region : misplacedRegions) { - String groupName = infoManager.getRSGroupOfTable(region.getTable()); - if (groupName == null) { - LOG.debug("Group not found for table " + region.getTable() + ", using default"); - groupName = RSGroupInfo.DEFAULT_GROUP; - } - RSGroupInfo info = infoManager.getRSGroup(groupName); - List candidateList = filterOfflineServers(info, servers); - ServerName server = this.internalBalancer.randomAssignment(region, - candidateList); - if (server != null) { - if (!assignments.containsKey(server)) { - assignments.put(server, new ArrayList()); - } - assignments.get(server).add(region); - } else { - //if not server is available assign to bogus so it ends up in RIT - if(!assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) { - assignments.put(LoadBalancer.BOGUS_SERVER_NAME, new ArrayList()); - } - assignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region); + assignments.get(serverName).addAll(regionInfos); } } return assignments; @@ -266,19 +246,17 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc @Override public ServerName randomAssignment(HRegionInfo region, List servers) throws HBaseIOException { - ListMultimap regionMap = LinkedListMultimap.create(); - ListMultimap serverMap = LinkedListMultimap.create(); - generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap); - List filteredServers = serverMap.get(regionMap.keySet().iterator().next()); + List, List>> pairs = + generateGroupAssignments(Lists.newArrayList(region), servers); + List filteredServers = pairs.iterator().next().getSecond(); return this.internalBalancer.randomAssignment(region, filteredServers); } - private void generateGroupMaps( - List regions, - List servers, - ListMultimap regionMap, - ListMultimap serverMap) throws HBaseIOException { + private List, List>> generateGroupAssignments( + List regions, List servers) throws HBaseIOException { try { + ListMultimap regionMap = ArrayListMultimap.create(); + ListMultimap serverMap = ArrayListMultimap.create(); for (HRegionInfo region : regions) { String groupName = infoManager.getRSGroupOfTable(region.getTable()); if (groupName == null) { @@ -290,12 +268,29 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc for (String groupKey : regionMap.keySet()) { RSGroupInfo info = infoManager.getRSGroup(groupKey); serverMap.putAll(groupKey, filterOfflineServers(info, servers)); - if(serverMap.get(groupKey).size() < 1) { - serverMap.put(groupKey, LoadBalancer.BOGUS_SERVER_NAME); + } + + List, List>> result = Lists.newArrayList(); + List fallbackRegions = Lists.newArrayList(); + for (String groupKey : regionMap.keySet()) { + if (serverMap.get(groupKey).isEmpty()) { + fallbackRegions.addAll(regionMap.get(groupKey)); + } else { + result.add(Pair.newPair(regionMap.get(groupKey), serverMap.get(groupKey))); } } + if (!fallbackRegions.isEmpty()) { + List candidates = null; + if (isFallbackEnabled()) { + candidates = getFallBackCandidates(servers); + } + candidates = (candidates == null || candidates.isEmpty()) ? + Lists.newArrayList(BOGUS_SERVER_NAME) : candidates; + result.add(Pair.newPair(fallbackRegions, candidates)); + } + return result; } catch(IOException e) { - throw new HBaseIOException("Failed to generate group maps", e); + throw new HBaseIOException("Failed to generate group assignments", e); } } @@ -368,6 +363,18 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc return misplacedRegions; } + private ServerName findServerForRegion( + Map> existingAssignments, HRegionInfo region) { + for (Map.Entry> entry : existingAssignments.entrySet()) { + if (entry.getValue().contains(region)) { + return entry.getKey(); + } + } + + throw new IllegalStateException("Could not find server for region " + + region.getShortNameToLog()); + } + private Map> correctAssignments( Map> existingAssignments) { Map> correctAssignments = @@ -434,12 +441,18 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc internalBalancer.setMasterServices(masterServices); internalBalancer.setConf(config); internalBalancer.initialize(); + // init fallback groups + this.fallbackEnabled = config.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false); } public boolean isOnline() { return infoManager != null && infoManager.isOnline(); } + public boolean isFallbackEnabled() { + return fallbackEnabled; + } + @Override public void regionOnline(HRegionInfo regionInfo, ServerName sn) { } @@ -450,6 +463,12 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc @Override public void onConfigurationChange(Configuration conf) { + boolean newFallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false); + if (fallbackEnabled != newFallbackEnabled) { + LOG.info("Changing the value of " + FALLBACK_GROUP_ENABLE_KEY + " from " + fallbackEnabled + + " to " + newFallbackEnabled); + fallbackEnabled = newFallbackEnabled; + } internalBalancer.onConfigurationChange(conf); } @@ -470,4 +489,15 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc public void updateBalancerStatus(boolean status) { internalBalancer.updateBalancerStatus(status); } + + private List getFallBackCandidates(List servers) { + List serverNames = null; + try { + RSGroupInfo info = infoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP); + serverNames = filterOfflineServers(info, servers); + } catch (IOException e) { + LOG.error("Failed to get default rsgroup info to fallback", e); + } + return serverNames == null || serverNames.isEmpty() ? servers : serverNames; + } } diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java index 6799e69fb9f..ce108a25221 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java @@ -503,12 +503,13 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene } private synchronized void updateDefaultServers( - Set
server) throws IOException { + Set
server) { RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP); RSGroupInfo newInfo = new RSGroupInfo(info.getName(), server, info.getTables()); HashMap newGroupMap = Maps.newHashMap(rsGroupMap); newGroupMap.put(newInfo.getName(), newInfo); - flushConfig(newGroupMap); + // do not need to persist, as we do not persist default group. + rsGroupMap = Collections.unmodifiableMap(newGroupMap); } @Override diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java index 6170cc1c857..6cbdc88c4cc 100644 --- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java @@ -186,4 +186,21 @@ public class TestRSGroupBasedLoadBalancer extends RSGroupableBalancerTestBase { .roundRobinAssignment(regions, onlineServers); assertEquals(bogusRegion, assignments.get(LoadBalancer.BOGUS_SERVER_NAME).size()); } + + @Test + public void testOnConfigurationChange() { + // fallbackEnabled default is false + assertFalse(loadBalancer.isFallbackEnabled()); + + // change FALLBACK_GROUP_ENABLE_KEY from false to true + Configuration conf = loadBalancer.getConf(); + conf.setBoolean(RSGroupBasedLoadBalancer.FALLBACK_GROUP_ENABLE_KEY, true); + loadBalancer.onConfigurationChange(conf); + assertTrue(loadBalancer.isFallbackEnabled()); + + // restore + conf.setBoolean(RSGroupBasedLoadBalancer.FALLBACK_GROUP_ENABLE_KEY, false); + loadBalancer.onConfigurationChange(conf); + assertFalse(loadBalancer.isFallbackEnabled()); + } } diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java index 3d19df008d3..1b30c4fde5f 100644 --- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; public abstract class TestRSGroupsBase { protected static final Log LOG = LogFactory.getLog(TestRSGroupsBase.class); @@ -102,6 +103,7 @@ public abstract class TestRSGroupsBase { admin = TEST_UTIL.getHBaseAdmin(); cluster = TEST_UTIL.getHBaseCluster(); master = ((MiniHBaseCluster)cluster).getMaster(); + master.balanceSwitch(true); //wait for balancer to come online TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate() { @@ -277,6 +279,21 @@ public abstract class TestRSGroupsBase { return groupPrefix+"_"+baseName+"_"+rand.nextInt(Integer.MAX_VALUE); } + /** + * The server name in group does not contain the start code, this method will find out the start + * code and construct the ServerName object. + */ + protected final ServerName getServerName(Address addr) { + for (JVMClusterUtil.RegionServerThread rsThread: + TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) { + ServerName sn = rsThread.getRegionServer().getServerName(); + if (sn.getAddress().equals(addr)) { + return sn; + } + } + return null; + } + public static class CPMasterObserver extends BaseMasterObserver { boolean preBalanceRSGroupCalled = false; boolean postBalanceRSGroupCalled = false; diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsFallback.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsFallback.java new file mode 100644 index 00000000000..2d4d72fe375 --- /dev/null +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsFallback.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.rsgroup; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collections; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.ProcedureInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ MediumTests.class }) +public class TestRSGroupsFallback extends TestRSGroupsBase { + protected static final Logger LOG = LoggerFactory.getLogger(TestRSGroupsFallback.class); + + private static final String FALLBACK_GROUP = "fallback"; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + TEST_UTIL.getConfiguration().setBoolean(RSGroupBasedLoadBalancer.FALLBACK_GROUP_ENABLE_KEY, + true); + TEST_UTIL.getConfiguration().setFloat( + "hbase.master.balancer.stochastic.tableSkewCost", 6000); + TEST_UTIL.getConfiguration().set( + HConstants.HBASE_MASTER_LOADBALANCER_CLASS, + RSGroupBasedLoadBalancer.class.getName()); + TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + RSGroupAdminEndpoint.class.getName() + "," + CPMasterObserver.class.getName()); + TEST_UTIL.getConfiguration().setBoolean( + HConstants.ZOOKEEPER_USEMULTI, + true); + TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE - 1); + TEST_UTIL.getConfiguration().setInt( + ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, + NUM_SLAVES_BASE - 1); + TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + initialize(); + master.balanceSwitch(true); + } + + @AfterClass + public static void tearDown() throws Exception { + tearDownAfterClass(); + } + + @Before + public void beforeMethod() throws Exception { + setUpBeforeMethod(); + } + + @After + public void afterMethod() throws Exception { + tearDownAfterMethod(); + } + + @Test + public void testFallback() throws Exception { + // add fallback group + addGroup(rsGroupAdmin, FALLBACK_GROUP, 1); + // add test group + String groupName = "appInfo"; + RSGroupInfo appInfo = addGroup(rsGroupAdmin, groupName, 1); + final TableName tableName = TableName.valueOf(tablePrefix + "_ns", "_testFallback"); + admin.createNamespace( + NamespaceDescriptor.create(tableName.getNamespaceAsString()) + .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, appInfo.getName()).build()); + final HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor("f")); + admin.createTable(desc); + //wait for created table to be assigned + TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return getTableRegionMap().get(desc.getTableName()) != null; + } + }); + TEST_UTIL.waitUntilAllRegionsAssigned(tableName); + + // server of test group crash, regions move to default group + crashRsInGroup(groupName); + assertRegionsInGroup(tableName, RSGroupInfo.DEFAULT_GROUP); + + // server of default group crash, regions move to any other group + crashRsInGroup(RSGroupInfo.DEFAULT_GROUP); + assertRegionsInGroup(tableName, FALLBACK_GROUP); + + // add a new server to default group, regions move to default group + JVMClusterUtil.RegionServerThread t = + TEST_UTIL.getMiniHBaseCluster().startRegionServerAndWait(60000); + assertTrue(master.balance()); + assertRegionsInGroup(tableName, RSGroupInfo.DEFAULT_GROUP); + + // add a new server to test group, regions move back + JVMClusterUtil.RegionServerThread t1 = + TEST_UTIL.getMiniHBaseCluster().startRegionServerAndWait(60000); + rsGroupAdmin.moveServers(Collections.singleton(t.getRegionServer().getServerName() + .getAddress()), groupName); + assertTrue(master.balance()); + assertRegionsInGroup(tableName, groupName); + + TEST_UTIL.getMiniHBaseCluster().killRegionServer(t.getRegionServer().getServerName()); + TEST_UTIL.getMiniHBaseCluster().killRegionServer(t1.getRegionServer().getServerName()); + + TEST_UTIL.deleteTable(tableName); + } + + private void assertRegionsInGroup(TableName table, String group) throws IOException { + ProcedureExecutor procExecutor = TEST_UTIL.getMiniHBaseCluster() + .getMaster().getMasterProcedureExecutor(); + for (ProcedureInfo procInfo: procExecutor.listProcedures()) { + LOG.debug("Waiting for " + procInfo.getProcName() + " " + procInfo.toString()); + waitProcedure(procExecutor, procInfo, 10000); + } + RSGroupInfo rsGroup = rsGroupAdmin.getRSGroupInfo(group); + for (HRegionInfo region: master.getAssignmentManager().getRegionStates() + .getRegionsOfTable(table)) { + Address regionOnServer = master.getAssignmentManager().getRegionStates() + .getRegionAssignments().get(region).getAddress(); + assertTrue(rsGroup.getServers().contains(regionOnServer)); + } + } + + private void crashRsInGroup(String groupName) throws Exception { + for (Address server : rsGroupAdmin.getRSGroupInfo(groupName).getServers()) { + final ServerName sn = getServerName(server); + TEST_UTIL.getMiniHBaseCluster().killRegionServer(sn); + TEST_UTIL.waitFor(60000, new Waiter.Predicate() { + @Override + public boolean evaluate() { + return master.getServerManager().isServerDead(sn); + } + }); + } + Threads.sleep(1000); + TEST_UTIL.waitUntilNoRegionsInTransition(60000); + } + + private void waitProcedure(ProcedureExecutor procExecutor, + ProcedureInfo procInfo, long timeout) { + long start = EnvironmentEdgeManager.currentTime(); + while ((EnvironmentEdgeManager.currentTime() - start) < timeout) { + if (procInfo.getProcState() == ProcedureProtos.ProcedureState.INITIALIZING || + (procExecutor.isRunning() && !procExecutor.isFinished(procInfo.getProcId()))) { + Threads.sleep(1000); + } else { + break; + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index 714e72b41ed..d14024164dd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.test.MetricsAssertHelper; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; @@ -393,6 +394,34 @@ public class MiniHBaseCluster extends HBaseCluster { return t; } + /** + * Starts a region server thread and waits until its processed by master. Throws an exception + * when it can't start a region server or when the region server is not processed by master + * within the timeout. + * + * @return New RegionServerThread + */ + public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout) + throws IOException { + + JVMClusterUtil.RegionServerThread t = startRegionServer(); + ServerName rsServerName = t.getRegionServer().getServerName(); + + long start = EnvironmentEdgeManager.currentTime(); + while ((EnvironmentEdgeManager.currentTime() - start) < timeout) { + ClusterStatus clusterStatus = getMaster().getClusterStatus(); + if (clusterStatus != null && clusterStatus.getLiveServersLoad().containsKey(rsServerName)) { + return t; + } + Threads.sleep(100); + } + if (t.getRegionServer().isOnline()) { + throw new IOException("RS: " + rsServerName + " online, but not processed by master"); + } else { + throw new IOException("RS: " + rsServerName + " is offline"); + } + } + /** * Cause a region server to exit doing basic clean up only on its way out. * @param serverNumber Used as index into a list.