HBASE-20368 Fix RIT stuck when a rsgroup has no online servers but AM's pendingAssginQueue is cleared (#354)
This commit is contained in:
parent
72e58a8fed
commit
9d5e5adaf0
|
@ -140,7 +140,8 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
}
|
||||
}
|
||||
for (RegionStateNode state : master.getAssignmentManager().getRegionsInTransition()) {
|
||||
if (state.getRegionLocation().getAddress().equals(server)) {
|
||||
if (state.getRegionLocation() != null &&
|
||||
state.getRegionLocation().getAddress().equals(server)) {
|
||||
addRegion(regions, state.getRegionInfo());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -226,6 +226,13 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
|||
if(candidateList.size() > 0) {
|
||||
assignments.putAll(this.internalBalancer.retainAssignment(
|
||||
currentAssignmentMap, candidateList));
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("No available servers to assign regions: {}",
|
||||
RegionInfo.getShortNameToLog(regionList));
|
||||
}
|
||||
assignments.computeIfAbsent(LoadBalancer.BOGUS_SERVER_NAME, s -> new ArrayList<>())
|
||||
.addAll(regionList);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -240,16 +247,10 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
|||
ServerName server = this.internalBalancer.randomAssignment(region,
|
||||
candidateList);
|
||||
if (server != null) {
|
||||
if (!assignments.containsKey(server)) {
|
||||
assignments.put(server, new ArrayList<>());
|
||||
}
|
||||
assignments.get(server).add(region);
|
||||
assignments.computeIfAbsent(server, s -> new ArrayList<>()).add(region);
|
||||
} else {
|
||||
//if not server is available assign to bogus so it ends up in RIT
|
||||
if(!assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
|
||||
assignments.put(LoadBalancer.BOGUS_SERVER_NAME, new ArrayList<>());
|
||||
}
|
||||
assignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region);
|
||||
assignments.computeIfAbsent(LoadBalancer.BOGUS_SERVER_NAME, s -> new ArrayList<>())
|
||||
.add(region);
|
||||
}
|
||||
}
|
||||
return assignments;
|
||||
|
|
|
@ -17,6 +17,14 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.rsgroup;
|
||||
|
||||
import static org.apache.hadoop.hbase.util.Threads.sleep;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
|
@ -24,13 +32,16 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -67,13 +78,12 @@ public class TestRSGroupsKillRS extends TestRSGroupsBase {
|
|||
|
||||
@After
|
||||
public void afterMethod() throws Exception {
|
||||
// tearDownAfterMethod();
|
||||
tearDownAfterMethod();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKillRS() throws Exception {
|
||||
RSGroupInfo appInfo = addGroup("appInfo", 1);
|
||||
|
||||
final TableName tableName = TableName.valueOf(tablePrefix + "_ns", name.getMethodName());
|
||||
admin.createNamespace(NamespaceDescriptor.create(tableName.getNamespaceAsString())
|
||||
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, appInfo.getName()).build());
|
||||
|
@ -89,7 +99,7 @@ public class TestRSGroupsKillRS extends TestRSGroupsBase {
|
|||
});
|
||||
|
||||
ServerName targetServer = getServerName(appInfo.getServers().iterator().next());
|
||||
Assert.assertEquals(1, admin.getRegions(targetServer).size());
|
||||
assertEquals(1, admin.getRegions(targetServer).size());
|
||||
|
||||
try {
|
||||
// stopping may cause an exception
|
||||
|
@ -131,7 +141,87 @@ public class TestRSGroupsKillRS extends TestRSGroupsBase {
|
|||
});
|
||||
|
||||
ServerName targetServer1 = getServerName(newServers.iterator().next());
|
||||
Assert.assertEquals(1, admin.getRegions(targetServer1).size());
|
||||
Assert.assertEquals(tableName, admin.getRegions(targetServer1).get(0).getTable());
|
||||
assertEquals(1, admin.getRegions(targetServer1).size());
|
||||
assertEquals(tableName, admin.getRegions(targetServer1).get(0).getTable());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKillAllRSInGroup() throws Exception {
|
||||
// create a rsgroup and move two regionservers to it
|
||||
String groupName = "my_group";
|
||||
int groupRSCount = 2;
|
||||
addGroup(groupName, groupRSCount);
|
||||
|
||||
// create a table, and move it to my_group
|
||||
Table t = TEST_UTIL.createMultiRegionTable(tableName, Bytes.toBytes("f"), 5);
|
||||
TEST_UTIL.loadTable(t, Bytes.toBytes("f"));
|
||||
Set<TableName> toAddTables = new HashSet<>();
|
||||
toAddTables.add(tableName);
|
||||
rsGroupAdmin.moveTables(toAddTables, groupName);
|
||||
assertTrue(rsGroupAdmin.getRSGroupInfo(groupName).getTables().contains(tableName));
|
||||
TEST_UTIL.waitTableAvailable(tableName, 30000);
|
||||
|
||||
// check my_group servers and table regions
|
||||
Set<Address> servers = rsGroupAdmin.getRSGroupInfo(groupName).getServers();
|
||||
assertEquals(2, servers.size());
|
||||
LOG.debug("group servers {}", servers);
|
||||
for (RegionInfo tr :
|
||||
master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName)) {
|
||||
assertTrue(servers.contains(
|
||||
master.getAssignmentManager().getRegionStates().getRegionAssignments()
|
||||
.get(tr).getAddress()));
|
||||
}
|
||||
|
||||
// Move a region, to ensure there exists a region whose 'lastHost' is in my_group
|
||||
// ('lastHost' of other regions are in 'default' group)
|
||||
// and check if all table regions are online
|
||||
List<ServerName> gsn = new ArrayList<>();
|
||||
for(Address addr : servers){
|
||||
gsn.add(getServerName(addr));
|
||||
}
|
||||
assertEquals(2, gsn.size());
|
||||
for(Map.Entry<RegionInfo, ServerName> entry :
|
||||
master.getAssignmentManager().getRegionStates().getRegionAssignments().entrySet()){
|
||||
if(entry.getKey().getTable().equals(tableName)){
|
||||
LOG.debug("move region {} from {} to {}", entry.getKey().getRegionNameAsString(),
|
||||
entry.getValue(), gsn.get(1 - gsn.indexOf(entry.getValue())));
|
||||
TEST_UTIL.moveRegionAndWait(entry.getKey(), gsn.get(1 - gsn.indexOf(entry.getValue())));
|
||||
break;
|
||||
}
|
||||
}
|
||||
TEST_UTIL.waitTableAvailable(tableName, 30000);
|
||||
|
||||
// case 1: stop all the regionservers in my_group, and restart a regionserver in my_group,
|
||||
// and then check if all table regions are online
|
||||
for(Address addr : rsGroupAdmin.getRSGroupInfo(groupName).getServers()) {
|
||||
TEST_UTIL.getMiniHBaseCluster().stopRegionServer(getServerName(addr));
|
||||
}
|
||||
// better wait for a while for region reassign
|
||||
sleep(10000);
|
||||
assertEquals(NUM_SLAVES_BASE - gsn.size(),
|
||||
TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size());
|
||||
TEST_UTIL.getMiniHBaseCluster().startRegionServer(gsn.get(0).getHostname(),
|
||||
gsn.get(0).getPort());
|
||||
assertEquals(NUM_SLAVES_BASE - gsn.size() + 1,
|
||||
TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size());
|
||||
TEST_UTIL.waitTableAvailable(tableName, 30000);
|
||||
|
||||
// case 2: stop all the regionservers in my_group, and move another
|
||||
// regionserver(from the 'default' group) to my_group,
|
||||
// and then check if all table regions are online
|
||||
for(JVMClusterUtil.RegionServerThread rst :
|
||||
TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()){
|
||||
if(rst.getRegionServer().getServerName().getAddress().equals(gsn.get(0).getAddress())){
|
||||
TEST_UTIL.getMiniHBaseCluster().stopRegionServer(rst.getRegionServer().getServerName());
|
||||
break;
|
||||
}
|
||||
}
|
||||
sleep(10000);
|
||||
assertEquals(NUM_SLAVES_BASE - gsn.size(),
|
||||
TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size());
|
||||
ServerName newServer = master.getServerManager().getOnlineServersList().get(0);
|
||||
rsGroupAdmin.moveServers(Sets.newHashSet(newServer.getAddress()), groupName);
|
||||
// wait and check if table regions are online
|
||||
TEST_UTIL.waitTableAvailable(tableName, 30000);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -265,7 +265,9 @@ public class MiniHBaseCluster extends HBaseCluster {
|
|||
|
||||
@Override
|
||||
public void startRegionServer(String hostname, int port) throws IOException {
|
||||
this.startRegionServer();
|
||||
final Configuration newConf = HBaseConfiguration.create(conf);
|
||||
newConf.setInt(HConstants.REGIONSERVER_PORT, port);
|
||||
startRegionServer(newConf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -400,12 +402,17 @@ public class MiniHBaseCluster extends HBaseCluster {
|
|||
public JVMClusterUtil.RegionServerThread startRegionServer()
|
||||
throws IOException {
|
||||
final Configuration newConf = HBaseConfiguration.create(conf);
|
||||
return startRegionServer(newConf);
|
||||
}
|
||||
|
||||
private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration)
|
||||
throws IOException {
|
||||
User rsUser =
|
||||
HBaseTestingUtility.getDifferentUser(newConf, ".hfs."+index++);
|
||||
HBaseTestingUtility.getDifferentUser(configuration, ".hfs."+index++);
|
||||
JVMClusterUtil.RegionServerThread t = null;
|
||||
try {
|
||||
t = hbaseCluster.addRegionServer(
|
||||
newConf, hbaseCluster.getRegionServers().size(), rsUser);
|
||||
configuration, hbaseCluster.getRegionServers().size(), rsUser);
|
||||
t.start();
|
||||
t.waitForServerOnline();
|
||||
} catch (InterruptedException ie) {
|
||||
|
|
Loading…
Reference in New Issue