HBASE-20368 Fix RIT stuck when a rsgroup has no online servers but AM's pendingAssginQueue is cleared (#354)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
6c834b974c
commit
abf3d183cd
|
@ -125,7 +125,8 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (RegionStateNode state : master.getAssignmentManager().getRegionsInTransition()) {
|
for (RegionStateNode state : master.getAssignmentManager().getRegionsInTransition()) {
|
||||||
if (state.getRegionLocation().getAddress().equals(server)) {
|
if (state.getRegionLocation() != null &&
|
||||||
|
state.getRegionLocation().getAddress().equals(server)) {
|
||||||
addRegion(regions, state.getRegionInfo());
|
addRegion(regions, state.getRegionInfo());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -222,6 +222,13 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
||||||
if(candidateList.size() > 0) {
|
if(candidateList.size() > 0) {
|
||||||
assignments.putAll(this.internalBalancer.retainAssignment(
|
assignments.putAll(this.internalBalancer.retainAssignment(
|
||||||
currentAssignmentMap, candidateList));
|
currentAssignmentMap, candidateList));
|
||||||
|
} else {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("No available servers to assign regions: {}",
|
||||||
|
RegionInfo.getShortNameToLog(regionList));
|
||||||
|
}
|
||||||
|
assignments.computeIfAbsent(LoadBalancer.BOGUS_SERVER_NAME, s -> new ArrayList<>())
|
||||||
|
.addAll(regionList);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -236,16 +243,10 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
||||||
ServerName server = this.internalBalancer.randomAssignment(region,
|
ServerName server = this.internalBalancer.randomAssignment(region,
|
||||||
candidateList);
|
candidateList);
|
||||||
if (server != null) {
|
if (server != null) {
|
||||||
if (!assignments.containsKey(server)) {
|
assignments.computeIfAbsent(server, s -> new ArrayList<>()).add(region);
|
||||||
assignments.put(server, new ArrayList<>());
|
|
||||||
}
|
|
||||||
assignments.get(server).add(region);
|
|
||||||
} else {
|
} else {
|
||||||
//if not server is available assign to bogus so it ends up in RIT
|
assignments.computeIfAbsent(LoadBalancer.BOGUS_SERVER_NAME, s -> new ArrayList<>())
|
||||||
if(!assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
|
.add(region);
|
||||||
assignments.put(LoadBalancer.BOGUS_SERVER_NAME, new ArrayList<>());
|
|
||||||
}
|
|
||||||
assignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return assignments;
|
return assignments;
|
||||||
|
|
|
@ -17,6 +17,14 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.rsgroup;
|
package org.apache.hadoop.hbase.rsgroup;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.util.Threads.sleep;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||||
|
@ -24,13 +32,16 @@ import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.Waiter;
|
import org.apache.hadoop.hbase.Waiter;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.net.Address;
|
import org.apache.hadoop.hbase.net.Address;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
|
@ -67,13 +78,12 @@ public class TestRSGroupsKillRS extends TestRSGroupsBase {
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void afterMethod() throws Exception {
|
public void afterMethod() throws Exception {
|
||||||
// tearDownAfterMethod();
|
tearDownAfterMethod();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testKillRS() throws Exception {
|
public void testKillRS() throws Exception {
|
||||||
RSGroupInfo appInfo = addGroup("appInfo", 1);
|
RSGroupInfo appInfo = addGroup("appInfo", 1);
|
||||||
|
|
||||||
final TableName tableName = TableName.valueOf(tablePrefix + "_ns", name.getMethodName());
|
final TableName tableName = TableName.valueOf(tablePrefix + "_ns", name.getMethodName());
|
||||||
admin.createNamespace(NamespaceDescriptor.create(tableName.getNamespaceAsString())
|
admin.createNamespace(NamespaceDescriptor.create(tableName.getNamespaceAsString())
|
||||||
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, appInfo.getName()).build());
|
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, appInfo.getName()).build());
|
||||||
|
@ -89,7 +99,7 @@ public class TestRSGroupsKillRS extends TestRSGroupsBase {
|
||||||
});
|
});
|
||||||
|
|
||||||
ServerName targetServer = getServerName(appInfo.getServers().iterator().next());
|
ServerName targetServer = getServerName(appInfo.getServers().iterator().next());
|
||||||
Assert.assertEquals(1, admin.getRegions(targetServer).size());
|
assertEquals(1, admin.getRegions(targetServer).size());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// stopping may cause an exception
|
// stopping may cause an exception
|
||||||
|
@ -131,7 +141,87 @@ public class TestRSGroupsKillRS extends TestRSGroupsBase {
|
||||||
});
|
});
|
||||||
|
|
||||||
ServerName targetServer1 = getServerName(newServers.iterator().next());
|
ServerName targetServer1 = getServerName(newServers.iterator().next());
|
||||||
Assert.assertEquals(1, admin.getRegions(targetServer1).size());
|
assertEquals(1, admin.getRegions(targetServer1).size());
|
||||||
Assert.assertEquals(tableName, admin.getRegions(targetServer1).get(0).getTable());
|
assertEquals(tableName, admin.getRegions(targetServer1).get(0).getTable());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testKillAllRSInGroup() throws Exception {
|
||||||
|
// create a rsgroup and move two regionservers to it
|
||||||
|
String groupName = "my_group";
|
||||||
|
int groupRSCount = 2;
|
||||||
|
addGroup(groupName, groupRSCount);
|
||||||
|
|
||||||
|
// create a table, and move it to my_group
|
||||||
|
Table t = TEST_UTIL.createMultiRegionTable(tableName, Bytes.toBytes("f"), 5);
|
||||||
|
TEST_UTIL.loadTable(t, Bytes.toBytes("f"));
|
||||||
|
Set<TableName> toAddTables = new HashSet<>();
|
||||||
|
toAddTables.add(tableName);
|
||||||
|
rsGroupAdmin.moveTables(toAddTables, groupName);
|
||||||
|
assertTrue(rsGroupAdmin.getRSGroupInfo(groupName).getTables().contains(tableName));
|
||||||
|
TEST_UTIL.waitTableAvailable(tableName, 30000);
|
||||||
|
|
||||||
|
// check my_group servers and table regions
|
||||||
|
Set<Address> servers = rsGroupAdmin.getRSGroupInfo(groupName).getServers();
|
||||||
|
assertEquals(2, servers.size());
|
||||||
|
LOG.debug("group servers {}", servers);
|
||||||
|
for (RegionInfo tr :
|
||||||
|
master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName)) {
|
||||||
|
assertTrue(servers.contains(
|
||||||
|
master.getAssignmentManager().getRegionStates().getRegionAssignments()
|
||||||
|
.get(tr).getAddress()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Move a region, to ensure there exists a region whose 'lastHost' is in my_group
|
||||||
|
// ('lastHost' of other regions are in 'default' group)
|
||||||
|
// and check if all table regions are online
|
||||||
|
List<ServerName> gsn = new ArrayList<>();
|
||||||
|
for(Address addr : servers){
|
||||||
|
gsn.add(getServerName(addr));
|
||||||
|
}
|
||||||
|
assertEquals(2, gsn.size());
|
||||||
|
for(Map.Entry<RegionInfo, ServerName> entry :
|
||||||
|
master.getAssignmentManager().getRegionStates().getRegionAssignments().entrySet()){
|
||||||
|
if(entry.getKey().getTable().equals(tableName)){
|
||||||
|
LOG.debug("move region {} from {} to {}", entry.getKey().getRegionNameAsString(),
|
||||||
|
entry.getValue(), gsn.get(1 - gsn.indexOf(entry.getValue())));
|
||||||
|
TEST_UTIL.moveRegionAndWait(entry.getKey(), gsn.get(1 - gsn.indexOf(entry.getValue())));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
TEST_UTIL.waitTableAvailable(tableName, 30000);
|
||||||
|
|
||||||
|
// case 1: stop all the regionservers in my_group, and restart a regionserver in my_group,
|
||||||
|
// and then check if all table regions are online
|
||||||
|
for(Address addr : rsGroupAdmin.getRSGroupInfo(groupName).getServers()) {
|
||||||
|
TEST_UTIL.getMiniHBaseCluster().stopRegionServer(getServerName(addr));
|
||||||
|
}
|
||||||
|
// better wait for a while for region reassign
|
||||||
|
sleep(10000);
|
||||||
|
assertEquals(NUM_SLAVES_BASE - gsn.size(),
|
||||||
|
TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size());
|
||||||
|
TEST_UTIL.getMiniHBaseCluster().startRegionServer(gsn.get(0).getHostname(),
|
||||||
|
gsn.get(0).getPort());
|
||||||
|
assertEquals(NUM_SLAVES_BASE - gsn.size() + 1,
|
||||||
|
TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size());
|
||||||
|
TEST_UTIL.waitTableAvailable(tableName, 30000);
|
||||||
|
|
||||||
|
// case 2: stop all the regionservers in my_group, and move another
|
||||||
|
// regionserver(from the 'default' group) to my_group,
|
||||||
|
// and then check if all table regions are online
|
||||||
|
for(JVMClusterUtil.RegionServerThread rst :
|
||||||
|
TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()){
|
||||||
|
if(rst.getRegionServer().getServerName().getAddress().equals(gsn.get(0).getAddress())){
|
||||||
|
TEST_UTIL.getMiniHBaseCluster().stopRegionServer(rst.getRegionServer().getServerName());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sleep(10000);
|
||||||
|
assertEquals(NUM_SLAVES_BASE - gsn.size(),
|
||||||
|
TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size());
|
||||||
|
ServerName newServer = master.getServerManager().getOnlineServersList().get(0);
|
||||||
|
rsGroupAdmin.moveServers(Sets.newHashSet(newServer.getAddress()), groupName);
|
||||||
|
// wait and check if table regions are online
|
||||||
|
TEST_UTIL.waitTableAvailable(tableName, 30000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -269,7 +269,9 @@ public class MiniHBaseCluster extends HBaseCluster {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void startRegionServer(String hostname, int port) throws IOException {
|
public void startRegionServer(String hostname, int port) throws IOException {
|
||||||
this.startRegionServer();
|
final Configuration newConf = HBaseConfiguration.create(conf);
|
||||||
|
newConf.setInt(HConstants.REGIONSERVER_PORT, port);
|
||||||
|
startRegionServer(newConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -404,12 +406,17 @@ public class MiniHBaseCluster extends HBaseCluster {
|
||||||
public JVMClusterUtil.RegionServerThread startRegionServer()
|
public JVMClusterUtil.RegionServerThread startRegionServer()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final Configuration newConf = HBaseConfiguration.create(conf);
|
final Configuration newConf = HBaseConfiguration.create(conf);
|
||||||
|
return startRegionServer(newConf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration)
|
||||||
|
throws IOException {
|
||||||
User rsUser =
|
User rsUser =
|
||||||
HBaseTestingUtility.getDifferentUser(newConf, ".hfs."+index++);
|
HBaseTestingUtility.getDifferentUser(configuration, ".hfs."+index++);
|
||||||
JVMClusterUtil.RegionServerThread t = null;
|
JVMClusterUtil.RegionServerThread t = null;
|
||||||
try {
|
try {
|
||||||
t = hbaseCluster.addRegionServer(
|
t = hbaseCluster.addRegionServer(
|
||||||
newConf, hbaseCluster.getRegionServers().size(), rsUser);
|
configuration, hbaseCluster.getRegionServers().size(), rsUser);
|
||||||
t.start();
|
t.start();
|
||||||
t.waitForServerOnline();
|
t.waitForServerOnline();
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
|
|
Loading…
Reference in New Issue