HBASE-12005 Split/merge fails if master restarts before PONR

This commit is contained in:
Jimmy Xiang 2014-09-17 17:39:27 -07:00
parent 74c6b33e84
commit b4c07e9a80
5 changed files with 276 additions and 60 deletions

View File

@ -272,6 +272,54 @@ public class RegionState {
return serverName != null && serverName.equals(sn);
}
public boolean isMergingOnServer(final ServerName sn) {
return isOnServer(sn) && isMerging();
}
public boolean isMergingNewOnServer(final ServerName sn) {
return isOnServer(sn) && isMergingNew();
}
public boolean isMergingNewOrOpenedOnServer(final ServerName sn) {
return isOnServer(sn) && (isMergingNew() || isOpened());
}
public boolean isMergingNewOrOfflineOnServer(final ServerName sn) {
return isOnServer(sn) && (isMergingNew() || isOffline());
}
public boolean isSplittingOnServer(final ServerName sn) {
return isOnServer(sn) && isSplitting();
}
public boolean isSplittingNewOnServer(final ServerName sn) {
return isOnServer(sn) && isSplittingNew();
}
public boolean isSplittingOrOpenedOnServer(final ServerName sn) {
return isOnServer(sn) && (isSplitting() || isOpened());
}
public boolean isSplittingOrSplitOnServer(final ServerName sn) {
return isOnServer(sn) && (isSplitting() || isSplit());
}
public boolean isClosingOnServer(final ServerName sn) {
return isOnServer(sn) && isClosing();
}
public boolean isOpeningOrFailedOpenOnServer(final ServerName sn) {
return isOnServer(sn) && (isOpening() || isFailedOpen());
}
public boolean isOpeningOrOpenedOnServer(final ServerName sn) {
return isOnServer(sn) && (isOpening() || isOpened());
}
public boolean isOpenedOnServer(final ServerName sn) {
return isOnServer(sn) && isOpened();
}
/**
* Check if a region state can transition to offline
*/

View File

@ -2051,11 +2051,11 @@ public class AssignmentManager {
// The region must be opening on this server.
// If current state is failed_open on the same server,
// it could be a reportRegionTransition RPC retry.
if (current == null || !current.isOnServer(serverName)
|| !(current.isOpening() || current.isFailedOpen())) {
if (current == null || !current.isOpeningOrFailedOpenOnServer(serverName)) {
return hri.getShortNameToLog() + " is not opening on " + serverName;
}
// Just return in case of retrying
if (current.isFailedOpen()) {
return null;
}
@ -2106,11 +2106,11 @@ public class AssignmentManager {
// The region must be opening on this server.
// If current state is already opened on the same server,
// it could be a reportRegionTransition RPC retry.
if (current == null || !current.isOnServer(serverName)
|| !(current.isOpening() || current.isOpened())) {
if (current == null || !current.isOpeningOrOpenedOnServer(serverName)) {
return hri.getShortNameToLog() + " is not opening on " + serverName;
}
// Just return in case of retrying
if (current.isOpened()) {
return null;
}
@ -2140,7 +2140,7 @@ public class AssignmentManager {
// already have moved away from closed state. 2. On the region server side, we
// don't care much about the response for this transition. We only make sure
// master has got and processed this report, either successfully or not.
if (current == null || !current.isOnServer(serverName) || !current.isClosing()) {
if (current == null || !current.isClosingOnServer(serverName)) {
return hri.getShortNameToLog() + " is not closing on " + serverName;
}
if (getTableStateManager().isTableState(hri.getTable(), TableState.State.DISABLED,
@ -2162,11 +2162,11 @@ public class AssignmentManager {
// The region must be opened on this server.
// If current state is already splitting on the same server,
// it could be a reportRegionTransition RPC retry.
if (current == null || !current.isOnServer(serverName)
|| !(current.isOpened() || current.isSplitting())) {
if (current == null || !current.isSplittingOrOpenedOnServer(serverName)) {
return hri.getShortNameToLog() + " is not opening on " + serverName;
}
// Just return in case of retrying
if (current.isSplitting()) {
return null;
}
@ -2194,7 +2194,7 @@ public class AssignmentManager {
final ServerName serverName, final RegionStateTransition transition) {
// The region must be splitting on this server, and the daughters must be in
// splitting_new state. To check RPC retry, we use server holding info.
if (current == null || !current.isOnServer(serverName) || !current.isSplitting()) {
if (current == null || !current.isSplittingOnServer(serverName)) {
return hri.getShortNameToLog() + " is not splitting on " + serverName;
}
@ -2202,13 +2202,23 @@ public class AssignmentManager {
final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
RegionState rs_a = regionStates.getRegionState(a);
RegionState rs_b = regionStates.getRegionState(b);
if (rs_a == null || rs_b == null || !rs_a.isOnServer(serverName)
|| !rs_b.isOnServer(serverName) || !rs_a.isSplittingNew()
|| !rs_b.isSplittingNew()) {
// Master could have restarted and lost the new region
// states, if so, they must be lost together
if (rs_a == null && rs_b == null) {
rs_a = regionStates.createRegionState(
a, State.SPLITTING_NEW, serverName, null);
rs_b = regionStates.createRegionState(
b, State.SPLITTING_NEW, serverName, null);
}
if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
|| rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
return "Some daughter is not known to be splitting on " + serverName
+ ", a=" + rs_a + ", b=" + rs_b;
}
// Just return in case of retrying
if (!regionStates.isRegionOnServer(hri, serverName)) {
return null;
}
@ -2228,11 +2238,11 @@ public class AssignmentManager {
// splitting_new state.
// If current state is already split on the same server,
// it could be a reportRegionTransition RPC retry.
if (current == null || !current.isOnServer(serverName)
|| !(current.isSplitting() || current.isSplit())) {
if (current == null || !current.isSplittingOrSplitOnServer(serverName)) {
return hri.getShortNameToLog() + " is not splitting on " + serverName;
}
// Just return in case of retrying
if (current.isSplit()) {
return null;
}
@ -2241,9 +2251,8 @@ public class AssignmentManager {
final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
RegionState rs_a = regionStates.getRegionState(a);
RegionState rs_b = regionStates.getRegionState(b);
if (rs_a == null || rs_b == null || !rs_a.isOnServer(serverName)
|| !rs_b.isOnServer(serverName) || !rs_a.isSplittingNew()
|| !rs_b.isSplittingNew()) {
if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
|| rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
return "Some daughter is not known to be splitting on " + serverName
+ ", a=" + rs_a + ", b=" + rs_b;
}
@ -2278,11 +2287,11 @@ public class AssignmentManager {
// The region must be splitting on this server, and the daughters must be in
// splitting_new state.
// If the region is in open state, it could be an RPC retry.
if (current == null || !current.isOnServer(serverName)
|| !(current.isSplitting() || current.isOpened())) {
if (current == null || !current.isSplittingOrOpenedOnServer(serverName)) {
return hri.getShortNameToLog() + " is not splitting on " + serverName;
}
// Just return in case of retrying
if (current.isOpened()) {
return null;
}
@ -2291,9 +2300,8 @@ public class AssignmentManager {
final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
RegionState rs_a = regionStates.getRegionState(a);
RegionState rs_b = regionStates.getRegionState(b);
if (rs_a == null || rs_b == null || !rs_a.isOnServer(serverName)
|| !rs_b.isOnServer(serverName) || !rs_a.isSplittingNew()
|| !rs_b.isSplittingNew()) {
if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
|| rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
return "Some daughter is not known to be splitting on " + serverName
+ ", a=" + rs_a + ", b=" + rs_b;
}
@ -2312,11 +2320,11 @@ public class AssignmentManager {
final ServerName serverName, final RegionStateTransition transition) {
// The region must be new, and the daughters must be open on this server.
// If the region is in merge_new state, it could be an RPC retry.
if (current != null && (!current.isOnServer(serverName)
|| !current.isMergingNew())) {
if (current != null && !current.isMergingNewOnServer(serverName)) {
return "Merging daughter region already exists, p=" + current;
}
// Just return in case of retrying
if (current != null) {
return null;
}
@ -2330,9 +2338,8 @@ public class AssignmentManager {
try {
RegionState rs_a = regionStates.getRegionState(a);
RegionState rs_b = regionStates.getRegionState(b);
if (rs_a == null || rs_b == null || !rs_a.isOnServer(serverName)
|| !rs_b.isOnServer(serverName) || !rs_a.isOpened()
|| !rs_b.isOpened()) {
if (rs_a == null || !rs_a.isOpenedOnServer(serverName)
|| rs_b == null || !rs_b.isOpenedOnServer(serverName)) {
return "Some daughter is not in a state to merge on " + serverName
+ ", a=" + rs_a + ", b=" + rs_b;
}
@ -2353,7 +2360,7 @@ public class AssignmentManager {
final ServerName serverName, final RegionStateTransition transition) {
// The region must be in merging_new state, and the daughters must be
// merging. To check RPC retry, we use server holding info.
if (current == null || !current.isOnServer(serverName) || !current.isMergingNew()) {
if (current != null && !current.isMergingNewOnServer(serverName)) {
return hri.getShortNameToLog() + " is not merging on " + serverName;
}
@ -2361,13 +2368,19 @@ public class AssignmentManager {
final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
RegionState rs_a = regionStates.getRegionState(a);
RegionState rs_b = regionStates.getRegionState(b);
if (rs_a == null || rs_b == null || !rs_a.isOnServer(serverName)
|| !rs_b.isOnServer(serverName) || !rs_a.isMerging()
|| !rs_b.isMerging()) {
if (rs_a == null || !rs_a.isMergingOnServer(serverName)
|| rs_b == null || !rs_b.isMergingOnServer(serverName)) {
return "Some daughter is not known to be merging on " + serverName
+ ", a=" + rs_a + ", b=" + rs_b;
}
// Master could have restarted and lost the new region state
if (current == null) {
regionStates.createRegionState(
hri, State.MERGING_NEW, serverName, null);
}
// Just return in case of retrying
if (regionStates.isRegionOnServer(hri, serverName)) {
return null;
}
@ -2387,11 +2400,11 @@ public class AssignmentManager {
// merging on this server.
// If current state is already opened on the same server,
// it could be a reportRegionTransition RPC retry.
if (current == null || !current.isOnServer(serverName)
|| !(current.isMergingNew() || current.isOpened())) {
if (current == null || !current.isMergingNewOrOpenedOnServer(serverName)) {
return hri.getShortNameToLog() + " is not merging on " + serverName;
}
// Just return in case of retrying
if (current.isOpened()) {
return null;
}
@ -2400,9 +2413,8 @@ public class AssignmentManager {
final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
RegionState rs_a = regionStates.getRegionState(a);
RegionState rs_b = regionStates.getRegionState(b);
if (rs_a == null || rs_b == null || !rs_a.isOnServer(serverName)
|| !rs_b.isOnServer(serverName) || !rs_a.isMerging()
|| !rs_b.isMerging()) {
if (rs_a == null || !rs_a.isMergingOnServer(serverName)
|| rs_b == null || !rs_b.isMergingOnServer(serverName)) {
return "Some daughter is not known to be merging on " + serverName
+ ", a=" + rs_a + ", b=" + rs_b;
}
@ -2433,11 +2445,11 @@ public class AssignmentManager {
// The region must be in merging_new state, and the daughters must be
// merging on this server.
// If the region is in offline state, it could be an RPC retry.
if (current == null || !current.isOnServer(serverName)
|| !(current.isMergingNew() || current.isOffline())) {
if (current == null || !current.isMergingNewOrOfflineOnServer(serverName)) {
return hri.getShortNameToLog() + " is not merging on " + serverName;
}
// Just return in case of retrying
if (current.isOffline()) {
return null;
}
@ -2446,9 +2458,8 @@ public class AssignmentManager {
final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
RegionState rs_a = regionStates.getRegionState(a);
RegionState rs_b = regionStates.getRegionState(b);
if (rs_a == null || rs_b == null || !rs_a.isOnServer(serverName)
|| !rs_b.isOnServer(serverName) || !rs_a.isMerging()
|| !rs_b.isMerging()) {
if (rs_a == null || !rs_a.isMergingOnServer(serverName)
|| rs_b == null || !rs_b.isMergingOnServer(serverName)) {
return "Some daughter is not known to be merging on " + serverName
+ ", a=" + rs_a + ", b=" + rs_b;
}

View File

@ -335,7 +335,7 @@ public class RegionStates {
}
lastAssignments.put(encodedName, lastHost);
regionAssignments.put(hri, lastHost);
} else if (!regionState.isUnassignable()) {
} else if (!isOneOfStates(regionState, State.MERGED, State.SPLIT, State.OFFLINE)) {
regionsInTransition.put(encodedName, regionState);
}
if (lastHost != null && newState != State.SPLIT) {

View File

@ -26,12 +26,15 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@ -51,8 +54,13 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -60,12 +68,15 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.base.Joiner;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* Like {@link TestRegionMergeTransaction} in that we're testing
@ -97,7 +108,7 @@ public class TestRegionMergeTransactionOnCluster {
@BeforeClass
public static void beforeAllTests() throws Exception {
// Start a cluster
TEST_UTIL.startMiniCluster(NB_SERVERS);
TEST_UTIL.startMiniCluster(1, NB_SERVERS, null, MyMaster.class, null);
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
master = cluster.getMaster();
master.balanceSwitch(false);
@ -156,6 +167,31 @@ public class TestRegionMergeTransactionOnCluster {
table.close();
}
/**
* Not really restarting the master. Simulate it by clear of new region
* state since it is not persisted, will be lost after master restarts.
*/
@Test
public void testMergeAndRestartingMaster() throws Exception {
LOG.info("Starting testMergeAndRestartingMaster");
final TableName tableName = TableName.valueOf("testMergeAndRestartingMaster");
// Create table and load data.
Table table = createTableAndLoadData(master, tableName);
try {
MyMasterRpcServices.enabled.set(true);
// Merge 1st and 2nd region
mergeRegionsAndVerifyRegionNum(master, tableName, 0, 1,
INITIAL_REGION_NUM - 1);
} finally {
MyMasterRpcServices.enabled.set(false);
}
table.close();
}
@SuppressWarnings("deprecation")
@Test
public void testCleanMergeReference() throws Exception {
@ -433,4 +469,45 @@ public class TestRegionMergeTransactionOnCluster {
assertEquals(expectedRegionNum, rowCount);
scanner.close();
}
// Make it public so that JVMClusterUtil can access it.
public static class MyMaster extends HMaster {
public MyMaster(Configuration conf, CoordinatedStateManager cp)
throws IOException, KeeperException,
InterruptedException {
super(conf, cp);
}
@Override
protected RSRpcServices createRpcServices() throws IOException {
return new MyMasterRpcServices(this);
}
}
static class MyMasterRpcServices extends MasterRpcServices {
static AtomicBoolean enabled = new AtomicBoolean(false);
private HMaster myMaster;
public MyMasterRpcServices(HMaster master) throws IOException {
super(master);
myMaster = master;
}
@Override
public ReportRegionStateTransitionResponse reportRegionStateTransition(RpcController c,
ReportRegionStateTransitionRequest req) throws ServiceException {
ReportRegionStateTransitionResponse resp = super.reportRegionStateTransition(c, req);
if (enabled.get() && req.getTransition(0).getTransitionCode()
== TransitionCode.READY_TO_MERGE && !resp.hasErrorMessage()) {
RegionStates regionStates = myMaster.getAssignmentManager().getRegionStates();
for (RegionState regionState: regionStates.getRegionsInTransition().values()) {
// Find the merging_new region and remove it
if (regionState.isMergingNew()) {
regionStates.deleteRegion(regionState.getRegion());
}
}
}
return resp;
}
}
}

View File

@ -30,6 +30,7 @@ import java.io.InterruptedIOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -73,10 +74,14 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@ -98,6 +103,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
@ -106,6 +112,7 @@ import com.google.protobuf.ServiceException;
* is tests against a bare {@link HRegion}.
*/
@Category({RegionServerTests.class, LargeTests.class})
@SuppressWarnings("deprecation")
public class TestSplitTransactionOnCluster {
private static final Log LOG =
LogFactory.getLog(TestSplitTransactionOnCluster.class);
@ -120,7 +127,7 @@ public class TestSplitTransactionOnCluster {
@BeforeClass public static void before() throws Exception {
TESTING_UTIL.getConfiguration().setInt("hbase.balancer.period", 60000);
TESTING_UTIL.startMiniCluster(NB_SERVERS);
TESTING_UTIL.startMiniCluster(1, NB_SERVERS, null, MyMaster.class, null);
}
@AfterClass public static void after() throws Exception {
@ -211,7 +218,6 @@ public class TestSplitTransactionOnCluster {
}
}
@Test(timeout = 60000)
@SuppressWarnings("deprecation")
public void testSplitFailedCompactionAndSplit() throws Exception {
final byte[] tableName = Bytes.toBytes("testSplitFailedCompactionAndSplit");
Configuration conf = TESTING_UTIL.getConfiguration();
@ -573,7 +579,7 @@ public class TestSplitTransactionOnCluster {
this.admin.split(hri.getRegionNameAsString());
checkAndGetDaughters(tableName);
MockMasterWithoutCatalogJanitor master = abortAndWaitForMaster();
HMaster master = abortAndWaitForMaster();
this.admin = new HBaseAdmin(TESTING_UTIL.getConfiguration());
@ -826,6 +832,52 @@ public class TestSplitTransactionOnCluster {
}
}
/**
* Not really restarting the master. Simulate it by clear of new region
* state since it is not persisted, will be lost after master restarts.
*/
@Test(timeout = 180000)
public void testSplitAndRestartingMaster() throws Exception {
LOG.info("Starting testSplitAndRestartingMaster");
final TableName tableName = TableName.valueOf("testSplitAndRestartingMaster");
// Create table then get the single region for our new table.
createTableAndWait(tableName.getName(), HConstants.CATALOG_FAMILY);
List<HRegion> regions = cluster.getRegions(tableName);
HRegionInfo hri = getAndCheckSingleTableRegion(regions);
ensureTableRegionNotOnSameServerAsMeta(admin, hri);
int regionServerIndex = cluster.getServerWith(regions.get(0).getRegionName());
HRegionServer regionServer = cluster.getRegionServer(regionServerIndex);
// Turn off balancer so it doesn't cut in and mess up our placements.
this.admin.setBalancerRunning(false, true);
// Turn off the meta scanner so it don't remove parent on us.
cluster.getMaster().setCatalogJanitorEnabled(false);
try {
MyMasterRpcServices.enabled.set(true);
// find a splittable region. Refresh the regions list
regions = cluster.getRegions(tableName);
final HRegion region = findSplittableRegion(regions);
assertTrue("not able to find a splittable region", region != null);
// Now split.
SplitTransaction st = new SplitTransaction(region, Bytes.toBytes("row2"));
try {
st.prepare();
st.execute(regionServer, regionServer);
} catch (IOException e) {
fail("Split execution should have succeeded with no exceptions thrown");
}
// Postcondition
List<HRegion> daughters = cluster.getRegions(tableName);
LOG.info("xxx " + regions.size() + AssignmentManager.TEST_SKIP_SPLIT_HANDLING);
assertTrue(daughters.size() == 2);
} finally {
MyMasterRpcServices.enabled.set(false);
admin.setBalancerRunning(true, false);
cluster.getMaster().setCatalogJanitorEnabled(true);
}
}
@Test(timeout = 180000)
public void testSplitHooksBeforeAndAfterPONR() throws Exception {
String firstTable = "testSplitHooksBeforeAndAfterPONR_1";
@ -962,14 +1014,11 @@ public class TestSplitTransactionOnCluster {
return daughters;
}
private MockMasterWithoutCatalogJanitor abortAndWaitForMaster()
private HMaster abortAndWaitForMaster()
throws IOException, InterruptedException {
cluster.abortMaster(0);
cluster.waitOnMaster(0);
cluster.getConfiguration().setClass(HConstants.MASTER_IMPL,
MockMasterWithoutCatalogJanitor.class, HMaster.class);
MockMasterWithoutCatalogJanitor master = null;
master = (MockMasterWithoutCatalogJanitor) cluster.startMaster().getMaster();
HMaster master = cluster.startMaster().getMaster();
cluster.waitForActiveAndReadyMaster();
return master;
}
@ -1107,15 +1156,6 @@ public class TestSplitTransactionOnCluster {
return t;
}
public static class MockMasterWithoutCatalogJanitor extends HMaster {
public MockMasterWithoutCatalogJanitor(Configuration conf, CoordinatedStateManager cp)
throws IOException, KeeperException,
InterruptedException {
super(conf, cp);
}
}
private static class SplittingNodeCreationFailedException extends IOException {
private static final long serialVersionUID = 1652404976265623004L;
@ -1124,6 +1164,47 @@ public class TestSplitTransactionOnCluster {
}
}
// Make it public so that JVMClusterUtil can access it.
public static class MyMaster extends HMaster {
public MyMaster(Configuration conf, CoordinatedStateManager cp)
throws IOException, KeeperException,
InterruptedException {
super(conf, cp);
}
@Override
protected RSRpcServices createRpcServices() throws IOException {
return new MyMasterRpcServices(this);
}
}
static class MyMasterRpcServices extends MasterRpcServices {
static AtomicBoolean enabled = new AtomicBoolean(false);
private HMaster myMaster;
public MyMasterRpcServices(HMaster master) throws IOException {
super(master);
myMaster = master;
}
@Override
public ReportRegionStateTransitionResponse reportRegionStateTransition(RpcController c,
ReportRegionStateTransitionRequest req) throws ServiceException {
ReportRegionStateTransitionResponse resp = super.reportRegionStateTransition(c, req);
if (enabled.get() && req.getTransition(0).getTransitionCode().equals(
TransitionCode.READY_TO_SPLIT) && !resp.hasErrorMessage()) {
RegionStates regionStates = myMaster.getAssignmentManager().getRegionStates();
for (RegionState regionState: regionStates.getRegionsInTransition().values()) {
// Find the merging_new region and remove it
if (regionState.isSplittingNew()) {
regionStates.deleteRegion(regionState.getRegion());
}
}
}
return resp;
}
}
public static class MockedRegionObserver extends BaseRegionObserver {
private SplitTransaction st = null;
private PairOfSameType<HRegion> daughterRegions = null;
@ -1177,7 +1258,6 @@ public class TestSplitTransactionOnCluster {
HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
st.stepsAfterPONR(rs, rs, daughterRegions);
}
}
}