From 2baf3bfc9fbe50327dc09389efdffe8c6e71af5f Mon Sep 17 00:00:00 2001 From: stack Date: Mon, 8 Jun 2015 10:39:08 -0700 Subject: [PATCH] HBASE-13811 Splitting WALs, we are filtering out too many edits -> DATALOSS --- .../ZKSplitLogManagerCoordination.java | 2 +- .../hadoop/hbase/master/RegionStates.java | 7 +- .../hadoop/hbase/master/ServerManager.java | 14 + .../regionserver/FlushLargeStoresPolicy.java | 6 +- .../hadoop/hbase/regionserver/HRegion.java | 137 ++++--- .../hbase/regionserver/HRegionServer.java | 22 +- .../hadoop/hbase/regionserver/HStore.java | 13 +- .../hadoop/hbase/regionserver/Region.java | 14 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 354 +++-------------- .../wal/SequenceIdAccounting.java | 363 ++++++++++++++++++ .../hadoop/hbase/wal/DisabledWALProvider.java | 11 +- .../java/org/apache/hadoop/hbase/wal/WAL.java | 49 ++- .../apache/hadoop/hbase/wal/WALSplitter.java | 43 ++- .../hadoop/hbase/ipc/TestCallRunner.java | 1 - .../master/TestGetLastFlushedSequenceId.java | 1 + .../hbase/regionserver/TestHRegion.java | 29 ++ .../regionserver/TestSplitWalDataLoss.java | 149 +++++++ .../hbase/regionserver/wal/TestFSHLog.java | 52 +-- .../wal/TestSequenceIdAccounting.java | 132 +++++++ 19 files changed, 912 insertions(+), 487 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java index 556a143b22a..0ae3a004f42 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java @@ -645,7 +645,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements lastSequenceId = lastRecordedFlushedSequenceId; } ZKUtil.createSetData(this.watcher, nodePath, - ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null)); + ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null)); if (LOG.isDebugEnabled()) { LOG.debug("Marked " + regionEncodeName + " recovering from " + serverName + ": " + nodePath); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java index 58a826056b3..c658475ed80 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java @@ -453,7 +453,7 @@ public class RegionStates { ServerName oldServerName = regionAssignments.put(hri, serverName); if (!serverName.equals(oldServerName)) { if (LOG.isDebugEnabled()) { - LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName + " " + hri); + LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName); } else { LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName); } @@ -644,7 +644,7 @@ public class RegionStates { // Region is open on this region server, but in transition. // This region must be moving away from this server, or splitting/merging. // SSH will handle it, either skip assigning, or re-assign. - LOG.info("Transitioning " + state + " will be handled by SSH for " + sn); + LOG.info("Transitioning " + state + " will be handled by ServerCrashProcedure for " + sn); } else if (sn.equals(state.getServerName())) { // Region is in transition on this region server, and this // region is not open on this server. So the region must be @@ -654,7 +654,8 @@ public class RegionStates { // transition. The region could be in failed_close state too if we have // tried several times to open it while this region server is not reachable) if (state.isPendingOpenOrOpening() || state.isFailedClose() || state.isOffline()) { - LOG.info("Found region in " + state + " to be reassigned by SSH for " + sn); + LOG.info("Found region in " + state + + " to be reassigned by ServerCrashProcedure for " + sn); rits.add(hri); } else if(state.isSplittingNew()) { regionsToCleanIfNoMetaEntry.add(state.getRegion()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index bdc73585904..25144b8155b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -122,9 +122,15 @@ public class ServerManager { // Set if we are to shutdown the cluster. private volatile boolean clusterShutdown = false; + /** + * The last flushed sequence id for a region. + */ private final ConcurrentNavigableMap flushedSequenceIdByRegion = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + /** + * The last flushed sequence id for a store in a region. + */ private final ConcurrentNavigableMap> storeFlushedSequenceIdsByRegion = new ConcurrentSkipListMap>(Bytes.BYTES_COMPARATOR); @@ -293,6 +299,10 @@ public class ServerManager { Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName); long l = entry.getValue().getCompleteSequenceId(); // Don't let smaller sequence ids override greater sequence ids. + if (LOG.isTraceEnabled()) { + LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue + + ", completeSequenceId=" + l); + } if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) { flushedSequenceIdByRegion.put(encodedRegionName, l); } else if (l != HConstants.NO_SEQNUM && l < existingValue) { @@ -306,6 +316,10 @@ public class ServerManager { byte[] family = storeSeqId.getFamilyName().toByteArray(); existingValue = storeFlushedSequenceId.get(family); l = storeSeqId.getSequenceId(); + if (LOG.isTraceEnabled()) { + LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family) + + ", existingValue=" + existingValue + ", completeSequenceId=" + l); + } // Don't let smaller sequence ids override greater sequence ids. if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) { storeFlushedSequenceId.put(family, l); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java index 7e0e54c08a5..328e890a3ed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java @@ -76,9 +76,9 @@ public class FlushLargeStoresPolicy extends FlushPolicy { private boolean shouldFlush(Store store) { if (store.getMemStoreSize() > this.flushSizeLowerBound) { if (LOG.isDebugEnabled()) { - LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + region - + " will be flushed because of memstoreSize(" + store.getMemStoreSize() - + ") is larger than lower bound(" + this.flushSizeLowerBound + ")"); + LOG.debug("Flush Column Family " + store.getColumnFamilyName() + " of " + + region.getRegionInfo().getEncodedName() + " because memstoreSize=" + + store.getMemStoreSize() + " > lower bound=" + this.flushSizeLowerBound); } return true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 94a193f5fac..271a6eb4a52 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -22,7 +22,6 @@ import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; -import java.io.UnsupportedEncodingException; import java.lang.reflect.Constructor; import java.text.ParseException; import java.util.AbstractList; @@ -217,13 +216,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi final AtomicBoolean closing = new AtomicBoolean(false); /** - * The max sequence id of flushed data on this region. Used doing some rough calculations on - * whether time to flush or not. + * The max sequence id of flushed data on this region. There is no edit in memory that is + * less that this sequence id. */ private volatile long maxFlushedSeqId = HConstants.NO_SEQNUM; /** - * Record the sequence id of last flush operation. + * Record the sequence id of last flush operation. Can be in advance of + * {@link #maxFlushedSeqId} when flushing a single column family. In this case, + * {@link #maxFlushedSeqId} will be older than the oldest edit in memory. */ private volatile long lastFlushOpSeqId = HConstants.NO_SEQNUM; /** @@ -608,6 +609,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * is new), then read them from the supplied path. * @param htd the table descriptor * @param rsServices reference to {@link RegionServerServices} or null + * @deprecated Use other constructors. */ @Deprecated public HRegion(final Path tableDir, final WAL wal, final FileSystem fs, @@ -1610,16 +1612,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi byte[] encodedRegionName = this.getRegionInfo().getEncodedNameAsBytes(); regionLoadBldr.clearStoreCompleteSequenceId(); for (byte[] familyName : this.stores.keySet()) { - long oldestUnflushedSeqId = this.wal.getEarliestMemstoreSeqNum(encodedRegionName, familyName); - // no oldestUnflushedSeqId means no data has written to the store after last flush, so we use - // lastFlushOpSeqId as complete sequence id for the store. - regionLoadBldr.addStoreCompleteSequenceId(StoreSequenceId - .newBuilder() - .setFamilyName(ByteString.copyFrom(familyName)) - .setSequenceId( - oldestUnflushedSeqId < 0 ? lastFlushOpSeqIdLocal : oldestUnflushedSeqId - 1).build()); + long earliest = this.wal.getEarliestMemstoreSeqNum(encodedRegionName, familyName); + // Subtract - 1 to go earlier than the current oldest, unflushed edit in memstore; this will + // give us a sequence id that is for sure flushed. We want edit replay to start after this + // sequence id in this region. If NO_SEQNUM, use the regions maximum flush id. + long csid = (earliest == HConstants.NO_SEQNUM)? lastFlushOpSeqIdLocal: earliest - 1; + regionLoadBldr.addStoreCompleteSequenceId(StoreSequenceId. + newBuilder().setFamilyName(ByteString.copyFrom(familyName)).setSequenceId(csid).build()); } - return regionLoadBldr.setCompleteSequenceId(this.maxFlushedSeqId); + return regionLoadBldr.setCompleteSequenceId(getMaxFlushedSeqId()); } ////////////////////////////////////////////////////////////////////////////// @@ -1912,27 +1913,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * returns true which will make a lot of flush requests. */ boolean shouldFlushStore(Store store) { - long maxFlushedSeqId = - this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), store - .getFamily().getName()) - 1; - if (maxFlushedSeqId > 0 && maxFlushedSeqId + flushPerChanges < sequenceId.get()) { + long earliest = this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), + store.getFamily().getName()) - 1; + if (earliest > 0 && earliest + flushPerChanges < sequenceId.get()) { if (LOG.isDebugEnabled()) { - LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this - + " will be flushed because its max flushed seqId(" + maxFlushedSeqId - + ") is far away from current(" + sequenceId.get() + "), max allowed is " - + flushPerChanges); + LOG.debug("Flush column family " + store.getColumnFamilyName() + " of " + + getRegionInfo().getEncodedName() + " because unflushed sequenceid=" + earliest + + " is > " + this.flushPerChanges + " from current=" + sequenceId.get()); } return true; } - if (flushCheckInterval <= 0) { + if (this.flushCheckInterval <= 0) { return false; } long now = EnvironmentEdgeManager.currentTime(); - if (store.timeOfOldestEdit() < now - flushCheckInterval) { + if (store.timeOfOldestEdit() < now - this.flushCheckInterval) { if (LOG.isDebugEnabled()) { - LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this - + " will be flushed because time of its oldest edit (" + store.timeOfOldestEdit() - + ") is far away from now(" + now + "), max allowed is " + flushCheckInterval); + LOG.debug("Flush column family: " + store.getColumnFamilyName() + " of " + + getRegionInfo().getEncodedName() + " because time of oldest edit=" + + store.timeOfOldestEdit() + " is > " + this.flushCheckInterval + " from now =" + now); } return true; } @@ -2086,18 +2085,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } if (LOG.isInfoEnabled()) { - LOG.info("Started memstore flush for " + this + ", current region memstore size " - + StringUtils.byteDesc(this.memstoreSize.get()) + ", and " + storesToFlush.size() + "/" - + stores.size() + " column families' memstores are being flushed." - + ((wal != null) ? "" : "; wal is null, using passed sequenceid=" + myseqid)); - // only log when we are not flushing all stores. - if (this.stores.size() > storesToFlush.size()) { + // Log a fat line detailing what is being flushed. + StringBuilder perCfExtras = null; + if (!isAllFamilies(storesToFlush)) { + perCfExtras = new StringBuilder(); for (Store store: storesToFlush) { - LOG.info("Flushing Column Family: " + store.getColumnFamilyName() - + " which was occupying " - + StringUtils.byteDesc(store.getMemStoreSize()) + " of memstore."); + perCfExtras.append("; "); + perCfExtras.append(store.getColumnFamilyName()); + perCfExtras.append("="); + perCfExtras.append(StringUtils.byteDesc(store.getMemStoreSize())); } } + LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() + + " column families, memstore=" + StringUtils.byteDesc(this.memstoreSize.get()) + + ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") + + ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + myseqid)); } // Stop updates while we snapshot the memstore of all of these regions' stores. We only have // to do this for a moment. It is quick. We also set the memstore size to zero here before we @@ -2123,10 +2125,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi TreeMap> committedFiles = new TreeMap>( Bytes.BYTES_COMPARATOR); // The sequence id of this flush operation which is used to log FlushMarker and pass to - // createFlushContext to use as the store file's sequence id. + // createFlushContext to use as the store file's sequence id. It can be in advance of edits + // still in the memstore, edits that are in other column families yet to be flushed. long flushOpSeqId = HConstants.NO_SEQNUM; - // The max flushed sequence id after this flush operation. Used as completeSequenceId which is - // passed to HMaster. + // The max flushed sequence id after this flush operation completes. All edits in memstore + // will be in advance of this sequence id. long flushedSeqId = HConstants.NO_SEQNUM; byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes(); @@ -2135,21 +2138,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { w = mvcc.beginMemstoreInsert(); if (wal != null) { - if (!wal.startCacheFlush(encodedRegionName, flushedFamilyNames)) { - // This should never happen. - String msg = "Flush will not be started for [" - + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; + Long earliestUnflushedSequenceIdForTheRegion = + wal.startCacheFlush(encodedRegionName, flushedFamilyNames); + if (earliestUnflushedSequenceIdForTheRegion == null) { + // This should never happen. This is how startCacheFlush signals flush cannot proceed. + String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing."; status.setStatus(msg); return new PrepareFlushResult( new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false), myseqid); } flushOpSeqId = getNextSequenceId(wal); - long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName); - // no oldestUnflushedSeqId means we flushed all stores. - // or the unflushed stores are all empty. - flushedSeqId = (oldestUnflushedSeqId == HConstants.NO_SEQNUM) ? flushOpSeqId - : oldestUnflushedSeqId - 1; + // Back up 1, minus 1 from oldest sequence id in memstore to get last 'flushed' edit + flushedSeqId = + earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM? + flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1; } else { // use the provided sequence Id as WAL is not being used for this flush. flushedSeqId = flushOpSeqId = myseqid; @@ -2229,6 +2232,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi flushedSeqId, totalFlushableSizeOfFlushableStores); } + /** + * @param families + * @return True if passed Set is all families in the region. + */ + private boolean isAllFamilies(final Collection families) { + return families == null || this.stores.size() == families.size(); + } + /** * Writes a marker to WAL indicating a flush is requested but cannot be complete due to various * reasons. Ignores exceptions from WAL. Returns whether the write succeeded. @@ -2344,10 +2355,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.lastStoreFlushTimeMap.put(store, startTime); } - // Update the oldest unflushed sequence id for region. this.maxFlushedSeqId = flushedSeqId; - - // Record flush operation sequence id. this.lastFlushOpSeqId = flushOpSeqId; // C. Finally notify anyone waiting on memstore to clear: @@ -3704,7 +3712,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Make request outside of synchronize block; HBASE-818. this.rsServices.getFlushRequester().requestFlush(this, false); if (LOG.isDebugEnabled()) { - LOG.debug("Flush requested on " + this); + LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName()); } } @@ -4438,7 +4446,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi + seqId + " is greater than current seqId:" + currentSeqId); // Prepare flush (take a snapshot) and then abort (drop the snapshot) - if (store == null ) { + if (store == null) { for (Store s : stores.values()) { totalFreedSize += doDropStoreMemstoreContentsForSeqId(s, currentSeqId); } @@ -5432,11 +5440,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); } else if (scannerContext.checkSizeLimit(limitScope)) { ScannerContext.NextState state = - moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; + moreCellsInRow? NextState.SIZE_LIMIT_REACHED_MID_ROW: NextState.SIZE_LIMIT_REACHED; return scannerContext.setScannerState(state).hasMoreValues(); } else if (scannerContext.checkTimeLimit(limitScope)) { ScannerContext.NextState state = - moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; + moreCellsInRow? NextState.TIME_LIMIT_REACHED_MID_ROW: NextState.TIME_LIMIT_REACHED; return scannerContext.setScannerState(state).hasMoreValues(); } } while (moreCellsInRow); @@ -5815,7 +5823,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi LOG.error(msg); LOG.error("unable to refresh store files", e); abortRegionServer(msg); - return new NotServingRegionException(getRegionInfo().getRegionNameAsString() +" is closing"); + return new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " closing"); } } @@ -5968,12 +5976,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return new HRegion * @throws IOException */ - public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir, - final Configuration conf, - final HTableDescriptor hTableDescriptor, - final WAL wal, - final boolean initialize, final boolean ignoreWAL) - throws IOException { + public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, + final Path tableDir, final Configuration conf, final HTableDescriptor hTableDescriptor, + final WAL wal, final boolean initialize, final boolean ignoreWAL) + throws IOException { LOG.info("creating HRegion " + info.getTable().getNameAsString() + " HTD == " + hTableDescriptor + " RootDir = " + rootDir + " Table name == " + info.getTable().getNameAsString()); @@ -6338,6 +6344,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param tabledir qualified path for table * @param name ENCODED region name * @return Path of HRegion directory + * @deprecated For tests only; to be removed. */ @Deprecated public static Path getRegionDir(final Path tabledir, final String name) { @@ -6350,6 +6357,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param rootdir qualified path of HBase root directory * @param info HRegionInfo for the region * @return qualified path of region directory + * @deprecated For tests only; to be removed. */ @Deprecated @VisibleForTesting @@ -7170,7 +7178,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi newTags.add(itr.next()); } } - if (i < ( edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1))) + if (i < (edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1))) idx++; } @@ -7752,6 +7760,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // sync the WAL edit (SYNC and FSYNC treated the same for now) this.wal.sync(txid); break; + default: + throw new RuntimeException("Unknown durability " + durability); } } } @@ -7840,8 +7850,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public long getOldestSeqIdOfStore(byte[] familyName) { - return wal.getEarliestMemstoreSeqNum(getRegionInfo() - .getEncodedNameAsBytes(), familyName); + return wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), familyName); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 843c0a7ab30..a70456a3b0f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2835,24 +2835,14 @@ public class HRegionServer extends HasThread implements @Override public boolean removeFromOnlineRegions(final Region r, ServerName destination) { Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); - if (destination != null) { - try { - WAL wal = getWAL(r.getRegionInfo()); - long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes()); - if (closeSeqNum == HConstants.NO_SEQNUM) { - // No edits in WAL for this region; get the sequence number when the region was opened. - closeSeqNum = r.getOpenSeqNum(); - if (closeSeqNum == HConstants.NO_SEQNUM) { - closeSeqNum = 0; - } - } - addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum); - } catch (IOException exception) { - LOG.error("Could not retrieve WAL information for region " + r.getRegionInfo() + - "; not adding to moved regions."); - LOG.debug("Exception details for failure to get wal", exception); + long closeSeqNum = r.getMaxFlushedSeqId(); + if (closeSeqNum == HConstants.NO_SEQNUM) { + // No edits in WAL for this region; get the sequence number when the region was opened. + closeSeqNum = r.getOpenSeqNum(); + if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0; } + addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum); } this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName()); return toReturn != null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 98dab42dc9d..0c6b2f0bb29 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -111,16 +111,6 @@ import com.google.common.collect.Sets; * services is compaction services where files are aggregated once they pass * a configurable threshold. * - *

The only thing having to do with logs that Store needs to deal with is - * the reconstructionLog. This is a segment of an HRegion's log that might - * NOT be present upon startup. If the param is NULL, there's nothing to do. - * If the param is non-NULL, we need to process the log to reconstruct - * a TreeMap that might not have been written to disk before the process - * died. - * - *

It's assumed that after this constructor returns, the reconstructionLog - * file will be deleted (by whoever has instantiated the Store). - * *

Locking and transactions are handled at a higher level. This API should * not be called directly but by an HRegion manager. */ @@ -898,8 +888,7 @@ public class HStore implements Store { } /** - * Write out current snapshot. Presumes {@link #snapshot()} has been called - * previously. + * Write out current snapshot. Presumes {@link #snapshot()} has been called previously. * @param logCacheFlushId flush sequence number * @param snapshot * @param status diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 891004286f7..da642ca24ba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServic import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.Message; import com.google.protobuf.RpcController; import com.google.protobuf.Service; @@ -123,10 +124,19 @@ public interface Region extends ConfigurationObserver { /** @return the latest sequence number that was read from storage when this region was opened */ long getOpenSeqNum(); - /** @return the max sequence id of flushed data on this region */ + /** @return the max sequence id of flushed data on this region; no edit in memory will have + * a sequence id that is less that what is returned here. + */ long getMaxFlushedSeqId(); - /** @return the oldest sequence id found in the store for the given family */ + /** @return the oldest flushed sequence id for the given family; can be beyond + * {@link #getMaxFlushedSeqId()} in case where we've flushed a subset of a regions column + * families + * @deprecated Since version 1.2.0. Exposes too much about our internals; shutting it down. + * Do not use. + */ + @VisibleForTesting + @Deprecated public long getOldestSeqIdOfStore(byte[] familyName); /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index aa722a0c8bb..b118ecd967d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -29,16 +29,13 @@ import java.net.URLEncoder; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Set; -import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -66,7 +63,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; @@ -91,7 +87,6 @@ import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Maps; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.ExceptionHandler; @@ -156,7 +151,7 @@ public class FSHLog implements WAL { private static final Log LOG = LogFactory.getLog(FSHLog.class); private static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms - + /** * The nexus at which all incoming handlers meet. Does appends and sync with an ordering. * Appends and syncs are each put on the ring which means handlers need to @@ -168,7 +163,7 @@ public class FSHLog implements WAL { private final Disruptor disruptor; /** - * An executorservice that runs the disrutpor AppendEventHandler append executor. + * An executorservice that runs the disruptor AppendEventHandler append executor. */ private final ExecutorService appendExecutor; @@ -210,6 +205,7 @@ public class FSHLog implements WAL { * WAL directory, where all WAL files would be placed. */ private final Path fullPathLogDir; + /** * dir path where old logs are kept. */ @@ -241,6 +237,7 @@ public class FSHLog implements WAL { * conf object */ protected final Configuration conf; + /** Listeners that are called on WAL events. */ private final List listeners = new CopyOnWriteArrayList(); @@ -258,6 +255,7 @@ public class FSHLog implements WAL { public WALCoprocessorHost getCoprocessorHost() { return coprocessorHost; } + /** * FSDataOutputStream associated with the current SequenceFile.writer */ @@ -288,6 +286,13 @@ public class FSHLog implements WAL { // Enable it if the replications recover. private volatile boolean lowReplicationRollEnabled = true; + /** + * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding + * sequence id as yet not flushed as well as the most recent edit sequence id appended to the + * WAL. Has facility for answering questions such as "Is it safe to GC a WAL?". + */ + private SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting(); + /** * Current log file. */ @@ -334,52 +339,6 @@ public class FSHLog implements WAL { private final AtomicInteger closeErrorCount = new AtomicInteger(); - // Region sequence id accounting across flushes and for knowing when we can GC a WAL. These - // sequence id numbers are by region and unrelated to the ring buffer sequence number accounting - // done above in failedSequence, highest sequence, etc. - /** - * This lock ties all operations on lowestFlushingStoreSequenceIds and - * oldestUnflushedStoreSequenceIds Maps with the exception of append's putIfAbsent call into - * oldestUnflushedStoreSequenceIds. We use these Maps to find out the low bound regions - * sequence id, or to find regions with old sequence ids to force flush; we are interested in - * old stuff not the new additions (TODO: IS THIS SAFE? CHECK!). - */ - private final Object regionSequenceIdLock = new Object(); - - /** - * Map of encoded region names and family names to their OLDEST -- i.e. their first, - * the longest-lived -- sequence id in memstore. Note that this sequence id is the region - * sequence id. This is not related to the id we use above for {@link #highestSyncedSequence} - * and {@link #highestUnsyncedSequence} which is the sequence from the disruptor - * ring buffer. - */ - private final ConcurrentMap> oldestUnflushedStoreSequenceIds - = new ConcurrentSkipListMap>( - Bytes.BYTES_COMPARATOR); - - /** - * Map of encoded region names and family names to their lowest or OLDEST sequence/edit id in - * memstore currently being flushed out to hfiles. Entries are moved here from - * {@link #oldestUnflushedStoreSequenceIds} while the lock {@link #regionSequenceIdLock} is held - * (so movement between the Maps is atomic). This is not related to the id we use above for - * {@link #highestSyncedSequence} and {@link #highestUnsyncedSequence} which is the sequence from - * the disruptor ring buffer, an internal detail. - */ - private final Map> lowestFlushingStoreSequenceIds = - new TreeMap>(Bytes.BYTES_COMPARATOR); - - /** - * Map of region encoded names to the latest region sequence id. Updated on each append of - * WALEdits to the WAL. We create one map for each WAL file at the time it is rolled. - *

When deciding whether to archive a WAL file, we compare the sequence IDs in this map to - * {@link #lowestFlushingRegionSequenceIds} and {@link #oldestUnflushedRegionSequenceIds}. - * See {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} for more info. - *

- * This map uses byte[] as the key, and uses reference equality. It works in our use case as we - * use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns - * the same array. - */ - private Map highestRegionSequenceIds = new HashMap(); /** * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. @@ -396,7 +355,7 @@ public class FSHLog implements WAL { }; /** - * Map of wal log file to the latest sequence ids of all regions it has entries of. + * Map of WAL log file to the latest sequence ids of all regions it has entries of. * The map is sorted by the log file creation timestamp (contained in the log file name). */ private NavigableMap> byWalRegionSequenceIds = @@ -542,7 +501,7 @@ public class FSHLog implements WAL { (long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f)); this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); - this.minTolerableReplication = conf.getInt( "hbase.regionserver.hlog.tolerable.lowreplication", + this.minTolerableReplication = conf.getInt("hbase.regionserver.hlog.tolerable.lowreplication", FSUtils.getDefaultReplication(fs, this.fullPathLogDir)); this.lowReplicationRollLimit = conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", 5); @@ -745,128 +704,37 @@ public class FSHLog implements WAL { return DefaultWALProvider.createWriter(conf, fs, path, false); } - private long getLowestSeqId(Map seqIdMap) { - long result = HConstants.NO_SEQNUM; - for (Long seqNum: seqIdMap.values()) { - if (result == HConstants.NO_SEQNUM || seqNum.longValue() < result) { - result = seqNum.longValue(); - } - } - return result; - } - - private > Map copyMapWithLowestSeqId( - Map mapToCopy) { - Map copied = Maps.newHashMap(); - for (Map.Entry entry: mapToCopy.entrySet()) { - long lowestSeqId = getLowestSeqId(entry.getValue()); - if (lowestSeqId != HConstants.NO_SEQNUM) { - copied.put(entry.getKey(), lowestSeqId); - } - } - return copied; - } - /** - * Archive old logs that could be archived: a log is eligible for archiving if all its WALEdits - * have been flushed to hfiles. - *

- * For each log file, it compares its region to sequenceId map - * (@link {@link FSHLog#highestRegionSequenceIds} with corresponding region entries in - * {@link FSHLog#lowestFlushingRegionSequenceIds} and - * {@link FSHLog#oldestUnflushedRegionSequenceIds}. If all the regions in the map are flushed - * past of their value, then the wal is eligible for archiving. + * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed. * @throws IOException */ private void cleanOldLogs() throws IOException { - Map lowestFlushingRegionSequenceIdsLocal = null; - Map oldestUnflushedRegionSequenceIdsLocal = null; - List logsToArchive = new ArrayList(); - // make a local copy so as to avoid locking when we iterate over these maps. - synchronized (regionSequenceIdLock) { - lowestFlushingRegionSequenceIdsLocal = - copyMapWithLowestSeqId(this.lowestFlushingStoreSequenceIds); - oldestUnflushedRegionSequenceIdsLocal = - copyMapWithLowestSeqId(this.oldestUnflushedStoreSequenceIds); - } - for (Map.Entry> e : byWalRegionSequenceIds.entrySet()) { - // iterate over the log file. + List logsToArchive = null; + // For each log file, look at its Map of regions to highest sequence id; if all sequence ids + // are older than what is currently in memory, the WAL can be GC'd. + for (Map.Entry> e : this.byWalRegionSequenceIds.entrySet()) { Path log = e.getKey(); Map sequenceNums = e.getValue(); - // iterate over the map for this log file, and tell whether it should be archive or not. - if (areAllRegionsFlushed(sequenceNums, lowestFlushingRegionSequenceIdsLocal, - oldestUnflushedRegionSequenceIdsLocal)) { + if (this.sequenceIdAccounting.areAllLower(sequenceNums)) { + if (logsToArchive == null) logsToArchive = new ArrayList(); logsToArchive.add(log); - LOG.debug("WAL file ready for archiving " + log); + if (LOG.isTraceEnabled()) LOG.trace("WAL file ready for archiving " + log); } } - for (Path p : logsToArchive) { - this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen()); - archiveLogFile(p); - this.byWalRegionSequenceIds.remove(p); - } - } - - /** - * Takes a region:sequenceId map for a WAL file, and checks whether the file can be archived. - * It compares the region entries present in the passed sequenceNums map with the local copy of - * {@link #oldestUnflushedRegionSequenceIds} and {@link #lowestFlushingRegionSequenceIds}. If, - * for all regions, the value is lesser than the minimum of values present in the - * oldestFlushing/UnflushedSeqNums, then the wal file is eligible for archiving. - * @param sequenceNums for a WAL, at the time when it was rolled. - * @param oldestFlushingMap - * @param oldestUnflushedMap - * @return true if wal is eligible for archiving, false otherwise. - */ - static boolean areAllRegionsFlushed(Map sequenceNums, - Map oldestFlushingMap, Map oldestUnflushedMap) { - for (Map.Entry regionSeqIdEntry : sequenceNums.entrySet()) { - // find region entries in the flushing/unflushed map. If there is no entry, it meansj - // a region doesn't have any unflushed entry. - long oldestFlushing = oldestFlushingMap.containsKey(regionSeqIdEntry.getKey()) ? - oldestFlushingMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE; - long oldestUnFlushed = oldestUnflushedMap.containsKey(regionSeqIdEntry.getKey()) ? - oldestUnflushedMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE; - // do a minimum to be sure to contain oldest sequence Id - long minSeqNum = Math.min(oldestFlushing, oldestUnFlushed); - if (minSeqNum <= regionSeqIdEntry.getValue()) return false;// can't archive - } - return true; - } - - /** - * Iterates over the given map of regions, and compares their sequence numbers with corresponding - * entries in {@link #oldestUnflushedRegionSequenceIds}. If the sequence number is greater or - * equal, the region is eligible to flush, otherwise, there is no benefit to flush (from the - * perspective of passed regionsSequenceNums map), because the region has already flushed the - * entries present in the WAL file for which this method is called for (typically, the oldest - * wal file). - * @param regionsSequenceNums - * @return regions which should be flushed (whose sequence numbers are larger than their - * corresponding un-flushed entries. - */ - private byte[][] findEligibleMemstoresToFlush(Map regionsSequenceNums) { - List regionsToFlush = null; - // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock. - synchronized (regionSequenceIdLock) { - for (Map.Entry e: regionsSequenceNums.entrySet()) { - long unFlushedVal = getEarliestMemstoreSeqNum(e.getKey()); - if (unFlushedVal != HConstants.NO_SEQNUM && unFlushedVal <= e.getValue()) { - if (regionsToFlush == null) - regionsToFlush = new ArrayList(); - regionsToFlush.add(e.getKey()); - } + if (logsToArchive != null) { + for (Path p : logsToArchive) { + this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen()); + archiveLogFile(p); + this.byWalRegionSequenceIds.remove(p); } } - return regionsToFlush == null ? null : regionsToFlush - .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY }); } /** - * If the number of un-archived WAL files is greater than maximum allowed, it checks - * the first (oldest) WAL file, and returns the regions which should be flushed so that it could + * If the number of un-archived WAL files is greater than maximum allowed, check the first + * (oldest) WAL file, and returns those regions which should be flushed so that it can * be archived. - * @return regions to flush in order to archive oldest wal file. + * @return regions (encodedRegionNames) to flush in order to archive oldest WAL file. * @throws IOException */ byte[][] findRegionsToForceFlush() throws IOException { @@ -875,7 +743,7 @@ public class FSHLog implements WAL { if (logCount > this.maxLogs && logCount > 0) { Map.Entry> firstWALEntry = this.byWalRegionSequenceIds.firstEntry(); - regions = findEligibleMemstoresToFlush(firstWALEntry.getValue()); + regions = this.sequenceIdAccounting.findLower(firstWALEntry.getValue()); } if (regions != null) { StringBuilder sb = new StringBuilder(); @@ -883,9 +751,8 @@ public class FSHLog implements WAL { if (i > 0) sb.append(", "); sb.append(Bytes.toStringBinary(regions[i])); } - LOG.info("Too many wals: logs=" + logCount + ", maxlogs=" + - this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " + - sb.toString()); + LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs + + "; forcing flush of " + regions.length + " regions(s): " + sb.toString()); } return regions; } @@ -963,8 +830,7 @@ public class FSHLog implements WAL { this.numEntries.set(0); final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath)); if (oldPath != null) { - this.byWalRegionSequenceIds.put(oldPath, this.highestRegionSequenceIds); - this.highestRegionSequenceIds = new HashMap(); + this.byWalRegionSequenceIds.put(oldPath, this.sequenceIdAccounting.resetHighest()); long oldFileLen = this.fs.getFileStatus(oldPath).getLen(); this.totalLogSize.addAndGet(oldFileLen); LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries + @@ -1108,7 +974,7 @@ public class FSHLog implements WAL { LOG.debug("Moved " + files.length + " WAL file(s) to " + FSUtils.getPath(this.fullPathArchiveDir)); } - LOG.info("Closed WAL: " + toString() ); + LOG.info("Closed WAL: " + toString()); } @Override @@ -1631,108 +1497,24 @@ public class FSHLog implements WAL { } @Override - public boolean startCacheFlush(final byte[] encodedRegionName, - Set flushedFamilyNames) { - Map oldStoreSeqNum = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + public Long startCacheFlush(final byte[] encodedRegionName, Set families) { if (!closeBarrier.beginOp()) { - LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) + - " - because the server is closing."); - return false; + LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing."); + return null; } - synchronized (regionSequenceIdLock) { - ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = - oldestUnflushedStoreSequenceIds.get(encodedRegionName); - if (oldestUnflushedStoreSequenceIdsOfRegion != null) { - for (byte[] familyName: flushedFamilyNames) { - Long seqId = oldestUnflushedStoreSequenceIdsOfRegion.remove(familyName); - if (seqId != null) { - oldStoreSeqNum.put(familyName, seqId); - } - } - if (!oldStoreSeqNum.isEmpty()) { - Map oldValue = this.lowestFlushingStoreSequenceIds.put( - encodedRegionName, oldStoreSeqNum); - assert oldValue == null: "Flushing map not cleaned up for " - + Bytes.toString(encodedRegionName); - } - if (oldestUnflushedStoreSequenceIdsOfRegion.isEmpty()) { - // Remove it otherwise it will be in oldestUnflushedStoreSequenceIds for ever - // even if the region is already moved to other server. - // Do not worry about data racing, we held write lock of region when calling - // startCacheFlush, so no one can add value to the map we removed. - oldestUnflushedStoreSequenceIds.remove(encodedRegionName); - } - } - } - if (oldStoreSeqNum.isEmpty()) { - // TODO: if we have no oldStoreSeqNum, and WAL is not disabled, presumably either - // the region is already flushing (which would make this call invalid), or there - // were no appends after last flush, so why are we starting flush? Maybe we should - // assert not empty. Less rigorous, but safer, alternative is telling the caller to stop. - // For now preserve old logic. - LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: [" - + Bytes.toString(encodedRegionName) + "]"); - } - return true; + return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families); } @Override public void completeCacheFlush(final byte [] encodedRegionName) { - synchronized (regionSequenceIdLock) { - this.lowestFlushingStoreSequenceIds.remove(encodedRegionName); - } + this.sequenceIdAccounting.completeCacheFlush(encodedRegionName); closeBarrier.endOp(); } - private ConcurrentMap getOrCreateOldestUnflushedStoreSequenceIdsOfRegion( - byte[] encodedRegionName) { - ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = - oldestUnflushedStoreSequenceIds.get(encodedRegionName); - if (oldestUnflushedStoreSequenceIdsOfRegion != null) { - return oldestUnflushedStoreSequenceIdsOfRegion; - } - oldestUnflushedStoreSequenceIdsOfRegion = - new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); - ConcurrentMap alreadyPut = - oldestUnflushedStoreSequenceIds.putIfAbsent(encodedRegionName, - oldestUnflushedStoreSequenceIdsOfRegion); - return alreadyPut == null ? oldestUnflushedStoreSequenceIdsOfRegion : alreadyPut; - } - @Override public void abortCacheFlush(byte[] encodedRegionName) { - Map storeSeqNumsBeforeFlushStarts; - Map currentStoreSeqNums = new TreeMap(Bytes.BYTES_COMPARATOR); - synchronized (regionSequenceIdLock) { - storeSeqNumsBeforeFlushStarts = this.lowestFlushingStoreSequenceIds.remove( - encodedRegionName); - if (storeSeqNumsBeforeFlushStarts != null) { - ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = - getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName); - for (Map.Entry familyNameAndSeqId: storeSeqNumsBeforeFlushStarts - .entrySet()) { - currentStoreSeqNums.put(familyNameAndSeqId.getKey(), - oldestUnflushedStoreSequenceIdsOfRegion.put(familyNameAndSeqId.getKey(), - familyNameAndSeqId.getValue())); - } - } - } + this.sequenceIdAccounting.abortCacheFlush(encodedRegionName); closeBarrier.endOp(); - if (storeSeqNumsBeforeFlushStarts != null) { - for (Map.Entry familyNameAndSeqId : storeSeqNumsBeforeFlushStarts.entrySet()) { - Long currentSeqNum = currentStoreSeqNums.get(familyNameAndSeqId.getKey()); - if (currentSeqNum != null - && currentSeqNum.longValue() <= familyNameAndSeqId.getValue().longValue()) { - String errorStr = - "Region " + Bytes.toString(encodedRegionName) + " family " - + Bytes.toString(familyNameAndSeqId.getKey()) - + " acquired edits out of order current memstore seq=" + currentSeqNum - + ", previous oldest unflushed id=" + familyNameAndSeqId.getValue(); - LOG.error(errorStr); - Runtime.getRuntime().halt(1); - } - } - } } @VisibleForTesting @@ -1762,23 +1544,21 @@ public class FSHLog implements WAL { @Override public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) { - ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = - this.oldestUnflushedStoreSequenceIds.get(encodedRegionName); - return oldestUnflushedStoreSequenceIdsOfRegion != null ? - getLowestSeqId(oldestUnflushedStoreSequenceIdsOfRegion) : HConstants.NO_SEQNUM; + // Used by tests. Deprecated as too subtle for general usage. + return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName); } @Override - public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, - byte[] familyName) { - ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = - this.oldestUnflushedStoreSequenceIds.get(encodedRegionName); - if (oldestUnflushedStoreSequenceIdsOfRegion != null) { - Long result = oldestUnflushedStoreSequenceIdsOfRegion.get(familyName); - return result != null ? result.longValue() : HConstants.NO_SEQNUM; - } else { - return HConstants.NO_SEQNUM; - } + public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) { + // This method is used by tests and for figuring if we should flush or not because our + // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use + // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId + // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the + // currently flushing sequence ids, and if anything found there, it is returning these. This is + // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if + // we crash during the flush. For figuring what to flush, we might get requeued if our sequence + // id is old even though we are currently flushing. This may mean we do too much flushing. + return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName); } /** @@ -1820,10 +1600,10 @@ public class FSHLog implements WAL { /** * For Thread A to call when it is ready to wait on the 'safe point' to be attained. * Thread A will be held in here until Thread B calls {@link #safePointAttained()} - * @throws InterruptedException - * @throws ExecutionException * @param syncFuture We need this as barometer on outstanding syncs. If it comes home with * an exception, then something is up w/ our syncing. + * @throws InterruptedException + * @throws ExecutionException * @return The passed syncFuture * @throws FailedSyncBeforeLogCloseException */ @@ -2014,15 +1794,6 @@ public class FSHLog implements WAL { } } - private void updateOldestUnflushedSequenceIds(byte[] encodedRegionName, - Set familyNameSet, Long lRegionSequenceId) { - ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = - getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName); - for (byte[] familyName : familyNameSet) { - oldestUnflushedStoreSequenceIdsOfRegion.putIfAbsent(familyName, lRegionSequenceId); - } - } - /** * Append to the WAL. Does all CP and WAL listener calls. * @param entry @@ -2040,14 +1811,14 @@ public class FSHLog implements WAL { // here inside this single appending/writing thread. Events are ordered on the ringbuffer // so region sequenceids will also be in order. regionSequenceId = entry.stampRegionSequenceId(); - + // Edits are empty, there is nothing to append. Maybe empty when we are looking for a // region sequence id only, a region edit/sequence id that is not associated with an actual // edit. It has to go through all the rigmarole to be sure we have the right ordering. if (entry.getEdit().isEmpty()) { return; } - + // Coprocessor hook. if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit())) { @@ -2067,13 +1838,8 @@ public class FSHLog implements WAL { writer.append(entry); assert highestUnsyncedSequence < entry.getSequence(); highestUnsyncedSequence = entry.getSequence(); - Long lRegionSequenceId = Long.valueOf(regionSequenceId); - highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId); - if (entry.isInMemstore()) { - updateOldestUnflushedSequenceIds(encodedRegionName, - entry.getFamilyNames(), lRegionSequenceId); - } - + sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, + entry.isInMemstore()); coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit()); // Update metrics. postAppend(entry, EnvironmentEdgeManager.currentTime() - start); @@ -2203,4 +1969,4 @@ public class FSHLog implements WAL { } return new DatanodeInfo[0]; } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java new file mode 100644 index 00000000000..6e10f3ca3e7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; + +import com.google.common.collect.Maps; + +/** + * Accounting of sequence ids per region and then by column family. So we can our accounting + * current, call startCacheFlush and then finishedCacheFlush or abortCacheFlush so this instance + * can keep abreast of the state of sequence id persistence. Also call update per append. + */ +class SequenceIdAccounting { + private static final Log LOG = LogFactory.getLog(SequenceIdAccounting.class); + /** + * This lock ties all operations on {@link SequenceIdAccounting#flushingSequenceIds} and + * {@link #lowestUnflushedSequenceIds} Maps. {@link #lowestUnflushedSequenceIds} has the + * lowest outstanding sequence ids EXCEPT when flushing. When we flush, the current + * lowest set for the region/column family are moved (atomically because of this lock) to + * {@link #flushingSequenceIds}. + * + *

The two Maps are tied by this locking object EXCEPT when we go to update the lowest + * entry; see {@link #lowest(byte[], Set, Long)}. In here is a putIfAbsent call on + * {@link #lowestUnflushedSequenceIds}. In this latter case, we will add this lowest + * sequence id if we find that there is no entry for the current column family. There will be no + * entry only if we just came up OR we have moved aside current set of lowest sequence ids + * because the current set are being flushed (by putting them into {@link #flushingSequenceIds}). + * This is how we pick up the next 'lowest' sequence id per region per column family to be used + * figuring what is in the next flush. + */ + private final Object tieLock = new Object(); + + /** + * Map of encoded region names and family names to their OLDEST -- i.e. their first, + * the longest-lived, their 'earliest', the 'lowest' -- sequence id. + * + *

When we flush, the current lowest sequence ids get cleared and added to + * {@link #flushingSequenceIds}. The next append that comes in, is then added + * here to {@link #lowestUnflushedSequenceIds} as the next lowest sequenceid. + * + *

If flush fails, currently server is aborted so no need to restore previous sequence ids. + *

Needs to be concurrent Maps because we use putIfAbsent updating oldest. + */ + private final ConcurrentMap> lowestUnflushedSequenceIds + = new ConcurrentSkipListMap>( + Bytes.BYTES_COMPARATOR); + + /** + * Map of encoded region names and family names to their lowest or OLDEST sequence/edit id + * currently being flushed out to hfiles. Entries are moved here from + * {@link #lowestUnflushedSequenceIds} while the lock {@link #tieLock} is held + * (so movement between the Maps is atomic). + */ + private final Map> flushingSequenceIds = + new TreeMap>(Bytes.BYTES_COMPARATOR); + + /** + * Map of region encoded names to the latest/highest region sequence id. Updated on each + * call to append. + *

+ * This map uses byte[] as the key, and uses reference equality. It works in our use case as we + * use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns + * the same array. + */ + private Map highestSequenceIds = new HashMap(); + + /** + * Returns the lowest unflushed sequence id for the region. + * @param encodedRegionName + * @return Lowest outstanding unflushed sequenceid for encodedRegionName. Will + * return {@link HConstants#NO_SEQNUM} when none. + */ + long getLowestSequenceId(final byte [] encodedRegionName) { + synchronized (this.tieLock) { + Map m = this.flushingSequenceIds.get(encodedRegionName); + long flushingLowest = m != null? getLowestSequenceId(m): Long.MAX_VALUE; + m = this.lowestUnflushedSequenceIds.get(encodedRegionName); + long unflushedLowest = m != null? getLowestSequenceId(m): HConstants.NO_SEQNUM; + return Math.min(flushingLowest, unflushedLowest); + } + } + + /** + * @param encodedRegionName + * @param familyName + * @return Lowest outstanding unflushed sequenceid for encodedRegionname and + * familyName. Returned sequenceid may be for an edit currently being flushed. + */ + long getLowestSequenceId(final byte [] encodedRegionName, final byte [] familyName) { + synchronized (this.tieLock) { + Map m = this.flushingSequenceIds.get(encodedRegionName); + if (m != null) { + Long lowest = m.get(familyName); + if (lowest != null) return lowest; + } + m = this.lowestUnflushedSequenceIds.get(encodedRegionName); + if (m != null) { + Long lowest = m.get(familyName); + if (lowest != null) return lowest; + } + } + return HConstants.NO_SEQNUM; + } + + /** + * Reset the accounting of highest sequenceid by regionname. + * @return Return the previous accounting Map of regions to the last sequence id written into + * each. + */ + Map resetHighest() { + Map old = this.highestSequenceIds; + this.highestSequenceIds = new HashMap(); + return old; + } + + /** + * We've been passed a new sequenceid for the region. Set it as highest seen for this region and + * if we are to record oldest, or lowest sequenceids, save it as oldest seen if nothing + * currently older. + * @param encodedRegionName + * @param families + * @param sequenceid + * @param lowest Whether to keep running account of oldest sequence id. + */ + void update(byte[] encodedRegionName, Set families, long sequenceid, + final boolean lowest) { + Long l = Long.valueOf(sequenceid); + this.highestSequenceIds.put(encodedRegionName, l); + if (lowest) { + ConcurrentMap m = getOrCreateLowestSequenceIds(encodedRegionName); + for (byte[] familyName : families) { + m.putIfAbsent(familyName, l); + } + } + } + + ConcurrentMap getOrCreateLowestSequenceIds(byte[] encodedRegionName) { + // Intentionally, this access is done outside of this.regionSequenceIdLock. Done per append. + ConcurrentMap m = this.lowestUnflushedSequenceIds.get(encodedRegionName); + if (m != null) return m; + m = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + // Another thread may have added it ahead of us. + ConcurrentMap alreadyPut = + this.lowestUnflushedSequenceIds.putIfAbsent(encodedRegionName, m); + return alreadyPut == null? m : alreadyPut; + } + + /** + * @param sequenceids Map to search for lowest value. + * @return Lowest value found in sequenceids. + */ + static long getLowestSequenceId(Map sequenceids) { + long lowest = HConstants.NO_SEQNUM; + for (Long sid: sequenceids.values()) { + if (lowest == HConstants.NO_SEQNUM || sid.longValue() < lowest) { + lowest = sid.longValue(); + } + } + return lowest; + } + + /** + * @param src + * @return New Map that has same keys as src but instead of a Map for a value, it + * instead has found the smallest sequence id and it returns that as the value instead. + */ + private > Map flattenToLowestSequenceId( + Map src) { + if (src == null || src.isEmpty()) return null; + Map tgt = Maps.newHashMap(); + for (Map.Entry entry: src.entrySet()) { + long lowestSeqId = getLowestSequenceId(entry.getValue()); + if (lowestSeqId != HConstants.NO_SEQNUM) { + tgt.put(entry.getKey(), lowestSeqId); + } + } + return tgt; + } + + /** + * @param encodedRegionName Region to flush. + * @param families Families to flush. May be a subset of all families in the region. + * @return Returns {@link HConstants#NO_SEQNUM} if we are flushing the whole region OR if + * we are flushing a subset of all families but there are no edits in those families not + * being flushed; in other words, this is effectively same as a flush of all of the region + * though we were passed a subset of regions. Otherwise, it returns the sequence id of the + * oldest/lowest outstanding edit. + */ + Long startCacheFlush(final byte[] encodedRegionName, final Set families) { + Map oldSequenceIds = null; + Long lowestUnflushedInRegion = HConstants.NO_SEQNUM; + synchronized (tieLock) { + Map m = this.lowestUnflushedSequenceIds.get(encodedRegionName); + if (m != null) { + // NOTE: Removal from this.lowestUnflushedSequenceIds must be done in controlled + // circumstance because another concurrent thread now may add sequenceids for this family + // (see above in getOrCreateLowestSequenceId). Make sure you are ok with this. Usually it + // is fine because updates are blocked when this method is called. Make sure!!! + for (byte[] familyName: families) { + Long seqId = m.remove(familyName); + if (seqId != null) { + if (oldSequenceIds == null) oldSequenceIds = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + oldSequenceIds.put(familyName, seqId); + } + } + if (oldSequenceIds != null && !oldSequenceIds.isEmpty()) { + if (this.flushingSequenceIds.put(encodedRegionName, oldSequenceIds) != null) { + LOG.warn("Flushing Map not cleaned up for " + Bytes.toString(encodedRegionName) + + ", sequenceid=" + oldSequenceIds); + } + } + if (m.isEmpty()) { + // Remove it otherwise it will be in oldestUnflushedStoreSequenceIds for ever + // even if the region is already moved to other server. + // Do not worry about data racing, we held write lock of region when calling + // startCacheFlush, so no one can add value to the map we removed. + this.lowestUnflushedSequenceIds.remove(encodedRegionName); + } else { + // Flushing a subset of the region families. Return the sequence id of the oldest entry. + lowestUnflushedInRegion = Collections.min(m.values()); + } + } + } + // Do this check outside lock. + if (oldSequenceIds != null && oldSequenceIds.isEmpty()) { + // TODO: if we have no oldStoreSeqNum, and WAL is not disabled, presumably either + // the region is already flushing (which would make this call invalid), or there + // were no appends after last flush, so why are we starting flush? Maybe we should + // assert not empty. Less rigorous, but safer, alternative is telling the caller to stop. + // For now preserve old logic. + LOG.warn("Couldn't find oldest sequenceid for " + Bytes.toString(encodedRegionName)); + } + return lowestUnflushedInRegion; + } + + void completeCacheFlush(final byte [] encodedRegionName) { + synchronized (tieLock) { + this.flushingSequenceIds.remove(encodedRegionName); + } + } + + void abortCacheFlush(final byte[] encodedRegionName) { + // Method is called when we are crashing down because failed write flush AND it is called + // if we fail prepare. The below is for the fail prepare case; we restore the old sequence ids. + Map flushing = null; + Map tmpMap = new TreeMap(Bytes.BYTES_COMPARATOR); + // Here we are moving sequenceids from flushing back to unflushed; doing opposite of what + // happened in startCacheFlush. During prepare phase, we have update lock on the region so + // no edits should be coming in via append. + synchronized (tieLock) { + flushing = this.flushingSequenceIds.remove(encodedRegionName); + if (flushing != null) { + Map unflushed = getOrCreateLowestSequenceIds(encodedRegionName); + for (Map.Entry e: flushing.entrySet()) { + // Set into unflushed the 'old' oldest sequenceid and if any value in flushed with this + // value, it will now be in tmpMap. + tmpMap.put(e.getKey(), unflushed.put(e.getKey(), e.getValue())); + } + } + } + + // Here we are doing some 'test' to see if edits are going in out of order. What is it for? + // Carried over from old code. + if (flushing != null) { + for (Map.Entry e : flushing.entrySet()) { + Long currentId = tmpMap.get(e.getKey()); + if (currentId != null && currentId.longValue() <= e.getValue().longValue()) { + String errorStr = Bytes.toString(encodedRegionName) + " family " + + Bytes.toString(e.getKey()) + " acquired edits out of order current memstore seq=" + + currentId + ", previous oldest unflushed id=" + e.getValue(); + LOG.error(errorStr); + Runtime.getRuntime().halt(1); + } + } + } + } + + /** + * See if passed sequenceids are lower -- i.e. earlier -- than any outstanding + * sequenceids, sequenceids we are holding on to in this accounting instance. + * @param sequenceids Keyed by encoded region name. Cannot be null (doesn't make + * sense for it to be null). + * @return true if all sequenceids are lower, older than, the old sequenceids in this instance. + */ + boolean areAllLower(Map sequenceids) { + Map flushing = null; + Map unflushed = null; + synchronized (this.tieLock) { + // Get a flattened -- only the oldest sequenceid -- copy of current flushing and unflushed + // data structures to use in tests below. + flushing = flattenToLowestSequenceId(this.flushingSequenceIds); + unflushed = flattenToLowestSequenceId(this.lowestUnflushedSequenceIds); + } + for (Map.Entry e : sequenceids.entrySet()) { + long oldestFlushing = Long.MAX_VALUE; + long oldestUnflushed = Long.MAX_VALUE; + if (flushing != null) { + if (flushing.containsKey(e.getKey())) oldestFlushing = flushing.get(e.getKey()); + } + if (unflushed != null) { + if (unflushed.containsKey(e.getKey())) oldestUnflushed = unflushed.get(e.getKey()); + } + long min = Math.min(oldestFlushing, oldestUnflushed); + if (min <= e.getValue()) return false; + } + return true; + } + + /** + * Iterates over the given Map and compares sequence ids with corresponding + * entries in {@link #oldestUnflushedRegionSequenceIds}. If a region in + * {@link #oldestUnflushedRegionSequenceIds} has a sequence id less than that passed + * in sequenceids then return it. + * @param sequenceids Sequenceids keyed by encoded region name. + * @return regions found in this instance with sequence ids less than those passed in. + */ + byte[][] findLower(Map sequenceids) { + List toFlush = null; + // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock. + synchronized (tieLock) { + for (Map.Entry e: sequenceids.entrySet()) { + Map m = this.lowestUnflushedSequenceIds.get(e.getKey()); + if (m == null) continue; + // The lowest sequence id outstanding for this region. + long lowest = getLowestSequenceId(m); + if (lowest != HConstants.NO_SEQNUM && lowest <= e.getValue()) { + if (toFlush == null) toFlush = new ArrayList(); + toFlush.add(e.getKey()); + } + } + } + return toFlush == null? null: toFlush.toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY }); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 7254ad1b998..56d17a2d735 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -26,21 +26,19 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.util.FSUtils; - +import org.apache.hadoop.hbase.classification.InterfaceAudience; // imports for things that haven't moved from regionserver.wal yet. import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.FSUtils; /** * No-op implementation of {@link WALProvider} used when the WAL is disabled. @@ -187,8 +185,9 @@ class DisabledWALProvider implements WALProvider { } @Override - public boolean startCacheFlush(final byte[] encodedRegionName, Set flushedFamilyNames) { - return !(closed.get()); + public Long startCacheFlush(final byte[] encodedRegionName, Set flushedFamilyNames) { + if (closed.get()) return null; + return HConstants.NO_SEQNUM; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 5a2b08dfb66..4844487cc99 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; @@ -39,6 +40,8 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import com.google.common.annotations.VisibleForTesting; + /** * A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides * APIs for WAL users (such as RegionServer) to use the WAL (do append, sync, etc). @@ -140,31 +143,36 @@ public interface WAL { void sync(long txid) throws IOException; /** - * WAL keeps track of the sequence numbers that were not yet flushed from memstores - * in order to be able to do cleanup. This method tells WAL that some region is about - * to flush memstore. + * WAL keeps track of the sequence numbers that are as yet not flushed im memstores + * in order to be able to do accounting to figure which WALs can be let go. This method tells WAL + * that some region is about to flush. The flush can be the whole region or for a column family + * of the region only. * - *

We stash the oldest seqNum for the region, and let the the next edit inserted in this - * region be recorded in {@link #append(HTableDescriptor, HRegionInfo, WALKey, WALEdit, - * AtomicLong, boolean, List)} as new oldest seqnum. - * In case of flush being aborted, we put the stashed value back; in case of flush succeeding, - * the seqNum of that first edit after start becomes the valid oldest seqNum for this region. - * - * @return true if the flush can proceed, false in case wal is closing (ususally, when server is - * closing) and flush couldn't be started. + *

Currently, it is expected that the update lock is held for the region; i.e. no + * concurrent appends while we set up cache flush. + * @param families Families to flush. May be a subset of all families in the region. + * @return Returns {@link HConstants#NO_SEQNUM} if we are flushing the whole region OR if + * we are flushing a subset of all families but there are no edits in those families not + * being flushed; in other words, this is effectively same as a flush of all of the region + * though we were passed a subset of regions. Otherwise, it returns the sequence id of the + * oldest/lowest outstanding edit. + * @see #completeCacheFlush(byte[]) + * @see #abortCacheFlush(byte[]) */ - boolean startCacheFlush(final byte[] encodedRegionName, Set flushedFamilyNames); + Long startCacheFlush(final byte[] encodedRegionName, Set families); /** * Complete the cache flush. * @param encodedRegionName Encoded region name. + * @see #startCacheFlush(byte[], Set) + * @see #abortCacheFlush(byte[]) */ void completeCacheFlush(final byte[] encodedRegionName); /** * Abort a cache flush. Call if the flush fails. Note that the only recovery * for an aborted flush currently is a restart of the regionserver so the - * snapshot content dropped by the failure gets restored to the memstore.v + * snapshot content dropped by the failure gets restored to the memstore. * @param encodedRegionName Encoded region name. */ void abortCacheFlush(byte[] encodedRegionName); @@ -174,19 +182,22 @@ public interface WAL { */ WALCoprocessorHost getCoprocessorHost(); - - /** Gets the earliest sequence number in the memstore for this particular region. - * This can serve as best-effort "recent" WAL number for this region. + /** + * Gets the earliest unflushed sequence id in the memstore for the region. * @param encodedRegionName The region to get the number for. - * @return The number if present, HConstants.NO_SEQNUM if absent. + * @return The earliest/lowest/oldest sequence id if present, HConstants.NO_SEQNUM if absent. + * @deprecated Since version 1.2.0. Removing because not used and exposes subtle internal + * workings. Use {@link #getEarliestMemstoreSeqNum(byte[], byte[])} */ + @VisibleForTesting + @Deprecated long getEarliestMemstoreSeqNum(byte[] encodedRegionName); /** - * Gets the earliest sequence number in the memstore for this particular region and store. + * Gets the earliest unflushed sequence id in the memstore for the store. * @param encodedRegionName The region to get the number for. * @param familyName The family to get the number for. - * @return The number if present, HConstants.NO_SEQNUM if absent. + * @return The earliest/lowest/oldest sequence id if present, HConstants.NO_SEQNUM if absent. */ long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index c55280b8169..7fbb2855a5f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -124,6 +124,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.protobuf.ServiceException; +import com.google.protobuf.TextFormat; /** * This class is responsible for splitting up a bunch of regionserver commit log @@ -324,15 +325,19 @@ public class WALSplitter { failedServerName = (serverName == null) ? "" : serverName.getServerName(); while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) { byte[] region = entry.getKey().getEncodedRegionName(); - String key = Bytes.toString(region); - lastFlushedSequenceId = lastFlushedSequenceIds.get(key); + String encodedRegionNameAsStr = Bytes.toString(region); + lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); if (lastFlushedSequenceId == null) { if (this.distributedLogReplay) { RegionStoreSequenceIds ids = csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName, - key); + encodedRegionNameAsStr); if (ids != null) { lastFlushedSequenceId = ids.getLastFlushedSequenceId(); + if (LOG.isDebugEnabled()) { + LOG.debug("DLR Last flushed sequenceid for " + encodedRegionNameAsStr + ": " + + TextFormat.shortDebugString(ids)); + } } } else if (sequenceIdChecker != null) { RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region); @@ -341,13 +346,17 @@ public class WALSplitter { maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(), storeSeqId.getSequenceId()); } - regionMaxSeqIdInStores.put(key, maxSeqIdInStores); + regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores); lastFlushedSequenceId = ids.getLastFlushedSequenceId(); + if (LOG.isDebugEnabled()) { + LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " + + TextFormat.shortDebugString(ids)); + } } if (lastFlushedSequenceId == null) { lastFlushedSequenceId = -1L; } - lastFlushedSequenceIds.put(key, lastFlushedSequenceId); + lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId); } if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) { editsSkipped++; @@ -1071,7 +1080,7 @@ public class WALSplitter { } private void doRun() throws IOException { - LOG.debug("Writer thread " + this + ": starting"); + if (LOG.isTraceEnabled()) LOG.trace("Writer thread starting"); while (true) { RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); if (buffer == null) { @@ -1226,7 +1235,8 @@ public class WALSplitter { } } controller.checkForErrors(); - LOG.info("Split writers finished"); + LOG.info((this.writerThreads == null? 0: this.writerThreads.size()) + + " split writers finished; closing..."); return (!progress_failed); } @@ -1317,12 +1327,14 @@ public class WALSplitter { CompletionService completionService = new ExecutorCompletionService(closeThreadPool); for (final Map.Entry writersEntry : writers.entrySet()) { - LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p); + if (LOG.isTraceEnabled()) { + LOG.trace("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p); + } completionService.submit(new Callable() { @Override public Void call() throws Exception { WriterAndPath wap = (WriterAndPath) writersEntry.getValue(); - LOG.debug("Closing " + wap.p); + if (LOG.isTraceEnabled()) LOG.trace("Closing " + wap.p); try { wap.w.close(); } catch (IOException ioe) { @@ -1330,8 +1342,8 @@ public class WALSplitter { thrown.add(ioe); return null; } - LOG.info("Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits in " - + (wap.nanosSpent / 1000 / 1000) + "ms)"); + LOG.info("Closed " + wap.p + "; wrote " + wap.editsWritten + " edit(s) in " + + (wap.nanosSpent / 1000 / 1000) + "ms"); if (wap.editsWritten == 0) { // just remove the empty recovered.edits file @@ -1490,8 +1502,8 @@ public class WALSplitter { } } Writer w = createWriter(regionedits); - LOG.info("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region)); - return (new WriterAndPath(regionedits, w)); + LOG.debug("Creating writer path=" + regionedits); + return new WriterAndPath(regionedits, w); } private void filterCellByStore(Entry logEntry) { @@ -1505,6 +1517,7 @@ public class WALSplitter { if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { byte[] family = CellUtil.cloneFamily(cell); Long maxSeqId = maxSeqIdInStores.get(family); + LOG.info("CHANGE REMOVE " + Bytes.toString(family) + ", max=" + maxSeqId); // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade, // or the master was crashed before and we can not get the information. if (maxSeqId != null && maxSeqId.longValue() >= logEntry.getKey().getLogSeqNum()) { @@ -1544,9 +1557,9 @@ public class WALSplitter { filterCellByStore(logEntry); if (!logEntry.getEdit().isEmpty()) { wap.w.append(logEntry); + this.updateRegionMaximumEditLogSeqNum(logEntry); + editsCount++; } - this.updateRegionMaximumEditLogSeqNum(logEntry); - editsCount++; } // Pass along summary statistics wap.incrementEdits(editsCount); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java index 9a9d784d9c2..b328e576eec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.ipc; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.security.UserProvider; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java index 5e6bff809ca..abb65203746 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java @@ -91,6 +91,7 @@ public class TestGetLastFlushedSequenceId { testUtil.getHBaseCluster().getMaster() .getLastSequenceId(region.getRegionInfo().getEncodedNameAsBytes()); assertEquals(HConstants.NO_SEQNUM, ids.getLastFlushedSequenceId()); + // This will be the sequenceid just before that of the earliest edit in memstore. long storeSequenceId = ids.getStoreSequenceId(0).getSequenceId(); assertTrue(storeSequenceId > 0); testUtil.getHBaseAdmin().flush(tableName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index b61416ce75d..6b342d7ab96 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -230,6 +230,35 @@ public class TestHRegion { return name.getMethodName(); } + /** + * Test that I can use the max flushed sequence id after the close. + * @throws IOException + */ + @Test (timeout = 100000) + public void testSequenceId() throws IOException { + HRegion region = initHRegion(tableName, name.getMethodName(), CONF, COLUMN_FAMILY_BYTES); + assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId()); + // Weird. This returns 0 if no store files or no edits. Afraid to change it. + assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES)); + region.close(); + assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId()); + assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES)); + // Open region again. + region = initHRegion(tableName, name.getMethodName(), CONF, COLUMN_FAMILY_BYTES); + byte [] value = Bytes.toBytes(name.getMethodName()); + // Make a random put against our cf. + Put put = new Put(value); + put.addColumn(COLUMN_FAMILY_BYTES, null, value); + region.put(put); + // No flush yet so init numbers should still be in place. + assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId()); + assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES)); + region.flush(true); + long max = region.getMaxFlushedSeqId(); + region.close(); + assertEquals(max, region.getMaxFlushedSeqId()); + } + /** * Test for Bug 2 of HBASE-10466. * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java new file mode 100644 index 00000000000..92e0558f0a2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.commons.lang.mutable.MutableBoolean; +import org.apache.hadoop.hbase.DroppedSnapshotException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult; +import org.apache.hadoop.hbase.regionserver.Region.FlushResult; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.mortbay.log.Log; + +/** + * Testcase for https://issues.apache.org/jira/browse/HBASE-13811 + */ +@Category({ MediumTests.class }) +public class TestSplitWalDataLoss { + + private final HBaseTestingUtility testUtil = new HBaseTestingUtility(); + + private NamespaceDescriptor namespace = NamespaceDescriptor.create(getClass().getSimpleName()) + .build(); + + private TableName tableName = TableName.valueOf(namespace.getName(), "dataloss"); + + private byte[] family = Bytes.toBytes("f"); + + private byte[] qualifier = Bytes.toBytes("q"); + + @Before + public void setUp() throws Exception { + testUtil.getConfiguration().setInt("hbase.regionserver.msginterval", 30000); + testUtil.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false); + testUtil.startMiniCluster(2); + HBaseAdmin admin = testUtil.getHBaseAdmin(); + admin.createNamespace(namespace); + admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(family))); + testUtil.waitTableAvailable(tableName); + } + + @After + public void tearDown() throws Exception { + testUtil.shutdownMiniCluster(); + } + + @Test + public void test() throws IOException, InterruptedException { + final HRegionServer rs = testUtil.getRSForFirstRegionInTable(tableName); + final HRegion region = (HRegion) rs.getOnlineRegions(tableName).get(0); + HRegion spiedRegion = spy(region); + final MutableBoolean flushed = new MutableBoolean(false); + final MutableBoolean reported = new MutableBoolean(false); + doAnswer(new Answer() { + @Override + public FlushResult answer(InvocationOnMock invocation) throws Throwable { + synchronized (flushed) { + flushed.setValue(true); + flushed.notifyAll(); + } + synchronized (reported) { + while (!reported.booleanValue()) { + reported.wait(); + } + } + rs.getWAL(region.getRegionInfo()).abortCacheFlush( + region.getRegionInfo().getEncodedNameAsBytes()); + throw new DroppedSnapshotException("testcase"); + } + }).when(spiedRegion).internalFlushCacheAndCommit(Matchers. any(), + Matchers. any(), Matchers. any(), + Matchers.> any()); + rs.onlineRegions.put(rs.onlineRegions.keySet().iterator().next(), spiedRegion); + Connection conn = testUtil.getConnection(); + + try (Table table = conn.getTable(tableName)) { + table.put(new Put(Bytes.toBytes("row0")).addColumn(family, qualifier, Bytes.toBytes("val0"))); + } + long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family); + Log.info("CHANGE OLDEST " + oldestSeqIdOfStore); + assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM); + rs.cacheFlusher.requestFlush(spiedRegion, false); + synchronized (flushed) { + while (!flushed.booleanValue()) { + flushed.wait(); + } + } + try (Table table = conn.getTable(tableName)) { + table.put(new Put(Bytes.toBytes("row1")).addColumn(family, qualifier, Bytes.toBytes("val1"))); + } + long now = EnvironmentEdgeManager.currentTime(); + rs.tryRegionServerReport(now - 500, now); + synchronized (reported) { + reported.setValue(true); + reported.notifyAll(); + } + while (testUtil.getRSForFirstRegionInTable(tableName) == rs) { + Thread.sleep(100); + } + try (Table table = conn.getTable(tableName)) { + Result result = table.get(new Get(Bytes.toBytes("row0"))); + assertArrayEquals(Bytes.toBytes("val0"), result.getValue(family, qualifier)); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index cc5191c8830..b3b520aed4c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -27,9 +26,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; @@ -50,7 +47,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -58,6 +54,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -320,53 +317,6 @@ public class TestFSHLog { } } - /** - * Simulates WAL append ops for a region and tests - * {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} API. - * It compares the region sequenceIds with oldestFlushing and oldestUnFlushed entries. - * If a region's entries are larger than min of (oldestFlushing, oldestUnFlushed), then the - * region should be flushed before archiving this WAL. - */ - @Test - public void testAllRegionsFlushed() { - LOG.debug("testAllRegionsFlushed"); - Map oldestFlushingSeqNo = new HashMap(); - Map oldestUnFlushedSeqNo = new HashMap(); - Map seqNo = new HashMap(); - // create a table - TableName t1 = TableName.valueOf("t1"); - // create a region - HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); - // variables to mock region sequenceIds - final AtomicLong sequenceId1 = new AtomicLong(1); - // test empty map - assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo)); - // add entries in the region - seqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.incrementAndGet()); - oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get()); - // should say region1 is not flushed. - assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo)); - // test with entries in oldestFlushing map. - oldestUnFlushedSeqNo.clear(); - oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get()); - assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo)); - // simulate region flush, i.e., clear oldestFlushing and oldestUnflushed maps - oldestFlushingSeqNo.clear(); - oldestUnFlushedSeqNo.clear(); - assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo)); - // insert some large values for region1 - oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), 1000l); - seqNo.put(hri1.getEncodedNameAsBytes(), 1500l); - assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo)); - - // tests when oldestUnFlushed/oldestFlushing contains larger value. - // It means region is flushed. - oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), 1200l); - oldestUnFlushedSeqNo.clear(); - seqNo.put(hri1.getEncodedNameAsBytes(), 1199l); - assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo)); - } - @Test(expected=IOException.class) public void testFailedToCreateWALIfParentRenamed() throws IOException { final String name = "testFailedToCreateWALIfParentRenamed"; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java new file mode 100644 index 00000000000..9fd0cb11059 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestSequenceIdAccounting { + private static final byte [] ENCODED_REGION_NAME = Bytes.toBytes("r"); + private static final byte [] FAMILY_NAME = Bytes.toBytes("cf"); + private static final Set FAMILIES; + static { + FAMILIES = new HashSet(); + FAMILIES.add(FAMILY_NAME); + } + + @Test + public void testStartCacheFlush() { + SequenceIdAccounting sida = new SequenceIdAccounting(); + sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME); + Map m = new HashMap(); + m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); + assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES)); + sida.completeCacheFlush(ENCODED_REGION_NAME); + long sequenceid = 1; + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true); + // Only one family so should return NO_SEQNUM still. + assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES)); + sida.completeCacheFlush(ENCODED_REGION_NAME); + long currentSequenceId = sequenceid; + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true); + final Set otherFamily = new HashSet(1); + otherFamily.add(Bytes.toBytes("otherCf")); + sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true); + // Should return oldest sequence id in the region. + assertEquals(currentSequenceId, (long)sida.startCacheFlush(ENCODED_REGION_NAME, otherFamily)); + sida.completeCacheFlush(ENCODED_REGION_NAME); + } + + @Test + public void testAreAllLower() { + SequenceIdAccounting sida = new SequenceIdAccounting(); + sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME); + Map m = new HashMap(); + m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); + assertTrue(sida.areAllLower(m)); + long sequenceid = 1; + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true); + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); + assertTrue(sida.areAllLower(m)); + m.put(ENCODED_REGION_NAME, sequenceid); + assertFalse(sida.areAllLower(m)); + long lowest = sida.getLowestSequenceId(ENCODED_REGION_NAME); + assertEquals("Lowest should be first sequence id inserted", 1, lowest); + m.put(ENCODED_REGION_NAME, lowest); + assertFalse(sida.areAllLower(m)); + // Now make sure above works when flushing. + sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES); + assertFalse(sida.areAllLower(m)); + m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); + assertTrue(sida.areAllLower(m)); + // Let the flush complete and if we ask if the sequenceid is lower, should be yes since no edits + sida.completeCacheFlush(ENCODED_REGION_NAME); + m.put(ENCODED_REGION_NAME, sequenceid); + assertTrue(sida.areAllLower(m)); + // Flush again but add sequenceids while we are flushing. + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); + lowest = sida.getLowestSequenceId(ENCODED_REGION_NAME); + m.put(ENCODED_REGION_NAME, lowest); + assertFalse(sida.areAllLower(m)); + sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES); + // The cache flush will clear out all sequenceid accounting by region. + assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME)); + sida.completeCacheFlush(ENCODED_REGION_NAME); + // No new edits have gone in so no sequenceid to work with. + assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME)); + // Make an edit behind all we'll put now into sida. + m.put(ENCODED_REGION_NAME, sequenceid); + sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true); + sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true); + sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true); + assertTrue(sida.areAllLower(m)); + } + + @Test + public void testFindLower() { + SequenceIdAccounting sida = new SequenceIdAccounting(); + sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME); + Map m = new HashMap(); + m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); + long sequenceid = 1; + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true); + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); + assertTrue(sida.findLower(m) == null); + m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME)); + assertTrue(sida.findLower(m).length == 1); + m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME) - 1); + assertTrue(sida.findLower(m) == null); + } +} \ No newline at end of file