HBASE-8617: Introducing a new config to disable writes during recovering
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1494804 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
559aa12a09
commit
e75a7e868b
|
@ -724,6 +724,9 @@ public final class HConstants {
|
||||||
/** Conf key that enables unflushed WAL edits directly being replayed to region servers */
|
/** Conf key that enables unflushed WAL edits directly being replayed to region servers */
|
||||||
public static final String DISTRIBUTED_LOG_REPLAY_KEY = "hbase.master.distributed.log.replay";
|
public static final String DISTRIBUTED_LOG_REPLAY_KEY = "hbase.master.distributed.log.replay";
|
||||||
public static final boolean DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG = false;
|
public static final boolean DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG = false;
|
||||||
|
public static final String DISALLOW_WRITES_IN_RECOVERING =
|
||||||
|
"hbase.regionserver.disallow.writes.when.recovering";
|
||||||
|
public static final boolean DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG = false;
|
||||||
|
|
||||||
/** Conf key that specifies timeout value to wait for a region ready */
|
/** Conf key that specifies timeout value to wait for a region ready */
|
||||||
public static final String LOG_REPLAY_WAIT_REGION_TIMEOUT =
|
public static final String LOG_REPLAY_WAIT_REGION_TIMEOUT =
|
||||||
|
|
|
@ -209,7 +209,8 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
* startRegionOperation
|
* startRegionOperation
|
||||||
*/
|
*/
|
||||||
protected enum Operation {
|
protected enum Operation {
|
||||||
ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION
|
ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
|
||||||
|
REPLAY_BATCH_MUTATE
|
||||||
}
|
}
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -300,6 +301,11 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
// The following map is populated when opening the region
|
// The following map is populated when opening the region
|
||||||
Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
|
Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Config setting for whether to allow writes when a region is in recovering or not.
|
||||||
|
*/
|
||||||
|
private boolean disallowWritesInRecovering = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return The smallest mvcc readPoint across all the scanners in this
|
* @return The smallest mvcc readPoint across all the scanners in this
|
||||||
* region. Writes older than this readPoint, are included in every
|
* region. Writes older than this readPoint, are included in every
|
||||||
|
@ -517,6 +523,11 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
// Write out region name as string and its encoded name.
|
// Write out region name as string and its encoded name.
|
||||||
LOG.debug("Instantiated " + this);
|
LOG.debug("Instantiated " + this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// by default, we allow writes against a region when it's in recovering
|
||||||
|
this.disallowWritesInRecovering =
|
||||||
|
conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
|
||||||
|
HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
|
||||||
}
|
}
|
||||||
|
|
||||||
void setHTableSpecificConf() {
|
void setHTableSpecificConf() {
|
||||||
|
@ -1918,7 +1929,11 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
checkResources();
|
checkResources();
|
||||||
|
|
||||||
long newSize;
|
long newSize;
|
||||||
startRegionOperation();
|
if (isReplay) {
|
||||||
|
startRegionOperation(Operation.REPLAY_BATCH_MUTATE);
|
||||||
|
} else {
|
||||||
|
startRegionOperation(Operation.BATCH_MUTATE);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (!initialized) {
|
if (!initialized) {
|
||||||
|
@ -5029,7 +5044,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
ClassSize.ARRAY +
|
ClassSize.ARRAY +
|
||||||
39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
|
39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
|
||||||
(11 * Bytes.SIZEOF_LONG) +
|
(11 * Bytes.SIZEOF_LONG) +
|
||||||
Bytes.SIZEOF_BOOLEAN);
|
2 * Bytes.SIZEOF_BOOLEAN);
|
||||||
|
|
||||||
public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
|
public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
|
||||||
ClassSize.OBJECT + // closeLock
|
ClassSize.OBJECT + // closeLock
|
||||||
|
@ -5327,14 +5342,17 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
case SCAN:
|
case SCAN:
|
||||||
case SPLIT_REGION:
|
case SPLIT_REGION:
|
||||||
case MERGE_REGION:
|
case MERGE_REGION:
|
||||||
|
case PUT:
|
||||||
|
case DELETE:
|
||||||
|
case BATCH_MUTATE:
|
||||||
// when a region is in recovering state, no read, split or merge is allowed
|
// when a region is in recovering state, no read, split or merge is allowed
|
||||||
if (this.isRecovering()) {
|
if (this.isRecovering() && (this.disallowWritesInRecovering ||
|
||||||
throw new RegionInRecoveryException(this.getRegionNameAsString()
|
(op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
|
||||||
+ " is recovering");
|
throw new RegionInRecoveryException(this.getRegionNameAsString() + " is recovering");
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION) {
|
if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION) {
|
||||||
// split or merge region doesn't need to check the closing/closed state or lock the region
|
// split or merge region doesn't need to check the closing/closed state or lock the region
|
||||||
|
|
|
@ -67,6 +67,8 @@ import org.apache.hadoop.hbase.Waiter;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
|
||||||
|
import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
|
||||||
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
|
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
@ -89,6 +91,7 @@ import org.apache.log4j.Level;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@ -677,14 +680,6 @@ public class TestDistributedLogSplitting {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("#regions = " + regions.size());
|
|
||||||
Iterator<HRegionInfo> it = regions.iterator();
|
|
||||||
while (it.hasNext()) {
|
|
||||||
HRegionInfo region = it.next();
|
|
||||||
if (region.isMetaTable()) {
|
|
||||||
it.remove();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this.prepareData(ht, Bytes.toBytes("family"), Bytes.toBytes("c1"));
|
this.prepareData(ht, Bytes.toBytes("family"), Bytes.toBytes("c1"));
|
||||||
String originalCheckSum = TEST_UTIL.checksumRows(ht);
|
String originalCheckSum = TEST_UTIL.checksumRows(ht);
|
||||||
|
|
||||||
|
@ -812,6 +807,91 @@ public class TestDistributedLogSplitting {
|
||||||
ht.close();
|
ht.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testDisallowWritesInRecovering() throws Exception {
|
||||||
|
LOG.info("testDisallowWritesInRecovering");
|
||||||
|
Configuration curConf = HBaseConfiguration.create();
|
||||||
|
curConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
|
||||||
|
curConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
|
||||||
|
curConf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true);
|
||||||
|
startCluster(NUM_RS, curConf);
|
||||||
|
final int NUM_REGIONS_TO_CREATE = 40;
|
||||||
|
final int NUM_LOG_LINES = 20000;
|
||||||
|
// turn off load balancing to prevent regions from moving around otherwise
|
||||||
|
// they will consume recovered.edits
|
||||||
|
master.balanceSwitch(false);
|
||||||
|
|
||||||
|
List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
|
||||||
|
final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
|
||||||
|
HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
|
||||||
|
|
||||||
|
List<HRegionInfo> regions = null;
|
||||||
|
HRegionServer hrs = null;
|
||||||
|
for (int i = 0; i < NUM_RS; i++) {
|
||||||
|
boolean isCarryingMeta = false;
|
||||||
|
hrs = rsts.get(i).getRegionServer();
|
||||||
|
regions = ProtobufUtil.getOnlineRegions(hrs);
|
||||||
|
for (HRegionInfo region : regions) {
|
||||||
|
if (region.isMetaRegion()) {
|
||||||
|
isCarryingMeta = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (isCarryingMeta) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("#regions = " + regions.size());
|
||||||
|
Iterator<HRegionInfo> it = regions.iterator();
|
||||||
|
while (it.hasNext()) {
|
||||||
|
HRegionInfo region = it.next();
|
||||||
|
if (region.isMetaTable()) {
|
||||||
|
it.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
|
||||||
|
|
||||||
|
// abort RS
|
||||||
|
LOG.info("Aborting region server: " + hrs.getServerName());
|
||||||
|
hrs.abort("testing");
|
||||||
|
|
||||||
|
// wait for abort completes
|
||||||
|
TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws Exception {
|
||||||
|
return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// wait for regions come online
|
||||||
|
TEST_UTIL.waitFor(180000, 100, new Waiter.Predicate<Exception>() {
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws Exception {
|
||||||
|
return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
HRegionInfo region = regions.get(0);
|
||||||
|
byte[] key = region.getStartKey();
|
||||||
|
if (key == null || key.length == 0) {
|
||||||
|
key = new byte[] { 0, 0, 0, 0, 1 };
|
||||||
|
}
|
||||||
|
ht.setAutoFlush(true);
|
||||||
|
Put put = new Put(key);
|
||||||
|
put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'});
|
||||||
|
ht.put(put);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
Assert.assertTrue(ioe instanceof RetriesExhaustedWithDetailsException);
|
||||||
|
RetriesExhaustedWithDetailsException re = (RetriesExhaustedWithDetailsException) ioe;
|
||||||
|
Assert.assertTrue(re.getCause(0) instanceof RegionInRecoveryException);
|
||||||
|
}
|
||||||
|
|
||||||
|
ht.close();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The original intention of this test was to force an abort of a region
|
* The original intention of this test was to force an abort of a region
|
||||||
* server and to make sure that the failure path in the region servers is
|
* server and to make sure that the failure path in the region servers is
|
||||||
|
|
Loading…
Reference in New Issue