HBASE-9480 Regions are unexpectedly made offline in certain failure conditions
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1523303 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
41f0f6f9d1
commit
b7d127e5e7
|
@ -151,11 +151,21 @@ public class RegionState implements org.apache.hadoop.io.Writable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isPendingOpenOrOpeningOnServer(final ServerName sn) {
|
public boolean isPendingOpenOrOpeningOnServer(final ServerName sn) {
|
||||||
return isOnServer(sn) && (isPendingOpen() || isOpening());
|
return isOnServer(sn) && isPendingOpenOrOpening();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Failed open is also kind of pending open
|
||||||
|
public boolean isPendingOpenOrOpening() {
|
||||||
|
return isPendingOpen() || isOpening() || isFailedOpen();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isPendingCloseOrClosingOnServer(final ServerName sn) {
|
public boolean isPendingCloseOrClosingOnServer(final ServerName sn) {
|
||||||
return isOnServer(sn) && (isPendingClose() || isClosing());
|
return isOnServer(sn) && isPendingCloseOrClosing();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Failed close is also kind of pending close
|
||||||
|
public boolean isPendingCloseOrClosing() {
|
||||||
|
return isPendingClose() || isClosing() || isFailedClose();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isOnServer(final ServerName sn) {
|
public boolean isOnServer(final ServerName sn) {
|
||||||
|
|
|
@ -587,6 +587,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
// died before sending the query the first time.
|
// died before sending the query the first time.
|
||||||
regionStates.updateRegionState(rt, RegionState.State.CLOSING);
|
regionStates.updateRegionState(rt, RegionState.State.CLOSING);
|
||||||
final RegionState rs = regionStates.getRegionState(regionInfo);
|
final RegionState rs = regionStates.getRegionState(regionInfo);
|
||||||
|
final ClosedRegionHandler closedRegionHandler =
|
||||||
|
new ClosedRegionHandler(server, this, regionInfo);
|
||||||
this.executorService.submit(
|
this.executorService.submit(
|
||||||
new EventHandler(server, EventType.M_MASTER_RECOVERY) {
|
new EventHandler(server, EventType.M_MASTER_RECOVERY) {
|
||||||
@Override
|
@Override
|
||||||
|
@ -594,6 +596,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
|
ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
|
||||||
try {
|
try {
|
||||||
unassign(regionInfo, rs, expectedVersion, null, true, null);
|
unassign(regionInfo, rs, expectedVersion, null, true, null);
|
||||||
|
if (regionStates.isRegionOffline(regionInfo)) {
|
||||||
|
closedRegionHandler.process();
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -2338,8 +2343,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
// The region is not open yet
|
// The region is not open yet
|
||||||
regionOffline(region);
|
regionOffline(region);
|
||||||
return;
|
return;
|
||||||
} else if (force && (state.isPendingClose()
|
} else if (force && state.isPendingCloseOrClosing()) {
|
||||||
|| state.isClosing() || state.isFailedClose())) {
|
|
||||||
LOG.debug("Attempting to unassign " + region.getRegionNameAsString() +
|
LOG.debug("Attempting to unassign " + region.getRegionNameAsString() +
|
||||||
" which is already " + state.getState() +
|
" which is already " + state.getState() +
|
||||||
" but forcing to send a CLOSE RPC again ");
|
" but forcing to send a CLOSE RPC again ");
|
||||||
|
@ -2355,6 +2359,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
unassign(region, state, versionOfClosingNode, dest, true, null);
|
unassign(region, state, versionOfClosingNode, dest, true, null);
|
||||||
|
if (regionStates.isRegionOffline(region)) {
|
||||||
|
new ClosedRegionHandler(server, this, region).process();
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
|
|
|
@ -138,6 +138,13 @@ public class RegionStates {
|
||||||
return regionAssignments.containsKey(hri);
|
return regionAssignments.containsKey(hri);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return True if specified region offline.
|
||||||
|
*/
|
||||||
|
public synchronized boolean isRegionOffline(final HRegionInfo hri) {
|
||||||
|
return !isRegionInTransition(hri) && isRegionInState(hri, State.OFFLINE);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return True if specified region is in specified state
|
* @return True if specified region is in specified state
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -2430,7 +2430,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
LOG.warn("Failed to close " + region.getRegionNameAsString() +
|
LOG.warn("Failed to close " + region.getRegionNameAsString() +
|
||||||
" - ignoring and continuing");
|
" - ignoring and continuing");
|
||||||
}
|
}
|
||||||
} catch (NotServingRegionException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Failed to close " + region.getRegionNameAsString() +
|
LOG.warn("Failed to close " + region.getRegionNameAsString() +
|
||||||
" - ignoring and continuing", e);
|
" - ignoring and continuing", e);
|
||||||
}
|
}
|
||||||
|
@ -2457,14 +2457,14 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
* @param versionOfClosingNode the version of znode to compare when RS transitions the znode from
|
* @param versionOfClosingNode the version of znode to compare when RS transitions the znode from
|
||||||
* CLOSING state.
|
* CLOSING state.
|
||||||
* @return True if closed a region.
|
* @return True if closed a region.
|
||||||
* @throws NotServingRegionException if the region is not online or if a close
|
* @throws NotServingRegionException if the region is not online
|
||||||
* request in in progress.
|
* @throws RegionAlreadyInTransitionException if the region is already closing
|
||||||
*/
|
*/
|
||||||
protected boolean closeRegion(String encodedName, final boolean abort,
|
protected boolean closeRegion(String encodedName, final boolean abort,
|
||||||
final boolean zk, final int versionOfClosingNode, final ServerName sn)
|
final boolean zk, final int versionOfClosingNode, final ServerName sn)
|
||||||
throws NotServingRegionException {
|
throws NotServingRegionException, RegionAlreadyInTransitionException {
|
||||||
//Check for permissions to close.
|
//Check for permissions to close.
|
||||||
final HRegion actualRegion = this.getFromOnlineRegions(encodedName);
|
HRegion actualRegion = this.getFromOnlineRegions(encodedName);
|
||||||
if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
|
if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
|
||||||
try {
|
try {
|
||||||
actualRegion.getCoprocessorHost().preClose(false);
|
actualRegion.getCoprocessorHost().preClose(false);
|
||||||
|
@ -2486,7 +2486,10 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
|
LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
|
||||||
" Doing a standard close now");
|
" Doing a standard close now");
|
||||||
return closeRegion(encodedName, abort, zk, versionOfClosingNode, sn);
|
return closeRegion(encodedName, abort, zk, versionOfClosingNode, sn);
|
||||||
} else {
|
}
|
||||||
|
// Let's get the region from the online region list again
|
||||||
|
actualRegion = this.getFromOnlineRegions(encodedName);
|
||||||
|
if (actualRegion == null) { // If already online, we still need to close it.
|
||||||
LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
|
LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
|
||||||
// The master deletes the znode when it receives this exception.
|
// The master deletes the znode when it receives this exception.
|
||||||
throw new NotServingRegionException("The region " + encodedName +
|
throw new NotServingRegionException("The region " + encodedName +
|
||||||
|
@ -2494,13 +2497,18 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
}
|
}
|
||||||
} else if (Boolean.FALSE.equals(previous)) {
|
} else if (Boolean.FALSE.equals(previous)) {
|
||||||
LOG.info("Received CLOSE for the region: " + encodedName +
|
LOG.info("Received CLOSE for the region: " + encodedName +
|
||||||
" ,which we are already trying to CLOSE");
|
" ,which we are already trying to CLOSE, but not completed yet");
|
||||||
// The master deletes the znode when it receives this exception.
|
// The master will retry till the region is closed. We need to do this since
|
||||||
throw new NotServingRegionException("The region " + encodedName +
|
// the region could fail to close somehow. If we mark the region closed in master
|
||||||
|
// while it is not, there could be data loss.
|
||||||
|
// If the region stuck in closing for a while, and master runs out of retries,
|
||||||
|
// master will move the region to failed_to_close. Later on, if the region
|
||||||
|
// is indeed closed, master can properly re-assign it.
|
||||||
|
throw new RegionAlreadyInTransitionException("The region " + encodedName +
|
||||||
" was already closing. New CLOSE request is ignored.");
|
" was already closing. New CLOSE request is ignored.");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (actualRegion == null){
|
if (actualRegion == null) {
|
||||||
LOG.error("Received CLOSE for a region which is not online, and we're not opening.");
|
LOG.error("Received CLOSE for a region which is not online, and we're not opening.");
|
||||||
this.regionsInTransitionInRS.remove(encodedName.getBytes());
|
this.regionsInTransitionInRS.remove(encodedName.getBytes());
|
||||||
// The master deletes the znode when it receives this exception.
|
// The master deletes the znode when it receives this exception.
|
||||||
|
|
|
@ -213,7 +213,8 @@ public class OpenRegionHandler extends EventHandler {
|
||||||
// really unlucky.
|
// really unlucky.
|
||||||
LOG.error("Race condition: we've finished to open a region, while a close was requested "
|
LOG.error("Race condition: we've finished to open a region, while a close was requested "
|
||||||
+ " on region=" + regionName + ". It can be a critical error, as a region that"
|
+ " on region=" + regionName + ". It can be a critical error, as a region that"
|
||||||
+ " should be closed is now opened.");
|
+ " should be closed is now opened. Closing it now");
|
||||||
|
cleanupFailedOpen(region);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -503,7 +504,10 @@ public class OpenRegionHandler extends EventHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
void cleanupFailedOpen(final HRegion region) throws IOException {
|
void cleanupFailedOpen(final HRegion region) throws IOException {
|
||||||
if (region != null) region.close();
|
if (region != null) {
|
||||||
|
this.rsServices.removeFromOnlineRegions(region, null);
|
||||||
|
region.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isRegionStillOpening() {
|
private boolean isRegionStillOpening() {
|
||||||
|
|
|
@ -1792,6 +1792,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
||||||
return createMultiRegions(c, table, family, regionStartKeys);
|
return createMultiRegions(c, table, family, regionStartKeys);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
public int createMultiRegions(final Configuration c, final HTable table,
|
public int createMultiRegions(final Configuration c, final HTable table,
|
||||||
final byte[] columnFamily, byte [][] startKeys)
|
final byte[] columnFamily, byte [][] startKeys)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -3057,6 +3058,40 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
||||||
+ " on server " + server);
|
+ " on server " + server);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check to make sure the region is open on the specified
|
||||||
|
* region server, but not on any other one.
|
||||||
|
*/
|
||||||
|
public void assertRegionOnlyOnServer(
|
||||||
|
final HRegionInfo hri, final ServerName server,
|
||||||
|
final long timeout) throws IOException, InterruptedException {
|
||||||
|
long timeoutTime = System.currentTimeMillis() + timeout;
|
||||||
|
while (true) {
|
||||||
|
List<HRegionInfo> regions = getHBaseAdmin().getOnlineRegions(server);
|
||||||
|
if (regions.contains(hri)) {
|
||||||
|
List<JVMClusterUtil.RegionServerThread> rsThreads =
|
||||||
|
getHBaseCluster().getLiveRegionServerThreads();
|
||||||
|
for (JVMClusterUtil.RegionServerThread rsThread: rsThreads) {
|
||||||
|
HRegionServer rs = rsThread.getRegionServer();
|
||||||
|
if (server.equals(rs.getServerName())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
|
||||||
|
for (HRegion r: hrs) {
|
||||||
|
assertTrue("Region should not be double assigned",
|
||||||
|
r.getRegionId() != hri.getRegionId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return; // good, we are happy
|
||||||
|
}
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
if (now > timeoutTime) break;
|
||||||
|
Thread.sleep(10);
|
||||||
|
}
|
||||||
|
fail("Could not find region " + hri.getRegionNameAsString()
|
||||||
|
+ " on server " + server);
|
||||||
|
}
|
||||||
|
|
||||||
public HRegion createTestRegion(String tableName, HColumnDescriptor hcd)
|
public HRegion createTestRegion(String tableName, HColumnDescriptor hcd)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
|
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
|
||||||
|
|
|
@ -27,6 +27,7 @@ import static org.junit.Assert.fail;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -53,6 +54,7 @@ import org.apache.hadoop.hbase.master.RegionState.State;
|
||||||
import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
|
import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
@ -281,11 +283,10 @@ public class TestAssignmentManagerOnCluster {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This tests forcefully assign a region
|
|
||||||
* while it's closing and re-assigned.
|
|
||||||
*
|
|
||||||
* This test should not be flaky. If it is flaky, it means something
|
* This test should not be flaky. If it is flaky, it means something
|
||||||
* wrong with AssignmentManager which should be reported and fixed
|
* wrong with AssignmentManager which should be reported and fixed
|
||||||
|
*
|
||||||
|
* This tests forcefully assign a region while it's closing and re-assigned.
|
||||||
*/
|
*/
|
||||||
@Test (timeout=60000)
|
@Test (timeout=60000)
|
||||||
public void testForceAssignWhileClosing() throws Exception {
|
public void testForceAssignWhileClosing() throws Exception {
|
||||||
|
@ -305,12 +306,12 @@ public class TestAssignmentManagerOnCluster {
|
||||||
AssignmentManager am = master.getAssignmentManager();
|
AssignmentManager am = master.getAssignmentManager();
|
||||||
assertTrue(am.waitForAssignment(hri));
|
assertTrue(am.waitForAssignment(hri));
|
||||||
|
|
||||||
MockRegionObserver.enabled = true;
|
MockRegionObserver.preCloseEnabled.set(true);
|
||||||
am.unassign(hri);
|
am.unassign(hri);
|
||||||
RegionState state = am.getRegionStates().getRegionState(hri);
|
RegionState state = am.getRegionStates().getRegionState(hri);
|
||||||
assertEquals(RegionState.State.FAILED_CLOSE, state.getState());
|
assertEquals(RegionState.State.FAILED_CLOSE, state.getState());
|
||||||
|
|
||||||
MockRegionObserver.enabled = false;
|
MockRegionObserver.preCloseEnabled.set(false);
|
||||||
am.unassign(hri, true);
|
am.unassign(hri, true);
|
||||||
|
|
||||||
// region is closing now, will be re-assigned automatically.
|
// region is closing now, will be re-assigned automatically.
|
||||||
|
@ -318,14 +319,15 @@ public class TestAssignmentManagerOnCluster {
|
||||||
// assigned properly and no double-assignment
|
// assigned properly and no double-assignment
|
||||||
am.assign(hri, true, true);
|
am.assign(hri, true, true);
|
||||||
|
|
||||||
// region should be closed and re-assigned
|
// let's check if it's assigned after it's out of transition
|
||||||
|
am.waitOnRegionToClearRegionsInTransition(hri);
|
||||||
assertTrue(am.waitForAssignment(hri));
|
assertTrue(am.waitForAssignment(hri));
|
||||||
|
|
||||||
ServerName serverName = master.getAssignmentManager().
|
ServerName serverName = master.getAssignmentManager().
|
||||||
getRegionStates().getRegionServerOfRegion(hri);
|
getRegionStates().getRegionServerOfRegion(hri);
|
||||||
TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
|
TEST_UTIL.assertRegionOnlyOnServer(hri, serverName, 200);
|
||||||
} finally {
|
} finally {
|
||||||
MockRegionObserver.enabled = false;
|
MockRegionObserver.preCloseEnabled.set(false);
|
||||||
TEST_UTIL.deleteTable(Bytes.toBytes(table));
|
TEST_UTIL.deleteTable(Bytes.toBytes(table));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -351,12 +353,12 @@ public class TestAssignmentManagerOnCluster {
|
||||||
AssignmentManager am = master.getAssignmentManager();
|
AssignmentManager am = master.getAssignmentManager();
|
||||||
assertTrue(am.waitForAssignment(hri));
|
assertTrue(am.waitForAssignment(hri));
|
||||||
|
|
||||||
MockRegionObserver.enabled = true;
|
MockRegionObserver.preCloseEnabled.set(true);
|
||||||
am.unassign(hri);
|
am.unassign(hri);
|
||||||
RegionState state = am.getRegionStates().getRegionState(hri);
|
RegionState state = am.getRegionStates().getRegionState(hri);
|
||||||
assertEquals(RegionState.State.FAILED_CLOSE, state.getState());
|
assertEquals(RegionState.State.FAILED_CLOSE, state.getState());
|
||||||
|
|
||||||
MockRegionObserver.enabled = false;
|
MockRegionObserver.preCloseEnabled.set(false);
|
||||||
am.unassign(hri, true);
|
am.unassign(hri, true);
|
||||||
|
|
||||||
// region may still be assigned now since it's closing,
|
// region may still be assigned now since it's closing,
|
||||||
|
@ -369,7 +371,7 @@ public class TestAssignmentManagerOnCluster {
|
||||||
getRegionStates().getRegionServerOfRegion(hri);
|
getRegionStates().getRegionServerOfRegion(hri);
|
||||||
TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
|
TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
|
||||||
} finally {
|
} finally {
|
||||||
MockRegionObserver.enabled = false;
|
MockRegionObserver.preCloseEnabled.set(false);
|
||||||
TEST_UTIL.deleteTable(Bytes.toBytes(table));
|
TEST_UTIL.deleteTable(Bytes.toBytes(table));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -512,6 +514,114 @@ public class TestAssignmentManagerOnCluster {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This tests region close hanging
|
||||||
|
*/
|
||||||
|
@Test (timeout=60000)
|
||||||
|
public void testCloseHang() throws Exception {
|
||||||
|
String table = "testCloseHang";
|
||||||
|
try {
|
||||||
|
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(table));
|
||||||
|
desc.addFamily(new HColumnDescriptor(FAMILY));
|
||||||
|
admin.createTable(desc);
|
||||||
|
|
||||||
|
HTable meta = new HTable(conf, TableName.META_TABLE_NAME);
|
||||||
|
HRegionInfo hri = new HRegionInfo(
|
||||||
|
desc.getTableName(), Bytes.toBytes("A"), Bytes.toBytes("Z"));
|
||||||
|
MetaEditor.addRegionToMeta(meta, hri);
|
||||||
|
|
||||||
|
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
|
||||||
|
master.assignRegion(hri);
|
||||||
|
AssignmentManager am = master.getAssignmentManager();
|
||||||
|
assertTrue(am.waitForAssignment(hri));
|
||||||
|
|
||||||
|
MockRegionObserver.postCloseEnabled.set(true);
|
||||||
|
am.unassign(hri);
|
||||||
|
// Now region should pending_close or closing
|
||||||
|
// Unassign it again forcefully so that we can trigger already
|
||||||
|
// in transition exception. This test is to make sure this scenario
|
||||||
|
// is handled properly.
|
||||||
|
am.unassign(hri, true);
|
||||||
|
RegionState state = am.getRegionStates().getRegionState(hri);
|
||||||
|
assertEquals(RegionState.State.FAILED_CLOSE, state.getState());
|
||||||
|
|
||||||
|
// Let region closing move ahead. The region should be closed
|
||||||
|
// properly and re-assigned automatically
|
||||||
|
MockRegionObserver.postCloseEnabled.set(false);
|
||||||
|
|
||||||
|
// region may still be assigned now since it's closing,
|
||||||
|
// let's check if it's assigned after it's out of transition
|
||||||
|
am.waitOnRegionToClearRegionsInTransition(hri);
|
||||||
|
|
||||||
|
// region should be closed and re-assigned
|
||||||
|
assertTrue(am.waitForAssignment(hri));
|
||||||
|
ServerName serverName = master.getAssignmentManager().
|
||||||
|
getRegionStates().getRegionServerOfRegion(hri);
|
||||||
|
TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
|
||||||
|
} finally {
|
||||||
|
MockRegionObserver.postCloseEnabled.set(false);
|
||||||
|
TEST_UTIL.deleteTable(Bytes.toBytes(table));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This tests region close racing with open
|
||||||
|
*/
|
||||||
|
@Test (timeout=60000)
|
||||||
|
public void testOpenCloseRacing() throws Exception {
|
||||||
|
String table = "testOpenCloseRacing";
|
||||||
|
try {
|
||||||
|
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(table));
|
||||||
|
desc.addFamily(new HColumnDescriptor(FAMILY));
|
||||||
|
admin.createTable(desc);
|
||||||
|
|
||||||
|
HTable meta = new HTable(conf, TableName.META_TABLE_NAME);
|
||||||
|
HRegionInfo hri = new HRegionInfo(
|
||||||
|
desc.getTableName(), Bytes.toBytes("A"), Bytes.toBytes("Z"));
|
||||||
|
MetaEditor.addRegionToMeta(meta, hri);
|
||||||
|
|
||||||
|
MockRegionObserver.postOpenEnabled.set(true);
|
||||||
|
MockRegionObserver.postOpenCalled = false;
|
||||||
|
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
|
||||||
|
// Region will be opened, but it won't complete
|
||||||
|
master.assignRegion(hri);
|
||||||
|
long end = EnvironmentEdgeManager.currentTimeMillis() + 20000;
|
||||||
|
// Wait till postOpen is called
|
||||||
|
while (!MockRegionObserver.postOpenCalled ) {
|
||||||
|
assertFalse("Timed out waiting for postOpen to be called",
|
||||||
|
EnvironmentEdgeManager.currentTimeMillis() > end);
|
||||||
|
Thread.sleep(300);
|
||||||
|
}
|
||||||
|
|
||||||
|
AssignmentManager am = master.getAssignmentManager();
|
||||||
|
// Now let's unassign it, it should do nothing
|
||||||
|
am.unassign(hri);
|
||||||
|
RegionState state = am.getRegionStates().getRegionState(hri);
|
||||||
|
ServerName oldServerName = state.getServerName();
|
||||||
|
assertTrue(state.isPendingOpenOrOpening() && oldServerName != null);
|
||||||
|
|
||||||
|
// Now the region is stuck in opening
|
||||||
|
// Let's forcefully re-assign it to trigger closing/opening
|
||||||
|
// racing. This test is to make sure this scenario
|
||||||
|
// is handled properly.
|
||||||
|
MockRegionObserver.postOpenEnabled.set(false);
|
||||||
|
am.assign(hri, true, true);
|
||||||
|
|
||||||
|
// let's check if it's assigned after it's out of transition
|
||||||
|
am.waitOnRegionToClearRegionsInTransition(hri);
|
||||||
|
assertTrue(am.waitForAssignment(hri));
|
||||||
|
|
||||||
|
ServerName serverName = master.getAssignmentManager().
|
||||||
|
getRegionStates().getRegionServerOfRegion(hri);
|
||||||
|
TEST_UTIL.assertRegionOnlyOnServer(hri, serverName, 200);
|
||||||
|
assertFalse("Region should assigned on a new region server",
|
||||||
|
oldServerName.equals(serverName));
|
||||||
|
} finally {
|
||||||
|
MockRegionObserver.postOpenEnabled.set(false);
|
||||||
|
TEST_UTIL.deleteTable(Bytes.toBytes(table));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static class MockLoadBalancer extends StochasticLoadBalancer {
|
static class MockLoadBalancer extends StochasticLoadBalancer {
|
||||||
// For this region, if specified, always assign to nowhere
|
// For this region, if specified, always assign to nowhere
|
||||||
static volatile String controledRegion = null;
|
static volatile String controledRegion = null;
|
||||||
|
@ -528,12 +638,44 @@ public class TestAssignmentManagerOnCluster {
|
||||||
|
|
||||||
public static class MockRegionObserver extends BaseRegionObserver {
|
public static class MockRegionObserver extends BaseRegionObserver {
|
||||||
// If enabled, fail all preClose calls
|
// If enabled, fail all preClose calls
|
||||||
static volatile boolean enabled = false;
|
static AtomicBoolean preCloseEnabled = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
// If enabled, stall postClose calls
|
||||||
|
static AtomicBoolean postCloseEnabled = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
// If enabled, stall postOpen calls
|
||||||
|
static AtomicBoolean postOpenEnabled = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
// A flag to track if postOpen is called
|
||||||
|
static volatile boolean postOpenCalled = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void preClose(ObserverContext<RegionCoprocessorEnvironment> c,
|
public void preClose(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||||
boolean abortRequested) throws IOException {
|
boolean abortRequested) throws IOException {
|
||||||
if (enabled) throw new IOException("fail preClose from coprocessor");
|
if (preCloseEnabled.get()) throw new IOException("fail preClose from coprocessor");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postClose(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||||
|
boolean abortRequested) {
|
||||||
|
stallOnFlag(postCloseEnabled);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
|
||||||
|
postOpenCalled = true;
|
||||||
|
stallOnFlag(postOpenEnabled);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void stallOnFlag(final AtomicBoolean flag) {
|
||||||
|
try {
|
||||||
|
// If enabled, stall
|
||||||
|
while (flag.get()) {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue