From dcae5488cd2a69f0ffe39487b1375d9057991dfd Mon Sep 17 00:00:00 2001 From: Jean-Daniel Cryans Date: Fri, 25 Apr 2014 21:03:11 +0000 Subject: [PATCH] HBASE-10958 [dataloss] Bulk loading with seqids can prevent some log entries from being replayed git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1590144 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/regionserver/HRegion.java | 149 ++++++++++++++---- .../hadoop/hbase/regionserver/HStore.java | 4 +- .../hbase/regionserver/MemStoreFlusher.java | 2 +- .../hbase/regionserver/RSRpcServices.java | 2 +- .../hadoop/hbase/regionserver/StoreFile.java | 12 +- .../mapreduce/TestLoadIncrementalHFiles.java | 53 +------ .../hbase/regionserver/TestHRegion.java | 38 ++++- .../hbase/regionserver/wal/TestWALReplay.java | 102 ++++++++++-- .../hadoop/hbase/util/HFileTestUtil.java | 66 ++++++++ 9 files changed, 323 insertions(+), 105 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/util/HFileTestUtil.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ed33893e428..90976e24ff2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -387,6 +387,81 @@ public class HRegion implements HeapSize { // , Writable{ ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN); } + /** + * Objects from this class are created when flushing to describe all the different states that + * that method ends up in. The Result enum describes those states. The sequence id should only + * be specified if the flush was successful, and the failure message should only be speficied + * if it didn't flush. + */ + public static class FlushResult { + enum Result { + FLUSHED_NO_COMPACTION_NEEDED, + FLUSHED_COMPACTION_NEEDED, + // Special case where a flush didn't run because there's nothing in the memstores. Used when + // bulk loading to know when we can still load even if a flush didn't happen. + CANNOT_FLUSH_MEMSTORE_EMPTY, + CANNOT_FLUSH + // Be careful adding more to this enum, look at the below methods to make sure + } + + final Result result; + final String failureReason; + final long flushSequenceId; + + /** + * Convenience constructor to use when the flush is successful, the failure message is set to + * null. + * @param result Expecting FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_COMPACTION_NEEDED. + * @param flushSequenceId Generated sequence id that comes right after the edits in the + * memstores. + */ + FlushResult(Result result, long flushSequenceId) { + this(result, flushSequenceId, null); + assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result + .FLUSHED_COMPACTION_NEEDED; + } + + /** + * Convenience constructor to use when we cannot flush. + * @param result Expecting CANNOT_FLUSH_MEMSTORE_EMPTY or CANNOT_FLUSH. + * @param failureReason Reason why we couldn't flush. + */ + FlushResult(Result result, String failureReason) { + this(result, -1, failureReason); + assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH; + } + + /** + * Constructor with all the parameters. + * @param result Any of the Result. + * @param flushSequenceId Generated sequence id if the memstores were flushed else -1. + * @param failureReason Reason why we couldn't flush, or null. + */ + FlushResult(Result result, long flushSequenceId, String failureReason) { + this.result = result; + this.flushSequenceId = flushSequenceId; + this.failureReason = failureReason; + } + + /** + * Convenience method, the equivalent of checking if result is + * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED. + * @return true if the memstores were flushed, else false. + */ + public boolean isFlushSucceeded() { + return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result + .FLUSHED_COMPACTION_NEEDED; + } + + /** + * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED. + * @return True if the flush requested a compaction, else false (doesn't even mean it flushed). + */ + public boolean isCompactionNeeded() { + return result == Result.FLUSHED_COMPACTION_NEEDED; + } + } + final WriteState writestate = new WriteState(); long memstoreFlushSize; @@ -706,16 +781,13 @@ public class HRegion implements HeapSize { // , Writable{ for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) { Future future = completionService.take(); HStore store = future.get(); - this.stores.put(store.getColumnFamilyName().getBytes(), store); - // Do not include bulk loaded files when determining seqIdForReplay - long storeSeqIdForReplay = store.getMaxSequenceId(false); + + long storeMaxSequenceId = store.getMaxSequenceId(); maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), - storeSeqIdForReplay); - // Include bulk loaded files when determining seqIdForAssignment - long storeSeqIdForAssignment = store.getMaxSequenceId(true); - if (maxSeqId == -1 || storeSeqIdForAssignment > maxSeqId) { - maxSeqId = storeSeqIdForAssignment; + storeMaxSequenceId); + if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) { + maxSeqId = storeMaxSequenceId; } long maxStoreMemstoreTS = store.getMaxMemstoreTS(); if (maxStoreMemstoreTS > maxMemstoreTS) { @@ -1461,11 +1533,12 @@ public class HRegion implements HeapSize { // , Writable{ * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ - public boolean flushcache() throws IOException { + public FlushResult flushcache() throws IOException { // fail-fast instead of waiting on the lock if (this.closing.get()) { - LOG.debug("Skipping flush on " + this + " because closing"); - return false; + String msg = "Skipping flush on " + this + " because closing"; + LOG.debug(msg); + return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); } MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this); status.setStatus("Acquiring readlock on region"); @@ -1473,9 +1546,10 @@ public class HRegion implements HeapSize { // , Writable{ lock.readLock().lock(); try { if (this.closed.get()) { - LOG.debug("Skipping flush on " + this + " because closed"); - status.abort("Skipped: closed"); - return false; + String msg = "Skipping flush on " + this + " because closed"; + LOG.debug(msg); + status.abort(msg); + return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); } if (coprocessorHost != null) { status.setStatus("Running coprocessor pre-flush hooks"); @@ -1494,14 +1568,15 @@ public class HRegion implements HeapSize { // , Writable{ + ", flushing=" + writestate.flushing + ", writesEnabled=" + writestate.writesEnabled); } - status.abort("Not flushing since " + String msg = "Not flushing since " + (writestate.flushing ? "already flushing" - : "writes not enabled")); - return false; + : "writes not enabled"); + status.abort(msg); + return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); } } try { - boolean result = internalFlushcache(status); + FlushResult fs = internalFlushcache(status); if (coprocessorHost != null) { status.setStatus("Running post-flush coprocessor hooks"); @@ -1509,7 +1584,7 @@ public class HRegion implements HeapSize { // , Writable{ } status.markComplete("Flush successful"); - return result; + return fs; } finally { synchronized (writestate) { writestate.flushing = false; @@ -1578,13 +1653,13 @@ public class HRegion implements HeapSize { // , Writable{ *

This method may block for some time. * @param status * - * @return true if the region needs compacting + * @return object describing the flush's state * * @throws IOException general io exceptions * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ - protected boolean internalFlushcache(MonitoredTask status) + protected FlushResult internalFlushcache(MonitoredTask status) throws IOException { return internalFlushcache(this.log, -1, status); } @@ -1598,7 +1673,7 @@ public class HRegion implements HeapSize { // , Writable{ * @throws IOException * @see #internalFlushcache(MonitoredTask) */ - protected boolean internalFlushcache( + protected FlushResult internalFlushcache( final HLog wal, final long myseqid, MonitoredTask status) throws IOException { if (this.rsServices != null && this.rsServices.isAborted()) { @@ -1609,7 +1684,7 @@ public class HRegion implements HeapSize { // , Writable{ // Clear flush flag. // If nothing to flush, return and avoid logging start/stop flush. if (this.memstoreSize.get() <= 0) { - return false; + return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush"); } if (LOG.isDebugEnabled()) { LOG.debug("Started memstore flush for " + this + @@ -1644,9 +1719,10 @@ public class HRegion implements HeapSize { // , Writable{ // check if it is not closing. if (wal != null) { if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) { - status.setStatus("Flush will not be started for [" - + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."); - return false; + String msg = "Flush will not be started for [" + + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; + status.setStatus(msg); + return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); } flushSeqId = this.sequenceId.incrementAndGet(); } else { @@ -1762,7 +1838,8 @@ public class HRegion implements HeapSize { // , Writable{ status.setStatus(msg); this.recentFlushes.add(new Pair(time/1000, totalFlushableSize)); - return compactionRequested; + return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED : + FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId); } ////////////////////////////////////////////////////////////////////////////// @@ -3563,6 +3640,22 @@ public class HRegion implements HeapSize { // , Writable{ return false; } + long seqId = -1; + // We need to assign a sequential ID that's in between two memstores in order to preserve + // the guarantee that all the edits lower than the highest sequential ID from all the + // HFiles are flushed on disk. See HBASE-10958. + if (assignSeqId) { + FlushResult fs = this.flushcache(); + if (fs.isFlushSucceeded()) { + seqId = fs.flushSequenceId; + } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) { + seqId = this.sequenceId.incrementAndGet(); + } else { + throw new IOException("Could not bulk load with an assigned sequential ID because the " + + "flush didn't run. Reason for not flushing: " + fs.failureReason); + } + } + for (Pair p : familyPaths) { byte[] familyName = p.getFirst(); String path = p.getSecond(); @@ -3572,7 +3665,7 @@ public class HRegion implements HeapSize { // , Writable{ if(bulkLoadListener != null) { finalPath = bulkLoadListener.prepareBulkLoad(familyName, path); } - store.bulkLoadHFile(finalPath, assignSeqId ? this.sequenceId.incrementAndGet() : -1); + store.bulkLoadHFile(finalPath, seqId); if(bulkLoadListener != null) { bulkLoadListener.doneBulkLoad(familyName, path); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 2b8bec56340..6de94c8fe27 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -433,8 +433,8 @@ public class HStore implements Store { /** * @return The maximum sequence id in all store files. Used for log replay. */ - long getMaxSequenceId(boolean includeBulkFiles) { - return StoreFile.getMaxSequenceIdInList(this.getStorefiles(), includeBulkFiles); + long getMaxSequenceId() { + return StoreFile.getMaxSequenceIdInList(this.getStorefiles()); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 8a3282772bc..c3b2667ca0a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -475,7 +475,7 @@ class MemStoreFlusher implements FlushRequester { lock.readLock().lock(); try { notifyFlushRequest(region, emergencyFlush); - boolean shouldCompact = region.flushcache(); + boolean shouldCompact = region.flushcache().isCompactionNeeded(); // We just want to check the size boolean shouldSplit = region.checkSplit() != null; if (shouldSplit) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index c30b520cbf8..e7f7a10c06c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -978,7 +978,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); if (shouldFlush) { - boolean result = region.flushcache(); + boolean result = region.flushcache().isCompactionNeeded(); if (result) { regionServer.compactSplitThread.requestSystemCompaction(region, "Compaction through user triggered flush"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index c278792e1ea..af3517c1c07 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -300,21 +300,15 @@ public class StoreFile { /** * Return the highest sequence ID found across all storefiles in - * the given list. Store files that were created by a mapreduce - * bulk load are ignored, as they do not correspond to any edit - * log items. + * the given list. * @param sfs - * @param includeBulkLoadedFiles * @return 0 if no non-bulk-load files are provided or, this is Store that * does not yet have any store files. */ - public static long getMaxSequenceIdInList(Collection sfs, - boolean includeBulkLoadedFiles) { + public static long getMaxSequenceIdInList(Collection sfs) { long max = 0; for (StoreFile sf : sfs) { - if (includeBulkLoadedFiles || !sf.isBulkLoadResult()) { - max = Math.max(max, sf.getMaxSequenceId()); - } + max = Math.max(max, sf.getMaxSequenceId()); } return max; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index 860fcdad56a..bf5936d354d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -32,21 +32,16 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFileContext; -import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.BloomType; -import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HFileTestUtil; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -69,10 +64,6 @@ public class TestLoadIncrementalHFiles { Bytes.toBytes("ppp") }; - public static int BLOCKSIZE = 64*1024; - public static Algorithm COMPRESSION = - Compression.Algorithm.NONE; - static HBaseTestingUtility util = new HBaseTestingUtility(); @BeforeClass @@ -149,7 +140,7 @@ public class TestLoadIncrementalHFiles { for (byte[][] range : hfileRanges) { byte[] from = range[0]; byte[] to = range[1]; - createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000); } int expectedRows = hfileIdx * 1000; @@ -189,7 +180,7 @@ public class TestLoadIncrementalHFiles { for (byte[][] range : hFileRanges) { byte[] from = range[0]; byte[] to = range[1]; - createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + hFileIdx++), FAMILY, QUALIFIER, from, to, 1000); } @@ -228,7 +219,7 @@ public class TestLoadIncrementalHFiles { FileSystem fs = util.getTestFileSystem(); Path testIn = new Path(dir, "testhfile"); HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY); - createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, + HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); Path bottomOut = new Path(dir, "bottom.out"); @@ -261,40 +252,6 @@ public class TestLoadIncrementalHFiles { return count; } - - /** - * Create an HFile with the given number of rows between a given - * start key and end key. - * TODO put me in an HFileTestUtil or something? - */ - static void createHFile( - Configuration configuration, - FileSystem fs, Path path, - byte[] family, byte[] qualifier, - byte[] startKey, byte[] endKey, int numRows) throws IOException - { - HFileContext meta = new HFileContextBuilder() - .withBlockSize(BLOCKSIZE) - .withCompression(COMPRESSION) - .build(); - HFile.Writer writer = HFile.getWriterFactory(configuration, new CacheConfig(configuration)) - .withPath(fs, path) - .withFileContext(meta) - .create(); - long now = System.currentTimeMillis(); - try { - // subtract 2 since iterateOnSplits doesn't include boundary keys - for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows-2)) { - KeyValue kv = new KeyValue(key, family, qualifier, now, key); - writer.append(kv); - } - } finally { - writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, - Bytes.toBytes(System.currentTimeMillis())); - writer.close(); - } - } - private void addStartEndKeysForTest(TreeMap map, byte[] first, byte[] last) { Integer value = map.containsKey(first)?map.get(first):0; map.put(first, value+1); @@ -370,7 +327,7 @@ public class TestLoadIncrementalHFiles { byte[] from = Bytes.toBytes("begin"); byte[] to = Bytes.toBytes("end"); for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) { - createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + i), FAMILY, QUALIFIER, from, to, 1000); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 195834f347c..ae912fa3eea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -3435,7 +3435,7 @@ public class TestHRegion { @Override public void doAnAction() throws Exception { - if (region.flushcache()) { + if (region.flushcache().isCompactionNeeded()) { ++flushesSinceCompact; } // Compact regularly to avoid creating too many files and exceeding @@ -4292,6 +4292,42 @@ public class TestHRegion { } } + /** + * Test that we get the expected flush results back + * @throws IOException + */ + @Test + public void testFlushResult() throws IOException { + String method = name.getMethodName(); + byte[] tableName = Bytes.toBytes(method); + byte[] family = Bytes.toBytes("family"); + + this.region = initHRegion(tableName, method, family); + + // empty memstore, flush doesn't run + HRegion.FlushResult fr = region.flushcache(); + assertFalse(fr.isFlushSucceeded()); + assertFalse(fr.isCompactionNeeded()); + + // Flush enough files to get up to the threshold, doesn't need compactions + for (int i = 0; i < 2; i++) { + Put put = new Put(tableName).add(family, family, tableName); + region.put(put); + fr = region.flushcache(); + assertTrue(fr.isFlushSucceeded()); + assertFalse(fr.isCompactionNeeded()); + } + + // Two flushes after the threshold, compactions are needed + for (int i = 0; i < 2; i++) { + Put put = new Put(tableName).add(family, family, tableName); + region.put(put); + fr = region.flushcache(); + assertTrue(fr.isFlushSucceeded()); + assertTrue(fr.isCompactionNeeded()); + } + } + private Configuration initSplit() { // Always compact if there is more than one store file. CONF.setInt("hbase.hstore.compactionThreshold", 2); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index c6b0ac4a300..515aef056b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -57,9 +57,6 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFileContext; -import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; @@ -77,6 +74,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HFileTestUtil; import org.apache.hadoop.hbase.util.Pair; import org.junit.After; import org.junit.AfterClass; @@ -319,7 +317,7 @@ public class TestWALReplay { throws IOException, SecurityException, IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException { final TableName tableName = - TableName.valueOf("testReplayEditsWrittenViaHRegion"); + TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly"); final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); deleteDir(basedir); @@ -329,21 +327,22 @@ public class TestWALReplay { HRegion.closeHRegion(region2); HLog wal = createWAL(this.conf); HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf); - Path f = new Path(basedir, "hfile"); - HFileContext context = new HFileContextBuilder().build(); - HFile.Writer writer = - HFile.getWriterFactoryNoCache(conf).withPath(fs, f) - .withFileContext(context).create(); + byte [] family = htd.getFamilies().iterator().next().getName(); - byte [] row = tableName.getName(); - writer.append(new KeyValue(row, family, family, row)); - writer.close(); + Path f = new Path(basedir, "hfile"); + HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""), + Bytes.toBytes("z"), 10); List > hfs= new ArrayList>(1); hfs.add(Pair.newPair(family, f.toString())); region.bulkLoadHFiles(hfs, true); + // Add an edit so something in the WAL + byte [] row = tableName.getName(); region.put((new Put(row)).add(family, family, family)); wal.sync(); + final int rowsInsertedCount = 11; + + assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); // Now 'crash' the region by stealing its wal final Configuration newConf = HBaseConfiguration.create(this.conf); @@ -358,6 +357,7 @@ public class TestWALReplay { hbaseRootDir, hri, htd, wal2); long seqid2 = region2.getOpenSeqNum(); assertTrue(seqid2 > -1); + assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); // I can't close wal1. Its been appropriated when we split. region2.close(); @@ -367,6 +367,78 @@ public class TestWALReplay { }); } + /** + * HRegion test case that is made of a major compacted HFile (created with three bulk loaded + * files) and an edit in the memstore. + * This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries + * from being replayed" + * @throws IOException + * @throws IllegalAccessException + * @throws NoSuchFieldException + * @throws IllegalArgumentException + * @throws SecurityException + */ + @Test + public void testCompactedBulkLoadedFiles() + throws IOException, SecurityException, IllegalArgumentException, + NoSuchFieldException, IllegalAccessException, InterruptedException { + final TableName tableName = + TableName.valueOf("testCompactedBulkLoadedFiles"); + final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); + final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); + deleteDir(basedir); + final HTableDescriptor htd = createBasic3FamilyHTD(tableName); + HRegion region2 = HRegion.createHRegion(hri, + hbaseRootDir, this.conf, htd); + HRegion.closeHRegion(region2); + HLog wal = createWAL(this.conf); + HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf); + + // Add an edit so something in the WAL + byte [] row = tableName.getName(); + byte [] family = htd.getFamilies().iterator().next().getName(); + region.put((new Put(row)).add(family, family, family)); + wal.sync(); + + List > hfs= new ArrayList>(1); + for (int i = 0; i < 3; i++) { + Path f = new Path(basedir, "hfile"+i); + HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"), + Bytes.toBytes(i + "50"), 10); + hfs.add(Pair.newPair(family, f.toString())); + } + region.bulkLoadHFiles(hfs, true); + final int rowsInsertedCount = 31; + assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); + + // major compact to turn all the bulk loaded files into one normal file + region.compactStores(true); + assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); + + // Now 'crash' the region by stealing its wal + final Configuration newConf = HBaseConfiguration.create(this.conf); + User user = HBaseTestingUtility.getDifferentUser(newConf, + tableName.getNameAsString()); + user.runAs(new PrivilegedExceptionAction() { + public Object run() throws Exception { + runWALSplit(newConf); + HLog wal2 = createWAL(newConf); + + HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf), + hbaseRootDir, hri, htd, wal2); + long seqid2 = region2.getOpenSeqNum(); + assertTrue(seqid2 > -1); + assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); + + // I can't close wal1. Its been appropriated when we split. + region2.close(); + wal2.closeAndDelete(); + return null; + } + }); + } + + /** * Test writing edits into an HRegion, closing it, splitting logs, opening * Region again. Verify seqids. @@ -736,14 +808,14 @@ public class TestWALReplay { try { final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) { - protected boolean internalFlushcache( + protected FlushResult internalFlushcache( final HLog wal, final long myseqid, MonitoredTask status) throws IOException { LOG.info("InternalFlushCache Invoked"); - boolean b = super.internalFlushcache(wal, myseqid, + FlushResult fs = super.internalFlushcache(wal, myseqid, Mockito.mock(MonitoredTask.class)); flushcount.incrementAndGet(); - return b; + return fs; }; }; long seqid = region.initialize(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/HFileTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/HFileTestUtil.java new file mode 100644 index 00000000000..f52837bfc80 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/HFileTestUtil.java @@ -0,0 +1,66 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.regionserver.StoreFile; + +import java.io.IOException; + +/** + * Utility class for HFile-related testing. + */ +public class HFileTestUtil { + + /** + * Create an HFile with the given number of rows between a given + * start key and end key. + */ + public static void createHFile( + Configuration configuration, + FileSystem fs, Path path, + byte[] family, byte[] qualifier, + byte[] startKey, byte[] endKey, int numRows) throws IOException + { + HFileContext meta = new HFileContextBuilder().build(); + HFile.Writer writer = HFile.getWriterFactory(configuration, new CacheConfig(configuration)) + .withPath(fs, path) + .withFileContext(meta) + .create(); + long now = System.currentTimeMillis(); + try { + // subtract 2 since iterateOnSplits doesn't include boundary keys + for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows-2)) { + KeyValue kv = new KeyValue(key, family, qualifier, now, key); + writer.append(kv); + } + } finally { + writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, + Bytes.toBytes(System.currentTimeMillis())); + writer.close(); + } + } +}