HBASE-20001 cleanIfNoMetaEntry() uses encoded instead of region name to lookup region
Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
parent
a37c91bb41
commit
ece68d3c65
|
@ -255,10 +255,10 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
private final RegionStateStore regionStateStore;
|
private final RegionStateStore regionStateStore;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For testing only! Set to true to skip handling of split.
|
* For testing only! Set to true to skip handling of split and merge.
|
||||||
*/
|
*/
|
||||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
|
private static boolean TEST_SKIP_SPLIT_HANDLING = false;
|
||||||
public static boolean TEST_SKIP_SPLIT_HANDLING = false;
|
private static boolean TEST_SKIP_MERGE_HANDLING = false;
|
||||||
|
|
||||||
/** Listeners that are called on assignment events. */
|
/** Listeners that are called on assignment events. */
|
||||||
private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
|
private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
|
||||||
|
@ -4054,6 +4054,11 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
mergingRegions.put(encodedName,
|
mergingRegions.put(encodedName,
|
||||||
new PairOfSameType<HRegionInfo>(a, b));
|
new PairOfSameType<HRegionInfo>(a, b));
|
||||||
} else if (code == TransitionCode.MERGED) {
|
} else if (code == TransitionCode.MERGED) {
|
||||||
|
|
||||||
|
if (TEST_SKIP_MERGE_HANDLING) {
|
||||||
|
return "Skipping merge message, TEST_SKIP_MERGE_HANDLING is set for merge parent: " + p;
|
||||||
|
}
|
||||||
|
|
||||||
mergingRegions.remove(encodedName);
|
mergingRegions.remove(encodedName);
|
||||||
regionOffline(a, State.MERGED);
|
regionOffline(a, State.MERGED);
|
||||||
regionOffline(b, State.MERGED);
|
regionOffline(b, State.MERGED);
|
||||||
|
@ -4180,6 +4185,11 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
regionStates.updateRegionState(hri_b, State.MERGING);
|
regionStates.updateRegionState(hri_b, State.MERGING);
|
||||||
regionStates.updateRegionState(p, State.MERGING_NEW, sn);
|
regionStates.updateRegionState(p, State.MERGING_NEW, sn);
|
||||||
|
|
||||||
|
if (TEST_SKIP_MERGE_HANDLING) {
|
||||||
|
LOG.warn("Skipping merge message, TEST_SKIP_MERGE_HANDLING is set for merge parent: " + p);
|
||||||
|
return true; // return true so that the merging node stays
|
||||||
|
}
|
||||||
|
|
||||||
if (et != EventType.RS_ZK_REGION_MERGED) {
|
if (et != EventType.RS_ZK_REGION_MERGED) {
|
||||||
this.mergingRegions.put(encodedName,
|
this.mergingRegions.put(encodedName,
|
||||||
new PairOfSameType<HRegionInfo>(hri_a, hri_b));
|
new PairOfSameType<HRegionInfo>(hri_a, hri_b));
|
||||||
|
@ -4727,4 +4737,20 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
threadPoolExecutorService.submit(callable);
|
threadPoolExecutorService.submit(callable);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This is only used for unit-testing split failures.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public static void setTestSkipSplitHandling(boolean skipSplitHandling) {
|
||||||
|
TEST_SKIP_SPLIT_HANDLING = skipSplitHandling;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This is only used for unit-testing merge failures.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public static void setTestSkipMergeHandling(boolean skipMergeHandling) {
|
||||||
|
TEST_SKIP_MERGE_HANDLING = skipMergeHandling;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||||
import org.apache.hadoop.hbase.master.RegionState.State;
|
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.ConfigUtil;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||||
|
@ -169,6 +170,7 @@ public class RegionStates {
|
||||||
private final RegionStateStore regionStateStore;
|
private final RegionStateStore regionStateStore;
|
||||||
private final ServerManager serverManager;
|
private final ServerManager serverManager;
|
||||||
private final MasterServices server;
|
private final MasterServices server;
|
||||||
|
private final boolean useZK; // Is it ZK based assignment?
|
||||||
|
|
||||||
// The maximum time to keep a log split info in region states map
|
// The maximum time to keep a log split info in region states map
|
||||||
static final String LOG_SPLIT_TIME = "hbase.master.maximum.logsplit.keeptime";
|
static final String LOG_SPLIT_TIME = "hbase.master.maximum.logsplit.keeptime";
|
||||||
|
@ -180,6 +182,7 @@ public class RegionStates {
|
||||||
this.regionStateStore = regionStateStore;
|
this.regionStateStore = regionStateStore;
|
||||||
this.serverManager = serverManager;
|
this.serverManager = serverManager;
|
||||||
this.server = master;
|
this.server = master;
|
||||||
|
this.useZK = ConfigUtil.useZKForAssignment(server.getConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -726,7 +729,7 @@ public class RegionStates {
|
||||||
public List<HRegionInfo> serverOffline(final ZooKeeperWatcher watcher, final ServerName sn) {
|
public List<HRegionInfo> serverOffline(final ZooKeeperWatcher watcher, final ServerName sn) {
|
||||||
// Offline all regions on this server not already in transition.
|
// Offline all regions on this server not already in transition.
|
||||||
List<HRegionInfo> rits = new ArrayList<HRegionInfo>();
|
List<HRegionInfo> rits = new ArrayList<HRegionInfo>();
|
||||||
Set<HRegionInfo> regionsToCleanIfNoMetaEntry = new HashSet<HRegionInfo>();
|
Set<HRegionInfo> regionsToClean = new HashSet<HRegionInfo>();
|
||||||
// Offline regions outside the loop and synchronized block to avoid
|
// Offline regions outside the loop and synchronized block to avoid
|
||||||
// ConcurrentModificationException and deadlock in case of meta anassigned,
|
// ConcurrentModificationException and deadlock in case of meta anassigned,
|
||||||
// but RegionState a blocked.
|
// but RegionState a blocked.
|
||||||
|
@ -773,7 +776,9 @@ public class RegionStates {
|
||||||
" to be reassigned by ServerCrashProcedure for " + sn);
|
" to be reassigned by ServerCrashProcedure for " + sn);
|
||||||
rits.add(hri);
|
rits.add(hri);
|
||||||
} else if(state.isSplittingNew() || state.isMergingNew()) {
|
} else if(state.isSplittingNew() || state.isMergingNew()) {
|
||||||
regionsToCleanIfNoMetaEntry.add(state.getRegion());
|
LOG.info("Offline/Cleanup region if no meta entry exists, hri: " + hri +
|
||||||
|
" state: " + state);
|
||||||
|
regionsToClean.add(state.getRegion());
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("THIS SHOULD NOT HAPPEN: unexpected " + state);
|
LOG.warn("THIS SHOULD NOT HAPPEN: unexpected " + state);
|
||||||
}
|
}
|
||||||
|
@ -786,28 +791,41 @@ public class RegionStates {
|
||||||
regionOffline(hri);
|
regionOffline(hri);
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanIfNoMetaEntry(regionsToCleanIfNoMetaEntry);
|
cleanFailedSplitMergeRegions(regionsToClean);
|
||||||
return rits;
|
return rits;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method does an RPC to hbase:meta. Do not call this method with a lock/synchronize held.
|
* This method does an RPC to hbase:meta. Do not call this method with a lock/synchronize held.
|
||||||
|
* In ZK mode we rollback and hence cleanup daughters/merged region. We also cleanup if
|
||||||
|
* meta doesn't have these regions.
|
||||||
|
*
|
||||||
* @param hris The hris to check if empty in hbase:meta and if so, clean them up.
|
* @param hris The hris to check if empty in hbase:meta and if so, clean them up.
|
||||||
*/
|
*/
|
||||||
private void cleanIfNoMetaEntry(Set<HRegionInfo> hris) {
|
private void cleanFailedSplitMergeRegions(Set<HRegionInfo> hris) {
|
||||||
if (hris.isEmpty()) return;
|
if (hris.isEmpty()) {
|
||||||
for (HRegionInfo hri: hris) {
|
return;
|
||||||
try {
|
}
|
||||||
|
|
||||||
|
for (HRegionInfo hri : hris) {
|
||||||
// This is RPC to meta table. It is done while we have a synchronize on
|
// This is RPC to meta table. It is done while we have a synchronize on
|
||||||
// regionstates. No progress will be made if meta is not available at this time.
|
// regionstates. No progress will be made if meta is not available at this time.
|
||||||
// This is a cleanup task. Not critical.
|
// This is a cleanup task. Not critical.
|
||||||
if (MetaTableAccessor.getRegion(server.getConnection(), hri.getEncodedNameAsBytes()) ==
|
try {
|
||||||
null) {
|
Pair<HRegionInfo, ServerName> regionPair =
|
||||||
|
MetaTableAccessor.getRegion(server.getConnection(), hri.getRegionName());
|
||||||
|
if (regionPair == null || useZK) {
|
||||||
regionOffline(hri);
|
regionOffline(hri);
|
||||||
|
|
||||||
|
// If we use ZK, then we can cleanup entries from meta, since we roll back.
|
||||||
|
if (regionPair != null) {
|
||||||
|
MetaTableAccessor.deleteRegion(this.server.getConnection(), hri);
|
||||||
|
}
|
||||||
|
LOG.debug("Cleaning up HDFS since no meta entry exists, hri: " + hri);
|
||||||
FSUtils.deleteRegionDir(server.getConfiguration(), hri);
|
FSUtils.deleteRegionDir(server.getConfiguration(), hri);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Got exception while deleting " + hri + " directories from file system.", e);
|
LOG.warn("Got exception while cleaning up region " + hri, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
@ -30,6 +31,7 @@ import java.util.List;
|
||||||
import org.apache.commons.lang.math.RandomUtils;
|
import org.apache.commons.lang.math.RandomUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
@ -37,12 +39,13 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
|
||||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
|
import org.apache.hadoop.hbase.RegionTransition;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.UnknownRegionException;
|
import org.apache.hadoop.hbase.UnknownRegionException;
|
||||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||||
|
import org.apache.hadoop.hbase.Waiter;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||||
|
@ -51,17 +54,23 @@ import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
|
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
|
||||||
|
import org.apache.hadoop.hbase.executor.EventType;
|
||||||
import org.apache.hadoop.hbase.master.AssignmentManager;
|
import org.apache.hadoop.hbase.master.AssignmentManager;
|
||||||
import org.apache.hadoop.hbase.master.HMaster;
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
import org.apache.hadoop.hbase.master.RegionState.State;
|
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||||
import org.apache.hadoop.hbase.master.RegionStates;
|
import org.apache.hadoop.hbase.master.RegionStates;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
import org.apache.hadoop.hbase.util.PairOfSameType;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.zookeeper.data.Stat;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -95,11 +104,13 @@ public class TestRegionMergeTransactionOnCluster {
|
||||||
|
|
||||||
private static HMaster master;
|
private static HMaster master;
|
||||||
private static Admin admin;
|
private static Admin admin;
|
||||||
|
static MiniHBaseCluster cluster;
|
||||||
|
static Configuration conf;
|
||||||
|
|
||||||
static void setupOnce() throws Exception {
|
static void setupOnce() throws Exception {
|
||||||
// Start a cluster
|
// Start a cluster
|
||||||
TEST_UTIL.startMiniCluster(NB_SERVERS);
|
TEST_UTIL.startMiniCluster(NB_SERVERS);
|
||||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
cluster = TEST_UTIL.getHBaseCluster();
|
||||||
master = cluster.getMaster();
|
master = cluster.getMaster();
|
||||||
master.balanceSwitch(false);
|
master.balanceSwitch(false);
|
||||||
admin = TEST_UTIL.getHBaseAdmin();
|
admin = TEST_UTIL.getHBaseAdmin();
|
||||||
|
@ -107,8 +118,9 @@ public class TestRegionMergeTransactionOnCluster {
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void beforeAllTests() throws Exception {
|
public static void beforeAllTests() throws Exception {
|
||||||
|
conf = TEST_UTIL.getConfiguration();
|
||||||
// Use ZK for region assignment
|
// Use ZK for region assignment
|
||||||
TEST_UTIL.getConfiguration().setBoolean("hbase.assignment.usezk", true);
|
conf.setBoolean("hbase.assignment.usezk", true);
|
||||||
setupOnce();
|
setupOnce();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -469,4 +481,169 @@ public class TestRegionMergeTransactionOnCluster {
|
||||||
assertEquals(expectedRegionNum, rowCount);
|
assertEquals(expectedRegionNum, rowCount);
|
||||||
scanner.close();
|
scanner.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A test that intentionally has master fail the processing of the merge message.
|
||||||
|
* Tests that the regionserver merge ephemeral node gets cleaned up if it
|
||||||
|
* crashes and that after we process server shutdown, the parent regions are online and
|
||||||
|
* merged region is cleaned up.
|
||||||
|
*/
|
||||||
|
@Test (timeout = 60000)
|
||||||
|
public void testMergeIsRolledBackOnMergeFailure() throws Exception {
|
||||||
|
|
||||||
|
final RegionStates regionStates = master.getAssignmentManager().getRegionStates();
|
||||||
|
final ZooKeeperWatcher zkw = TEST_UTIL.getZooKeeperWatcher();
|
||||||
|
|
||||||
|
final TableName tableName = TableName.valueOf("testMergeIsRolledBackOnMergeFailure");
|
||||||
|
// Create table with 2 regions as its easy for us to merge.
|
||||||
|
createTableAndLoadData(master, tableName, 2, 1);
|
||||||
|
List<HRegion> regions = cluster.getRegions(tableName);
|
||||||
|
|
||||||
|
assertEquals("Table shudn't have more than 2 regions, " + regions, 2, regions.size());
|
||||||
|
final HRegionInfo regionA = regions.get(0).getRegionInfo();
|
||||||
|
final HRegionInfo regionB = regions.get(1).getRegionInfo();
|
||||||
|
|
||||||
|
// Turn off balancer so it doesn't cut in and mess up our placements.
|
||||||
|
admin.setBalancerRunning(false, true);
|
||||||
|
// Turn off the meta scanner so it don't remove parent on us.
|
||||||
|
master.setCatalogJanitorEnabled(false);
|
||||||
|
|
||||||
|
// Start a server and move both the regions to it. We kill this server later.
|
||||||
|
HRegionServer regionServer = cluster.startRegionServer().getRegionServer();
|
||||||
|
moveRegionToServer(regionA, regionServer);
|
||||||
|
moveRegionToServer(regionB, regionServer);
|
||||||
|
|
||||||
|
int serverIndex = cluster.getServerWith(regionA.getRegionName());
|
||||||
|
|
||||||
|
// This helps with server aborts later.
|
||||||
|
TEST_UTIL.compact(tableName, true);
|
||||||
|
|
||||||
|
try {
|
||||||
|
printOutRegions(regionServer, "Initial regions: ");
|
||||||
|
|
||||||
|
// Now, before we merge, set special flag in master, a flag that has
|
||||||
|
// it FAIL the processing of merge.
|
||||||
|
AssignmentManager.setTestSkipMergeHandling(true);
|
||||||
|
admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false);
|
||||||
|
|
||||||
|
// Lets wait until we have a merge region.
|
||||||
|
TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws Exception {
|
||||||
|
return regionStates.getRegionByStateOfTable(tableName).get(State.MERGING_NEW).size() > 0;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
List<HRegionInfo> mergedRegions =
|
||||||
|
regionStates.getRegionByStateOfTable(tableName).get(State.MERGING_NEW);
|
||||||
|
assertEquals("Only one region should be in MERGING_NEW state", 1, mergedRegions.size());
|
||||||
|
final HRegionInfo merge = mergedRegions.get(0);
|
||||||
|
|
||||||
|
// Lets double check if we have the merge Znode with the appr. state.
|
||||||
|
final String path = ZKAssign.getNodeName(zkw, merge.getEncodedName());
|
||||||
|
// Wait till the znode moved to MERGED
|
||||||
|
TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
|
||||||
|
@Override public boolean evaluate() throws Exception {
|
||||||
|
Stat stats = zkw.getRecoverableZooKeeper().exists(path, false);
|
||||||
|
RegionTransition rt =
|
||||||
|
RegionTransition.parseFrom(ZKAssign.getData(zkw, merge.getEncodedName()));
|
||||||
|
return stats != null && rt.getEventType().equals(EventType.RS_ZK_REGION_MERGED);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Now crash the server
|
||||||
|
abortServerAndWaitForProcessingToComplete(serverIndex);
|
||||||
|
waitUntilRegionServerDead();
|
||||||
|
|
||||||
|
TEST_UTIL.waitUntilNoRegionsInTransition();
|
||||||
|
|
||||||
|
// Lets wait until merge parents are online.
|
||||||
|
TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() {
|
||||||
|
return cluster.getRegions(tableName).size() == 2;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Check if merge regions is cleaned up.
|
||||||
|
List<HRegionInfo> tableRegions = MetaTableAccessor.getTableRegions(zkw,
|
||||||
|
cluster.getMaster().getConnection(), tableName);
|
||||||
|
assertEquals("Only parent regions should be present, but we have: " + tableRegions,
|
||||||
|
2, tableRegions.size());
|
||||||
|
assertTrue("Merge A not present? " + regionA, tableRegions.contains(regionA));
|
||||||
|
assertTrue("Merge B not present? " + regionB, tableRegions.contains(regionB));
|
||||||
|
|
||||||
|
// Are both merge parents online?
|
||||||
|
assertTrue("region should be online, " + regionA, regionStates.isRegionOnline(regionA));
|
||||||
|
assertTrue("region should be online, " + regionB, regionStates.isRegionOnline(regionB));
|
||||||
|
|
||||||
|
// Have HDFS dirs been cleaned up?
|
||||||
|
Path tableDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableName);
|
||||||
|
List<Path> regionDirs =
|
||||||
|
FSUtils.getRegionDirs(cluster.getMaster().getFileSystem(), tableDir);
|
||||||
|
assertEquals("Only two region dir should be present, we have, dirs: " + regionDirs,
|
||||||
|
2, regionDirs.size());
|
||||||
|
|
||||||
|
assertTrue("Region dir doesn't belong to region: " + regionA + " dir: " + regionDirs,
|
||||||
|
regionDirs.get(0).getName().endsWith(regionA.getEncodedName())
|
||||||
|
|| regionDirs.get(1).getName().endsWith(regionA.getEncodedName()));
|
||||||
|
assertTrue("Region dir doesn't belong to region: " + regionB + " dir: " + regionDirs,
|
||||||
|
regionDirs.get(0).getName().endsWith(regionB.getEncodedName())
|
||||||
|
|| regionDirs.get(1).getName().endsWith(regionB.getEncodedName()));
|
||||||
|
|
||||||
|
// The merged Znode should have been cleaned up.
|
||||||
|
Stat stat = zkw.getRecoverableZooKeeper().exists(path, false);
|
||||||
|
assertNull("Merged znode shouldn't exist, but we have stat: " + stat, stat);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
// Set this flag back.
|
||||||
|
AssignmentManager.setTestSkipMergeHandling(false);
|
||||||
|
admin.setBalancerRunning(true, false);
|
||||||
|
master.setCatalogJanitorEnabled(true);
|
||||||
|
TEST_UTIL.deleteTable(tableName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void moveRegionToServer(final HRegionInfo region, final HRegionServer rs)
|
||||||
|
throws Exception {
|
||||||
|
admin.move(region.getEncodedNameAsBytes(), rs.getServerName().toString().getBytes());
|
||||||
|
TEST_UTIL.waitUntilNoRegionsInTransition();
|
||||||
|
TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws Exception {
|
||||||
|
return rs.getOnlineRegion(region.getRegionName()) != null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitUntilRegionServerDead() throws InterruptedException, IOException {
|
||||||
|
// Wait until the master processes the RS shutdown
|
||||||
|
for (int i=0; cluster.getMaster().getClusterStatus().
|
||||||
|
getServers().size() > NB_SERVERS && i<100; i++) {
|
||||||
|
LOG.info("Waiting on server to go down");
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
assertFalse("Waited too long for RS to die", cluster.getMaster().getClusterStatus().
|
||||||
|
getServers().size() > NB_SERVERS);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void printOutRegions(final HRegionServer hrs, final String prefix)
|
||||||
|
throws IOException {
|
||||||
|
List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
|
||||||
|
for (HRegionInfo region: regions) {
|
||||||
|
LOG.info(prefix + region.getRegionNameAsString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void abortServerAndWaitForProcessingToComplete(int serverIndex) throws Exception {
|
||||||
|
|
||||||
|
final HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
|
||||||
|
cluster.abortRegionServer(serverIndex);
|
||||||
|
TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws Exception {
|
||||||
|
return master.getServerManager().areDeadServersInProgress();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -126,6 +126,7 @@ public class TestSplitTransactionOnCluster {
|
||||||
LogFactory.getLog(TestSplitTransactionOnCluster.class);
|
LogFactory.getLog(TestSplitTransactionOnCluster.class);
|
||||||
private HBaseAdmin admin = null;
|
private HBaseAdmin admin = null;
|
||||||
private MiniHBaseCluster cluster = null;
|
private MiniHBaseCluster cluster = null;
|
||||||
|
private static Configuration conf;
|
||||||
private static final int NB_SERVERS = 3;
|
private static final int NB_SERVERS = 3;
|
||||||
private static CountDownLatch latch = new CountDownLatch(1);
|
private static CountDownLatch latch = new CountDownLatch(1);
|
||||||
private static volatile boolean secondSplit = false;
|
private static volatile boolean secondSplit = false;
|
||||||
|
@ -143,9 +144,11 @@ public class TestSplitTransactionOnCluster {
|
||||||
TESTING_UTIL.startMiniCluster(NB_SERVERS);
|
TESTING_UTIL.startMiniCluster(NB_SERVERS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass public static void before() throws Exception {
|
@BeforeClass
|
||||||
|
public static void before() throws Exception {
|
||||||
|
conf = TESTING_UTIL.getConfiguration();
|
||||||
// Use ZK for region assignment
|
// Use ZK for region assignment
|
||||||
TESTING_UTIL.getConfiguration().setBoolean("hbase.assignment.usezk", true);
|
conf.setBoolean("hbase.assignment.usezk", true);
|
||||||
setupOnce();
|
setupOnce();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -389,90 +392,95 @@ public class TestSplitTransactionOnCluster {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/*
|
||||||
* A test that intentionally has master fail the processing of the split message.
|
* A test that intentionally has master fail the processing of the split message.
|
||||||
* Tests that the regionserver split ephemeral node gets cleaned up if it
|
* Tests that the regionserver split ephemeral node gets cleaned up if it
|
||||||
* crashes and that after we process server shutdown, the daughters are up on
|
* crashes and that after we process server shutdown, the parent region is online and
|
||||||
* line.
|
* daughters are cleaned up.
|
||||||
* @throws IOException
|
|
||||||
* @throws InterruptedException
|
|
||||||
* @throws NodeExistsException
|
|
||||||
* @throws KeeperException
|
|
||||||
* @throws DeserializationException
|
|
||||||
*/
|
*/
|
||||||
@Test (timeout = 300000) public void testRSSplitEphemeralsDisappearButDaughtersAreOnlinedAfterShutdownHandling()
|
@Test (timeout = 60000)
|
||||||
throws IOException, InterruptedException, NodeExistsException, KeeperException,
|
public void testSplitIsRolledBackOnSplitFailure() throws Exception {
|
||||||
DeserializationException, ServiceException {
|
final TableName tableName = TableName.valueOf("testSplitIsRolledBackOnSplitFailure");
|
||||||
final TableName tableName =
|
|
||||||
TableName.valueOf("testRSSplitEphemeralsDisappearButDaughtersAreOnlinedAfterShutdownHandling");
|
|
||||||
|
|
||||||
// Create table then get the single region for our new table.
|
// Create table then get the single region for our new table.
|
||||||
HTable t = createTableAndWait(tableName, HConstants.CATALOG_FAMILY);
|
HTable t = createTableAndWait(tableName, HConstants.CATALOG_FAMILY);
|
||||||
List<HRegion> regions = cluster.getRegions(tableName);
|
List<HRegion> regions = cluster.getRegions(tableName);
|
||||||
HRegionInfo hri = getAndCheckSingleTableRegion(regions);
|
final HRegionInfo hri = getAndCheckSingleTableRegion(regions);
|
||||||
|
|
||||||
int tableRegionIndex = ensureTableRegionNotOnSameServerAsMeta(admin, hri);
|
|
||||||
|
|
||||||
// Turn off balancer so it doesn't cut in and mess up our placements.
|
// Turn off balancer so it doesn't cut in and mess up our placements.
|
||||||
this.admin.setBalancerRunning(false, true);
|
this.admin.setBalancerRunning(false, true);
|
||||||
// Turn off the meta scanner so it don't remove parent on us.
|
// Turn off the meta scanner so it don't remove parent on us.
|
||||||
cluster.getMaster().setCatalogJanitorEnabled(false);
|
cluster.getMaster().setCatalogJanitorEnabled(false);
|
||||||
|
|
||||||
|
int serverIndex = ensureTableRegionNotOnSameServerAsMeta(admin, hri);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Add a bit of load up into the table so splittable.
|
// Add a bit of load up into the table so splittable.
|
||||||
TESTING_UTIL.loadTable(t, HConstants.CATALOG_FAMILY, false);
|
TESTING_UTIL.loadTable(t, HConstants.CATALOG_FAMILY, false);
|
||||||
// Get region pre-split.
|
// Get region pre-split.
|
||||||
HRegionServer server = cluster.getRegionServer(tableRegionIndex);
|
HRegionServer server = cluster.getRegionServer(serverIndex);
|
||||||
printOutRegions(server, "Initial regions: ");
|
printOutRegions(server, "Initial regions: ");
|
||||||
int regionCount = ProtobufUtil.getOnlineRegions(server.getRSRpcServices()).size();
|
int regionCount = ProtobufUtil.getOnlineRegions(server.getRSRpcServices()).size();
|
||||||
// Now, before we split, set special flag in master, a flag that has
|
// Now, before we split, set special flag in master, a flag that has
|
||||||
// it FAIL the processing of split.
|
// it FAIL the processing of split.
|
||||||
AssignmentManager.TEST_SKIP_SPLIT_HANDLING = true;
|
AssignmentManager.setTestSkipSplitHandling(true);
|
||||||
// Now try splitting and it should work.
|
// Now try splitting and it should work.
|
||||||
split(hri, server, regionCount);
|
split(hri, server, regionCount);
|
||||||
|
|
||||||
String path = ZKAssign.getNodeName(TESTING_UTIL.getZooKeeperWatcher(),
|
ZooKeeperWatcher zkw = TESTING_UTIL.getZooKeeperWatcher();
|
||||||
hri.getEncodedName());
|
String path = ZKAssign.getNodeName(zkw, hri.getEncodedName());
|
||||||
RegionTransition rt = null;
|
RegionTransition rt = null;
|
||||||
Stat stats = null;
|
Stat stats = null;
|
||||||
List<HRegion> daughters = null;
|
|
||||||
if (useZKForAssignment) {
|
|
||||||
daughters = checkAndGetDaughters(tableName);
|
|
||||||
|
|
||||||
// Wait till the znode moved to SPLIT
|
// Wait till the znode moved to SPLIT
|
||||||
for (int i=0; i<100; i++) {
|
for (int i = 0; i < 100; i++) {
|
||||||
stats = TESTING_UTIL.getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false);
|
stats = zkw.getRecoverableZooKeeper().exists(path, false);
|
||||||
rt = RegionTransition.parseFrom(ZKAssign.getData(TESTING_UTIL.getZooKeeperWatcher(),
|
rt = RegionTransition.parseFrom(ZKAssign.getData(zkw, hri.getEncodedName()));
|
||||||
hri.getEncodedName()));
|
if (rt.getEventType().equals(EventType.RS_ZK_REGION_SPLIT)) {
|
||||||
if (rt.getEventType().equals(EventType.RS_ZK_REGION_SPLIT)) break;
|
break;
|
||||||
|
}
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats);
|
LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats);
|
||||||
assertTrue(rt != null && rt.getEventType().equals(EventType.RS_ZK_REGION_SPLIT));
|
assertTrue(rt.getEventType().equals(EventType.RS_ZK_REGION_SPLIT));
|
||||||
// Now crash the server, for ZK-less assignment, the server is auto aborted
|
// Now crash the server, for ZK-less assignment, the server is auto aborted
|
||||||
cluster.abortRegionServer(tableRegionIndex);
|
abortServerAndWaitForProcessingToComplete(serverIndex);
|
||||||
}
|
|
||||||
waitUntilRegionServerDead();
|
waitUntilRegionServerDead();
|
||||||
awaitDaughters(tableName, 2);
|
|
||||||
if (useZKForAssignment) {
|
|
||||||
regions = cluster.getRegions(tableName);
|
|
||||||
for (HRegion r: regions) {
|
|
||||||
assertTrue(daughters.contains(r));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Finally assert that the ephemeral SPLIT znode was cleaned up.
|
TESTING_UTIL.waitUntilNoRegionsInTransition();
|
||||||
for (int i=0; i<100; i++) {
|
|
||||||
// wait a bit (10s max) for the node to disappear
|
// Lets wait until parent region is online.
|
||||||
stats = TESTING_UTIL.getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false);
|
TESTING_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
|
||||||
if (stats == null) break;
|
@Override
|
||||||
Thread.sleep(100);
|
public boolean evaluate() {
|
||||||
|
for (HRegion region : cluster.getRegions(tableName)) {
|
||||||
|
if (Bytes.equals(region.getRegionInfo().getRegionName(), hri.getRegionName())) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
LOG.debug("Wait for some more time, online region: " + region);
|
||||||
}
|
}
|
||||||
LOG.info("EPHEMERAL NODE AFTER SERVER ABORT, path=" + path + ", stats=" + stats);
|
|
||||||
assertTrue(stats == null);
|
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
RegionStates regionStates = cluster.getMaster().getAssignmentManager().getRegionStates();
|
||||||
|
assertTrue("Parent region should be online", regionStates.isRegionOnline(hri));
|
||||||
|
// Check if daughter regions are cleaned up.
|
||||||
|
List<HRegionInfo> tableRegions = MetaTableAccessor.getTableRegions(zkw,
|
||||||
|
cluster.getMaster().getConnection(), tableName);
|
||||||
|
assertEquals("Only parent region should be present, but we have: " + tableRegions,
|
||||||
|
1, tableRegions.size());
|
||||||
|
Path tableDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableName);
|
||||||
|
List<Path> regionDirs =
|
||||||
|
FSUtils.getRegionDirs(cluster.getMaster().getFileSystem(), tableDir);
|
||||||
|
assertEquals("Only one region dir should be present, we have, dirs: " + regionDirs,
|
||||||
|
1, regionDirs.size());
|
||||||
|
assertTrue("Region dir doesn't belong to region: " + hri + " dir: " + regionDirs,
|
||||||
|
regionDirs.get(0).getName().endsWith(hri.getEncodedName()));
|
||||||
} finally {
|
} finally {
|
||||||
// Set this flag back.
|
// Set this flag back.
|
||||||
AssignmentManager.TEST_SKIP_SPLIT_HANDLING = false;
|
AssignmentManager.setTestSkipSplitHandling(false);
|
||||||
cluster.getMaster().getAssignmentManager().regionOffline(hri);
|
|
||||||
admin.setBalancerRunning(true, false);
|
admin.setBalancerRunning(true, false);
|
||||||
cluster.getMaster().setCatalogJanitorEnabled(true);
|
cluster.getMaster().setCatalogJanitorEnabled(true);
|
||||||
cluster.startRegionServer();
|
cluster.startRegionServer();
|
||||||
|
@ -481,6 +489,18 @@ public class TestSplitTransactionOnCluster {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void abortServerAndWaitForProcessingToComplete(int serverIndex) throws Exception {
|
||||||
|
|
||||||
|
final HMaster master = TESTING_UTIL.getMiniHBaseCluster().getMaster();
|
||||||
|
cluster.abortRegionServer(serverIndex);
|
||||||
|
TESTING_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws Exception {
|
||||||
|
return master.getServerManager().areDeadServersInProgress();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@Test (timeout = 300000) public void testExistingZnodeBlocksSplitAndWeRollback()
|
@Test (timeout = 300000) public void testExistingZnodeBlocksSplitAndWeRollback()
|
||||||
throws IOException, InterruptedException, NodeExistsException, KeeperException, ServiceException {
|
throws IOException, InterruptedException, NodeExistsException, KeeperException, ServiceException {
|
||||||
final TableName tableName =
|
final TableName tableName =
|
||||||
|
@ -749,7 +769,7 @@ public class TestSplitTransactionOnCluster {
|
||||||
printOutRegions(server, "Initial regions: ");
|
printOutRegions(server, "Initial regions: ");
|
||||||
// Now, before we split, set special flag in master, a flag that has
|
// Now, before we split, set special flag in master, a flag that has
|
||||||
// it FAIL the processing of split.
|
// it FAIL the processing of split.
|
||||||
AssignmentManager.TEST_SKIP_SPLIT_HANDLING = true;
|
AssignmentManager.setTestSkipSplitHandling(true);
|
||||||
// Now try splitting and it should work.
|
// Now try splitting and it should work.
|
||||||
|
|
||||||
this.admin.split(hri.getRegionNameAsString());
|
this.admin.split(hri.getRegionNameAsString());
|
||||||
|
@ -779,7 +799,7 @@ public class TestSplitTransactionOnCluster {
|
||||||
assertTrue(regionServerOfRegion != null);
|
assertTrue(regionServerOfRegion != null);
|
||||||
|
|
||||||
// Remove the block so that split can move ahead.
|
// Remove the block so that split can move ahead.
|
||||||
AssignmentManager.TEST_SKIP_SPLIT_HANDLING = false;
|
AssignmentManager.setTestSkipSplitHandling(false);
|
||||||
String node = ZKAssign.getNodeName(zkw, hri.getEncodedName());
|
String node = ZKAssign.getNodeName(zkw, hri.getEncodedName());
|
||||||
Stat stat = new Stat();
|
Stat stat = new Stat();
|
||||||
byte[] data = ZKUtil.getDataNoWatch(zkw, node, stat);
|
byte[] data = ZKUtil.getDataNoWatch(zkw, node, stat);
|
||||||
|
@ -796,7 +816,7 @@ public class TestSplitTransactionOnCluster {
|
||||||
assertTrue(regionServerOfRegion == null);
|
assertTrue(regionServerOfRegion == null);
|
||||||
} finally {
|
} finally {
|
||||||
// Set this flag back.
|
// Set this flag back.
|
||||||
AssignmentManager.TEST_SKIP_SPLIT_HANDLING = false;
|
AssignmentManager.setTestSkipSplitHandling(false);
|
||||||
admin.setBalancerRunning(true, false);
|
admin.setBalancerRunning(true, false);
|
||||||
cluster.getMaster().setCatalogJanitorEnabled(true);
|
cluster.getMaster().setCatalogJanitorEnabled(true);
|
||||||
t.close();
|
t.close();
|
||||||
|
|
Loading…
Reference in New Issue