From 731e2f6541793b476e43d52315169c7425e056e9 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Thu, 13 Mar 2014 22:35:26 +0000 Subject: [PATCH] HBASE-10514 Forward port HBASE-10466, possible data loss when failed flushes git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1577353 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/regionserver/HRegion.java | 50 ++- .../hadoop/hbase/regionserver/HStore.java | 8 +- .../hadoop/hbase/regionserver/MemStore.java | 18 +- .../hadoop/hbase/regionserver/Store.java | 7 + .../regionserver/StoreConfigInformation.java | 2 + .../hbase/regionserver/TestHRegion.java | 372 ++++++++++++------ .../regionserver/TestHRegionBusyWait.java | 4 +- .../hadoop/hbase/regionserver/TestStore.java | 268 ++++++++----- 8 files changed, 510 insertions(+), 219 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index d833ae7d03f..d5f0839a576 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1030,22 +1030,25 @@ public class HRegion implements HeapSize { // , Writable{ } status.setStatus("Disabling compacts and flushes for region"); - boolean wasFlushing; synchronized (writestate) { // Disable compacting and flushing by background threads for this // region. writestate.writesEnabled = false; - wasFlushing = writestate.flushing; LOG.debug("Closing " + this + ": disabling compactions & flushes"); waitForFlushesAndCompactions(); } // If we were not just flushing, is it worth doing a preflush...one // that will clear out of the bulk of the memstore before we put up // the close flag? - if (!abort && !wasFlushing && worthPreFlushing()) { + if (!abort && worthPreFlushing()) { status.setStatus("Pre-flushing region before close"); LOG.info("Running close preflush of " + this.getRegionNameAsString()); - internalFlushcache(status); + try { + internalFlushcache(status); + } catch (IOException ioe) { + // Failed to flush the region. Keep going. + status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage()); + } } this.closing.set(true); @@ -1061,7 +1064,30 @@ public class HRegion implements HeapSize { // , Writable{ LOG.debug("Updates disabled for region " + this); // Don't flush the cache if we are aborting if (!abort) { - internalFlushcache(status); + int flushCount = 0; + while (this.getMemstoreSize().get() > 0) { + try { + if (flushCount++ > 0) { + int actualFlushes = flushCount - 1; + if (actualFlushes > 5) { + // If we tried 5 times and are unable to clear memory, abort + // so we do not lose data + throw new DroppedSnapshotException("Failed clearing memory after " + + actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName())); + } + LOG.info("Running extra flush, " + actualFlushes + + " (carrying snapshot?) " + this); + } + internalFlushcache(status); + } catch (IOException ioe) { + status.setStatus("Failed flush " + this + ", putting online again"); + synchronized (writestate) { + writestate.writesEnabled = true; + } + // Have to throw to upper layers. I can't abort server from here. + throw ioe; + } + } } Map> result = @@ -1075,6 +1101,7 @@ public class HRegion implements HeapSize { // , Writable{ // close each store in parallel for (final Store store : stores.values()) { + assert abort? true: store.getFlushableSize() == 0; completionService .submit(new Callable>>() { @Override @@ -1104,7 +1131,7 @@ public class HRegion implements HeapSize { // , Writable{ } } this.closed.set(true); - + if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get()); if (coprocessorHost != null) { status.setStatus("Running coprocessor post-close hooks"); this.coprocessorHost.postClose(abort); @@ -1608,7 +1635,7 @@ public class HRegion implements HeapSize { // , Writable{ status.setStatus("Obtaining lock to block concurrent updates"); // block waiting for the lock for internal flush this.updatesLock.writeLock().lock(); - long flushsize = this.memstoreSize.get(); + long totalFlushableSize = 0; status.setStatus("Preparing to flush by snapshotting stores"); List storeFlushCtxs = new ArrayList(stores.size()); long flushSeqId = -1L; @@ -1630,6 +1657,7 @@ public class HRegion implements HeapSize { // , Writable{ } for (Store s : stores.values()) { + totalFlushableSize += s.getFlushableSize(); storeFlushCtxs.add(s.createFlushContext(flushSeqId)); } @@ -1641,7 +1669,7 @@ public class HRegion implements HeapSize { // , Writable{ this.updatesLock.writeLock().unlock(); } String s = "Finished memstore snapshotting " + this + - ", syncing WAL and waiting on mvcc, flushsize=" + flushsize; + ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize; status.setStatus(s); if (LOG.isTraceEnabled()) LOG.trace(s); @@ -1688,7 +1716,7 @@ public class HRegion implements HeapSize { // , Writable{ storeFlushCtxs.clear(); // Set down the memstore size by amount of flush. - this.addAndGetGlobalMemstoreSize(-flushsize); + this.addAndGetGlobalMemstoreSize(-totalFlushableSize); } catch (Throwable t) { // An exception here means that the snapshot was not persisted. // The hlog needs to be replayed so its content is restored to memstore. @@ -1726,7 +1754,7 @@ public class HRegion implements HeapSize { // , Writable{ long time = EnvironmentEdgeManager.currentTimeMillis() - startTime; long memstoresize = this.memstoreSize.get(); String msg = "Finished memstore flush of ~" + - StringUtils.humanReadableInt(flushsize) + "/" + flushsize + + StringUtils.humanReadableInt(totalFlushableSize) + "/" + totalFlushableSize + ", currentsize=" + StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize + " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId + @@ -1734,7 +1762,7 @@ public class HRegion implements HeapSize { // , Writable{ ((wal == null)? "; wal=null": ""); LOG.info(msg); status.setStatus(msg); - this.recentFlushes.add(new Pair(time/1000, flushsize)); + this.recentFlushes.add(new Pair(time/1000, totalFlushableSize)); return compactionRequested; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index bd9d7918ad7..a5eb7019c4d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -368,9 +368,15 @@ public class HStore implements Store { @Override public long getMemstoreFlushSize() { + // TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack return this.region.memstoreFlushSize; } + @Override + public long getFlushableSize() { + return this.memstore.getFlushableSize(); + } + @Override public long getCompactionCheckMultiplier() { return this.compactionCheckMultiplier; @@ -801,7 +807,7 @@ public class HStore implements Store { } } } catch (IOException e) { - LOG.warn("Failed flushing store file, retring num=" + i, e); + LOG.warn("Failed flushing store file, retrying num=" + i, e); lastException = e; } if (lastException != null && i < (flushRetriesNumber - 1)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 8da61fdc8ac..f5c3071bdc8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -86,6 +86,7 @@ public class MemStore implements HeapSize { // Used to track own heapSize final AtomicLong size; + private volatile long snapshotSize; // Used to track when to flush volatile long timeOfOldestEdit = Long.MAX_VALUE; @@ -117,6 +118,7 @@ public class MemStore implements HeapSize { timeRangeTracker = new TimeRangeTracker(); snapshotTimeRangeTracker = new TimeRangeTracker(); this.size = new AtomicLong(DEEP_OVERHEAD); + this.snapshotSize = 0; if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) { this.chunkPool = MemStoreChunkPool.getPool(conf); this.allocator = new MemStoreLAB(conf, chunkPool); @@ -148,6 +150,7 @@ public class MemStore implements HeapSize { "Doing nothing. Another ongoing flush or did we fail last attempt?"); } else { if (!this.kvset.isEmpty()) { + this.snapshotSize = keySize(); this.snapshot = this.kvset; this.kvset = new KeyValueSkipListSet(this.comparator); this.snapshotTimeRangeTracker = this.timeRangeTracker; @@ -176,6 +179,18 @@ public class MemStore implements HeapSize { return this.snapshot; } + /** + * On flush, how much memory we will clear. + * Flush will first clear out the data in snapshot if any (It will take a second flush + * invocation to clear the current Cell set). If snapshot is empty, current + * Cell set will be flushed. + * + * @return size of data that is going to be flushed + */ + long getFlushableSize() { + return this.snapshotSize > 0 ? this.snapshotSize : keySize(); + } + /** * The passed snapshot was successfully persisted; it can be let go. * @param ss The snapshot to clean out. @@ -195,6 +210,7 @@ public class MemStore implements HeapSize { this.snapshot = new KeyValueSkipListSet(this.comparator); this.snapshotTimeRangeTracker = new TimeRangeTracker(); } + this.snapshotSize = 0; if (this.snapshotAllocator != null) { tmpAllocator = this.snapshotAllocator; this.snapshotAllocator = null; @@ -983,7 +999,7 @@ public class MemStore implements HeapSize { } public final static long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); + ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)); public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 798979b471d..8923769befa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -254,6 +254,13 @@ public interface Store extends HeapSize, StoreConfigInformation { */ long getMemStoreSize(); + /** + * @return The amount of memory we could flush from this memstore; usually this is equal to + * {@link #getMemStoreSize()} unless we are carrying snapshots and then it will be the size of + * outstanding snapshots. + */ + long getFlushableSize(); + HColumnDescriptor getFamily(); /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java index 768b6cff013..5de00d8dc24 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java @@ -33,6 +33,8 @@ public interface StoreConfigInformation { /** * @return Gets the Memstore flush size for the region that this store works with. */ + // TODO: Why is this in here? It should be in Store and it should return the Store flush size, + // not the Regions. St.Ack long getMemstoreFlushSize(); /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index ca13d0d851b..1936725803f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -33,7 +33,10 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -41,6 +44,7 @@ import static org.mockito.Mockito.verify; import java.io.IOException; import java.io.InterruptedIOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -65,6 +69,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; +import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -96,7 +101,6 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.Filter.ReturnCode; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.FilterList; @@ -111,12 +115,14 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.HRegion.RowLock; +import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; @@ -136,7 +142,7 @@ import org.mockito.Mockito; import com.google.common.collect.Lists; /** - * Basic stand-alone testing of HRegion. + * Basic stand-alone testing of HRegion. No clusters! * * A lot of the meta information for an HRegion now lives inside other HRegions * or in the HBaseMaster, so only basic testing is possible. @@ -150,12 +156,14 @@ public class TestHRegion { @Rule public TestName name = new TestName(); private static final String COLUMN_FAMILY = "MyCF"; + private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY); HRegion region = null; - private static HBaseTestingUtility TEST_UTIL; // do not run unit tests in parallel - public static Configuration conf ; - private String DIR; - private static FileSystem fs; + // Do not run unit tests in parallel (? Why not? It don't work? Why not? St.Ack) + private static HBaseTestingUtility TEST_UTIL; + public static Configuration CONF ; + private String dir; + private static FileSystem FILESYSTEM; private final int MAX_VERSIONS = 2; // Test names @@ -174,10 +182,10 @@ public class TestHRegion { @Before public void setup() throws IOException { - this.TEST_UTIL = HBaseTestingUtility.createLocalHTU(); - this.fs = TEST_UTIL.getTestFileSystem(); - this.conf = TEST_UTIL.getConfiguration(); - this.DIR = TEST_UTIL.getDataTestDir("TestHRegion").toString(); + TEST_UTIL = HBaseTestingUtility.createLocalHTU(); + FILESYSTEM = TEST_UTIL.getTestFileSystem(); + CONF = TEST_UTIL.getConfiguration(); + dir = TEST_UTIL.getDataTestDir("TestHRegion").toString(); method = name.getMethodName(); tableName = Bytes.toBytes(name.getMethodName()); } @@ -192,17 +200,169 @@ public class TestHRegion { String getName() { return name.getMethodName(); } - - // //////////////////////////////////////////////////////////////////////////// - // New tests that doesn't spin up a mini cluster but rather just test the - // individual code pieces in the HRegion. Putting files locally in - // /tmp/testtable - // //////////////////////////////////////////////////////////////////////////// + + /** + * Test for Bug 2 of HBASE-10466. + * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize + * is smaller than a certain value, or when region close starts a flush is ongoing, the first + * flush is skipped and only the second flush takes place. However, two flushes are required in + * case previous flush fails and leaves some data in snapshot. The bug could cause loss of data + * in current memstore. The fix is removing all conditions except abort check so we ensure 2 + * flushes for region close." + * @throws IOException + */ + @Test (timeout=60000) + public void testCloseCarryingSnapshot() throws IOException { + HRegion region = initHRegion(tableName, name.getMethodName(), CONF, COLUMN_FAMILY_BYTES); + Store store = region.getStore(COLUMN_FAMILY_BYTES); + // Get some random bytes. + byte [] value = Bytes.toBytes(name.getMethodName()); + // Make a random put against our cf. + Put put = new Put(value); + put.add(COLUMN_FAMILY_BYTES, null, value); + // First put something in current memstore, which will be in snapshot after flusher.prepare() + region.put(put); + StoreFlushContext storeFlushCtx = store.createFlushContext(12345); + storeFlushCtx.prepare(); + // Second put something in current memstore + put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); + region.put(put); + // Close with something in memstore and something in the snapshot. Make sure all is cleared. + region.close(); + assertEquals(0, region.getMemstoreSize().get()); + HRegion.closeHRegion(region); + } + + /** + * Test we do not lose data if we fail a flush and then close. + * Part of HBase-10466. Tests the following from the issue description: + * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is + * kept in each MemStore's snapshot and wait for next flush attempt to continue on it. But when + * the next flush succeeds, the counter of total memstore size in HRegion is always deduced by + * the sum of current memstore sizes instead of snapshots left from previous failed flush. This + * calculation is problematic that almost every time there is failed flush, HRegion.memstoreSize + * gets reduced by a wrong value. If region flush could not proceed for a couple cycles, the size + * in current memstore could be much larger than the snapshot. It's likely to drift memstoreSize + * much smaller than expected. In extreme case, if the error accumulates to even bigger than + * HRegion's memstore size limit, any further flush is skipped because flush does not do anything + * if memstoreSize is not larger than 0." + * @throws Exception + */ + @Test (timeout=60000) + public void testFlushSizeAccounting() throws Exception { + final Configuration conf = HBaseConfiguration.create(CONF); + // Only retry once. + conf.setInt("hbase.hstore.flush.retries.number", 1); + final User user = + User.createUserForTesting(conf, this.name.getMethodName(), new String[]{"foo"}); + // Inject our faulty LocalFileSystem + conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); + user.runAs(new PrivilegedExceptionAction() { + public Object run() throws Exception { + // Make sure it worked (above is sensitive to caching details in hadoop core) + FileSystem fs = FileSystem.get(conf); + Assert.assertEquals(FaultyFileSystem.class, fs.getClass()); + FaultyFileSystem ffs = (FaultyFileSystem)fs; + HRegion region = null; + try { + // Initialize region + region = initHRegion(tableName, name.getMethodName(), conf, COLUMN_FAMILY_BYTES); + long size = region.getMemstoreSize().get(); + Assert.assertEquals(0, size); + // Put one item into memstore. Measure the size of one item in memstore. + Put p1 = new Put(row); + p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null)); + region.put(p1); + final long sizeOfOnePut = region.getMemstoreSize().get(); + // Fail a flush which means the current memstore will hang out as memstore 'snapshot'. + try { + LOG.info("Flushing"); + region.flushcache(); + Assert.fail("Didn't bubble up IOE!"); + } catch (DroppedSnapshotException dse) { + // What we are expecting + } + // Make it so all writes succeed from here on out + ffs.fault.set(false); + // Check sizes. Should still be the one entry. + Assert.assertEquals(sizeOfOnePut, region.getMemstoreSize().get()); + // Now add two entries so that on this next flush that fails, we can see if we + // subtract the right amount, the snapshot size only. + Put p2 = new Put(row); + p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null)); + p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null)); + region.put(p2); + Assert.assertEquals(sizeOfOnePut * 3, region.getMemstoreSize().get()); + // Do a successful flush. It will clear the snapshot only. Thats how flushes work. + // If already a snapshot, we clear it else we move the memstore to be snapshot and flush + // it + region.flushcache(); + // Make sure our memory accounting is right. + Assert.assertEquals(sizeOfOnePut * 2, region.getMemstoreSize().get()); + } finally { + HRegion.closeHRegion(region); + } + return null; + } + }); + FileSystem.closeAllForUGI(user.getUGI()); + } + + @Test (timeout=60000) + public void testCloseWithFailingFlush() throws Exception { + final Configuration conf = HBaseConfiguration.create(CONF); + // Only retry once. + conf.setInt("hbase.hstore.flush.retries.number", 1); + final User user = + User.createUserForTesting(conf, this.name.getMethodName(), new String[]{"foo"}); + // Inject our faulty LocalFileSystem + conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); + user.runAs(new PrivilegedExceptionAction() { + public Object run() throws Exception { + // Make sure it worked (above is sensitive to caching details in hadoop core) + FileSystem fs = FileSystem.get(conf); + Assert.assertEquals(FaultyFileSystem.class, fs.getClass()); + FaultyFileSystem ffs = (FaultyFileSystem)fs; + HRegion region = null; + try { + // Initialize region + region = initHRegion(tableName, name.getMethodName(), conf, COLUMN_FAMILY_BYTES); + long size = region.getMemstoreSize().get(); + Assert.assertEquals(0, size); + // Put one item into memstore. Measure the size of one item in memstore. + Put p1 = new Put(row); + p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null)); + region.put(p1); + // Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only. + Store store = region.getStore(COLUMN_FAMILY_BYTES); + StoreFlushContext storeFlushCtx = store.createFlushContext(12345); + storeFlushCtx.prepare(); + // Now add two entries to the foreground memstore. + Put p2 = new Put(row); + p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null)); + p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null)); + region.put(p2); + // Now try close on top of a failing flush. + region.close(); + fail(); + } catch (DroppedSnapshotException dse) { + // Expected + LOG.info("Expected DroppedSnapshotException"); + } finally { + // Make it so all writes succeed from here on out so can close clean + ffs.fault.set(false); + HRegion.closeHRegion(region); + } + return null; + } + }); + FileSystem.closeAllForUGI(user.getUGI()); + } @Test public void testCompactionAffectedByScanners() throws Exception { byte[] family = Bytes.toBytes("family"); - this.region = initHRegion(tableName, method, conf, family); + this.region = initHRegion(tableName, method, CONF, family); Put put = new Put(Bytes.toBytes("r1")); put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1")); @@ -250,7 +410,7 @@ public class TestHRegion { @Test public void testToShowNPEOnRegionScannerReseek() throws Exception { byte[] family = Bytes.toBytes("family"); - this.region = initHRegion(tableName, method, conf, family); + this.region = initHRegion(tableName, method, CONF, family); Put put = new Put(Bytes.toBytes("r1")); put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1")); @@ -282,7 +442,7 @@ public class TestHRegion { String method = "testSkipRecoveredEditsReplay"; TableName tableName = TableName.valueOf(method); byte[] family = Bytes.toBytes("family"); - this.region = initHRegion(tableName, method, conf, family); + this.region = initHRegion(tableName, method, CONF, family); try { Path regiondir = region.getRegionFileSystem().getRegionDir(); FileSystem fs = region.getRegionFileSystem().getFileSystem(); @@ -296,7 +456,7 @@ public class TestHRegion { for (long i = minSeqId; i <= maxSeqId; i += 10) { Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); fs.create(recoveredEdits); - HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, conf); + HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF); long time = System.nanoTime(); WALEdit edit = new WALEdit(); @@ -332,7 +492,7 @@ public class TestHRegion { String method = "testSkipRecoveredEditsReplaySomeIgnored"; TableName tableName = TableName.valueOf(method); byte[] family = Bytes.toBytes("family"); - this.region = initHRegion(tableName, method, conf, family); + this.region = initHRegion(tableName, method, CONF, family); try { Path regiondir = region.getRegionFileSystem().getRegionDir(); FileSystem fs = region.getRegionFileSystem().getFileSystem(); @@ -346,7 +506,7 @@ public class TestHRegion { for (long i = minSeqId; i <= maxSeqId; i += 10) { Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); fs.create(recoveredEdits); - HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, conf); + HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF); long time = System.nanoTime(); WALEdit edit = new WALEdit(); @@ -385,7 +545,7 @@ public class TestHRegion { @Test public void testSkipRecoveredEditsReplayAllIgnored() throws Exception { byte[] family = Bytes.toBytes("family"); - this.region = initHRegion(tableName, method, conf, family); + this.region = initHRegion(tableName, method, CONF, family); try { Path regiondir = region.getRegionFileSystem().getRegionDir(); FileSystem fs = region.getRegionFileSystem().getFileSystem(); @@ -419,7 +579,7 @@ public class TestHRegion { String method = name.getMethodName(); TableName tableName = TableName.valueOf(method); byte[] family = Bytes.toBytes("family"); - this.region = initHRegion(tableName, method, conf, family); + this.region = initHRegion(tableName, method, CONF, family); try { Path regiondir = region.getRegionFileSystem().getRegionDir(); FileSystem fs = region.getRegionFileSystem().getFileSystem(); @@ -443,7 +603,7 @@ public class TestHRegion { } // disable compaction completion - conf.setBoolean("hbase.hstore.compaction.complete", false); + CONF.setBoolean("hbase.hstore.compaction.complete", false); region.compactStores(); // ensure that nothing changed @@ -471,7 +631,7 @@ public class TestHRegion { Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000)); fs.create(recoveredEdits); - HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, conf); + HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF); long time = System.nanoTime(); @@ -480,8 +640,8 @@ public class TestHRegion { writer.close(); // close the region now, and reopen again - HTableDescriptor htd = region.getTableDesc(); - HRegionInfo info = region.getRegionInfo(); + region.getTableDesc(); + region.getRegionInfo(); region.close(); region = HRegion.openHRegion(region, null); @@ -602,7 +762,7 @@ public class TestHRegion { byte[] TABLE = Bytes.toBytes("testWeirdCacheBehaviour"); byte[][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"), Bytes.toBytes("trans-type"), Bytes.toBytes("trans-date"), Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") }; - this.region = initHRegion(TABLE, getName(), conf, FAMILIES); + this.region = initHRegion(TABLE, getName(), CONF, FAMILIES); try { String value = "this is the value"; String value2 = "this is some other value"; @@ -643,7 +803,7 @@ public class TestHRegion { @Test public void testAppendWithReadOnlyTable() throws Exception { byte[] TABLE = Bytes.toBytes("readOnlyTable"); - this.region = initHRegion(TABLE, getName(), conf, true, Bytes.toBytes("somefamily")); + this.region = initHRegion(TABLE, getName(), CONF, true, Bytes.toBytes("somefamily")); boolean exceptionCaught = false; Append append = new Append(Bytes.toBytes("somerow")); append.setDurability(Durability.SKIP_WAL); @@ -663,7 +823,7 @@ public class TestHRegion { @Test public void testIncrWithReadOnlyTable() throws Exception { byte[] TABLE = Bytes.toBytes("readOnlyTable"); - this.region = initHRegion(TABLE, getName(), conf, true, Bytes.toBytes("somefamily")); + this.region = initHRegion(TABLE, getName(), CONF, true, Bytes.toBytes("somefamily")); boolean exceptionCaught = false; Increment inc = new Increment(Bytes.toBytes("somerow")); inc.setDurability(Durability.SKIP_WAL); @@ -756,7 +916,7 @@ public class TestHRegion { public void testFamilyWithAndWithoutColon() throws Exception { byte[] b = Bytes.toBytes(getName()); byte[] cf = Bytes.toBytes(COLUMN_FAMILY); - this.region = initHRegion(b, getName(), conf, cf); + this.region = initHRegion(b, getName(), CONF, cf); try { Put p = new Put(b); byte[] cfwithcolon = Bytes.toBytes(COLUMN_FAMILY + ":"); @@ -780,7 +940,7 @@ public class TestHRegion { byte[] cf = Bytes.toBytes(COLUMN_FAMILY); byte[] qual = Bytes.toBytes("qual"); byte[] val = Bytes.toBytes("val"); - this.region = initHRegion(b, getName(), conf, cf); + this.region = initHRegion(b, getName(), CONF, cf); MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); try { long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source); @@ -814,7 +974,7 @@ public class TestHRegion { LOG.info("Next a batch put that has to break into two batches to avoid a lock"); RowLock rowLock = region.getRowLock(Bytes.toBytes("row_2")); - MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); + MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF); final AtomicReference retFromThread = new AtomicReference(); TestThread putter = new TestThread(ctx) { @Override @@ -860,8 +1020,8 @@ public class TestHRegion { byte[] val = Bytes.toBytes("val"); // add data with a timestamp that is too recent for range. Ensure assert - conf.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000); - this.region = initHRegion(b, getName(), conf, cf); + CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000); + this.region = initHRegion(b, getName(), CONF, cf); try { MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); @@ -902,7 +1062,7 @@ public class TestHRegion { // Setting up region String method = this.getName(); - this.region = initHRegion(tableName, method, conf, fam1); + this.region = initHRegion(tableName, method, CONF, fam1); try { // Putting empty data in key Put put = new Put(row1); @@ -976,7 +1136,7 @@ public class TestHRegion { // Setting up region String method = this.getName(); - this.region = initHRegion(tableName, method, conf, fam1); + this.region = initHRegion(tableName, method, CONF, fam1); try { // Putting data in key Put put = new Put(row1); @@ -1009,7 +1169,7 @@ public class TestHRegion { // Setting up region String method = this.getName(); - this.region = initHRegion(tableName, method, conf, fam1); + this.region = initHRegion(tableName, method, CONF, fam1); try { // Putting data in key Put put = new Put(row1); @@ -1045,7 +1205,7 @@ public class TestHRegion { // Setting up region String method = this.getName(); - this.region = initHRegion(tableName, method, conf, fam1); + this.region = initHRegion(tableName, method, CONF, fam1); try { // Putting val3 in key Put put = new Put(row1); @@ -1141,7 +1301,7 @@ public class TestHRegion { // Setting up region String method = this.getName(); - this.region = initHRegion(tableName, method, conf, families); + this.region = initHRegion(tableName, method, CONF, families); try { // Putting data in the key to check Put put = new Put(row1); @@ -1182,12 +1342,12 @@ public class TestHRegion { @Test public void testCheckAndPut_wrongRowInPut() throws IOException { TableName tableName = TableName.valueOf(name.getMethodName()); - this.region = initHRegion(tableName, this.getName(), conf, COLUMNS); + this.region = initHRegion(tableName, this.getName(), CONF, COLUMNS); try { Put put = new Put(row2); put.add(fam1, qual1, value1); try { - boolean res = region.checkAndMutate(row, fam1, qual1, CompareOp.EQUAL, + region.checkAndMutate(row, fam1, qual1, CompareOp.EQUAL, new BinaryComparator(value2), put, false); fail(); } catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) { @@ -1216,7 +1376,7 @@ public class TestHRegion { // Setting up region String method = this.getName(); - this.region = initHRegion(tableName, method, conf, families); + this.region = initHRegion(tableName, method, CONF, families); try { // Put content Put put = new Put(row1); @@ -1291,7 +1451,7 @@ public class TestHRegion { put.add(fam1, qual, 2, value); String method = this.getName(); - this.region = initHRegion(tableName, method, conf, fam1); + this.region = initHRegion(tableName, method, CONF, fam1); try { region.put(put); @@ -1321,7 +1481,7 @@ public class TestHRegion { // Setting up region String method = this.getName(); - this.region = initHRegion(tableName, method, conf, fam1, fam2, fam3); + this.region = initHRegion(tableName, method, CONF, fam1, fam2, fam3); try { List kvs = new ArrayList(); kvs.add(new KeyValue(row1, fam4, null, null)); @@ -1360,7 +1520,7 @@ public class TestHRegion { byte[] fam = Bytes.toBytes("info"); byte[][] families = { fam }; String method = this.getName(); - this.region = initHRegion(tableName, method, conf, families); + this.region = initHRegion(tableName, method, CONF, families); try { EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge()); @@ -1428,7 +1588,7 @@ public class TestHRegion { byte[] fam = Bytes.toBytes("info"); byte[][] families = { fam }; String method = this.getName(); - this.region = initHRegion(tableName, method, conf, families); + this.region = initHRegion(tableName, method, CONF, families); try { byte[] row = Bytes.toBytes("table_name"); // column names @@ -1471,7 +1631,7 @@ public class TestHRegion { byte[] fam = Bytes.toBytes("info"); byte[][] families = { fam }; String method = this.getName(); - this.region = initHRegion(tableName, method, conf, families); + this.region = initHRegion(tableName, method, CONF, families); try { byte[] row = Bytes.toBytes("row1"); // column names @@ -1525,8 +1685,8 @@ public class TestHRegion { String method = this.getName(); // add data with a timestamp that is too recent for range. Ensure assert - conf.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000); - this.region = initHRegion(tableName, method, conf, families); + CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000); + this.region = initHRegion(tableName, method, CONF, families); boolean caughtExcep = false; try { try { @@ -1551,7 +1711,7 @@ public class TestHRegion { public void testScanner_DeleteOneFamilyNotAnother() throws IOException { byte[] fam1 = Bytes.toBytes("columnA"); byte[] fam2 = Bytes.toBytes("columnB"); - this.region = initHRegion(tableName, getName(), conf, fam1, fam2); + this.region = initHRegion(tableName, getName(), CONF, fam1, fam2); try { byte[] rowA = Bytes.toBytes("rowA"); byte[] rowB = Bytes.toBytes("rowB"); @@ -1605,7 +1765,7 @@ public class TestHRegion { public void doTestDelete_AndPostInsert(Delete delete) throws IOException, InterruptedException { TableName tableName = TableName.valueOf(name.getMethodName()); - this.region = initHRegion(tableName, getName(), conf, fam1); + this.region = initHRegion(tableName, getName(), CONF, fam1); try { EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge()); Put put = new Put(row); @@ -1658,7 +1818,7 @@ public class TestHRegion { // Setting up region String method = this.getName(); - this.region = initHRegion(tableName, method, conf, fam1); + this.region = initHRegion(tableName, method, CONF, fam1); try { // Building checkerList List kvs = new ArrayList(); @@ -1699,7 +1859,7 @@ public class TestHRegion { // Setting up region String method = this.getName(); - this.region = initHRegion(tableName, method, conf, fam1); + this.region = initHRegion(tableName, method, CONF, fam1); try { Get get = new Get(row1); get.addColumn(fam2, col1); @@ -1730,7 +1890,7 @@ public class TestHRegion { // Setting up region String method = this.getName(); - this.region = initHRegion(tableName, method, conf, fam1); + this.region = initHRegion(tableName, method, CONF, fam1); try { // Add to memstore Put put = new Put(row1); @@ -1776,7 +1936,7 @@ public class TestHRegion { byte[] fam = Bytes.toBytes("fam"); String method = this.getName(); - this.region = initHRegion(tableName, method, conf, fam); + this.region = initHRegion(tableName, method, CONF, fam); try { Get get = new Get(row); get.addFamily(fam); @@ -1820,9 +1980,9 @@ public class TestHRegion { region = HRegion.mergeAdjacent(subregions[0], subregions[1]); LOG.info("Merge regions elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); - fs.delete(oldRegion1, true); - fs.delete(oldRegion2, true); - fs.delete(oldRegionPath, true); + FILESYSTEM.delete(oldRegion1, true); + FILESYSTEM.delete(oldRegion2, true); + FILESYSTEM.delete(oldRegionPath, true); LOG.info("splitAndMerge completed."); } finally { for (int i = 0; i < subregions.length; i++) { @@ -1884,7 +2044,7 @@ public class TestHRegion { // Setting up region String method = this.getName(); - this.region = initHRegion(tableName, method, conf, families); + this.region = initHRegion(tableName, method, CONF, families); try { Scan scan = new Scan(); scan.addFamily(fam1); @@ -1909,7 +2069,7 @@ public class TestHRegion { // Setting up region String method = this.getName(); - this.region = initHRegion(tableName, method, conf, families); + this.region = initHRegion(tableName, method, CONF, families); try { Scan scan = new Scan(); scan.addFamily(fam2); @@ -1938,7 +2098,7 @@ public class TestHRegion { // Setting up region String method = this.getName(); - this.region = initHRegion(tableName, method, conf, families); + this.region = initHRegion(tableName, method, CONF, families); try { // Putting data in Region @@ -1985,7 +2145,7 @@ public class TestHRegion { // Setting up region String method = this.getName(); try { - this.region = initHRegion(tableName, method, conf, families); + this.region = initHRegion(tableName, method, CONF, families); } catch (IOException e) { e.printStackTrace(); fail("Got IOException during initHRegion, " + e.getMessage()); @@ -2021,7 +2181,7 @@ public class TestHRegion { // Setting up region String method = this.getName(); - this.region = initHRegion(tableName, method, conf, families); + this.region = initHRegion(tableName, method, CONF, families); try { // Putting data in Region Put put = null; @@ -2087,7 +2247,7 @@ public class TestHRegion { // Setting up region String method = this.getName(); - this.region = initHRegion(tableName, method, conf, families); + this.region = initHRegion(tableName, method, CONF, families); try { // Putting data in Region Put put = null; @@ -2146,7 +2306,7 @@ public class TestHRegion { // Setting up region String method = this.getName(); - this.region = initHRegion(tableName, method, conf, families); + this.region = initHRegion(tableName, method, CONF, families); try { // Putting data in Region Put put = null; @@ -2210,7 +2370,7 @@ public class TestHRegion { // Setting up region String method = this.getName(); - this.region = initHRegion(tableName, method, conf, families); + this.region = initHRegion(tableName, method, CONF, families); try { // Putting data in Region KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null); @@ -2291,7 +2451,7 @@ public class TestHRegion { // Setting up region String method = this.getName(); - this.region = initHRegion(tableName, method, conf, families); + this.region = initHRegion(tableName, method, CONF, families); try { // Putting data in Region Put put = null; @@ -2351,7 +2511,7 @@ public class TestHRegion { // Setting up region String method = this.getName(); - this.region = initHRegion(tableName, method, conf, fam1); + this.region = initHRegion(tableName, method, CONF, fam1); try { // Putting data in Region Put put = null; @@ -2402,7 +2562,7 @@ public class TestHRegion { @Test public void testScanner_StopRow1542() throws IOException { byte[] family = Bytes.toBytes("testFamily"); - this.region = initHRegion(tableName, getName(), conf, family); + this.region = initHRegion(tableName, getName(), CONF, family); try { byte[] row1 = Bytes.toBytes("row111"); byte[] row2 = Bytes.toBytes("row222"); @@ -2447,19 +2607,6 @@ public class TestHRegion { } } - private void assertICV(byte[] row, byte[] familiy, byte[] qualifier, int amount) - throws IOException { - // run a get and see? - Get get = new Get(row); - get.addColumn(familiy, qualifier); - Result result = region.get(get); - assertEquals(1, result.size()); - - Cell kv = result.rawCells()[0]; - int r = Bytes.toInt(CellUtil.cloneValue(kv)); - assertEquals(amount, r); - } - @Test public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions() throws IOException { byte[] row1 = Bytes.toBytes("row1"); @@ -2474,7 +2621,7 @@ public class TestHRegion { // Setting up region String method = this.getName(); - this.region = initHRegion(tableName, method, conf, fam1); + this.region = initHRegion(tableName, method, CONF, fam1); try { // Putting data in Region KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null); @@ -2550,7 +2697,7 @@ public class TestHRegion { byte[] cf_essential = Bytes.toBytes("essential"); byte[] cf_joined = Bytes.toBytes("joined"); byte[] cf_alpha = Bytes.toBytes("alpha"); - this.region = initHRegion(tableName, getName(), conf, cf_essential, cf_joined, cf_alpha); + this.region = initHRegion(tableName, getName(), CONF, cf_essential, cf_joined, cf_alpha); try { byte[] row1 = Bytes.toBytes("row1"); byte[] row2 = Bytes.toBytes("row2"); @@ -2618,7 +2765,7 @@ public class TestHRegion { final byte[] cf_first = Bytes.toBytes("first"); final byte[] cf_second = Bytes.toBytes("second"); - this.region = initHRegion(tableName, getName(), conf, cf_first, cf_second); + this.region = initHRegion(tableName, getName(), CONF, cf_first, cf_second); try { final byte[] col_a = Bytes.toBytes("a"); final byte[] col_b = Bytes.toBytes("b"); @@ -2854,7 +3001,7 @@ public class TestHRegion { int compactInterval = 10 * flushAndScanInterval; String method = "testFlushCacheWhileScanning"; - this.region = initHRegion(tableName, method, conf, family); + this.region = initHRegion(tableName, method, CONF, family); try { FlushThread flushThread = new FlushThread(); flushThread.start(); @@ -2986,7 +3133,7 @@ public class TestHRegion { } String method = "testWritesWhileScanning"; - this.region = initHRegion(tableName, method, conf, families); + this.region = initHRegion(tableName, method, CONF, families); try { PutThread putThread = new PutThread(numRows, families, qualifiers); putThread.start(); @@ -3149,6 +3296,7 @@ public class TestHRegion { // extending over the ulimit. Make sure compactions are aggressive in // reducing // the number of HFiles created. + Configuration conf = HBaseConfiguration.create(CONF); conf.setInt("hbase.hstore.compaction.min", 1); conf.setInt("hbase.hstore.compaction.max", 1000); this.region = initHRegion(tableName, method, conf, families); @@ -3238,7 +3386,7 @@ public class TestHRegion { @Test public void testHolesInMeta() throws Exception { byte[] family = Bytes.toBytes("family"); - this.region = initHRegion(tableName, Bytes.toBytes("x"), Bytes.toBytes("z"), method, conf, + this.region = initHRegion(tableName, Bytes.toBytes("x"), Bytes.toBytes("z"), method, CONF, false, family); try { byte[] rowNotServed = Bytes.toBytes("a"); @@ -3264,7 +3412,7 @@ public class TestHRegion { // Setting up region String method = "testIndexesScanWithOneDeletedRow"; - this.region = initHRegion(tableName, method, conf, family); + this.region = initHRegion(tableName, method, CONF, family); try { Put put = new Put(Bytes.toBytes(1L)); put.add(family, qual1, 1L, Bytes.toBytes(1L)); @@ -3302,7 +3450,6 @@ public class TestHRegion { // //////////////////////////////////////////////////////////////////////////// @Test public void testBloomFilterSize() throws IOException { - byte[] row1 = Bytes.toBytes("row1"); byte[] fam1 = Bytes.toBytes("fam1"); byte[] qf1 = Bytes.toBytes("col"); byte[] val1 = Bytes.toBytes("value1"); @@ -3519,8 +3666,8 @@ public class TestHRegion { htd.addFamily(new HColumnDescriptor("cf")); info = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, false); - Path path = new Path(DIR + "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization"); - region = HRegion.newHRegion(path, null, fs, conf, info, htd, null); + Path path = new Path(dir + "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization"); + region = HRegion.newHRegion(path, null, fs, CONF, info, htd, null); // region initialization throws IOException and set task state to ABORTED. region.initialize(); fail("Region initialization should fail due to IOException"); @@ -3545,7 +3692,7 @@ public class TestHRegion { */ @Test public void testRegionInfoFileCreation() throws IOException { - Path rootDir = new Path(DIR + "testRegionInfoFileCreation"); + Path rootDir = new Path(dir + "testRegionInfoFileCreation"); HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testtb")); htd.addFamily(new HColumnDescriptor("cf")); @@ -3553,7 +3700,7 @@ public class TestHRegion { HRegionInfo hri = new HRegionInfo(htd.getTableName()); // Create a region and skip the initialization (like CreateTableHandler) - HRegion region = HRegion.createHRegion(hri, rootDir, conf, htd, null, false, true); + HRegion region = HRegion.createHRegion(hri, rootDir, CONF, htd, null, false, true); // HRegion region = TEST_UTIL.createLocalHRegion(hri, htd); Path regionDir = region.getRegionFileSystem().getRegionDir(); FileSystem fs = region.getRegionFileSystem().getFileSystem(); @@ -3566,7 +3713,7 @@ public class TestHRegion { fs.exists(regionInfoFile)); // Try to open the region - region = HRegion.openHRegion(rootDir, hri, htd, null, conf); + region = HRegion.openHRegion(rootDir, hri, htd, null, CONF); assertEquals(regionDir, region.getRegionFileSystem().getRegionDir()); HRegion.closeHRegion(region); @@ -3579,7 +3726,7 @@ public class TestHRegion { assertFalse(HRegionFileSystem.REGION_INFO_FILE + " should be removed from the region dir", fs.exists(regionInfoFile)); - region = HRegion.openHRegion(rootDir, hri, htd, null, conf); + region = HRegion.openHRegion(rootDir, hri, htd, null, CONF); // region = TEST_UTIL.openHRegion(hri, htd); assertEquals(regionDir, region.getRegionFileSystem().getRegionDir()); HRegion.closeHRegion(region); @@ -3629,7 +3776,7 @@ public class TestHRegion { @Test public void testParallelIncrementWithMemStoreFlush() throws Exception { byte[] family = Incrementer.family; - this.region = initHRegion(tableName, method, conf, family); + this.region = initHRegion(tableName, method, CONF, family); final HRegion region = this.region; final AtomicBoolean incrementDone = new AtomicBoolean(false); Runnable flusher = new Runnable() { @@ -3716,7 +3863,7 @@ public class TestHRegion { @Test public void testParallelAppendWithMemStoreFlush() throws Exception { byte[] family = Appender.family; - this.region = initHRegion(tableName, method, conf, family); + this.region = initHRegion(tableName, method, CONF, family); final HRegion region = this.region; final AtomicBoolean appendDone = new AtomicBoolean(false); Runnable flusher = new Runnable() { @@ -3780,7 +3927,7 @@ public class TestHRegion { byte[] qualifier = Bytes.toBytes("qualifier"); byte[] row = Bytes.toBytes("putRow"); byte[] value = null; - this.region = initHRegion(tableName, method, conf, family); + this.region = initHRegion(tableName, method, CONF, family); Put put = null; Get get = null; List kvs = null; @@ -3883,11 +4030,12 @@ public class TestHRegion { private void durabilityTest(String method, Durability tableDurability, Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync, final boolean expectSyncFromLogSyncer) throws Exception { + Configuration conf = HBaseConfiguration.create(CONF); method = method + "_" + tableDurability.name() + "_" + mutationDurability.name(); TableName tableName = TableName.valueOf(method); byte[] family = Bytes.toBytes("family"); - Path logDir = new Path(new Path(DIR + method), "log"); - HLog hlog = HLogFactory.createHLog(fs, logDir, UUID.randomUUID().toString(), conf); + Path logDir = new Path(new Path(dir + method), "log"); + HLog hlog = HLogFactory.createHLog(FILESYSTEM, logDir, UUID.randomUUID().toString(), conf); final HLog log = spy(hlog); this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, method, conf, false, tableDurability, log, @@ -3924,8 +4072,8 @@ public class TestHRegion { verify(log, never()).sync(); } - hlog.close(); - region.close(); + HRegion.closeHRegion(this.region); + this.region = null; } private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { @@ -4010,20 +4158,20 @@ public class TestHRegion { private Configuration initSplit() { // Always compact if there is more than one store file. - conf.setInt("hbase.hstore.compactionThreshold", 2); + CONF.setInt("hbase.hstore.compactionThreshold", 2); // Make lease timeout longer, lease checks less frequent - conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000); + CONF.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000); - conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000); + CONF.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000); // Increase the amount of time between client retries - conf.setLong("hbase.client.pause", 15 * 1000); + CONF.setLong("hbase.client.pause", 15 * 1000); // This size should make it so we always split using the addContent // below. After adding all data, the first region is 1.3M - conf.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128); - return conf; + CONF.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128); + return CONF; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionBusyWait.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionBusyWait.java index f1fffa3fecb..d641bdb12de 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionBusyWait.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionBusyWait.java @@ -42,7 +42,7 @@ public class TestHRegionBusyWait extends TestHRegion { @Before public void setup() throws IOException { super.setup(); - conf.set("hbase.busy.wait.duration", "1000"); + CONF.set("hbase.busy.wait.duration", "1000"); } /** @@ -53,7 +53,7 @@ public class TestHRegionBusyWait extends TestHRegion { String method = "testRegionTooBusy"; byte[] tableName = Bytes.toBytes(method); byte[] family = Bytes.toBytes("family"); - region = initHRegion(tableName, method, conf, family); + region = initHRegion(tableName, method, CONF, family); final AtomicBoolean stopped = new AtomicBoolean(true); Thread t = new Thread(new Runnable() { @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 793b839d898..20d57b5bef8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -29,8 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.NavigableSet; import java.util.concurrent.ConcurrentSkipListSet; - -import junit.framework.TestCase; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -75,15 +74,22 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.util.Progressable; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; import org.mockito.Mockito; /** * Test class for the Store */ @Category(MediumTests.class) -public class TestStore extends TestCase { +public class TestStore { public static final Log LOG = LogFactory.getLog(TestStore.class); + @Rule public TestName name = new TestName(); HStore store; byte [] table = Bytes.toBytes("table"); @@ -115,7 +121,7 @@ public class TestStore extends TestCase { * Setup * @throws IOException */ - @Override + @Before public void setUp() throws IOException { qualifiers.add(qf1); qualifiers.add(qf3); @@ -149,7 +155,7 @@ public class TestStore extends TestCase { } @SuppressWarnings("deprecation") - private void init(String methodName, Configuration conf, HTableDescriptor htd, + private Store init(String methodName, Configuration conf, HTableDescriptor htd, HColumnDescriptor hcd) throws IOException { //Setting up a Store Path basedir = new Path(DIR+methodName); @@ -167,12 +173,73 @@ public class TestStore extends TestCase { HRegion region = new HRegion(tableDir, hlog, fs, conf, info, htd, null); store = new HStore(region, hcd, conf); + return store; + } + + /** + * Test we do not lose data if we fail a flush and then close. + * Part of HBase-10466 + * @throws Exception + */ + @Test + public void testFlushSizeAccounting() throws Exception { + LOG.info("Setting up a faulty file system that cannot write in " + + this.name.getMethodName()); + final Configuration conf = HBaseConfiguration.create(); + // Only retry once. + conf.setInt("hbase.hstore.flush.retries.number", 1); + User user = User.createUserForTesting(conf, this.name.getMethodName(), + new String[]{"foo"}); + // Inject our faulty LocalFileSystem + conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); + user.runAs(new PrivilegedExceptionAction() { + public Object run() throws Exception { + // Make sure it worked (above is sensitive to caching details in hadoop core) + FileSystem fs = FileSystem.get(conf); + Assert.assertEquals(FaultyFileSystem.class, fs.getClass()); + FaultyFileSystem ffs = (FaultyFileSystem)fs; + + // Initialize region + init(name.getMethodName(), conf); + + long size = store.memstore.getFlushableSize(); + Assert.assertEquals(0, size); + LOG.info("Adding some data"); + long kvSize = store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); + size = store.memstore.getFlushableSize(); + Assert.assertEquals(kvSize, size); + // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. + try { + LOG.info("Flushing"); + flushStore(store, id++); + Assert.fail("Didn't bubble up IOE!"); + } catch (IOException ioe) { + Assert.assertTrue(ioe.getMessage().contains("Fault injected")); + } + size = store.memstore.getFlushableSize(); + Assert.assertEquals(kvSize, size); + store.add(new KeyValue(row, family, qf2, 2, (byte[])null)); + // Even though we add a new kv, we expect the flushable size to be 'same' since we have + // not yet cleared the snapshot -- the above flush failed. + Assert.assertEquals(kvSize, size); + ffs.fault.set(false); + flushStore(store, id++); + size = store.memstore.getFlushableSize(); + // Size should be the foreground kv size. + Assert.assertEquals(kvSize, size); + flushStore(store, id++); + size = store.memstore.getFlushableSize(); + Assert.assertEquals(0, size); + return null; + } + }); } /** * Verify that compression and data block encoding are respected by the * Store.createWriterInTmp() method, used on store flush. */ + @Test public void testCreateWriter() throws Exception { Configuration conf = HBaseConfiguration.create(); FileSystem fs = FileSystem.get(conf); @@ -180,7 +247,7 @@ public class TestStore extends TestCase { HColumnDescriptor hcd = new HColumnDescriptor(family); hcd.setCompressionType(Compression.Algorithm.GZ); hcd.setDataBlockEncoding(DataBlockEncoding.DIFF); - init(getName(), conf, hcd); + init(name.getMethodName(), conf, hcd); // Test createWriterInTmp() StoreFile.Writer writer = store.createWriterInTmp(4, hcd.getCompression(), false, true, false); @@ -193,11 +260,12 @@ public class TestStore extends TestCase { // Verify that compression and encoding settings are respected HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), conf); - assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm()); - assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding()); + Assert.assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm()); + Assert.assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding()); reader.close(); } + @Test public void testDeleteExpiredStoreFiles() throws Exception { int storeFileNum = 4; int ttl = 4; @@ -209,7 +277,7 @@ public class TestStore extends TestCase { conf.setBoolean("hbase.store.delete.expired.storefile", true); HColumnDescriptor hcd = new HColumnDescriptor(family); hcd.setTimeToLive(ttl); - init(getName(), conf, hcd); + init(name.getMethodName(), conf, hcd); long sleepTime = this.store.getScanInfo().getTtl() / storeFileNum; long timeStamp; @@ -226,7 +294,7 @@ public class TestStore extends TestCase { } // Verify the total number of store files - assertEquals(storeFileNum, this.store.getStorefiles().size()); + Assert.assertEquals(storeFileNum, this.store.getStorefiles().size()); // Each compaction request will find one expired store file and delete it // by the compaction. @@ -237,27 +305,28 @@ public class TestStore extends TestCase { // the first is expired normally. // If not the first compaction, there is another empty store file, List files = new ArrayList(cr.getFiles()); - assertEquals(Math.min(i, 2), cr.getFiles().size()); + Assert.assertEquals(Math.min(i, 2), cr.getFiles().size()); for (int j = 0; j < files.size(); j++) { - assertTrue(files.get(j).getReader().getMaxTimestamp() < (edge + Assert.assertTrue(files.get(j).getReader().getMaxTimestamp() < (edge .currentTimeMillis() - this.store.getScanInfo().getTtl())); } // Verify that the expired store file is compacted to an empty store file. // Default compaction policy creates just one and only one compacted file. StoreFile compactedFile = this.store.compact(compaction).get(0); // It is an empty store file. - assertEquals(0, compactedFile.getReader().getEntries()); + Assert.assertEquals(0, compactedFile.getReader().getEntries()); // Let the next store file expired. edge.incrementTime(sleepTime); } } + @Test public void testLowestModificationTime() throws Exception { Configuration conf = HBaseConfiguration.create(); FileSystem fs = FileSystem.get(conf); // Initialize region - init(getName(), conf); + init(name.getMethodName(), conf); int storeFileNum = 4; for (int i = 1; i <= storeFileNum; i++) { @@ -270,13 +339,13 @@ public class TestStore extends TestCase { // after flush; check the lowest time stamp long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); - assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS); + Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS); // after compact; check the lowest time stamp store.compact(store.requestCompaction()); lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); - assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); + Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); } private static long getLowestTimeStampFromFS(FileSystem fs, @@ -311,8 +380,9 @@ public class TestStore extends TestCase { * Test for hbase-1686. * @throws IOException */ + @Test public void testEmptyStoreFile() throws IOException { - init(this.getName()); + init(this.name.getMethodName()); // Write a store file. this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null)); @@ -335,20 +405,21 @@ public class TestStore extends TestCase { this.store.close(); // Reopen it... should pick up two files this.store = new HStore(this.store.getHRegion(), this.store.getFamily(), c); - assertEquals(2, this.store.getStorefilesCount()); + Assert.assertEquals(2, this.store.getStorefilesCount()); result = HBaseTestingUtility.getFromStoreFile(store, get.getRow(), qualifiers); - assertEquals(1, result.size()); + Assert.assertEquals(1, result.size()); } /** * Getting data from memstore only * @throws IOException */ + @Test public void testGet_FromMemStoreOnly() throws IOException { - init(this.getName()); + init(this.name.getMethodName()); //Put data in memstore this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); @@ -370,8 +441,9 @@ public class TestStore extends TestCase { * Getting data from files only * @throws IOException */ + @Test public void testGet_FromFilesOnly() throws IOException { - init(this.getName()); + init(this.name.getMethodName()); //Put data in memstore this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); @@ -408,8 +480,9 @@ public class TestStore extends TestCase { * Getting data from memstore and files * @throws IOException */ + @Test public void testGet_FromMemStoreAndFiles() throws IOException { - init(this.getName()); + init(this.name.getMethodName()); //Put data in memstore this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); @@ -441,14 +514,14 @@ public class TestStore extends TestCase { private void flush(int storeFilessize) throws IOException{ this.store.snapshot(); flushStore(store, id++); - assertEquals(storeFilessize, this.store.getStorefiles().size()); - assertEquals(0, this.store.memstore.kvset.size()); + Assert.assertEquals(storeFilessize, this.store.getStorefiles().size()); + Assert.assertEquals(0, this.store.memstore.kvset.size()); } private void assertCheck() { - assertEquals(expected.size(), result.size()); + Assert.assertEquals(expected.size(), result.size()); for(int i=0; i 0); + Assert.assertTrue(ret > 0); // then flush. flushStore(store, id++); - assertEquals(1, this.store.getStorefiles().size()); + Assert.assertEquals(1, this.store.getStorefiles().size()); // from the one we inserted up there, and a new one - assertEquals(2, this.store.memstore.kvset.size()); + Assert.assertEquals(2, this.store.memstore.kvset.size()); // how many key/values for this row are there? Get get = new Get(row); @@ -495,25 +569,25 @@ public class TestStore extends TestCase { List results = new ArrayList(); results = HBaseTestingUtility.getFromStoreFile(store, get); - assertEquals(2, results.size()); + Assert.assertEquals(2, results.size()); long ts1 = results.get(0).getTimestamp(); long ts2 = results.get(1).getTimestamp(); - assertTrue(ts1 > ts2); + Assert.assertTrue(ts1 > ts2); - assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0)))); - assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1)))); + Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0)))); + Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1)))); } - @Override - protected void tearDown() throws Exception { - super.tearDown(); + @After + public void tearDown() throws Exception { EnvironmentEdgeManagerTestHelper.reset(); } + @Test public void testICV_negMemstoreSize() throws IOException { - init(this.getName()); + init(this.name.getMethodName()); long time = 100; ManualEnvironmentEdge ee = new ManualEnvironmentEdge(); @@ -549,9 +623,9 @@ public class TestStore extends TestCase { if (ret != 0) System.out.println("ret: " + ret); if (ret2 != 0) System.out.println("ret2: " + ret2); - assertTrue("ret: " + ret, ret >= 0); + Assert.assertTrue("ret: " + ret, ret >= 0); size += ret; - assertTrue("ret2: " + ret2, ret2 >= 0); + Assert.assertTrue("ret2: " + ret2, ret2 >= 0); size += ret2; @@ -565,13 +639,14 @@ public class TestStore extends TestCase { //System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize()); computedSize += kvsize; } - assertEquals(computedSize, size); + Assert.assertEquals(computedSize, size); } + @Test public void testIncrementColumnValue_SnapshotFlushCombo() throws Exception { ManualEnvironmentEdge mee = new ManualEnvironmentEdge(); EnvironmentEdgeManagerTestHelper.injectEdge(mee); - init(this.getName()); + init(this.name.getMethodName()); long oldValue = 1L; long newValue = 3L; @@ -586,12 +661,12 @@ public class TestStore extends TestCase { long ret = this.store.updateColumnValue(row, family, qf1, newValue); // memstore should have grown by some amount. - assertTrue(ret > 0); + Assert.assertTrue(ret > 0); // then flush. flushStore(store, id++); - assertEquals(1, this.store.getStorefiles().size()); - assertEquals(1, this.store.memstore.kvset.size()); + Assert.assertEquals(1, this.store.getStorefiles().size()); + Assert.assertEquals(1, this.store.memstore.kvset.size()); // now increment again: newValue += 1; @@ -611,30 +686,31 @@ public class TestStore extends TestCase { List results = new ArrayList(); results = HBaseTestingUtility.getFromStoreFile(store, get); - assertEquals(2, results.size()); + Assert.assertEquals(2, results.size()); long ts1 = results.get(0).getTimestamp(); long ts2 = results.get(1).getTimestamp(); - assertTrue(ts1 > ts2); - assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0)))); - assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1)))); + Assert.assertTrue(ts1 > ts2); + Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0)))); + Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1)))); mee.setValue(2); // time goes up slightly newValue += 1; this.store.updateColumnValue(row, family, qf1, newValue); results = HBaseTestingUtility.getFromStoreFile(store, get); - assertEquals(2, results.size()); + Assert.assertEquals(2, results.size()); ts1 = results.get(0).getTimestamp(); ts2 = results.get(1).getTimestamp(); - assertTrue(ts1 > ts2); - assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0)))); - assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1)))); + Assert.assertTrue(ts1 > ts2); + Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0)))); + Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1)))); } + @Test public void testHandleErrorsInFlush() throws Exception { LOG.info("Setting up a faulty file system that cannot write"); @@ -648,10 +724,10 @@ public class TestStore extends TestCase { public Object run() throws Exception { // Make sure it worked (above is sensitive to caching details in hadoop core) FileSystem fs = FileSystem.get(conf); - assertEquals(FaultyFileSystem.class, fs.getClass()); + Assert.assertEquals(FaultyFileSystem.class, fs.getClass()); // Initialize region - init(getName(), conf); + init(name.getMethodName(), conf); LOG.info("Adding some data"); store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); @@ -662,30 +738,36 @@ public class TestStore extends TestCase { Collection files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); - assertEquals(0, files != null ? files.size() : 0); + Assert.assertEquals(0, files != null ? files.size() : 0); //flush try { LOG.info("Flushing"); flush(1); - fail("Didn't bubble up IOE!"); + Assert.fail("Didn't bubble up IOE!"); } catch (IOException ioe) { - assertTrue(ioe.getMessage().contains("Fault injected")); + Assert.assertTrue(ioe.getMessage().contains("Fault injected")); } LOG.info("After failed flush, we should still have no files!"); files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); - assertEquals(0, files != null ? files.size() : 0); + Assert.assertEquals(0, files != null ? files.size() : 0); + store.getHRegion().getLog().closeAndDelete(); return null; } }); + FileSystem.closeAllForUGI(user.getUGI()); } - + /** + * Faulty file system that will fail if you write past its fault position the FIRST TIME + * only; thereafter it will succeed. Used by {@link TestHRegion} too. + */ static class FaultyFileSystem extends FilterFileSystem { List> outStreams = new ArrayList>(); private long faultPos = 200; + AtomicBoolean fault = new AtomicBoolean(true); public FaultyFileSystem() { super(new LocalFileSystem()); @@ -694,7 +776,7 @@ public class TestStore extends TestCase { @Override public FSDataOutputStream create(Path p) throws IOException { - return new FaultyOutputStream(super.create(p), faultPos); + return new FaultyOutputStream(super.create(p), faultPos, fault); } @Override @@ -702,7 +784,7 @@ public class TestStore extends TestCase { boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { return new FaultyOutputStream(super.create(f, permission, - overwrite, bufferSize, replication, blockSize, progress), faultPos); + overwrite, bufferSize, replication, blockSize, progress), faultPos, fault); } public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, @@ -716,11 +798,13 @@ public class TestStore extends TestCase { static class FaultyOutputStream extends FSDataOutputStream { volatile long faultPos = Long.MAX_VALUE; + private final AtomicBoolean fault; - public FaultyOutputStream(FSDataOutputStream out, - long faultPos) throws IOException { + public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault) + throws IOException { super(out, null); this.faultPos = faultPos; + this.fault = fault; } @Override @@ -731,14 +815,12 @@ public class TestStore extends TestCase { } private void injectFault() throws IOException { - if (getPos() >= faultPos) { + if (this.fault.get() && getPos() >= faultPos) { throw new IOException("Fault injected"); } } } - - private static void flushStore(HStore store, long id) throws IOException { StoreFlushContext storeFlushCtx = store.createFlushContext(id); storeFlushCtx.prepare(); @@ -746,8 +828,6 @@ public class TestStore extends TestCase { storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); } - - /** * Generate a list of KeyValues for testing based on given parameters * @param timestamps @@ -772,12 +852,13 @@ public class TestStore extends TestCase { * Test to ensure correctness when using Stores with multiple timestamps * @throws IOException */ + @Test public void testMultipleTimestamps() throws IOException { int numRows = 1; long[] timestamps1 = new long[] {1,5,10,20}; long[] timestamps2 = new long[] {30,80}; - init(this.getName()); + init(this.name.getMethodName()); List kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family); for (Cell kv : kvList1) { @@ -798,27 +879,27 @@ public class TestStore extends TestCase { get.setTimeRange(0,15); result = HBaseTestingUtility.getFromStoreFile(store, get); - assertTrue(result.size()>0); + Assert.assertTrue(result.size()>0); get.setTimeRange(40,90); result = HBaseTestingUtility.getFromStoreFile(store, get); - assertTrue(result.size()>0); + Assert.assertTrue(result.size()>0); get.setTimeRange(10,45); result = HBaseTestingUtility.getFromStoreFile(store, get); - assertTrue(result.size()>0); + Assert.assertTrue(result.size()>0); get.setTimeRange(80,145); result = HBaseTestingUtility.getFromStoreFile(store, get); - assertTrue(result.size()>0); + Assert.assertTrue(result.size()>0); get.setTimeRange(1,2); result = HBaseTestingUtility.getFromStoreFile(store, get); - assertTrue(result.size()>0); + Assert.assertTrue(result.size()>0); get.setTimeRange(90,200); result = HBaseTestingUtility.getFromStoreFile(store, get); - assertTrue(result.size()==0); + Assert.assertTrue(result.size()==0); } /** @@ -826,14 +907,16 @@ public class TestStore extends TestCase { * * @throws IOException When the IO operations fail. */ + @Test public void testSplitWithEmptyColFam() throws IOException { - init(this.getName()); - assertNull(store.getSplitPoint()); + init(this.name.getMethodName()); + Assert.assertNull(store.getSplitPoint()); store.getHRegion().forceSplit(null); - assertNull(store.getSplitPoint()); + Assert.assertNull(store.getSplitPoint()); store.getHRegion().clearSplit_TESTS_ONLY(); } + @Test public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception { final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle"; long anyValue = 10; @@ -843,25 +926,25 @@ public class TestStore extends TestCase { // a number we pass in is higher than some config value, inside compactionPolicy. Configuration conf = HBaseConfiguration.create(); conf.setLong(CONFIG_KEY, anyValue); - init(getName() + "-xml", conf); - assertTrue(store.throttleCompaction(anyValue + 1)); - assertFalse(store.throttleCompaction(anyValue)); + init(name.getMethodName() + "-xml", conf); + Assert.assertTrue(store.throttleCompaction(anyValue + 1)); + Assert.assertFalse(store.throttleCompaction(anyValue)); // HTD overrides XML. --anyValue; HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); HColumnDescriptor hcd = new HColumnDescriptor(family); htd.setConfiguration(CONFIG_KEY, Long.toString(anyValue)); - init(getName() + "-htd", conf, htd, hcd); - assertTrue(store.throttleCompaction(anyValue + 1)); - assertFalse(store.throttleCompaction(anyValue)); + init(name.getMethodName() + "-htd", conf, htd, hcd); + Assert.assertTrue(store.throttleCompaction(anyValue + 1)); + Assert.assertFalse(store.throttleCompaction(anyValue)); // HCD overrides them both. --anyValue; hcd.setConfiguration(CONFIG_KEY, Long.toString(anyValue)); - init(getName() + "-hcd", conf, htd, hcd); - assertTrue(store.throttleCompaction(anyValue + 1)); - assertFalse(store.throttleCompaction(anyValue)); + init(name.getMethodName() + "-hcd", conf, htd, hcd); + Assert.assertTrue(store.throttleCompaction(anyValue + 1)); + Assert.assertFalse(store.throttleCompaction(anyValue)); } public static class DummyStoreEngine extends DefaultStoreEngine { @@ -874,11 +957,12 @@ public class TestStore extends TestCase { } } + @Test public void testStoreUsesSearchEngineOverride() throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName()); - init(this.getName(), conf); - assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor()); + init(this.name.getMethodName(), conf); + Assert.assertEquals(DummyStoreEngine.lastCreatedCompactor, + this.store.storeEngine.getCompactor()); } -} - +} \ No newline at end of file