HBASE-8631 Meta Region First Recovery
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1487939 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
11c861cb91
commit
957f580d83
|
@ -199,7 +199,8 @@ public abstract class ServerCallable<T> implements Callable<T> {
|
|||
|
||||
// If the server is dead, we need to wait a little before retrying, to give
|
||||
// a chance to the regions to be
|
||||
expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
|
||||
// tries hasn't been bumped up yet so we use "tries + 1" to get right pause time
|
||||
expectedSleep = ConnectionUtils.getPauseTime(pause, tries + 1);
|
||||
if (expectedSleep < MIN_WAIT_DEAD_SERVER
|
||||
&& (location == null || getConnection().isDeadServer(location.getServerName()))) {
|
||||
expectedSleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);
|
||||
|
|
|
@ -45,7 +45,7 @@ public abstract class IngestIntegrationTestBase {
|
|||
|
||||
protected void setUp(int numSlavesBase, Configuration conf) throws Exception {
|
||||
tableName = this.getClass().getSimpleName();
|
||||
util = (conf == null) ? new IntegrationTestingUtility() : new IntegrationTestingUtility(conf);
|
||||
util = getTestingUtil(conf);
|
||||
LOG.info("Initializing cluster with " + numSlavesBase + " servers");
|
||||
util.initializeCluster(numSlavesBase);
|
||||
LOG.info("Done initializing cluster");
|
||||
|
@ -59,6 +59,17 @@ public abstract class IngestIntegrationTestBase {
|
|||
Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret);
|
||||
}
|
||||
|
||||
protected IntegrationTestingUtility getTestingUtil(Configuration conf) {
|
||||
if (this.util == null) {
|
||||
if (conf == null) {
|
||||
this.util = new IntegrationTestingUtility();
|
||||
} else {
|
||||
this.util = new IntegrationTestingUtility(conf);
|
||||
}
|
||||
}
|
||||
return util;
|
||||
}
|
||||
|
||||
protected void setUp(int numSlavesBase) throws Exception {
|
||||
setUp(numSlavesBase, null);
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ import org.junit.experimental.categories.Category;
|
|||
@Category(IntegrationTests.class)
|
||||
public class IntegrationTestDataIngestWithChaosMonkey extends IngestIntegrationTestBase {
|
||||
|
||||
private static final int NUM_SLAVES_BASE = 4; //number of slaves for the smallest cluster
|
||||
private static int NUM_SLAVES_BASE = 4; //number of slaves for the smallest cluster
|
||||
|
||||
// run for 5 min by default
|
||||
private static final long DEFAULT_RUN_TIME = 5 * 60 * 1000;
|
||||
|
@ -42,6 +42,11 @@ public class IntegrationTestDataIngestWithChaosMonkey extends IngestIntegrationT
|
|||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
util= getTestingUtil(null);
|
||||
if(!util.isDistributedCluster()) {
|
||||
// In MiniCluster mode, we increase number of RS a little bit to speed the test
|
||||
NUM_SLAVES_BASE = 5;
|
||||
}
|
||||
super.setUp(NUM_SLAVES_BASE);
|
||||
monkey = new ChaosMonkey(util, ChaosMonkey.EVERY_MINUTE_RANDOM_ACTION_POLICY);
|
||||
monkey.start();
|
||||
|
|
|
@ -93,13 +93,13 @@ public class MasterFileSystem {
|
|||
final SplitLogManager splitLogManager;
|
||||
private final MasterServices services;
|
||||
|
||||
private final static PathFilter META_FILTER = new PathFilter() {
|
||||
final static PathFilter META_FILTER = new PathFilter() {
|
||||
public boolean accept(Path p) {
|
||||
return HLogUtil.isMetaFile(p);
|
||||
}
|
||||
};
|
||||
|
||||
private final static PathFilter NON_META_FILTER = new PathFilter() {
|
||||
final static PathFilter NON_META_FILTER = new PathFilter() {
|
||||
public boolean accept(Path p) {
|
||||
return !HLogUtil.isMetaFile(p);
|
||||
}
|
||||
|
|
|
@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
|
@ -127,8 +128,8 @@ public class SplitLogManager extends ZooKeeperListener {
|
|||
// When lastRecoveringNodeCreationTime is older than the following threshold, we'll check
|
||||
// whether to GC stale recovering znodes
|
||||
private long checkRecoveringTimeThreshold = 15000; // 15 seconds
|
||||
private final Set<ServerName> failedRecoveringRegionDeletions = Collections
|
||||
.synchronizedSet(new HashSet<ServerName>());
|
||||
private final List<Pair<Set<ServerName>, Boolean>> failedRecoveringRegionDeletions = Collections
|
||||
.synchronizedList(new ArrayList<Pair<Set<ServerName>, Boolean>>());
|
||||
|
||||
/**
|
||||
* In distributedLogReplay mode, we need touch both splitlog and recovering-regions znodes in one
|
||||
|
@ -307,6 +308,7 @@ public class SplitLogManager extends ZooKeeperListener {
|
|||
long t = EnvironmentEdgeManager.currentTimeMillis();
|
||||
long totalSize = 0;
|
||||
TaskBatch batch = new TaskBatch();
|
||||
Boolean isMetaRecovery = (filter == null) ? null : false;
|
||||
for (FileStatus lf : logfiles) {
|
||||
// TODO If the log file is still being written to - which is most likely
|
||||
// the case for the last log file - then its length will show up here
|
||||
|
@ -321,7 +323,12 @@ public class SplitLogManager extends ZooKeeperListener {
|
|||
}
|
||||
waitForSplittingCompletion(batch, status);
|
||||
// remove recovering regions from ZK
|
||||
this.removeRecoveringRegionsFromZK(serverNames);
|
||||
if (filter == MasterFileSystem.META_FILTER /* reference comparison */) {
|
||||
// we split meta regions and user regions separately therefore logfiles are either all for
|
||||
// meta or user regions but won't for both( we could have mixed situations in tests)
|
||||
isMetaRecovery = true;
|
||||
}
|
||||
this.removeRecoveringRegionsFromZK(serverNames, isMetaRecovery);
|
||||
|
||||
if (batch.done != batch.installed) {
|
||||
batch.isDead = true;
|
||||
|
@ -453,14 +460,18 @@ public class SplitLogManager extends ZooKeeperListener {
|
|||
* It removes recovering regions under /hbase/recovering-regions/[encoded region name] so that the
|
||||
* region server hosting the region can allow reads to the recovered region
|
||||
* @param serverNames servers which are just recovered
|
||||
* @param isMetaRecovery whether current recovery is for the meta region on
|
||||
* <code>serverNames<code>
|
||||
*/
|
||||
private void removeRecoveringRegionsFromZK(final Set<ServerName> serverNames) {
|
||||
private void
|
||||
removeRecoveringRegionsFromZK(final Set<ServerName> serverNames, Boolean isMetaRecovery) {
|
||||
|
||||
if (!this.distributedLogReplay) {
|
||||
// the function is only used in WALEdit direct replay mode
|
||||
return;
|
||||
}
|
||||
|
||||
final String metaEncodeRegionName = HRegionInfo.FIRST_META_REGIONINFO.getEncodedName();
|
||||
int count = 0;
|
||||
Set<String> recoveredServerNameSet = new HashSet<String>();
|
||||
if (serverNames != null) {
|
||||
|
@ -492,12 +503,20 @@ public class SplitLogManager extends ZooKeeperListener {
|
|||
List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
|
||||
if (regions != null) {
|
||||
for (String region : regions) {
|
||||
if(isMetaRecovery != null) {
|
||||
if ((isMetaRecovery && !region.equalsIgnoreCase(metaEncodeRegionName))
|
||||
|| (!isMetaRecovery && region.equalsIgnoreCase(metaEncodeRegionName))) {
|
||||
// skip non-meta regions when recovering the meta region or
|
||||
// skip the meta region when recovering user regions
|
||||
continue;
|
||||
}
|
||||
}
|
||||
String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
|
||||
List<String> failedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
|
||||
if (failedServers == null || failedServers.isEmpty()) {
|
||||
ZKUtil.deleteNode(watcher, nodePath);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (recoveredServerNameSet.containsAll(failedServers)) {
|
||||
ZKUtil.deleteNodeRecursively(watcher, nodePath);
|
||||
} else {
|
||||
|
@ -514,7 +533,8 @@ public class SplitLogManager extends ZooKeeperListener {
|
|||
} catch (KeeperException ke) {
|
||||
LOG.warn("removeRecoveringRegionsFromZK got zookeeper exception. Will retry", ke);
|
||||
if (serverNames != null && !serverNames.isEmpty()) {
|
||||
this.failedRecoveringRegionDeletions.addAll(serverNames);
|
||||
this.failedRecoveringRegionDeletions.add(new Pair<Set<ServerName>, Boolean>(serverNames,
|
||||
isMetaRecovery));
|
||||
}
|
||||
} finally {
|
||||
this.recoveringRegionLock.unlock();
|
||||
|
@ -588,7 +608,7 @@ public class SplitLogManager extends ZooKeeperListener {
|
|||
}
|
||||
}
|
||||
if (!needMoreRecovery) {
|
||||
ZKUtil.deleteNode(watcher, nodePath);
|
||||
ZKUtil.deleteNodeRecursively(watcher, nodePath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1384,12 +1404,16 @@ public class SplitLogManager extends ZooKeeperListener {
|
|||
if (!failedRecoveringRegionDeletions.isEmpty()
|
||||
|| (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) {
|
||||
// inside the function there have more checks before GC anything
|
||||
Set<ServerName> previouslyFailedDeletoins = null;
|
||||
if (!failedRecoveringRegionDeletions.isEmpty()) {
|
||||
previouslyFailedDeletoins = new HashSet<ServerName>(failedRecoveringRegionDeletions);
|
||||
failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletoins);
|
||||
List<Pair<Set<ServerName>, Boolean>> previouslyFailedDeletions =
|
||||
new ArrayList<Pair<Set<ServerName>, Boolean>>(failedRecoveringRegionDeletions);
|
||||
failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletions);
|
||||
for (Pair<Set<ServerName>, Boolean> failedDeletion : previouslyFailedDeletions) {
|
||||
removeRecoveringRegionsFromZK(failedDeletion.getFirst(), failedDeletion.getSecond());
|
||||
}
|
||||
} else {
|
||||
removeRecoveringRegionsFromZK(null, null);
|
||||
}
|
||||
removeRecoveringRegionsFromZK(previouslyFailedDeletoins);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
|||
import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.master.SplitLogManager;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
|
||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
@ -217,7 +218,14 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
|||
this.watcher.splitLogZNode + " ... worker thread exiting.");
|
||||
return;
|
||||
}
|
||||
int offset = (int)(Math.random() * paths.size());
|
||||
// pick meta wal firstly
|
||||
int offset = (int) (Math.random() * paths.size());
|
||||
for(int i = 0; i < paths.size(); i ++){
|
||||
if(HLogUtil.isMetaFile(paths.get(i))) {
|
||||
offset = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < paths.size(); i ++) {
|
||||
int idx = (i + offset) % paths.size();
|
||||
// don't call ZKSplitLog.getNodeName() because that will lead to
|
||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
|
@ -1628,8 +1629,9 @@ public class HLogSplitter {
|
|||
private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
|
||||
private final Map<String, RegionServerWriter> writers =
|
||||
new ConcurrentHashMap<String, RegionServerWriter>();
|
||||
// online encoded region name map
|
||||
private final Set<String> onlineRegions = Collections.synchronizedSet(new HashSet<String>());
|
||||
// online encoded region name -> region location map
|
||||
private final Map<String, HRegionLocation> onlineRegions =
|
||||
new ConcurrentHashMap<String, HRegionLocation>();
|
||||
|
||||
private Map<byte[], HConnection> tableNameToHConnectionMap = Collections
|
||||
.synchronizedMap(new TreeMap<byte[], HConnection>(Bytes.BYTES_COMPARATOR));
|
||||
|
@ -1648,8 +1650,17 @@ public class HLogSplitter {
|
|||
private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
|
||||
private boolean hasEditsInDisablingOrDisabledTables = false;
|
||||
|
||||
private Configuration sinkConf;
|
||||
public LogReplayOutputSink(int numWriters) {
|
||||
super(numWriters);
|
||||
// set a smaller retries to fast fail otherwise splitlogworker could be blocked for
|
||||
// quite a while inside HConnection layer. The worker won't available for other
|
||||
// tasks even after current task is preempted after a split task times out.
|
||||
sinkConf = HBaseConfiguration.create(conf);
|
||||
sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER - 2);
|
||||
sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT / 2);
|
||||
sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
|
||||
|
||||
this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout",
|
||||
SplitLogManager.DEFAULT_TIMEOUT);
|
||||
|
@ -1763,7 +1774,8 @@ public class HLogSplitter {
|
|||
}
|
||||
|
||||
try {
|
||||
loc = locateRegionAndRefreshLastFlushedSequenceId(hconn, table, kv.getRow());
|
||||
loc = locateRegionAndRefreshLastFlushedSequenceId(hconn, table, kv.getRow(),
|
||||
encodeRegionNameStr);
|
||||
} catch (TableNotFoundException ex) {
|
||||
// table has been deleted so skip edits of the table
|
||||
LOG.info("Table " + Bytes.toString(table)
|
||||
|
@ -1848,22 +1860,22 @@ public class HLogSplitter {
|
|||
* @throws IOException
|
||||
*/
|
||||
private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
|
||||
byte[] table, byte[] row) throws IOException {
|
||||
HRegionLocation loc = hconn.getRegionLocation(table, row, false);
|
||||
byte[] table, byte[] row, String originalEncodedRegionName) throws IOException {
|
||||
HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
|
||||
if(loc != null) return loc;
|
||||
|
||||
loc = hconn.getRegionLocation(table, row, false);
|
||||
if (loc == null) {
|
||||
throw new IOException("Can't locate location for row:" + Bytes.toString(row)
|
||||
+ " of table:" + Bytes.toString(table));
|
||||
}
|
||||
if (onlineRegions.contains(loc.getRegionInfo().getEncodedName())) {
|
||||
return loc;
|
||||
}
|
||||
|
||||
Long lastFlushedSequenceId = -1l;
|
||||
loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut);
|
||||
Long cachedLastFlushedSequenceId = lastFlushedSequenceIds.get(loc.getRegionInfo()
|
||||
.getEncodedName());
|
||||
|
||||
onlineRegions.add(loc.getRegionInfo().getEncodedName());
|
||||
onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
|
||||
// retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
|
||||
// update the value for the region
|
||||
RegionStoreSequenceIds ids =
|
||||
|
@ -2116,7 +2128,7 @@ public class HLogSplitter {
|
|||
synchronized (this.tableNameToHConnectionMap) {
|
||||
hconn = this.tableNameToHConnectionMap.get(tableName);
|
||||
if (hconn == null) {
|
||||
hconn = HConnectionManager.createConnection(conf);
|
||||
hconn = HConnectionManager.createConnection(sinkConf);
|
||||
this.tableNameToHConnectionMap.put(tableName, hconn);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -246,7 +246,11 @@ public class HLogUtil {
|
|||
}
|
||||
|
||||
public static boolean isMetaFile(Path p) {
|
||||
if (p.getName().endsWith(HLog.META_HLOG_FILE_EXTN)) {
|
||||
return isMetaFile(p.getName());
|
||||
}
|
||||
|
||||
public static boolean isMetaFile(String p) {
|
||||
if (p != null && p.endsWith(HLog.META_HLOG_FILE_EXTN)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
|
|
@ -26,6 +26,7 @@ import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done;
|
|||
import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err;
|
||||
import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
@ -995,6 +996,72 @@ public class TestDistributedLogSplitting {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testMetaRecoveryInZK() throws Exception {
|
||||
LOG.info("testMetaRecoveryInZK");
|
||||
Configuration curConf = HBaseConfiguration.create();
|
||||
curConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
|
||||
startCluster(NUM_RS, curConf);
|
||||
|
||||
// turn off load balancing to prevent regions from moving around otherwise
|
||||
// they will consume recovered.edits
|
||||
master.balanceSwitch(false);
|
||||
FileSystem fs = master.getMasterFileSystem().getFileSystem();
|
||||
final ZooKeeperWatcher zkw = new ZooKeeperWatcher(curConf, "table-creation", null);
|
||||
List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
|
||||
|
||||
installTable(zkw, "table", "family", 40);
|
||||
List<HRegionInfo> regions = null;
|
||||
HRegionServer hrs = null;
|
||||
for (int i = 0; i < NUM_RS; i++) {
|
||||
boolean isCarryingMeta = false;
|
||||
hrs = rsts.get(i).getRegionServer();
|
||||
regions = ProtobufUtil.getOnlineRegions(hrs);
|
||||
for (HRegionInfo region : regions) {
|
||||
if (region.isMetaRegion()) {
|
||||
isCarryingMeta = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!isCarryingMeta) {
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
LOG.info("#regions = " + regions.size());
|
||||
Set<HRegionInfo> tmpRegions = new HashSet<HRegionInfo>();
|
||||
tmpRegions.add(HRegionInfo.FIRST_META_REGIONINFO);
|
||||
master.getMasterFileSystem().prepareMetaLogReplay(hrs.getServerName(), tmpRegions);
|
||||
Set<ServerName> failedServers = new HashSet<ServerName>();
|
||||
failedServers.add(hrs.getServerName());
|
||||
master.getMasterFileSystem().prepareLogReplay(failedServers);
|
||||
boolean isMetaRegionInRecovery = false;
|
||||
List<String> recoveringRegions =
|
||||
zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
|
||||
for (String curEncodedRegionName : recoveringRegions) {
|
||||
if (curEncodedRegionName.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
|
||||
isMetaRegionInRecovery = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertTrue(isMetaRegionInRecovery);
|
||||
|
||||
master.getMasterFileSystem().splitMetaLog(hrs.getServerName());
|
||||
|
||||
isMetaRegionInRecovery = false;
|
||||
recoveringRegions =
|
||||
zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
|
||||
for (String curEncodedRegionName : recoveringRegions) {
|
||||
if (curEncodedRegionName.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
|
||||
isMetaRegionInRecovery = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
// meta region should be recovered
|
||||
assertFalse(isMetaRegionInRecovery);
|
||||
}
|
||||
|
||||
HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs) throws Exception {
|
||||
return installTable(zkw, tname, fname, nrs, 0);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue