HBASE-22414 Interruption of moving regions in RSGroup will cause regions on wrong rs

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
haxiaolin 2019-06-18 15:16:16 +08:00 committed by Wellington Chevreuil
parent 5333d8f1b4
commit 5c26aa887e
2 changed files with 249 additions and 18 deletions

View File

@ -204,6 +204,7 @@ public class RSGroupAdminServer implements RSGroupAdmin {
private void moveServerRegionsFromGroup(Set<Address> servers, String targetGroupName)
throws IOException {
boolean hasRegionsToMove;
int retry = 0;
RSGroupInfo targetGrp = getRSGroupInfo(targetGroupName);
Set<Address> allSevers = new HashSet<>(servers);
do {
@ -215,7 +216,12 @@ public class RSGroupAdminServer implements RSGroupAdmin {
if (!targetGrp.containsTable(region.getTable())) {
LOG.info("Moving server region {}, which do not belong to RSGroup {}",
region.getShortNameToLog(), targetGroupName);
this.master.getAssignmentManager().move(region);
try {
this.master.getAssignmentManager().move(region);
}catch (IOException ioe){
LOG.error("Move region {} from group failed, will retry, current retry time is {}",
region.getShortNameToLog(), retry, ioe);
}
if (master.getAssignmentManager().getRegionStates().
getRegionState(region).isFailedOpen()) {
continue;
@ -229,13 +235,15 @@ public class RSGroupAdminServer implements RSGroupAdmin {
iter.remove();
}
}
retry++;
try {
rsGroupInfoManager.wait(1000);
} catch (InterruptedException e) {
LOG.warn("Sleep interrupted", e);
Thread.currentThread().interrupt();
}
} while (hasRegionsToMove);
} while (hasRegionsToMove && retry <= 50);
}
/**
@ -247,23 +255,49 @@ public class RSGroupAdminServer implements RSGroupAdmin {
*/
private void moveTableRegionsToGroup(Set<TableName> tables, String targetGroupName)
throws IOException {
boolean hasRegionsToMove;
int retry = 0;
RSGroupInfo targetGrp = getRSGroupInfo(targetGroupName);
for (TableName table : tables) {
if (master.getAssignmentManager().isTableDisabled(table)) {
LOG.debug("Skipping move regions because the table {} is disabled", table);
continue;
}
LOG.info("Moving region(s) for table {} to RSGroup {}", table, targetGroupName);
for (RegionInfo region : master.getAssignmentManager().getRegionStates()
.getRegionsOfTable(table)) {
ServerName sn =
master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(region);
if (!targetGrp.containsServer(sn.getAddress())) {
LOG.info("Moving region {} to RSGroup {}", region.getShortNameToLog(), targetGroupName);
master.getAssignmentManager().move(region);
Set<TableName> allTables = new HashSet<>(tables);
do {
hasRegionsToMove = false;
for (Iterator<TableName> iter = allTables.iterator(); iter.hasNext(); ) {
TableName table = iter.next();
if (master.getAssignmentManager().isTableDisabled(table)) {
LOG.debug("Skipping move regions because the table {} is disabled", table);
continue;
}
LOG.info("Moving region(s) for table {} to RSGroup {}", table, targetGroupName);
for (RegionInfo region : master.getAssignmentManager().getRegionStates()
.getRegionsOfTable(table)) {
ServerName sn =
master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(region);
if (!targetGrp.containsServer(sn.getAddress())) {
LOG.info("Moving region {} to RSGroup {}", region.getShortNameToLog(), targetGroupName);
try {
master.getAssignmentManager().move(region);
}catch (IOException ioe){
LOG.error("Move region {} to group failed, will retry, current retry time is {}",
region.getShortNameToLog(), retry, ioe);
}
hasRegionsToMove = true;
}
}
if (!hasRegionsToMove) {
LOG.info("Table {} has no more regions to move for RSGroup", table.getNameAsString());
iter.remove();
}
}
}
retry++;
try {
rsGroupInfoManager.wait(1000);
} catch (InterruptedException e) {
LOG.warn("Sleep interrupted", e);
Thread.currentThread().interrupt();
}
} while (hasRegionsToMove && retry <= 50);
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.rsgroup;
import static org.apache.hadoop.hbase.util.Threads.sleep;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -29,6 +30,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.ServerName;
@ -36,8 +38,10 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass;
@ -52,7 +56,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
@Category({ MediumTests.class })
@Category({ LargeTests.class })
public class TestRSGroupsAdmin2 extends TestRSGroupsBase {
@ClassRule
@ -459,4 +463,197 @@ public class TestRSGroupsAdmin2 extends TestRSGroupsBase {
Assert.assertEquals(null, rsGroupAdmin.getRSGroupInfo(fooGroup.getName()));
}
@Test
public void testFailedMoveWhenMoveServer() throws Exception {
final RSGroupInfo newGroup = addGroup(getGroupName(name.getMethodName()), 1);
final byte[] familyNameBytes = Bytes.toBytes("f");
final int tableRegionCount = 10;
// All the regions created below will be assigned to the default group.
TEST_UTIL.createMultiRegionTable(tableName, familyNameBytes, tableRegionCount);
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
List<String> regions = getTableRegionMap().get(tableName);
if (regions == null) {
return false;
}
return getTableRegionMap().get(tableName).size() >= tableRegionCount;
}
});
// get target server to move, which should has more than one regions
// randomly set a region state to SPLITTING
Map<ServerName, List<String>> assignMap = getTableServerRegionMap().get(tableName);
String rregion = null;
ServerName toMoveServer = null;
for (ServerName server : assignMap.keySet()) {
rregion = assignMap.get(server).size() > 1 && !newGroup.containsServer(server.getAddress()) ?
assignMap.get(server).get(0) :
null;
if (rregion != null) {
toMoveServer = server;
break;
}
}
assert toMoveServer != null;
RegionInfo ri = TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().
getRegionInfo(Bytes.toBytesBinary(rregion));
RegionStateNode rsn =
TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
.getRegionStateNode(ri);
rsn.setState(RegionState.State.SPLITTING);
// start thread to recover region state
final ServerName movedServer = toMoveServer;
final String sregion = rregion;
AtomicBoolean changed = new AtomicBoolean(false);
Thread t1 = new Thread(() -> {
LOG.debug("thread1 start running, will recover region state");
long current = System.currentTimeMillis();
while (System.currentTimeMillis() - current <= 50000) {
List<RegionInfo> regions = master.getAssignmentManager().getRegionsOnServer(movedServer);
LOG.debug("server region size is:{}", regions.size());
assert regions.size() >= 1;
// when there is exactly one region left, we can determine the move operation encountered
// exception caused by the strange region state.
if (regions.size() == 1) {
assertEquals(regions.get(0).getRegionNameAsString(), sregion);
rsn.setState(RegionState.State.OPEN);
LOG.info("set region {} state OPEN", sregion);
changed.set(true);
break;
}
sleep(5000);
}
});
t1.start();
// move target server to group
Thread t2 = new Thread(() -> {
LOG.info("thread2 start running, to move regions");
try {
rsGroupAdmin.moveServers(Sets.newHashSet(movedServer.getAddress()), newGroup.getName());
} catch (IOException e) {
LOG.error("move server error", e);
}
});
t2.start();
t1.join();
t2.join();
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() {
if (changed.get()) {
return master.getAssignmentManager().getRegionsOnServer(movedServer).size() == 0 && !rsn
.getRegionLocation().equals(movedServer);
}
return false;
}
});
}
@Test
public void testFailedMoveWhenMoveTable() throws Exception {
final RSGroupInfo newGroup = addGroup(getGroupName(name.getMethodName()), 1);
final byte[] familyNameBytes = Bytes.toBytes("f");
final int tableRegionCount = 5;
// All the regions created below will be assigned to the default group.
TEST_UTIL.createMultiRegionTable(tableName, familyNameBytes, tableRegionCount);
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
List<String> regions = getTableRegionMap().get(tableName);
if (regions == null) {
return false;
}
return getTableRegionMap().get(tableName).size() >= tableRegionCount;
}
});
// randomly set a region state to SPLITTING
Map<ServerName, List<String>> assignMap = getTableServerRegionMap().get(tableName);
String rregion = null;
ServerName srcServer = null;
for (ServerName server : assignMap.keySet()) {
rregion = assignMap.get(server).size() >= 1 && !newGroup.containsServer(server.getAddress()) ?
assignMap.get(server).get(0) :
null;
if (rregion != null) {
srcServer = server;
break;
}
}
assert srcServer != null;
RegionInfo ri = TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().
getRegionInfo(Bytes.toBytesBinary(rregion));
RegionStateNode rsn =
TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
.getRegionStateNode(ri);
rsn.setState(RegionState.State.SPLITTING);
// move table to group
Thread t2 = new Thread(() -> {
LOG.info("thread2 start running, to move regions");
try {
rsGroupAdmin.moveTables(Sets.newHashSet(tableName), newGroup.getName());
} catch (IOException e) {
LOG.error("move server error", e);
}
});
t2.start();
// start thread to recover region state
final ServerName ss = srcServer;
final String sregion = rregion;
AtomicBoolean changed = new AtomicBoolean(false);
Thread t1 = new Thread(() -> {
LOG.info("thread1 start running, will recover region state");
long current = System.currentTimeMillis();
while (System.currentTimeMillis() - current <= 50000) {
List<RegionInfo> regions = master.getAssignmentManager().getRegionsOnServer(ss);
List<RegionInfo> tableRegions = new ArrayList<>();
for (RegionInfo regionInfo : regions) {
if (regionInfo.getTable().equals(tableName)) {
tableRegions.add(regionInfo);
}
}
LOG.debug("server table region size is:{}", tableRegions.size());
assert tableRegions.size() >= 1;
// when there is exactly one region left, we can determine the move operation encountered
// exception caused by the strange region state.
if (tableRegions.size() == 1) {
assertEquals(tableRegions.get(0).getRegionNameAsString(), sregion);
rsn.setState(RegionState.State.OPEN);
LOG.info("set region {} state OPEN", sregion);
changed.set(true);
break;
}
sleep(5000);
}
});
t1.start();
t1.join();
t2.join();
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() {
if (changed.get()) {
boolean serverHasTableRegions = false;
for (RegionInfo regionInfo : master.getAssignmentManager().getRegionsOnServer(ss)) {
if (regionInfo.getTable().equals(tableName)) {
serverHasTableRegions = true;
break;
}
}
return !serverHasTableRegions && !rsn.getRegionLocation().equals(ss);
}
return false;
}
});
}
}