HBASE-22414 Interruption of moving regions in RSGroup will cause regions on wrong rs
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
parent
380f80f316
commit
74731c2a46
@ -27,7 +27,10 @@ import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
@ -54,15 +57,27 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||
@InterfaceAudience.Private
|
||||
public class RSGroupAdminServer implements RSGroupAdmin {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RSGroupAdminServer.class);
|
||||
public static final String KEEP_ONE_SERVER_IN_DEFAULT_ERROR_MESSAGE = "should keep at least " +
|
||||
static final String KEEP_ONE_SERVER_IN_DEFAULT_ERROR_MESSAGE = "should keep at least " +
|
||||
"one server in 'default' RSGroup.";
|
||||
|
||||
private MasterServices master;
|
||||
private final RSGroupInfoManager rsGroupInfoManager;
|
||||
|
||||
/** Define the config key of retries threshold when movements failed */
|
||||
//made package private for testing
|
||||
static final String FAILED_MOVE_MAX_RETRY = "hbase.rsgroup.move.max.retry";
|
||||
|
||||
/** Define the default number of retries */
|
||||
//made package private for testing
|
||||
static final int DEFAULT_MAX_RETRY_VALUE = 50;
|
||||
|
||||
private int moveMaxRetry;
|
||||
|
||||
public RSGroupAdminServer(MasterServices master, RSGroupInfoManager rsGroupInfoManager) {
|
||||
this.master = master;
|
||||
this.rsGroupInfoManager = rsGroupInfoManager;
|
||||
this.moveMaxRetry = master.getConfiguration().getInt(FAILED_MOVE_MAX_RETRY,
|
||||
DEFAULT_MAX_RETRY_VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -202,40 +217,19 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
||||
* @throws IOException if moving the server and tables fail
|
||||
*/
|
||||
private void moveServerRegionsFromGroup(Set<Address> servers, String targetGroupName)
|
||||
throws IOException {
|
||||
boolean hasRegionsToMove;
|
||||
RSGroupInfo targetGrp = getRSGroupInfo(targetGroupName);
|
||||
Set<Address> allSevers = new HashSet<>(servers);
|
||||
do {
|
||||
hasRegionsToMove = false;
|
||||
for (Iterator<Address> iter = allSevers.iterator(); iter.hasNext();) {
|
||||
Address rs = iter.next();
|
||||
// Get regions that are associated with this server and filter regions by group tables.
|
||||
for (RegionInfo region : getRegions(rs)) {
|
||||
if (!targetGrp.containsTable(region.getTable())) {
|
||||
LOG.info("Moving server region {}, which do not belong to RSGroup {}",
|
||||
region.getShortNameToLog(), targetGroupName);
|
||||
this.master.getAssignmentManager().move(region);
|
||||
if (master.getAssignmentManager().getRegionStates().
|
||||
getRegionState(region).isFailedOpen()) {
|
||||
continue;
|
||||
}
|
||||
hasRegionsToMove = true;
|
||||
}
|
||||
throws IOException {
|
||||
moveRegionsBetweenGroups(servers, targetGroupName,
|
||||
rs -> getRegions(rs),
|
||||
info -> {
|
||||
try {
|
||||
RSGroupInfo group = getRSGroupInfo(targetGroupName);
|
||||
return group.containsTable(info.getTable());
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!hasRegionsToMove) {
|
||||
LOG.info("Server {} has no more regions to move for RSGroup", rs.getHostname());
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
try {
|
||||
rsGroupInfoManager.wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Sleep interrupted", e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
} while (hasRegionsToMove);
|
||||
},
|
||||
rs -> rs.getHostname());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -246,23 +240,87 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
||||
* @throws IOException if moving the region fails
|
||||
*/
|
||||
private void moveTableRegionsToGroup(Set<TableName> tables, String targetGroupName)
|
||||
throws IOException {
|
||||
RSGroupInfo targetGrp = getRSGroupInfo(targetGroupName);
|
||||
for (TableName table : tables) {
|
||||
if (master.getAssignmentManager().isTableDisabled(table)) {
|
||||
LOG.debug("Skipping move regions because the table {} is disabled", table);
|
||||
continue;
|
||||
}
|
||||
LOG.info("Moving region(s) for table {} to RSGroup {}", table, targetGroupName);
|
||||
for (RegionInfo region : master.getAssignmentManager().getRegionStates()
|
||||
.getRegionsOfTable(table)) {
|
||||
ServerName sn =
|
||||
master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(region);
|
||||
if (!targetGrp.containsServer(sn.getAddress())) {
|
||||
LOG.info("Moving region {} to RSGroup {}", region.getShortNameToLog(), targetGroupName);
|
||||
master.getAssignmentManager().move(region);
|
||||
throws IOException {
|
||||
moveRegionsBetweenGroups(tables, targetGroupName,
|
||||
table -> {
|
||||
if (master.getAssignmentManager().isTableDisabled(table)) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
return master.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
|
||||
},
|
||||
info -> {
|
||||
try {
|
||||
RSGroupInfo group = getRSGroupInfo(targetGroupName);
|
||||
ServerName sn =
|
||||
master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(info);
|
||||
return group.containsServer(sn.getAddress());
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return false;
|
||||
}
|
||||
},
|
||||
table -> table.getNameWithNamespaceInclAsString());
|
||||
}
|
||||
|
||||
private <T> void moveRegionsBetweenGroups(Set<T> regionsOwners, String targetGroupName,
|
||||
Function<T, List<RegionInfo>> getRegionsInfo, Function<RegionInfo, Boolean> validation,
|
||||
Function<T, String> getOwnerName) throws IOException {
|
||||
boolean hasRegionsToMove;
|
||||
int retry = 0;
|
||||
Set<T> allOwners = new HashSet<>(regionsOwners);
|
||||
Set<String> failedRegions = new HashSet<>();
|
||||
IOException toThrow = null;
|
||||
do {
|
||||
hasRegionsToMove = false;
|
||||
for (Iterator<T> iter = allOwners.iterator(); iter.hasNext(); ) {
|
||||
T owner = iter.next();
|
||||
// Get regions that are associated with this server and filter regions by group tables.
|
||||
for (RegionInfo region : getRegionsInfo.apply(owner)) {
|
||||
if (!validation.apply(region)) {
|
||||
LOG.info("Moving region {}, which do not belong to RSGroup {}",
|
||||
region.getShortNameToLog(), targetGroupName);
|
||||
try {
|
||||
this.master.getAssignmentManager().move(region);
|
||||
failedRegions.remove(region.getRegionNameAsString());
|
||||
} catch (IOException ioe) {
|
||||
LOG.debug("Move region {} from group failed, will retry, current retry time is {}",
|
||||
region.getShortNameToLog(), retry, ioe);
|
||||
toThrow = ioe;
|
||||
failedRegions.add(region.getRegionNameAsString());
|
||||
}
|
||||
if (master.getAssignmentManager().getRegionStates().
|
||||
getRegionState(region).isFailedOpen()) {
|
||||
continue;
|
||||
}
|
||||
hasRegionsToMove = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!hasRegionsToMove) {
|
||||
LOG.info("No more regions to move from {} to RSGroup", getOwnerName.apply(owner));
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
|
||||
retry++;
|
||||
try {
|
||||
rsGroupInfoManager.wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Sleep interrupted", e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
} while (hasRegionsToMove && retry <= moveMaxRetry);
|
||||
|
||||
//has up to max retry time or there are no more regions to move
|
||||
if (hasRegionsToMove) {
|
||||
// print failed moved regions, for later process conveniently
|
||||
String msg = String
|
||||
.format("move regions for group %s failed, failed regions: %s", targetGroupName,
|
||||
failedRegions);
|
||||
LOG.error(msg);
|
||||
throw new DoNotRetryIOException(
|
||||
msg + ", just record the last failed region's cause, more details in server log",
|
||||
toThrow);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -17,6 +17,8 @@
|
||||
*/
|
||||
package org.apache.hadoop.hbase.rsgroup;
|
||||
|
||||
import static org.apache.hadoop.hbase.rsgroup.RSGroupAdminServer.DEFAULT_MAX_RETRY_VALUE;
|
||||
import static org.apache.hadoop.hbase.util.Threads.sleep;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
@ -29,6 +31,8 @@ import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Function;
|
||||
import org.apache.hadoop.hbase.ClusterMetrics.Option;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
@ -36,9 +40,12 @@ import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.constraint.ConstraintException;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
@ -52,7 +59,7 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
||||
|
||||
@Category({ MediumTests.class })
|
||||
@Category({ LargeTests.class })
|
||||
public class TestRSGroupsAdmin2 extends TestRSGroupsBase {
|
||||
|
||||
@ClassRule
|
||||
@ -459,4 +466,210 @@ public class TestRSGroupsAdmin2 extends TestRSGroupsBase {
|
||||
Assert.assertEquals(null, rsGroupAdmin.getRSGroupInfo(fooGroup.getName()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailedMoveBeforeRetryExhaustedWhenMoveServer() throws Exception {
|
||||
String groupName = getGroupName(name.getMethodName());
|
||||
rsGroupAdmin.addRSGroup(groupName);
|
||||
final RSGroupInfo newGroup = rsGroupAdmin.getRSGroupInfo(groupName);
|
||||
Pair<ServerName, RegionStateNode> gotPair = createTableWithRegionSplitting(newGroup,
|
||||
10);
|
||||
|
||||
// start thread to recover region state
|
||||
final ServerName movedServer = gotPair.getFirst();
|
||||
final RegionStateNode rsn = gotPair.getSecond();
|
||||
AtomicBoolean changed = new AtomicBoolean(false);
|
||||
Thread t1 = recoverRegionStateThread(movedServer,
|
||||
server -> master.getAssignmentManager().getRegionsOnServer(movedServer), rsn, changed);
|
||||
t1.start();
|
||||
|
||||
// move target server to group
|
||||
Thread t2 = new Thread(() -> {
|
||||
LOG.info("thread2 start running, to move regions");
|
||||
try {
|
||||
rsGroupAdmin.moveServers(Sets.newHashSet(movedServer.getAddress()), newGroup.getName());
|
||||
} catch (IOException e) {
|
||||
LOG.error("move server error", e);
|
||||
}
|
||||
});
|
||||
t2.start();
|
||||
|
||||
t1.join();
|
||||
t2.join();
|
||||
|
||||
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() {
|
||||
if (changed.get()) {
|
||||
return master.getAssignmentManager().getRegionsOnServer(movedServer).size() == 0 && !rsn
|
||||
.getRegionLocation().equals(movedServer);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailedMoveBeforeRetryExhaustedWhenMoveTable() throws Exception {
|
||||
final RSGroupInfo newGroup = addGroup(getGroupName(name.getMethodName()), 1);
|
||||
Pair<ServerName, RegionStateNode> gotPair = createTableWithRegionSplitting(newGroup,
|
||||
5);
|
||||
|
||||
// move table to group
|
||||
Thread t2 = new Thread(() -> {
|
||||
LOG.info("thread2 start running, to move regions");
|
||||
try {
|
||||
rsGroupAdmin.moveTables(Sets.newHashSet(tableName), newGroup.getName());
|
||||
} catch (IOException e) {
|
||||
LOG.error("move server error", e);
|
||||
}
|
||||
});
|
||||
t2.start();
|
||||
|
||||
// start thread to recover region state
|
||||
final ServerName ss = gotPair.getFirst();
|
||||
final RegionStateNode rsn = gotPair.getSecond();
|
||||
AtomicBoolean changed = new AtomicBoolean(false);
|
||||
|
||||
Thread t1 = recoverRegionStateThread(ss, server -> {
|
||||
List<RegionInfo> regions = master.getAssignmentManager().getRegionsOnServer(ss);
|
||||
List<RegionInfo> tableRegions = new ArrayList<>();
|
||||
for (RegionInfo regionInfo : regions) {
|
||||
if (regionInfo.getTable().equals(tableName)) {
|
||||
tableRegions.add(regionInfo);
|
||||
}
|
||||
}
|
||||
return tableRegions;
|
||||
}, rsn, changed);
|
||||
t1.start();
|
||||
|
||||
t1.join();
|
||||
t2.join();
|
||||
|
||||
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() {
|
||||
if (changed.get()) {
|
||||
boolean serverHasTableRegions = false;
|
||||
for (RegionInfo regionInfo : master.getAssignmentManager().getRegionsOnServer(ss)) {
|
||||
if (regionInfo.getTable().equals(tableName)) {
|
||||
serverHasTableRegions = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return !serverHasTableRegions && !rsn.getRegionLocation().equals(ss);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private <T> Thread recoverRegionStateThread(T owner, Function<T, List<RegionInfo>> getRegions,
|
||||
RegionStateNode rsn, AtomicBoolean changed){
|
||||
return new Thread(() -> {
|
||||
LOG.info("thread1 start running, will recover region state");
|
||||
long current = System.currentTimeMillis();
|
||||
// wait until there is only left the region we changed state and recover its state.
|
||||
// wait time is set according to the number of max retries, all except failed regions will be
|
||||
// moved in one retry, and will sleep 1s until next retry.
|
||||
while (System.currentTimeMillis() - current <= DEFAULT_MAX_RETRY_VALUE * 1000) {
|
||||
List<RegionInfo> regions = getRegions.apply(owner);
|
||||
LOG.debug("server table region size is:{}", regions.size());
|
||||
assert regions.size() >= 1;
|
||||
// when there is exactly one region left, we can determine the move operation encountered
|
||||
// exception caused by the strange region state.
|
||||
if (regions.size() == 1) {
|
||||
assertEquals(regions.get(0).getRegionNameAsString(),
|
||||
rsn.getRegionInfo().getRegionNameAsString());
|
||||
rsn.setState(RegionState.State.OPEN);
|
||||
LOG.info("set region {} state OPEN", rsn.getRegionInfo().getRegionNameAsString());
|
||||
changed.set(true);
|
||||
break;
|
||||
}
|
||||
sleep(5000);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailedMoveWhenMoveServer() throws Exception{
|
||||
String groupName = getGroupName(name.getMethodName());
|
||||
rsGroupAdmin.addRSGroup(groupName);
|
||||
final RSGroupInfo newGroup = rsGroupAdmin.getRSGroupInfo(groupName);
|
||||
Pair<ServerName, RegionStateNode> gotPair = createTableWithRegionSplitting(newGroup,
|
||||
10);
|
||||
try{
|
||||
rsGroupAdmin.moveServers(Sets.newHashSet(gotPair.getFirst().getAddress()),
|
||||
newGroup.getName());
|
||||
fail("should get IOException when retry exhausted but there still exists failed moved "
|
||||
+ "regions");
|
||||
}catch (IOException e){
|
||||
assertTrue(e.getMessage().contains(
|
||||
gotPair.getSecond().getRegionInfo().getRegionNameAsString()));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailedMoveWhenMoveTable() throws Exception{
|
||||
final RSGroupInfo newGroup = addGroup(getGroupName(name.getMethodName()), 1);
|
||||
Pair<ServerName, RegionStateNode> gotPair = createTableWithRegionSplitting(newGroup,
|
||||
5);
|
||||
try{
|
||||
rsGroupAdmin.moveTables(Sets.newHashSet(tableName), newGroup.getName());
|
||||
fail("should get IOException when retry exhausted but there still exists failed moved "
|
||||
+ "regions");
|
||||
}catch (IOException e){
|
||||
assertTrue(e.getMessage().contains(
|
||||
gotPair.getSecond().getRegionInfo().getRegionNameAsString()));
|
||||
}
|
||||
}
|
||||
|
||||
private Pair<ServerName, RegionStateNode> createTableWithRegionSplitting(RSGroupInfo rsGroupInfo,
|
||||
int tableRegionCount) throws Exception{
|
||||
final byte[] familyNameBytes = Bytes.toBytes("f");
|
||||
// All the regions created below will be assigned to the default group.
|
||||
TEST_UTIL.createMultiRegionTable(tableName, familyNameBytes, tableRegionCount);
|
||||
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
List<String> regions = getTableRegionMap().get(tableName);
|
||||
if (regions == null) {
|
||||
return false;
|
||||
}
|
||||
return getTableRegionMap().get(tableName).size() >= tableRegionCount;
|
||||
}
|
||||
});
|
||||
|
||||
return randomlySetOneRegionStateToSplitting(rsGroupInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* Randomly choose a region to set state.
|
||||
* @param newGroup target group
|
||||
* @return source server of region, and region state
|
||||
* @throws IOException if methods called throw
|
||||
*/
|
||||
private Pair<ServerName, RegionStateNode> randomlySetOneRegionStateToSplitting(
|
||||
RSGroupInfo newGroup) throws IOException{
|
||||
// get target server to move, which should has more than one regions
|
||||
// randomly set a region state to SPLITTING to make move fail
|
||||
Map<ServerName, List<String>> assignMap = getTableServerRegionMap().get(tableName);
|
||||
String rregion = null;
|
||||
ServerName toMoveServer = null;
|
||||
for (ServerName server : assignMap.keySet()) {
|
||||
rregion = assignMap.get(server).size() > 1 &&
|
||||
!newGroup.containsServer(server.getAddress()) ? assignMap.get(server).get(0) : null;
|
||||
if (rregion != null) {
|
||||
toMoveServer = server;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assert toMoveServer != null;
|
||||
RegionInfo ri = TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().
|
||||
getRegionInfo(Bytes.toBytesBinary(rregion));
|
||||
RegionStateNode rsn =
|
||||
TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
|
||||
.getRegionStateNode(ri);
|
||||
rsn.setState(RegionState.State.SPLITTING);
|
||||
return new Pair<>(toMoveServer, rsn);
|
||||
}
|
||||
}
|
||||
|
@ -100,11 +100,13 @@ public abstract class TestRSGroupsBase {
|
||||
RSGroupBasedLoadBalancer.class.getName());
|
||||
TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
|
||||
RSGroupAdminEndpoint.class.getName() + "," + CPMasterObserver.class.getName());
|
||||
TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE - 1);
|
||||
TEST_UTIL.getConfiguration().setInt(
|
||||
ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
|
||||
NUM_SLAVES_BASE - 1);
|
||||
TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 100000);
|
||||
|
||||
TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE - 1);
|
||||
initialize();
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user