From 351e75629f32091ce3fa8965739e35fc1aeeaa9f Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Wed, 23 Nov 2011 05:23:45 +0000 Subject: [PATCH] HBASE-4797 [availability] Skip recovered.edits files with edits we know older than what region currently has (Jimmy Jiang) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1205290 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/regionserver/HRegion.java | 330 ++++++++++-------- .../hbase/regionserver/TestHRegion.java | 179 ++++++++-- 2 files changed, 328 insertions(+), 181 deletions(-) diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 0b50a7f5763..38bc413aeae 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -168,7 +168,7 @@ public class HRegion implements HeapSize { // , Writable{ // Members ////////////////////////////////////////////////////////////////////////////// - private final ConcurrentHashMap lockedRows = + private final ConcurrentHashMap lockedRows = new ConcurrentHashMap(); private final ConcurrentHashMap lockIds = new ConcurrentHashMap(); @@ -487,10 +487,10 @@ public class HRegion implements HeapSize { // , Writable{ */ public long initialize(final CancelableProgressable reporter) throws IOException { - + MonitoredTask status = TaskMonitor.get().createStatus( "Initializing region " + this); - + if (coprocessorHost != null) { status.setStatus("Running coprocessor pre-open hook"); coprocessorHost.preOpen(); @@ -558,17 +558,17 @@ public class HRegion implements HeapSize { // , Writable{ this.writestate.setReadOnly(this.htableDescriptor.isReadOnly()); this.writestate.compacting = 0; - + // Initialize split policy this.splitPolicy = RegionSplitPolicy.create(this, conf); - + this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis(); // Use maximum of log sequenceid or that which was found in stores // (particularly if no recovered edits, seqid will be -1). long nextSeqid = maxSeqId + 1; LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid); - + if (coprocessorHost != null) { status.setStatus("Running coprocessor post-open hooks"); coprocessorHost.postOpen(); @@ -605,7 +605,7 @@ public class HRegion implements HeapSize { // , Writable{ } return false; } - + /** * This function will return the HDFS blocks distribution based on the data * captured when HFile is created @@ -642,7 +642,7 @@ public class HRegion implements HeapSize { // , Writable{ Path tablePath = FSUtils.getTablePath(FSUtils.getRootDir(conf), tableDescriptor.getName()); FileSystem fs = tablePath.getFileSystem(conf); - + for (HColumnDescriptor family: tableDescriptor.getFamilies()) { Path storeHomeDir = Store.getStoreHomedir(tablePath, regionEncodedName, family.getName()); @@ -660,27 +660,27 @@ public class HRegion implements HeapSize { // , Writable{ } return hdfsBlocksDistribution; } - + public AtomicLong getMemstoreSize() { return memstoreSize; } - + /** - * Increase the size of mem store in this region and the size of global mem + * Increase the size of mem store in this region and the size of global mem * store * @param memStoreSize * @return the size of memstore in this region */ public long addAndGetGlobalMemstoreSize(long memStoreSize) { if (this.rsServices != null) { - RegionServerAccounting rsAccounting = + RegionServerAccounting rsAccounting = this.rsServices.getRegionServerAccounting(); - + if (rsAccounting != null) { rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize); } } - return this.memstoreSize.getAndAdd(memStoreSize); + return this.memstoreSize.getAndAdd(memStoreSize); } /* @@ -795,7 +795,7 @@ public class HRegion implements HeapSize { // , Writable{ MonitoredTask status = TaskMonitor.get().createStatus( "Closing region " + this + (abort ? " due to abort" : "")); - + status.setStatus("Waiting for close lock"); try { synchronized (closeLock) { @@ -1180,7 +1180,7 @@ public class HRegion implements HeapSize { // , Writable{ } } boolean result = internalFlushcache(status); - + if (coprocessorHost != null) { status.setStatus("Running post-flush coprocessor hooks"); coprocessorHost.postFlush(); @@ -1228,7 +1228,7 @@ public class HRegion implements HeapSize { // , Writable{ * routes. * *

This method may block for some time. - * @param status + * @param status * * @return true if the region needs compacting * @@ -1245,7 +1245,7 @@ public class HRegion implements HeapSize { // , Writable{ * @param wal Null if we're NOT to go via hlog/wal. * @param myseqid The seqid to use if wal is null writing out * flush file. - * @param status + * @param status * @return true if the region needs compacting * @throws IOException * @see #internalFlushcache(MonitoredTask) @@ -1791,7 +1791,7 @@ public class HRegion implements HeapSize { // , Writable{ /** * Perform a batch of puts. - * + * * @param putsAndLocks * the list of puts paired with their requested lock IDs. * @return an array of OperationStatus which internally contains the @@ -2015,7 +2015,7 @@ public class HRegion implements HeapSize { // , Writable{ // STEP 7. Sync wal. // ------------------------- if (walEdit.size() > 0 && - (this.regionInfo.isMetaRegion() || + (this.regionInfo.isMetaRegion() || !this.htableDescriptor.isDeferredLogFlush())) { this.log.sync(txid); } @@ -2378,7 +2378,7 @@ public class HRegion implements HeapSize { // , Writable{ /** * Remove all the keys listed in the map from the memstore. This method is - * called when a Put has updated memstore but subequently fails to update + * called when a Put has updated memstore but subequently fails to update * the wal. This method is then invoked to rollback the memstore. */ private void rollbackMemstore(BatchOperationInProgress> batchOp, @@ -2392,13 +2392,13 @@ public class HRegion implements HeapSize { // , Writable{ continue; } - // Rollback all the kvs for this row. - Map> familyMap = familyMaps[i]; + // Rollback all the kvs for this row. + Map> familyMap = familyMaps[i]; for (Map.Entry> e : familyMap.entrySet()) { byte[] family = e.getKey(); List edits = e.getValue(); - // Remove those keys from the memstore that matches our + // Remove those keys from the memstore that matches our // key's (row, cf, cq, timestamp, memstoreTS). The interesting part is // that even the memstoreTS has to match for keys that will be rolleded-back. Store store = getStore(family); @@ -2502,16 +2502,36 @@ public class HRegion implements HeapSize { // , Writable{ protected long replayRecoveredEditsIfAny(final Path regiondir, final long minSeqId, final CancelableProgressable reporter, final MonitoredTask status) - throws UnsupportedEncodingException, IOException { + throws UnsupportedEncodingException, IOException { long seqid = minSeqId; NavigableSet files = HLog.getSplitEditFilesSorted(this.fs, regiondir); if (files == null || files.isEmpty()) return seqid; + boolean checkSafeToSkip = true; for (Path edits: files) { if (edits == null || !this.fs.exists(edits)) { LOG.warn("Null or non-existent edits file: " + edits); continue; } if (isZeroLengthThenDelete(this.fs, edits)) continue; + + if (checkSafeToSkip) { + Path higher = files.higher(edits); + long maxSeqId = Long.MAX_VALUE; + if (higher != null) { + // Edit file name pattern, HLog.EDITFILES_NAME_PATTERN: "-?[0-9]+" + String fileName = higher.getName(); + maxSeqId = Math.abs(Long.parseLong(fileName)); + } + if (maxSeqId <= minSeqId) { + String msg = "Maximum possible sequenceid for this log is " + maxSeqId + + ", skipped the whole file, path=" + edits; + LOG.debug(msg); + continue; + } else { + checkSafeToSkip = false; + } + } + try { seqid = replayRecoveredEdits(edits, seqid, reporter); } catch (IOException e) { @@ -2556,139 +2576,139 @@ public class HRegion implements HeapSize { // , Writable{ minSeqId + "; path=" + edits; LOG.info(msg); MonitoredTask status = TaskMonitor.get().createStatus(msg); - + status.setStatus("Opening logs"); HLog.Reader reader = HLog.getReader(this.fs, edits, conf); try { - long currentEditSeqId = minSeqId; - long firstSeqIdInLog = -1; - long skippedEdits = 0; - long editsCount = 0; - long intervalEdits = 0; - HLog.Entry entry; - Store store = null; - boolean reported_once = false; + long currentEditSeqId = minSeqId; + long firstSeqIdInLog = -1; + long skippedEdits = 0; + long editsCount = 0; + long intervalEdits = 0; + HLog.Entry entry; + Store store = null; + boolean reported_once = false; - try { - // How many edits seen before we check elapsed time - int interval = this.conf.getInt("hbase.hstore.report.interval.edits", - 2000); - // How often to send a progress report (default 1/2 master timeout) - int period = this.conf.getInt("hbase.hstore.report.period", - this.conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", - 180000) / 2); - long lastReport = EnvironmentEdgeManager.currentTimeMillis(); + try { + // How many edits seen before we check elapsed time + int interval = this.conf.getInt("hbase.hstore.report.interval.edits", + 2000); + // How often to send a progress report (default 1/2 master timeout) + int period = this.conf.getInt("hbase.hstore.report.period", + this.conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", + 180000) / 2); + long lastReport = EnvironmentEdgeManager.currentTimeMillis(); - while ((entry = reader.next()) != null) { - HLogKey key = entry.getKey(); - WALEdit val = entry.getEdit(); + while ((entry = reader.next()) != null) { + HLogKey key = entry.getKey(); + WALEdit val = entry.getEdit(); - if (reporter != null) { - intervalEdits += val.size(); - if (intervalEdits >= interval) { - // Number of edits interval reached - intervalEdits = 0; - long cur = EnvironmentEdgeManager.currentTimeMillis(); - if (lastReport + period <= cur) { - status.setStatus("Replaying edits..." + - " skipped=" + skippedEdits + - " edits=" + editsCount); - // Timeout reached - if(!reporter.progress()) { - msg = "Progressable reporter failed, stopping replay"; - LOG.warn(msg); - status.abort(msg); - throw new IOException(msg); + if (reporter != null) { + intervalEdits += val.size(); + if (intervalEdits >= interval) { + // Number of edits interval reached + intervalEdits = 0; + long cur = EnvironmentEdgeManager.currentTimeMillis(); + if (lastReport + period <= cur) { + status.setStatus("Replaying edits..." + + " skipped=" + skippedEdits + + " edits=" + editsCount); + // Timeout reached + if(!reporter.progress()) { + msg = "Progressable reporter failed, stopping replay"; + LOG.warn(msg); + status.abort(msg); + throw new IOException(msg); + } + reported_once = true; + lastReport = cur; } - reported_once = true; - lastReport = cur; } } - } - // Start coprocessor replay here. The coprocessor is for each WALEdit - // instead of a KeyValue. - if (coprocessorHost != null) { - status.setStatus("Running pre-WAL-restore hook in coprocessors"); - if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) { - // if bypass this log entry, ignore it ... - continue; + // Start coprocessor replay here. The coprocessor is for each WALEdit + // instead of a KeyValue. + if (coprocessorHost != null) { + status.setStatus("Running pre-WAL-restore hook in coprocessors"); + if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) { + // if bypass this log entry, ignore it ... + continue; + } } - } - if (firstSeqIdInLog == -1) { - firstSeqIdInLog = key.getLogSeqNum(); - } - // Now, figure if we should skip this edit. - if (key.getLogSeqNum() <= currentEditSeqId) { - skippedEdits++; - continue; - } - currentEditSeqId = key.getLogSeqNum(); - boolean flush = false; - for (KeyValue kv: val.getKeyValues()) { - // Check this edit is for me. Also, guard against writing the special - // METACOLUMN info such as HBASE::CACHEFLUSH entries - if (kv.matchingFamily(HLog.METAFAMILY) || - !Bytes.equals(key.getEncodedRegionName(), this.regionInfo.getEncodedNameAsBytes())) { - skippedEdits++; - continue; - } - // Figure which store the edit is meant for. - if (store == null || !kv.matchingFamily(store.getFamily().getName())) { - store = this.stores.get(kv.getFamily()); + if (firstSeqIdInLog == -1) { + firstSeqIdInLog = key.getLogSeqNum(); } - if (store == null) { - // This should never happen. Perhaps schema was changed between - // crash and redeploy? - LOG.warn("No family for " + kv); + // Now, figure if we should skip this edit. + if (key.getLogSeqNum() <= currentEditSeqId) { skippedEdits++; continue; } - // Once we are over the limit, restoreEdit will keep returning true to - // flush -- but don't flush until we've played all the kvs that make up - // the WALEdit. - flush = restoreEdit(store, kv); - editsCount++; - } - if (flush) internalFlushcache(null, currentEditSeqId, status); + currentEditSeqId = key.getLogSeqNum(); + boolean flush = false; + for (KeyValue kv: val.getKeyValues()) { + // Check this edit is for me. Also, guard against writing the special + // METACOLUMN info such as HBASE::CACHEFLUSH entries + if (kv.matchingFamily(HLog.METAFAMILY) || + !Bytes.equals(key.getEncodedRegionName(), this.regionInfo.getEncodedNameAsBytes())) { + skippedEdits++; + continue; + } + // Figure which store the edit is meant for. + if (store == null || !kv.matchingFamily(store.getFamily().getName())) { + store = this.stores.get(kv.getFamily()); + } + if (store == null) { + // This should never happen. Perhaps schema was changed between + // crash and redeploy? + LOG.warn("No family for " + kv); + skippedEdits++; + continue; + } + // Once we are over the limit, restoreEdit will keep returning true to + // flush -- but don't flush until we've played all the kvs that make up + // the WALEdit. + flush = restoreEdit(store, kv); + editsCount++; + } + if (flush) internalFlushcache(null, currentEditSeqId, status); - if (coprocessorHost != null) { - coprocessorHost.postWALRestore(this.getRegionInfo(), key, val); + if (coprocessorHost != null) { + coprocessorHost.postWALRestore(this.getRegionInfo(), key, val); + } } - } - } catch (EOFException eof) { - Path p = HLog.moveAsideBadEditsFile(fs, edits); - msg = "Encountered EOF. Most likely due to Master failure during " + - "log spliting, so we have this data in another edit. " + - "Continuing, but renaming " + edits + " as " + p; - LOG.warn(msg, eof); - status.abort(msg); - } catch (IOException ioe) { - // If the IOE resulted from bad file format, - // then this problem is idempotent and retrying won't help - if (ioe.getCause() instanceof ParseException) { + } catch (EOFException eof) { Path p = HLog.moveAsideBadEditsFile(fs, edits); - msg = "File corruption encountered! " + + msg = "Encountered EOF. Most likely due to Master failure during " + + "log spliting, so we have this data in another edit. " + "Continuing, but renaming " + edits + " as " + p; - LOG.warn(msg, ioe); - status.setStatus(msg); - } else { - status.abort(StringUtils.stringifyException(ioe)); - // other IO errors may be transient (bad network connection, - // checksum exception on one datanode, etc). throw & retry - throw ioe; + LOG.warn(msg, eof); + status.abort(msg); + } catch (IOException ioe) { + // If the IOE resulted from bad file format, + // then this problem is idempotent and retrying won't help + if (ioe.getCause() instanceof ParseException) { + Path p = HLog.moveAsideBadEditsFile(fs, edits); + msg = "File corruption encountered! " + + "Continuing, but renaming " + edits + " as " + p; + LOG.warn(msg, ioe); + status.setStatus(msg); + } else { + status.abort(StringUtils.stringifyException(ioe)); + // other IO errors may be transient (bad network connection, + // checksum exception on one datanode, etc). throw & retry + throw ioe; + } } - } - if (reporter != null && !reported_once) { - reporter.progress(); - } - msg = "Applied " + editsCount + ", skipped " + skippedEdits + - ", firstSequenceidInLog=" + firstSeqIdInLog + - ", maxSequenceidInLog=" + currentEditSeqId + ", path=" + edits; - status.markComplete(msg); - LOG.debug(msg); - return currentEditSeqId; + if (reporter != null && !reported_once) { + reporter.progress(); + } + msg = "Applied " + editsCount + ", skipped " + skippedEdits + + ", firstSequenceidInLog=" + firstSeqIdInLog + + ", maxSequenceidInLog=" + currentEditSeqId + ", path=" + edits; + status.markComplete(msg); + LOG.debug(msg); + return currentEditSeqId; } finally { reader.close(); status.cleanup(); @@ -2712,7 +2732,7 @@ public class HRegion implements HeapSize { // , Writable{ * @throws IOException */ private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p) - throws IOException { + throws IOException { FileStatus stat = fs.getFileStatus(p); if (stat.getLen() > 0) return false; LOG.warn("File " + p + " is zero-length, deleting."); @@ -2721,7 +2741,7 @@ public class HRegion implements HeapSize { // , Writable{ } protected Store instantiateHStore(Path tableDir, HColumnDescriptor c) - throws IOException { + throws IOException { return new Store(tableDir, this, c, this.fs, this.conf); } @@ -2801,7 +2821,7 @@ public class HRegion implements HeapSize { // , Writable{ try { HashedBytes rowKey = new HashedBytes(row); CountDownLatch rowLatch = new CountDownLatch(1); - + // loop until we acquire the row lock (unless !waitForLock) while (true) { CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch); @@ -2822,7 +2842,7 @@ public class HRegion implements HeapSize { // , Writable{ } } } - + // loop until we generate an unused lock id while (true) { Integer lockId = lockIdGenerator.incrementAndGet(); @@ -2848,7 +2868,7 @@ public class HRegion implements HeapSize { // , Writable{ HashedBytes rowKey = lockIds.get(lockid); return rowKey == null ? null : rowKey.getBytes(); } - + /** * Release the row lock! * @param lockId The lock ID to release. @@ -2899,7 +2919,7 @@ public class HRegion implements HeapSize { // , Writable{ } return lid; } - + /** * Determines whether multiple column families are present * Precondition: familyPaths is not null @@ -2994,7 +3014,7 @@ public class HRegion implements HeapSize { // , Writable{ try { store.bulkLoadHFile(path); } catch (IOException ioe) { - // a failure here causes an atomicity violation that we currently + // a failure here causes an atomicity violation that we currently // cannot recover from since it is likely a failed hdfs operation. // TODO Need a better story for reverting partial failures due to HDFS. @@ -3303,9 +3323,9 @@ public class HRegion implements HeapSize { // , Writable{ /** * Convenience method creating new HRegions. Used by createTable. - * The {@link HLog} for the created region needs to be closed explicitly. + * The {@link HLog} for the created region needs to be closed explicitly. * Use {@link HRegion#getLog()} to get access. - * + * * @param info Info for region to create. * @param rootDir Root directory for HBase instance * @param conf @@ -3332,14 +3352,14 @@ public class HRegion implements HeapSize { // , Writable{ HLog effectiveHLog = hlog; if (hlog == null) { effectiveHLog = new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), - new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf); + new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf); } HRegion region = HRegion.newHRegion(tableDir, effectiveHLog, fs, conf, info, hTableDescriptor, null); region.initialize(); return region; } - + /** * Open a Region. * @param info Info for region to be opened. @@ -3932,12 +3952,12 @@ public class HRegion implements HeapSize { // , Writable{ // TODO: There's a lot of boiler plate code identical // to increment... See how to better unify that. /** - * + * * Perform one or more append operations on a row. *

* Appends performed are done under row lock but reads do not take locks out * so this can be seen partially complete by gets and scans. - * + * * @param append * @param lockid * @param writeToWAL @@ -4308,7 +4328,7 @@ public class HRegion implements HeapSize { // , Writable{ public static final long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.OBJECT + // closeLock (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing - ClassSize.ATOMIC_LONG + // memStoreSize + ClassSize.ATOMIC_LONG + // memStoreSize ClassSize.ATOMIC_INTEGER + // lockIdGenerator (3 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds, scannerReadPoints WriteState.HEAP_SIZE + // writestate @@ -4533,13 +4553,13 @@ public class HRegion implements HeapSize { // , Writable{ if (this.explicitSplitPoint != null) { return this.explicitSplitPoint; } - + if (!splitPolicy.shouldSplit()) { return null; } - + byte[] ret = splitPolicy.getSplitPoint(); - + if (ret != null) { try { checkRow(ret, "calculated split"); @@ -4547,7 +4567,7 @@ public class HRegion implements HeapSize { // , Writable{ LOG.error("Ignoring invalid split", e); return null; } - } + } return ret; } diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 5daa02b1dfa..7281d641073 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -34,16 +34,30 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.MultithreadedTestUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; @@ -52,9 +66,13 @@ import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.NullComparator; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; @@ -64,9 +82,9 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.hbase.util.Threads; import org.junit.Test; +import org.junit.experimental.categories.Category; import com.google.common.collect.Lists; -import org.junit.experimental.categories.Category; /** @@ -1296,7 +1314,7 @@ public class TestHRegion extends HBaseTestCase { LOG.info("" + addContent(region, fam3)); region.flushcache(); region.compactStores(); - byte [] splitRow = region.checkSplit(); + byte [] splitRow = region.checkSplit(); assertNotNull(splitRow); LOG.info("SplitRow: " + Bytes.toString(splitRow)); HRegion [] subregions = splitRegion(region, splitRow); @@ -2170,7 +2188,7 @@ public class TestHRegion extends HBaseTestCase { } catch (Exception exception) { // Expected. } - + assertICV(row1, fam1, qual1, row1Field1); assertICV(row1, fam1, qual2, row1Field2); @@ -2302,7 +2320,7 @@ public class TestHRegion extends HBaseTestCase { LOG.info("" + addContent(region, fam3)); region.flushcache(); region.compactStores(); - byte [] splitRow = region.checkSplit(); + byte [] splitRow = region.checkSplit(); assertNotNull(splitRow); LOG.info("SplitRow: " + Bytes.toString(splitRow)); HRegion [] regions = splitRegion(region, splitRow); @@ -2337,7 +2355,7 @@ public class TestHRegion extends HBaseTestCase { // To make regions splitable force compaction. for (int i = 0; i < regions.length; i++) { regions[i].compactStores(); - midkeys[i] = regions[i].checkSplit(); + midkeys[i] = regions[i].checkSplit(); } TreeMap sortedMap = new TreeMap(); @@ -2809,6 +2827,115 @@ public class TestHRegion extends HBaseTestCase { region.get(g, null); } + public void testSkipRecoveredEditsReplay() throws Exception { + String method = "testSkipRecoveredEditsReplay"; + byte[] tableName = Bytes.toBytes(method); + byte[] family = Bytes.toBytes("family"); + Configuration conf = HBaseConfiguration.create(); + initHRegion(tableName, method, conf, family); + Path regiondir = region.getRegionDir(); + FileSystem fs = region.getFilesystem(); + byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); + + Path recoveredEditsDir = HLog.getRegionDirRecoveredEditsDir(regiondir); + + long maxSeqId = 1050; + long minSeqId = 1000; + + for (long i = minSeqId; i <= maxSeqId; i += 10) { + Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); + HLog.Writer writer = HLog.createWriter(fs, recoveredEdits, conf); + + long time = System.nanoTime(); + WALEdit edit = new WALEdit(); + edit.add(new KeyValue(row, family, Bytes.toBytes(i), + time, KeyValue.Type.Put, Bytes.toBytes(i))); + writer.append(new HLog.Entry(new HLogKey(regionName, tableName, + i, time, HConstants.DEFAULT_CLUSTER_ID), edit)); + + writer.close(); + } + MonitoredTask status = TaskMonitor.get().createStatus(method); + long seqId = region.replayRecoveredEditsIfAny(regiondir, minSeqId-1, null, status); + assertEquals(maxSeqId, seqId); + Get get = new Get(row); + Result result = region.get(get, null); + for (long i = minSeqId; i <= maxSeqId; i += 10) { + List kvs = result.getColumn(family, Bytes.toBytes(i)); + assertEquals(1, kvs.size()); + assertEquals(Bytes.toBytes(i), kvs.get(0).getValue()); + } + } + + public void testSkipRecoveredEditsReplaySomeIgnored() throws Exception { + String method = "testSkipRecoveredEditsReplaySomeIgnored"; + byte[] tableName = Bytes.toBytes(method); + byte[] family = Bytes.toBytes("family"); + initHRegion(tableName, method, HBaseConfiguration.create(), family); + Path regiondir = region.getRegionDir(); + FileSystem fs = region.getFilesystem(); + byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); + + Path recoveredEditsDir = HLog.getRegionDirRecoveredEditsDir(regiondir); + + long maxSeqId = 1050; + long minSeqId = 1000; + + for (long i = minSeqId; i <= maxSeqId; i += 10) { + Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); + HLog.Writer writer = HLog.createWriter(fs, recoveredEdits, conf); + + long time = System.nanoTime(); + WALEdit edit = new WALEdit(); + edit.add(new KeyValue(row, family, Bytes.toBytes(i), + time, KeyValue.Type.Put, Bytes.toBytes(i))); + writer.append(new HLog.Entry(new HLogKey(regionName, tableName, + i, time, HConstants.DEFAULT_CLUSTER_ID), edit)); + + writer.close(); + } + long recoverSeqId = 1030; + MonitoredTask status = TaskMonitor.get().createStatus(method); + long seqId = region.replayRecoveredEditsIfAny(regiondir, recoverSeqId-1, null, status); + assertEquals(maxSeqId, seqId); + Get get = new Get(row); + Result result = region.get(get, null); + for (long i = minSeqId; i <= maxSeqId; i += 10) { + List kvs = result.getColumn(family, Bytes.toBytes(i)); + if (i < recoverSeqId) { + assertEquals(0, kvs.size()); + } else { + assertEquals(1, kvs.size()); + assertEquals(Bytes.toBytes(i), kvs.get(0).getValue()); + } + } + } + + public void testSkipRecoveredEditsReplayAllIgnored() throws Exception { + String method = "testSkipRecoveredEditsReplayAllIgnored"; + byte[] tableName = Bytes.toBytes(method); + byte[] family = Bytes.toBytes("family"); + initHRegion(tableName, method, HBaseConfiguration.create(), family); + Path regiondir = region.getRegionDir(); + FileSystem fs = region.getFilesystem(); + + Path recoveredEditsDir = HLog.getRegionDirRecoveredEditsDir(regiondir); + for (int i = 1000; i < 1050; i += 10) { + Path recoveredEdits = new Path( + recoveredEditsDir, String.format("%019d", i)); + FSDataOutputStream dos= fs.create(recoveredEdits); + dos.writeInt(i); + dos.close(); + } + long minSeqId = 2000; + Path recoveredEdits = new Path( + recoveredEditsDir, String.format("%019d", minSeqId-1)); + FSDataOutputStream dos= fs.create(recoveredEdits); + dos.close(); + long seqId = region.replayRecoveredEditsIfAny(regiondir, minSeqId, null, null); + assertEquals(minSeqId, seqId); + } + public void testIndexesScanWithOneDeletedRow() throws IOException { byte[] tableName = Bytes.toBytes("testIndexesScanWithOneDeletedRow"); byte[] family = Bytes.toBytes("family"); @@ -2864,13 +2991,13 @@ public class TestHRegion extends HBaseTestCase { HColumnDescriptor hcd = new HColumnDescriptor(fam1, Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_COMPRESSION, false, true, HColumnDescriptor.DEFAULT_TTL, "rowcol"); - + HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(hcd); HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false); Path path = new Path(DIR + "testBloomFilterSize"); region = HRegion.createHRegion(info, path, conf, htd); - + int num_unique_rows = 10; int duplicate_multiplier =2; int num_storefiles = 4; @@ -2887,7 +3014,7 @@ public class TestHRegion extends HBaseTestCase { } region.flushcache(); } - //before compaction + //before compaction Store store = region.getStore(fam1); List storeFiles = store.getStorefiles(); for (StoreFile storefile : storeFiles) { @@ -2897,10 +3024,10 @@ public class TestHRegion extends HBaseTestCase { assertEquals(num_unique_rows*duplicate_multiplier, reader.getEntries()); assertEquals(num_unique_rows, reader.getFilterEntries()); } - - region.compactStores(true); - - //after compaction + + region.compactStores(true); + + //after compaction storeFiles = store.getStorefiles(); for (StoreFile storefile : storeFiles) { StoreFile.Reader reader = storefile.getReader(); @@ -2909,9 +3036,9 @@ public class TestHRegion extends HBaseTestCase { assertEquals(num_unique_rows*duplicate_multiplier*num_storefiles, reader.getEntries()); assertEquals(num_unique_rows, reader.getFilterEntries()); - } + } } - + public void testAllColumnsWithBloomFilter() throws IOException { byte [] TABLE = Bytes.toBytes("testAllColumnsWithBloomFilter"); byte [] FAMILY = Bytes.toBytes("family"); @@ -3002,13 +3129,13 @@ public class TestHRegion extends HBaseTestCase { final int DEFAULT_BLOCK_SIZE = 1024; htu.getConfiguration().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); htu.getConfiguration().setInt("dfs.replication", 2); - - + + // set up a cluster with 3 nodes MiniHBaseCluster cluster; String dataNodeHosts[] = new String[] { "host1", "host2", "host3" }; int regionServersCount = 3; - + try { cluster = htu.startMiniCluster(1, regionServersCount, dataNodeHosts); byte [][] families = {fam1, fam2}; @@ -3018,31 +3145,31 @@ public class TestHRegion extends HBaseTestCase { byte row[] = Bytes.toBytes("row1"); byte col[] = Bytes.toBytes("col1"); - Put put = new Put(row); + Put put = new Put(row); put.add(fam1, col, 1, Bytes.toBytes("test1")); put.add(fam2, col, 1, Bytes.toBytes("test2")); ht.put(put); - + HRegion firstRegion = htu.getHBaseCluster(). getRegions(Bytes.toBytes(this.getName())).get(0); firstRegion.flushcache(); HDFSBlocksDistribution blocksDistribution1 = firstRegion.getHDFSBlocksDistribution(); - + // given the default replication factor is 2 and we have 2 HFiles, // we will have total of 4 replica of blocks on 3 datanodes; thus there // must be at least one host that have replica for 2 HFiles. That host's // weight will be equal to the unique block weight. long uniqueBlocksWeight1 = blocksDistribution1.getUniqueBlocksTotalWeight(); - + String topHost = blocksDistribution1.getTopHosts().get(0); long topHostWeight = blocksDistribution1.getWeight(topHost); assertTrue(uniqueBlocksWeight1 == topHostWeight); - + // use the static method to compute the value, it should be the same. // static method is used by load balancer or other components - HDFSBlocksDistribution blocksDistribution2 = + HDFSBlocksDistribution blocksDistribution2 = HRegion.computeHDFSBlocksDistribution(htu.getConfiguration(), firstRegion.getTableDesc(), firstRegion.getRegionInfo().getEncodedName()); @@ -3054,7 +3181,7 @@ public class TestHRegion extends HBaseTestCase { htu.shutdownMiniCluster(); } } - + private void putData(int startRow, int numRows, byte [] qf, byte [] ...families) throws IOException {