diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index a6179d2c070..1b8074e2da3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -724,6 +724,9 @@ public final class HConstants { /** Conf key that enables unflushed WAL edits directly being replayed to region servers */ public static final String DISTRIBUTED_LOG_REPLAY_KEY = "hbase.master.distributed.log.replay"; public static final boolean DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG = false; + public static final String DISALLOW_WRITES_IN_RECOVERING = + "hbase.regionserver.disallow.writes.when.recovering"; + public static final boolean DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG = false; /** Conf key that specifies timeout value to wait for a region ready */ public static final String LOG_REPLAY_WAIT_REGION_TIMEOUT = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index fa345764312..7f21e3a8dd3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -209,7 +209,8 @@ public class HRegion implements HeapSize { // , Writable{ * startRegionOperation */ protected enum Operation { - ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION + ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE, + REPLAY_BATCH_MUTATE } ////////////////////////////////////////////////////////////////////////////// @@ -300,6 +301,11 @@ public class HRegion implements HeapSize { // , Writable{ // The following map is populated when opening the region Map maxSeqIdInStores = new TreeMap(Bytes.BYTES_COMPARATOR); + /** + * Config setting for whether to allow writes when a region is in recovering or not. + */ + private boolean disallowWritesInRecovering = false; + /** * @return The smallest mvcc readPoint across all the scanners in this * region. Writes older than this readPoint, are included in every @@ -517,6 +523,11 @@ public class HRegion implements HeapSize { // , Writable{ // Write out region name as string and its encoded name. LOG.debug("Instantiated " + this); } + + // by default, we allow writes against a region when it's in recovering + this.disallowWritesInRecovering = + conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, + HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG); } void setHTableSpecificConf() { @@ -1918,7 +1929,11 @@ public class HRegion implements HeapSize { // , Writable{ checkResources(); long newSize; - startRegionOperation(); + if (isReplay) { + startRegionOperation(Operation.REPLAY_BATCH_MUTATE); + } else { + startRegionOperation(Operation.BATCH_MUTATE); + } try { if (!initialized) { @@ -5029,7 +5044,7 @@ public class HRegion implements HeapSize { // , Writable{ ClassSize.ARRAY + 39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + (11 * Bytes.SIZEOF_LONG) + - Bytes.SIZEOF_BOOLEAN); + 2 * Bytes.SIZEOF_BOOLEAN); public static final long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.OBJECT + // closeLock @@ -5327,14 +5342,17 @@ public class HRegion implements HeapSize { // , Writable{ case SCAN: case SPLIT_REGION: case MERGE_REGION: + case PUT: + case DELETE: + case BATCH_MUTATE: // when a region is in recovering state, no read, split or merge is allowed - if (this.isRecovering()) { - throw new RegionInRecoveryException(this.getRegionNameAsString() - + " is recovering"); + if (this.isRecovering() && (this.disallowWritesInRecovering || + (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) { + throw new RegionInRecoveryException(this.getRegionNameAsString() + " is recovering"); } break; - default: - break; + default: + break; } if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION) { // split or merge region doesn't need to check the closing/closed state or lock the region diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 181529360f6..53bb9d1c481 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -67,6 +67,8 @@ import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -89,6 +91,7 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.zookeeper.KeeperException; import org.junit.After; +import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -677,14 +680,6 @@ public class TestDistributedLogSplitting { break; } - LOG.info("#regions = " + regions.size()); - Iterator it = regions.iterator(); - while (it.hasNext()) { - HRegionInfo region = it.next(); - if (region.isMetaTable()) { - it.remove(); - } - } this.prepareData(ht, Bytes.toBytes("family"), Bytes.toBytes("c1")); String originalCheckSum = TEST_UTIL.checksumRows(ht); @@ -812,6 +807,91 @@ public class TestDistributedLogSplitting { ht.close(); } + @Test(timeout = 300000) + public void testDisallowWritesInRecovering() throws Exception { + LOG.info("testDisallowWritesInRecovering"); + Configuration curConf = HBaseConfiguration.create(); + curConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + curConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + curConf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true); + startCluster(NUM_RS, curConf); + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_LOG_LINES = 20000; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + + List regions = null; + HRegionServer hrs = null; + for (int i = 0; i < NUM_RS; i++) { + boolean isCarryingMeta = false; + hrs = rsts.get(i).getRegionServer(); + regions = ProtobufUtil.getOnlineRegions(hrs); + for (HRegionInfo region : regions) { + if (region.isMetaRegion()) { + isCarryingMeta = true; + break; + } + } + if (isCarryingMeta) { + continue; + } + break; + } + + LOG.info("#regions = " + regions.size()); + Iterator it = regions.iterator(); + while (it.hasNext()) { + HRegionInfo region = it.next(); + if (region.isMetaTable()) { + it.remove(); + } + } + makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100); + + // abort RS + LOG.info("Aborting region server: " + hrs.getServerName()); + hrs.abort("testing"); + + // wait for abort completes + TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1)); + } + }); + + // wait for regions come online + TEST_UTIL.waitFor(180000, 100, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1)); + } + }); + + try { + HRegionInfo region = regions.get(0); + byte[] key = region.getStartKey(); + if (key == null || key.length == 0) { + key = new byte[] { 0, 0, 0, 0, 1 }; + } + ht.setAutoFlush(true); + Put put = new Put(key); + put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'}); + ht.put(put); + } catch (IOException ioe) { + Assert.assertTrue(ioe instanceof RetriesExhaustedWithDetailsException); + RetriesExhaustedWithDetailsException re = (RetriesExhaustedWithDetailsException) ioe; + Assert.assertTrue(re.getCause(0) instanceof RegionInRecoveryException); + } + + ht.close(); + } + /** * The original intention of this test was to force an abort of a region * server and to make sure that the failure path in the region servers is