From a01ff38061886c99876cb74637e26a1a3d8fc918 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Thu, 7 Nov 2013 18:10:38 +0000 Subject: [PATCH] HBASE-8741 Scope sequenceid to the region rather than regionserver (WAS: Mutations on Regions in recovery mode might have same sequenceIDs) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1539743 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/regionserver/HRegion.java | 66 ++-- .../hadoop/hbase/regionserver/HStore.java | 2 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 286 ++++++++------ .../hadoop/hbase/regionserver/wal/HLog.java | 46 +-- .../hbase/regionserver/wal/HLogUtil.java | 6 +- .../hbase/coprocessor/TestWALObserver.java | 13 +- .../hbase/mapreduce/TestHLogRecordReader.java | 15 +- .../master/TestDistributedLogSplitting.java | 6 +- .../hbase/regionserver/TestHRegion.java | 11 +- .../wal/HLogPerformanceEvaluation.java | 27 +- .../hbase/regionserver/wal/TestHLog.java | 351 +++++++++++++++--- .../hbase/regionserver/wal/TestHLogSplit.java | 3 +- .../wal/TestLogRollingNoCluster.java | 4 +- .../wal/TestWALActionsListener.java | 4 +- .../hbase/regionserver/wal/TestWALReplay.java | 68 ++-- .../TestReplicationHLogReaderManager.java | 4 +- .../TestReplicationSourceManager.java | 9 +- 17 files changed, 623 insertions(+), 298 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c2c8b1fe802..4ef7a96f2c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -208,6 +208,13 @@ public class HRegion implements HeapSize { // , Writable{ protected long completeSequenceId = -1L; + /** + * Region level sequence Id. It is used for appending WALEdits in HLog. Its default value is -1, + * as a marker that the region hasn't opened yet. Once it is opened, it is set to + * {@link #openSeqNum}. + */ + private final AtomicLong sequenceId = new AtomicLong(-1L); + /** * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for * startRegionOperation to possibly invoke different checks before any region operations. Not all @@ -1518,16 +1525,16 @@ public class HRegion implements HeapSize { // , Writable{ // Record the mvcc for all transactions in progress. w = mvcc.beginMemstoreInsert(); mvcc.advanceMemstore(w); - + // check if it is not closing. if (wal != null) { - Long startSeqId = wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); - if (startSeqId == null) { - status.setStatus("Flush will not be started for [" + this.getRegionInfo().getEncodedName() - + "] - WAL is going away"); + if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) { + status.setStatus("Flush will not be started for [" + + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."); return false; } - flushSeqId = startSeqId; + flushSeqId = this.sequenceId.incrementAndGet(); } else { + // use the provided sequence Id as WAL is not being used for this flush. flushSeqId = myseqid; } @@ -2221,7 +2228,7 @@ public class HRegion implements HeapSize { // , Writable{ Mutation mutation = batchOp.operations[firstIndex]; if (walEdit.size() > 0) { txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), - walEdit, mutation.getClusterIds(), now, this.htableDescriptor); + walEdit, mutation.getClusterIds(), now, this.htableDescriptor, this.sequenceId); } // ------------------------------- @@ -3312,7 +3319,7 @@ public class HRegion implements HeapSize { // , Writable{ if(bulkLoadListener != null) { finalPath = bulkLoadListener.prepareBulkLoad(familyName, path); } - store.bulkLoadHFile(finalPath, assignSeqId ? this.log.obtainSeqNum() : -1); + store.bulkLoadHFile(finalPath, assignSeqId ? this.sequenceId.incrementAndGet() : -1); if(bulkLoadListener != null) { bulkLoadListener.doneBulkLoad(familyName, path); } @@ -3906,7 +3913,9 @@ public class HRegion implements HeapSize { // , Writable{ HRegion region = HRegion.newHRegion(tableDir, effectiveHLog, fs, conf, info, hTableDescriptor, null); if (initialize) { - region.initialize(); + // If initializing, set the sequenceId. It is also required by HLogPerformanceEvaluation when + // verifying the WALEdits. + region.setSequenceId(region.initialize()); } return region; } @@ -4087,10 +4096,7 @@ public class HRegion implements HeapSize { // , Writable{ checkCompressionCodecs(); this.openSeqNum = initialize(reporter); - if (this.log != null) { - this.log.setSequenceNumber(this.openSeqNum); - } - + this.setSequenceId(openSeqNum); return this; } @@ -4498,8 +4504,9 @@ public class HRegion implements HeapSize { // , Writable{ long txid = 0; // 7. Append no sync if (!walEdit.isEmpty()) { - txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), - walEdit, processor.getClusterIds(), now, this.htableDescriptor); + txid = this.log.appendNoSync(this.getRegionInfo(), + this.htableDescriptor.getTableName(), walEdit, processor.getClusterIds(), now, + this.htableDescriptor, this.sequenceId); } // 8. Release region lock if (locked) { @@ -4740,9 +4747,9 @@ public class HRegion implements HeapSize { // , Writable{ // Using default cluster id, as this can only happen in the orginating // cluster. A slave cluster receives the final value (not the delta) // as a Put. - txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), - walEdits, new ArrayList(), EnvironmentEdgeManager.currentTimeMillis(), - this.htableDescriptor); + txid = this.log.appendNoSync(this.getRegionInfo(), + this.htableDescriptor.getTableName(), walEdits, new ArrayList(), + EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId); } else { recordMutationWithoutWal(append.getFamilyCellMap()); } @@ -4914,9 +4921,9 @@ public class HRegion implements HeapSize { // , Writable{ // Using default cluster id, as this can only happen in the orginating // cluster. A slave cluster receives the final value (not the delta) // as a Put. - txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), - walEdits, new ArrayList(), EnvironmentEdgeManager.currentTimeMillis(), - this.htableDescriptor); + txid = this.log.appendNoSync(this.getRegionInfo(), + this.htableDescriptor.getTableName(), walEdits, new ArrayList(), + EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId); } else { recordMutationWithoutWal(increment.getFamilyCellMap()); } @@ -4981,7 +4988,7 @@ public class HRegion implements HeapSize { // , Writable{ public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 40 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + + 41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + (11 * Bytes.SIZEOF_LONG) + 5 * Bytes.SIZEOF_BOOLEAN); @@ -5565,6 +5572,21 @@ public class HRegion implements HeapSize { // , Writable{ assert newValue >= 0; } + /** + * @return sequenceId. + */ + public AtomicLong getSequenceId() { + return this.sequenceId; + } + + /** + * sets this region's sequenceId. + * @param value new value + */ + private void setSequenceId(long value) { + this.sequenceId.set(value); + } + /** * Listener class to enable callers of * bulkLoadHFile() to perform any necessary diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 9cf8c8d1600..a7c1bc3d64b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1074,7 +1074,7 @@ public class HStore implements Store { CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info, family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString())); HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(), - this.region.getRegionInfo(), compactionDescriptor); + this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId()); } private void replaceStoreFiles(final Collection compactedFiles, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 4d0ab78cad5..c9b78cab1df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -26,13 +26,13 @@ import java.lang.reflect.Method; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.SortedMap; +import java.util.NavigableMap; import java.util.TreeMap; -import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -142,13 +142,6 @@ class FSHLog implements HLog, Syncable { */ Writer writer; - /** - * Map of all log files but the current one. - */ - final SortedMap outputfiles = - Collections.synchronizedSortedMap(new TreeMap()); - - /** * This lock synchronizes all operations on oldestUnflushedSeqNums and oldestFlushingSeqNums, * with the exception of append's putIfAbsent into oldestUnflushedSeqNums. @@ -177,8 +170,6 @@ class FSHLog implements HLog, Syncable { private volatile boolean closed = false; - private final AtomicLong logSeqNum = new AtomicLong(0); - private boolean forMeta = false; // The timestamp (in ms) when the log file was created. @@ -228,6 +219,39 @@ class FSHLog implements HLog, Syncable { private final AtomicInteger closeErrorCount = new AtomicInteger(); private final MetricsWAL metrics; +/** + * Map of region encoded names to the latest sequence num obtained from them while appending + * WALEdits to the wal. We create one map for each WAL file at the time it is rolled. + *

+ * When deciding whether to archive a WAL file, we compare the sequence IDs in this map to + * {@link #oldestFlushingSeqNums} and {@link #oldestUnflushedSeqNums}. + * See {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} for more info. + *

+ * This map uses byte[] as the key, and uses reference equality. It works in our use case as we + * use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns + * the same array. + */ + private Map latestSequenceNums = new HashMap(); + + /** + * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. + */ + public final Comparator LOG_NAME_COMPARATOR = new Comparator() { + @Override + public int compare(Path o1, Path o2) { + long t1 = getFileNumFromFileName(o1); + long t2 = getFileNumFromFileName(o2); + if (t1 == t2) return 0; + return (t1 > t2) ? 1 : -1; + } + }; + + /** + * Map of log file to the latest sequence nums of all regions it has entries of. + * The map is sorted by the log file creation timestamp (contained in the log file name). + */ + private NavigableMap> hlogSequenceNums = + new ConcurrentSkipListMap>(LOG_NAME_COMPARATOR); /** * Constructor. @@ -436,21 +460,6 @@ class FSHLog implements HLog, Syncable { return this.filenum; } - @Override - public void setSequenceNumber(final long newvalue) { - for (long id = this.logSeqNum.get(); id < newvalue && - !this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) { - // This could spin on occasion but better the occasional spin than locking - // every increment of sequence number. - LOG.debug("Changed sequenceid from " + id + " to " + newvalue); - } - } - - @Override - public long getSequenceNumber() { - return logSeqNum.get(); - } - /** * Method used internal to this class and for tests only. * @return The wrapped stream our writer is using; its not the @@ -524,11 +533,17 @@ class FSHLog implements HLog, Syncable { this.writer = nextWriter; this.hdfs_out = nextHdfsOut; this.numEntries.set(0); + if (oldFile != null) { + this.hlogSequenceNums.put(oldFile, this.latestSequenceNums); + this.latestSequenceNums = new HashMap(); + } } if (oldFile == null) LOG.info("New WAL " + FSUtils.getPath(newPath)); - else LOG.info("Rolled WAL " + FSUtils.getPath(oldFile) + " with entries=" + oldNumEntries + + else { + LOG.info("Rolled WAL " + FSUtils.getPath(oldFile) + " with entries=" + oldNumEntries + ", filesize=" + StringUtils.humanReadableInt(this.fs.getFileStatus(oldFile).getLen()) + "; new WAL " + FSUtils.getPath(newPath)); + } // Tell our listeners that a new log was created if (!this.listeners.isEmpty()) { @@ -540,7 +555,7 @@ class FSHLog implements HLog, Syncable { // Can we delete any of the old log files? if (getNumLogFiles() > 0) { cleanOldLogs(); - regionsToFlush = getRegionsToForceFlush(); + regionsToFlush = findRegionsToForceFlush(); } } finally { this.logRollRunning = false; @@ -568,87 +583,126 @@ class FSHLog implements HLog, Syncable { return HLogFactory.createWALWriter(fs, path, conf); } - /* - * Clean up old commit logs. - * @return If lots of logs, flush the returned region so next time through - * we can clean logs. Returns null if nothing to flush. Returns array of - * encoded region names to flush. + /** + * Archive old logs that could be archived: a log is eligible for archiving if all its WALEdits + * are already flushed by the corresponding regions. + *

+ * For each log file, it compares its region to sequenceId map + * (@link {@link FSHLog#latestSequenceNums} with corresponding region entries in + * {@link FSHLog#oldestFlushingSeqNums} and {@link FSHLog#oldestUnflushedSeqNums}. + * If all the regions in the map are flushed past of their value, then the wal is eligible for + * archiving. * @throws IOException */ private void cleanOldLogs() throws IOException { - long oldestOutstandingSeqNum = Long.MAX_VALUE; + Map oldestFlushingSeqNumsLocal = null; + Map oldestUnflushedSeqNumsLocal = null; + List logsToArchive = new ArrayList(); + // make a local copy so as to avoid locking when we iterate over these maps. synchronized (oldestSeqNumsLock) { - Long oldestFlushing = (oldestFlushingSeqNums.size() > 0) - ? Collections.min(oldestFlushingSeqNums.values()) : Long.MAX_VALUE; - Long oldestUnflushed = (oldestUnflushedSeqNums.size() > 0) - ? Collections.min(oldestUnflushedSeqNums.values()) : Long.MAX_VALUE; - oldestOutstandingSeqNum = Math.min(oldestFlushing, oldestUnflushed); + oldestFlushingSeqNumsLocal = new HashMap(this.oldestFlushingSeqNums); + oldestUnflushedSeqNumsLocal = new HashMap(this.oldestUnflushedSeqNums); } - - // Get the set of all log files whose last sequence number is smaller than - // the oldest edit's sequence number. - TreeSet sequenceNumbers = new TreeSet(this.outputfiles.headMap( - oldestOutstandingSeqNum).keySet()); - // Now remove old log files (if any) - if (LOG.isDebugEnabled()) { - if (sequenceNumbers.size() > 0) { - LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove" + - " out of total " + this.outputfiles.size() + ";" + - " oldest outstanding sequenceid is " + oldestOutstandingSeqNum); + for (Map.Entry> e : hlogSequenceNums.entrySet()) { + // iterate over the log file. + Path log = e.getKey(); + Map sequenceNums = e.getValue(); + // iterate over the map for this log file, and tell whether it should be archive or not. + if (areAllRegionsFlushed(sequenceNums, oldestFlushingSeqNumsLocal, + oldestUnflushedSeqNumsLocal)) { + logsToArchive.add(log); + LOG.debug("log file is ready for archiving " + log); } } - for (Long seq : sequenceNumbers) { - archiveLogFile(this.outputfiles.remove(seq), seq); + for (Path p : logsToArchive) { + archiveLogFile(p); + this.hlogSequenceNums.remove(p); } } /** - * Return regions that have edits that are equal or less than a certain sequence number. - * Static due to some old unit test. - * @param walSeqNum The sequence number to compare with. - * @param regionsToSeqNums Encoded region names to sequence ids - * @return All regions whose seqNum <= walSeqNum. Null if no regions found. + * Takes a region:sequenceId map for a WAL file, and checks whether the file can be archived. + * It compares the region entries present in the passed sequenceNums map with the local copy of + * {@link #oldestUnflushedSeqNums} and {@link #oldestFlushingSeqNums}. If, for all regions, + * the value is lesser than the minimum of values present in the oldestFlushing/UnflushedSeqNums, + * then the wal file is eligible for archiving. + * @param sequenceNums for a HLog, at the time when it was rolled. + * @param oldestFlushingMap + * @param oldestUnflushedMap + * @return true if wal is eligible for archiving, false otherwise. */ - static byte[][] findMemstoresWithEditsEqualOrOlderThan( - final long walSeqNum, final Map regionsToSeqNums) { - List regions = null; - for (Map.Entry e : regionsToSeqNums.entrySet()) { - if (e.getValue().longValue() <= walSeqNum) { - if (regions == null) regions = new ArrayList(); - regions.add(e.getKey()); + static boolean areAllRegionsFlushed(Map sequenceNums, + Map oldestFlushingMap, Map oldestUnflushedMap) { + for (Map.Entry regionSeqIdEntry : sequenceNums.entrySet()) { + // find region entries in the flushing/unflushed map. If there is no entry, it means + // a region doesn't have any unflushed entry. + long oldestFlushing = oldestFlushingMap.containsKey(regionSeqIdEntry.getKey()) ? + oldestFlushingMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE; + long oldestUnFlushed = oldestUnflushedMap.containsKey(regionSeqIdEntry.getKey()) ? + oldestUnflushedMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE; + // do a minimum to be sure to contain oldest sequence Id + long minSeqNum = Math.min(oldestFlushing, oldestUnFlushed); + if (minSeqNum <= regionSeqIdEntry.getValue()) return false;// can't archive + } + return true; + } + + /** + * Iterates over the given map of regions, and compares their sequence numbers with corresponding + * entries in {@link #oldestUnflushedSeqNums}. If the sequence number is greater or equal, the + * region is eligible to flush, otherwise, there is no benefit to flush (from the perspective of + * passed regionsSequenceNums map), because the region has already flushed the entries present + * in the WAL file for which this method is called for (typically, the oldest wal file). + * @param regionsSequenceNums + * @return regions which should be flushed (whose sequence numbers are larger than their + * corresponding un-flushed entries. + */ + private byte[][] findEligibleMemstoresToFlush(Map regionsSequenceNums) { + List regionsToFlush = null; + // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock. + synchronized (oldestSeqNumsLock) { + for (Map.Entry e : regionsSequenceNums.entrySet()) { + Long unFlushedVal = this.oldestUnflushedSeqNums.get(e.getKey()); + if (unFlushedVal != null && unFlushedVal <= e.getValue()) { + if (regionsToFlush == null) regionsToFlush = new ArrayList(); + regionsToFlush.add(e.getKey()); + } } } - return regions == null ? null : regions + return regionsToFlush == null ? null : regionsToFlush .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY }); } - private byte[][] getRegionsToForceFlush() throws IOException { - // If too many log files, figure which regions we need to flush. - // Array is an array of encoded region names. + /** + * If the number of un-archived WAL files is greater than maximum allowed, it checks + * the first (oldest) WAL file, and returns the regions which should be flushed so that it could + * be archived. + * @return regions to flush in order to archive oldest wal file. + * @throws IOException + */ + byte[][] findRegionsToForceFlush() throws IOException { byte [][] regions = null; int logCount = getNumLogFiles(); if (logCount > this.maxLogs && logCount > 0) { - // This is an array of encoded region names. - synchronized (oldestSeqNumsLock) { - regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(), - this.oldestUnflushedSeqNums); - } - if (regions != null) { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < regions.length; i++) { - if (i > 0) sb.append(", "); - sb.append(Bytes.toStringBinary(regions[i])); - } - LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" + - this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " + - sb.toString()); + Map.Entry> firstWALEntry = + this.hlogSequenceNums.firstEntry(); + regions = findEligibleMemstoresToFlush(firstWALEntry.getValue()); + } + if (regions != null) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < regions.length; i++) { + if (i > 0) sb.append(", "); + sb.append(Bytes.toStringBinary(regions[i])); } + LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" + + this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " + + sb.toString()); } return regions; } /* - * Cleans up current writer closing and adding to outputfiles. + * Cleans up current writer closing. * Presumes we're operating inside an updateLock scope. * @return Path to current writer or null if none. * @throws IOException @@ -690,18 +744,13 @@ class FSHLog implements HLog, Syncable { } if (currentfilenum >= 0) { oldFile = computeFilename(currentfilenum); - this.outputfiles.put(Long.valueOf(this.logSeqNum.get()), oldFile); } } return oldFile; } - private void archiveLogFile(final Path p, final Long seqno) throws IOException { + private void archiveLogFile(final Path p) throws IOException { Path newPath = getHLogArchivePath(this.oldLogDir, p); - LOG.info("moving old hlog file " + FSUtils.getPath(p) + - " whose highest sequenceid is " + seqno + " to " + - FSUtils.getPath(newPath)); - // Tell our listeners that a log is going to be archived. if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { @@ -745,6 +794,26 @@ class FSHLog implements HLog, Syncable { return new Path(dir, child); } +/** + * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. + * This helper method returns the creation timestamp from a given log file. + * It extracts the timestamp assuming the filename is created with the + * {@link #computeFilename(long filenum)} method. + * @param fileName + * @return timestamp, as in the log file name. + */ + protected long getFileNumFromFileName(Path fileName) { + if (fileName == null) throw new IllegalArgumentException("file name can't be null"); + // The path should start with dir/. + String prefixPathStr = new Path(dir, prefix + ".").toString(); + if (!fileName.toString().startsWith(prefixPathStr)) { + throw new IllegalArgumentException("The log doesn't belong to this regionserver"); + } + String chompedPath = fileName.toString().substring(prefixPathStr.length()); + if (forMeta) chompedPath = chompedPath.substring(0, chompedPath.indexOf(META_HLOG_FILE_EXTN)); + return Long.parseLong(chompedPath); + } + @Override public void closeAndDelete() throws IOException { close(); @@ -835,15 +904,15 @@ class FSHLog implements HLog, Syncable { @Override public void append(HRegionInfo info, TableName tableName, WALEdit edits, - final long now, HTableDescriptor htd) + final long now, HTableDescriptor htd, AtomicLong sequenceId) throws IOException { - append(info, tableName, edits, now, htd, true); + append(info, tableName, edits, now, htd, true, sequenceId); } @Override - public void append(HRegionInfo info, TableName tableName, WALEdit edits, - final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException { - append(info, tableName, edits, new ArrayList(), now, htd, true, isInMemstore); + public void append(HRegionInfo info, TableName tableName, WALEdit edits, final long now, + HTableDescriptor htd, boolean isInMemstore, AtomicLong sequenceId) throws IOException { + append(info, tableName, edits, new ArrayList(), now, htd, true, isInMemstore, sequenceId); } /** @@ -869,12 +938,13 @@ class FSHLog implements HLog, Syncable { * @param clusterIds that have consumed the change (for replication) * @param now * @param doSync shall we sync? + * @param sequenceId of the region. * @return txid of this transaction * @throws IOException */ @SuppressWarnings("deprecation") private long append(HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds, - final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore) + final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore, AtomicLong sequenceId) throws IOException { if (edits.isEmpty()) return this.unflushedEntries.get(); if (this.closed) { @@ -884,7 +954,9 @@ class FSHLog implements HLog, Syncable { try { long txid = 0; synchronized (this.updateLock) { - long seqNum = obtainSeqNum(); + // get the sequence number from the passed Long. In normal flow, it is coming from the + // region. + long seqNum = sequenceId.incrementAndGet(); // The 'lastSeqWritten' map holds the sequence number of the oldest // write for each region (i.e. the first edit added to the particular // memstore). . When the cache is flushed, the entry for the @@ -901,6 +973,7 @@ class FSHLog implements HLog, Syncable { if (htd.isDeferredLogFlush()) { lastDeferredTxid = txid; } + this.latestSequenceNums.put(encodedRegionName, seqNum); } // Sync if catalog region, and if not then check if that table supports // deferred log flushing @@ -918,9 +991,9 @@ class FSHLog implements HLog, Syncable { @Override public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, - List clusterIds, final long now, HTableDescriptor htd) + List clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId) throws IOException { - return append(info, tableName, edits, clusterIds, now, htd, false, true); + return append(info, tableName, edits, clusterIds, now, htd, false, true, sequenceId); } /** @@ -1246,21 +1319,18 @@ class FSHLog implements HLog, Syncable { return numEntries.get(); } - @Override - public long obtainSeqNum() { - return this.logSeqNum.incrementAndGet(); - } - /** @return the number of log files in use */ int getNumLogFiles() { - return outputfiles.size(); + return hlogSequenceNums.size(); } @Override - public Long startCacheFlush(final byte[] encodedRegionName) { + public boolean startCacheFlush(final byte[] encodedRegionName) { Long oldRegionSeqNum = null; if (!closeBarrier.beginOp()) { - return null; + LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) + + " - because the server is closing."); + return false; } synchronized (oldestSeqNumsLock) { oldRegionSeqNum = this.oldestUnflushedSeqNums.remove(encodedRegionName); @@ -1279,7 +1349,7 @@ class FSHLog implements HLog, Syncable { LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: [" + Bytes.toString(encodedRegionName) + "]"); } - return obtainSeqNum(); + return true; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 9fff26b4816..cd1970e4c96 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -24,6 +24,7 @@ import java.io.DataOutput; import java.io.IOException; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import org.apache.commons.logging.Log; @@ -205,19 +206,6 @@ public interface HLog { // TODO: Remove. Implementation detail. long getFilenum(); - /** - * Called to ensure that log sequence numbers are always greater - * - * @param newvalue We'll set log edit/sequence number to this value if it is greater - * than the current value. - */ - void setSequenceNumber(final long newvalue); - - /** - * @return log sequence number - */ - long getSequenceNumber(); - // TODO: Log rolling should not be in this interface. /** * Roll the log writer. That is, start writing log messages to a new file. @@ -270,9 +258,10 @@ public interface HLog { /** * Same as appendNoSync(HRegionInfo, TableName, WALEdit, List, long, HTableDescriptor), * except it causes a sync on the log + * @param sequenceId of the region. */ public void append(HRegionInfo info, TableName tableName, WALEdit edits, - final long now, HTableDescriptor htd) throws IOException; + final long now, HTableDescriptor htd, AtomicLong sequenceId) throws IOException; /** * Append a set of edits to the log. Log edits are keyed by (encoded) @@ -284,9 +273,10 @@ public interface HLog { * @param now * @param htd * @param isInMemstore Whether the record is in memstore. False for system records. + * @param sequenceId of the region. */ - public void append(HRegionInfo info, TableName tableName, WALEdit edits, - final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException; + public void append(HRegionInfo info, TableName tableName, WALEdit edits, final long now, + HTableDescriptor htd, boolean isInMemstore, AtomicLong sequenceId) throws IOException; /** * Append a set of edits to the log. Log edits are keyed by (encoded) regionName, rowname, and @@ -297,11 +287,12 @@ public interface HLog { * @param clusterIds The clusters that have consumed the change (for replication) * @param now * @param htd + * @param sequenceId of the region * @return txid of this transaction * @throws IOException */ public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, - List clusterIds, final long now, HTableDescriptor htd) throws IOException; + List clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId) throws IOException; // TODO: Do we need all these versions of sync? void hsync() throws IOException; @@ -312,28 +303,21 @@ public interface HLog { void sync(long txid) throws IOException; - /** - * Obtain a log sequence number. - */ - // TODO: Name better to differentiate from getSequenceNumber. - long obtainSeqNum(); - /** * WAL keeps track of the sequence numbers that were not yet flushed from memstores * in order to be able to do cleanup. This method tells WAL that some region is about * to flush memstore. * * We stash the oldest seqNum for the region, and let the the next edit inserted in this - * region be recorded in {@link #append(HRegionInfo, TableName, WALEdit, long, HTableDescriptor)} - * as new oldest seqnum. In case of flush being aborted, we put the stashed value back; - * in case of flush succeeding, the seqNum of that first edit after start becomes the - * valid oldest seqNum for this region. + * region be recorded in {@link #append(HRegionInfo, TableName, WALEdit, long, HTableDescriptor, + * AtomicLong)} as new oldest seqnum. + * In case of flush being aborted, we put the stashed value back; in case of flush succeeding, + * the seqNum of that first edit after start becomes the valid oldest seqNum for this region. * - * @return current seqNum, to pass on to flushers (who will put it into the metadata of - * the resulting file as an upper-bound seqNum for that file), or NULL if flush - * should not be started. + * @return true if the flush can proceed, false in case wal is closing (ususally, when server is + * closing) and flush couldn't be started. */ - Long startCacheFlush(final byte[] encodedRegionName); + boolean startCacheFlush(final byte[] encodedRegionName); /** * Complete the cache flush. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java index 2bb9d437060..f629484e57e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; import java.util.NavigableSet; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -256,12 +257,13 @@ public class HLogUtil { * This provides info to the HMaster to allow it to recover the compaction if * this regionserver dies in the middle (This part is not yet implemented). It also prevents * the compaction from finishing if this regionserver has already lost its lease on the log. + * @param sequenceId Used by HLog to get sequence Id for the waledit. */ public static void writeCompactionMarker(HLog log, HTableDescriptor htd, HRegionInfo info, - final CompactionDescriptor c) throws IOException { + final CompactionDescriptor c, AtomicLong sequenceId) throws IOException { WALEdit e = WALEdit.createCompaction(c); log.append(info, TableName.valueOf(c.getTableName().toByteArray()), e, - EnvironmentEdgeManager.currentTimeMillis(), htd, false); + EnvironmentEdgeManager.currentTimeMillis(), htd, false, sequenceId); if (LOG.isTraceEnabled()) { LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java index dc45bda1b47..0264d763f72 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java @@ -49,6 +49,7 @@ import java.security.PrivilegedExceptionAction; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import static org.junit.Assert.*; @@ -138,6 +139,7 @@ public class TestWALObserver { Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE)); deleteDir(basedir); fs.mkdirs(new Path(basedir, hri.getEncodedName())); + final AtomicLong sequenceId = new AtomicLong(0); HLog log = HLogFactory.createHLog(this.fs, hbaseRootDir, TestWALObserver.class.getName(), this.conf); @@ -186,7 +188,7 @@ public class TestWALObserver { // it's where WAL write cp should occur. long now = EnvironmentEdgeManager.currentTimeMillis(); - log.append(hri, hri.getTable(), edit, now, htd); + log.append(hri, hri.getTable(), edit, now, htd, sequenceId); // the edit shall have been change now by the coprocessor. foundFamily0 = false; @@ -222,6 +224,7 @@ public class TestWALObserver { // ultimately called by HRegion::initialize() TableName tableName = TableName.valueOf("testWALCoprocessorReplay"); final HTableDescriptor htd = getBasic3FamilyHTableDescriptor(tableName); + final AtomicLong sequenceId = new AtomicLong(0); // final HRegionInfo hri = // createBasic3FamilyHRegionInfo(Bytes.toString(tableName)); // final HRegionInfo hri1 = @@ -247,9 +250,9 @@ public class TestWALObserver { // addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily, // EnvironmentEdgeManager.getDelegate(), wal); addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily, - EnvironmentEdgeManager.getDelegate(), wal, htd); + EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId); } - wal.append(hri, tableName, edit, now, htd); + wal.append(hri, tableName, edit, now, htd, sequenceId); // sync to fs. wal.sync(); @@ -369,7 +372,7 @@ public class TestWALObserver { private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName, final byte[] family, final int count, - EnvironmentEdge ee, final HLog wal, final HTableDescriptor htd) + EnvironmentEdge ee, final HLog wal, final HTableDescriptor htd, final AtomicLong sequenceId) throws IOException { String familyStr = Bytes.toString(family); for (int j = 0; j < count; j++) { @@ -378,7 +381,7 @@ public class TestWALObserver { WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, qualifierBytes, ee .currentTimeMillis(), columnBytes)); - wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd); + wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd, sequenceId); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java index 1b3fec24b75..ed8551ca742 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -114,11 +115,12 @@ public class TestHLogRecordReader { HLog log = HLogFactory.createHLog(fs, hbaseDir, logName, conf); long ts = System.currentTimeMillis(); WALEdit edit = new WALEdit(); + final AtomicLong sequenceId = new AtomicLong(0); edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value)); - log.append(info, tableName, edit, ts, htd); + log.append(info, tableName, edit, ts, htd, sequenceId); edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value)); - log.append(info, tableName, edit, ts+1, htd); + log.append(info, tableName, edit, ts+1, htd, sequenceId); log.rollWriter(); Thread.sleep(1); @@ -126,10 +128,10 @@ public class TestHLogRecordReader { edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value)); - log.append(info, tableName, edit, ts1+1, htd); + log.append(info, tableName, edit, ts1+1, htd, sequenceId); edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value)); - log.append(info, tableName, edit, ts1+2, htd); + log.append(info, tableName, edit, ts1+2, htd, sequenceId); log.close(); HLogInputFormat input = new HLogInputFormat(); @@ -161,11 +163,12 @@ public class TestHLogRecordReader { public void testHLogRecordReader() throws Exception { HLog log = HLogFactory.createHLog(fs, hbaseDir, logName, conf); byte [] value = Bytes.toBytes("value"); + final AtomicLong sequenceId = new AtomicLong(0); WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), System.currentTimeMillis(), value)); log.append(info, tableName, edit, - System.currentTimeMillis(), htd); + System.currentTimeMillis(), htd, sequenceId); Thread.sleep(1); // make sure 2nd log gets a later timestamp long secondTs = System.currentTimeMillis(); @@ -175,7 +178,7 @@ public class TestHLogRecordReader { edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value)); log.append(info, tableName, edit, - System.currentTimeMillis(), htd); + System.currentTimeMillis(), htd, sequenceId); log.close(); long thirdTs = System.currentTimeMillis(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 55425a4a7c5..eca14ea052f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -252,7 +252,6 @@ public class TestDistributedLogSplitting { @Test(timeout = 300000) public void testLogReplayWithNonMetaRSDown() throws Exception { LOG.info("testLogReplayWithNonMetaRSDown"); - conf.setLong("hbase.regionserver.hlog.blocksize", 100*1024); conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); startCluster(NUM_RS); final int NUM_REGIONS_TO_CREATE = 40; @@ -1144,6 +1143,9 @@ public class TestDistributedLogSplitting { TableName fullTName = TableName.valueOf(tname); // remove root and meta region regions.remove(HRegionInfo.FIRST_META_REGIONINFO); + // using one sequenceId for edits across all regions is ok. + final AtomicLong sequenceId = new AtomicLong(10); + for(Iterator iter = regions.iterator(); iter.hasNext(); ) { HRegionInfo regionInfo = iter.next(); @@ -1183,7 +1185,7 @@ public class TestDistributedLogSplitting { // key byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value)); - log.append(curRegionInfo, fullTName, e, System.currentTimeMillis(), htd); + log.append(curRegionInfo, fullTName, e, System.currentTimeMillis(), htd, sequenceId); counts[i % n] += 1; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index ebd5f15f838..09f2e82daa7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -53,6 +53,7 @@ import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; @@ -462,7 +463,7 @@ public class TestHRegion { .getRegionFileSystem().getStoreDir(Bytes.toString(family))); HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(), - this.region.getRegionInfo(), compactionDescriptor); + this.region.getRegionInfo(), compactionDescriptor, new AtomicLong(1)); Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir); @@ -3799,10 +3800,10 @@ public class TestHRegion { put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1")); put.setDurability(mutationDurability); region.put(put); - - // verify append called or not - verify(log, expectAppend ? times(1) : never()).appendNoSync((HRegionInfo) any(), eq(tableName), - (WALEdit) any(), (List) any(), anyLong(), (HTableDescriptor) any()); + //verify append called or not + verify(log, expectAppend ? times(1) : never()) + .appendNoSync((HRegionInfo)any(), eq(tableName), (WALEdit)any(), (List)any(), + anyLong(), (HTableDescriptor)any(), (AtomicLong)any()); // verify sync called or not if (expectSync || expectSyncFromLogSyncer) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java index c9e3a64f927..70ddbaab447 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java @@ -20,10 +20,12 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -106,9 +108,10 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit); HRegionInfo hri = region.getRegionInfo(); if (this.noSync) { - hlog.appendNoSync(hri, hri.getTable(), walEdit, new ArrayList(), now, htd); + hlog.appendNoSync(hri, hri.getTable(), walEdit, new ArrayList(), now, htd, + region.getSequenceId()); } else { - hlog.append(hri, hri.getTable(), walEdit, now, htd); + hlog.append(hri, hri.getTable(), walEdit, now, htd, region.getSequenceId()); } } long totalTime = (System.currentTimeMillis() - startTime); @@ -251,16 +254,15 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool /** * Verify the content of the WAL file. - * Verify that sequenceids are ascending and that the file has expected number - * of edits. + * Verify that the file has expected number of edits. * @param wal * @return Count of edits. * @throws IOException */ private long verify(final Path wal, final boolean verbose) throws IOException { HLog.Reader reader = HLogFactory.createReader(wal.getFileSystem(getConf()), wal, getConf()); - long previousSeqid = -1; long count = 0; + Map sequenceIds = new HashMap(); try { while (true) { Entry e = reader.next(); @@ -270,12 +272,17 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool } count++; long seqid = e.getKey().getLogSeqNum(); - if (verbose) LOG.info("seqid=" + seqid); - if (previousSeqid >= seqid) { - throw new IllegalStateException("wal=" + wal.getName() + - ", previousSeqid=" + previousSeqid + ", seqid=" + seqid); + if (sequenceIds.containsKey(Bytes.toString(e.getKey().getEncodedRegionName()))) { + // sequenceIds should be increasing for every regions + if (sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName())) >= seqid) { + throw new IllegalStateException("wal = " + wal.getName() + ", " + "previous seqid = " + + sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName())) + + ", current seqid = " + seqid); + } + } else { + sequenceIds.put(Bytes.toString(e.getKey().getEncodedRegionName()), seqid); } - previousSeqid = seqid; + if (verbose) LOG.info("seqid=" + seqid); } } finally { reader.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 0654752c654..1780928145e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -23,9 +23,14 @@ import static org.junit.Assert.*; import java.io.IOException; import java.lang.reflect.Method; import java.net.BindException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import java.util.TreeMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -58,6 +63,9 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** JUnit test case for HLog */ @Category(LargeTests.class) @@ -136,15 +144,13 @@ public class TestHLog { } /** - * Test that with three concurrent threads we still write edits in sequence - * edit id order. + * Write to a log file with three concurrent threads and verifying all data is written. * @throws Exception */ @Test - public void testMaintainOrderWithConcurrentWrites() throws Exception { + public void testConcurrentWrites() throws Exception { // Run the HPE tool with three threads writing 3000 edits each concurrently. - // When done, verify that all edits were written and that the order in the - // WALs is of ascending edit sequence ids. + // When done, verify that all edits were written. int errCode = HLogPerformanceEvaluation. innerMain(new Configuration(TEST_UTIL.getConfiguration()), new String [] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"}); @@ -179,6 +185,7 @@ public class TestHLog { htd.addFamily(new HColumnDescriptor("column")); // Add edits for three regions. + final AtomicLong sequenceId = new AtomicLong(1); try { for (int ii = 0; ii < howmany; ii++) { for (int i = 0; i < howmany; i++) { @@ -192,7 +199,7 @@ public class TestHLog { System.currentTimeMillis(), column)); LOG.info("Region " + i + ": " + edit); log.append(infos[i], tableName, edit, - System.currentTimeMillis(), htd); + System.currentTimeMillis(), htd, sequenceId); } } log.rollWriter(); @@ -242,7 +249,7 @@ public class TestHLog { in.close(); HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir", conf); - + final AtomicLong sequenceId = new AtomicLong(1); final int total = 20; HLog.Reader reader = null; @@ -255,7 +262,7 @@ public class TestHLog { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); - wal.append(info, tableName, kvs, System.currentTimeMillis(), htd); + wal.append(info, tableName, kvs, System.currentTimeMillis(), htd, sequenceId); } // Now call sync and try reading. Opening a Reader before you sync just // gives you EOFE. @@ -273,7 +280,7 @@ public class TestHLog { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); - wal.append(info, tableName, kvs, System.currentTimeMillis(), htd); + wal.append(info, tableName, kvs, System.currentTimeMillis(), htd, sequenceId); } reader = HLogFactory.createReader(fs, walPath, conf); count = 0; @@ -292,7 +299,7 @@ public class TestHLog { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value)); - wal.append(info, tableName, kvs, System.currentTimeMillis(), htd); + wal.append(info, tableName, kvs, System.currentTimeMillis(), htd, sequenceId); } // Now I should have written out lots of blocks. Sync then read. wal.sync(); @@ -314,34 +321,6 @@ public class TestHLog { } } - /** - * Test the findMemstoresWithEditsEqualOrOlderThan method. - * @throws IOException - */ - @Test - public void testFindMemstoresWithEditsEqualOrOlderThan() throws IOException { - Map regionsToSeqids = new TreeMap(Bytes.BYTES_COMPARATOR); - for (int i = 0; i < 10; i++) { - Long l = Long.valueOf(i); - regionsToSeqids.put(l.toString().getBytes(), l); - } - byte [][] regions = - FSHLog.findMemstoresWithEditsEqualOrOlderThan(1, regionsToSeqids); - assertEquals(2, regions.length); - assertTrue(Bytes.equals(regions[0], "0".getBytes()) || - Bytes.equals(regions[0], "1".getBytes())); - regions = FSHLog.findMemstoresWithEditsEqualOrOlderThan(3, regionsToSeqids); - int count = 4; - assertEquals(count, regions.length); - // Regions returned are not ordered. - for (int i = 0; i < count; i++) { - assertTrue(Bytes.equals(regions[i], "0".getBytes()) || - Bytes.equals(regions[i], "1".getBytes()) || - Bytes.equals(regions[i], "2".getBytes()) || - Bytes.equals(regions[i], "3".getBytes())); - } - } - private void verifySplits(List splits, final int howmany) throws IOException { assertEquals(howmany * howmany, splits.size()); @@ -391,6 +370,7 @@ public class TestHLog { HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir", "hlogdir_archive", conf); + final AtomicLong sequenceId = new AtomicLong(1); final int total = 20; HTableDescriptor htd = new HTableDescriptor(); @@ -399,7 +379,7 @@ public class TestHLog { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); - wal.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd); + wal.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd, sequenceId); } // Now call sync to send the data to HDFS datanodes wal.sync(); @@ -513,6 +493,7 @@ public class TestHLog { HLog log = null; try { log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf); + final AtomicLong sequenceId = new AtomicLong(1); // Write columns named 1, 2, 3, etc. and then values of single byte // 1, 2, 3... @@ -528,7 +509,7 @@ public class TestHLog { HTableDescriptor htd = new HTableDescriptor(); htd.addFamily(new HColumnDescriptor("column")); - log.append(info, tableName, cols, System.currentTimeMillis(), htd); + log.append(info, tableName, cols, System.currentTimeMillis(), htd, sequenceId); log.startCacheFlush(info.getEncodedNameAsBytes()); log.completeCacheFlush(info.getEncodedNameAsBytes()); log.close(); @@ -571,6 +552,7 @@ public class TestHLog { final byte [] row = Bytes.toBytes("row"); Reader reader = null; HLog log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf); + final AtomicLong sequenceId = new AtomicLong(1); try { // Write columns named 1, 2, 3, etc. and then values of single byte // 1, 2, 3... @@ -585,7 +567,7 @@ public class TestHLog { HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); HTableDescriptor htd = new HTableDescriptor(); htd.addFamily(new HColumnDescriptor("column")); - log.append(hri, tableName, cols, System.currentTimeMillis(), htd); + log.append(hri, tableName, cols, System.currentTimeMillis(), htd, sequenceId); log.startCacheFlush(hri.getEncodedNameAsBytes()); log.completeCacheFlush(hri.getEncodedNameAsBytes()); log.close(); @@ -626,6 +608,7 @@ public class TestHLog { TableName.valueOf("tablename"); final byte [] row = Bytes.toBytes("row"); HLog log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf); + final AtomicLong sequenceId = new AtomicLong(1); try { DumbWALActionsListener visitor = new DumbWALActionsListener(); log.registerWALActionsListener(visitor); @@ -640,7 +623,7 @@ public class TestHLog { cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)), timestamp, new byte[]{(byte) (i + '0')})); - log.append(hri, tableName, cols, System.currentTimeMillis(), htd); + log.append(hri, tableName, cols, System.currentTimeMillis(), htd, sequenceId); } assertEquals(COL_COUNT, visitor.increments); log.unregisterWALActionsListener(visitor); @@ -648,7 +631,7 @@ public class TestHLog { cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(11)), timestamp, new byte[]{(byte) (11 + '0')})); - log.append(hri, tableName, cols, System.currentTimeMillis(), htd); + log.append(hri, tableName, cols, System.currentTimeMillis(), htd, sequenceId); assertEquals(COL_COUNT, visitor.increments); } finally { if (log != null) log.closeAndDelete(); @@ -665,6 +648,7 @@ public class TestHLog { HLog log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf); + final AtomicLong sequenceId = new AtomicLong(1); try { HRegionInfo hri = new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); @@ -673,26 +657,26 @@ public class TestHLog { // Add a single edit and make sure that rolling won't remove the file // Before HBASE-3198 it used to delete it - addEdits(log, hri, tableName, 1); + addEdits(log, hri, tableName, 1, sequenceId); log.rollWriter(); assertEquals(1, ((FSHLog) log).getNumLogFiles()); // See if there's anything wrong with more than 1 edit - addEdits(log, hri, tableName, 2); + addEdits(log, hri, tableName, 2, sequenceId); log.rollWriter(); assertEquals(2, ((FSHLog) log).getNumLogFiles()); // Now mix edits from 2 regions, still no flushing - addEdits(log, hri, tableName, 1); - addEdits(log, hri2, tableName2, 1); - addEdits(log, hri, tableName, 1); - addEdits(log, hri2, tableName2, 1); + addEdits(log, hri, tableName, 1, sequenceId); + addEdits(log, hri2, tableName2, 1, sequenceId); + addEdits(log, hri, tableName, 1, sequenceId); + addEdits(log, hri2, tableName2, 1, sequenceId); log.rollWriter(); assertEquals(3, ((FSHLog) log).getNumLogFiles()); // Flush the first region, we expect to see the first two files getting // archived. We need to append something or writer won't be rolled. - addEdits(log, hri2, tableName2, 1); + addEdits(log, hri2, tableName2, 1, sequenceId); log.startCacheFlush(hri.getEncodedNameAsBytes()); log.completeCacheFlush(hri.getEncodedNameAsBytes()); log.rollWriter(); @@ -701,7 +685,7 @@ public class TestHLog { // Flush the second region, which removes all the remaining output files // since the oldest was completely flushed and the two others only contain // flush information - addEdits(log, hri2, tableName2, 1); + addEdits(log, hri2, tableName2, 1, sequenceId); log.startCacheFlush(hri2.getEncodedNameAsBytes()); log.completeCacheFlush(hri2.getEncodedNameAsBytes()); log.rollWriter(); @@ -778,7 +762,7 @@ public class TestHLog { } private void addEdits(HLog log, HRegionInfo hri, TableName tableName, - int times) throws IOException { + int times, AtomicLong sequenceId) throws IOException { HTableDescriptor htd = new HTableDescriptor(); htd.addFamily(new HColumnDescriptor("row")); @@ -787,7 +771,7 @@ public class TestHLog { long timestamp = System.currentTimeMillis(); WALEdit cols = new WALEdit(); cols.add(new KeyValue(row, row, row, timestamp, row)); - log.append(hri, tableName, cols, timestamp, htd); + log.append(hri, tableName, cols, timestamp, htd, sequenceId); } } @@ -949,6 +933,267 @@ public class TestHLog { } } + /** + * tests the log comparator. Ensure that we are not mixing meta logs with non-meta logs (throws + * exception if we do). Comparison is based on the timestamp present in the wal name. + * @throws Exception + */ + @Test + public void testHLogComparator() throws Exception { + HLog hlog1 = null; + HLog hlogMeta = null; + try { + hlog1 = HLogFactory.createHLog(fs, FSUtils.getRootDir(conf), dir.toString(), conf); + LOG.debug("Log obtained is: " + hlog1); + Comparator comp = ((FSHLog) hlog1).LOG_NAME_COMPARATOR; + Path p1 = ((FSHLog) hlog1).computeFilename(11); + Path p2 = ((FSHLog) hlog1).computeFilename(12); + // comparing with itself returns 0 + assertTrue(comp.compare(p1, p1) == 0); + // comparing with different filenum. + assertTrue(comp.compare(p1, p2) < 0); + hlogMeta = HLogFactory.createMetaHLog(fs, FSUtils.getRootDir(conf), dir.toString(), conf, + null, null); + Comparator compMeta = ((FSHLog) hlogMeta).LOG_NAME_COMPARATOR; + + Path p1WithMeta = ((FSHLog) hlogMeta).computeFilename(11); + Path p2WithMeta = ((FSHLog) hlogMeta).computeFilename(12); + assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0); + assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0); + // mixing meta and non-meta logs gives error + boolean ex = false; + try { + comp.compare(p1WithMeta, p2); + } catch (Exception e) { + ex = true; + } + assertTrue("Comparator doesn't complain while checking meta log files", ex); + boolean exMeta = false; + try { + compMeta.compare(p1WithMeta, p2); + } catch (Exception e) { + exMeta = true; + } + assertTrue("Meta comparator doesn't complain while checking log files", exMeta); + } finally { + if (hlog1 != null) hlog1.close(); + if (hlogMeta != null) hlogMeta.close(); + } + } + + /** + * Tests wal archiving by adding data, doing flushing/rolling and checking we archive old logs + * and also don't archive "live logs" (that is, a log with un-flushed entries). + *

+ * This is what it does: + * It creates two regions, and does a series of inserts along with log rolling. + * Whenever a WAL is rolled, FSHLog checks previous wals for archiving. A wal is eligible for + * archiving if for all the regions which have entries in that wal file, have flushed - past + * their maximum sequence id in that wal file. + *

+ * @throws IOException + */ + @Test + public void testWALArchiving() throws IOException { + LOG.debug("testWALArchiving"); + TableName table1 = TableName.valueOf("t1"); + TableName table2 = TableName.valueOf("t2"); + HLog hlog = HLogFactory.createHLog(fs, FSUtils.getRootDir(conf), dir.toString(), conf); + try { + assertEquals(0, ((FSHLog) hlog).getNumLogFiles()); + HRegionInfo hri1 = new HRegionInfo(table1, HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW); + HRegionInfo hri2 = new HRegionInfo(table2, HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW); + // ensure that we don't split the regions. + hri1.setSplit(false); + hri2.setSplit(false); + // variables to mock region sequenceIds. + final AtomicLong sequenceId1 = new AtomicLong(1); + final AtomicLong sequenceId2 = new AtomicLong(1); + // start with the testing logic: insert a waledit, and roll writer + addEdits(hlog, hri1, table1, 1, sequenceId1); + hlog.rollWriter(); + // assert that the wal is rolled + assertEquals(1, ((FSHLog) hlog).getNumLogFiles()); + // add edits in the second wal file, and roll writer. + addEdits(hlog, hri1, table1, 1, sequenceId1); + hlog.rollWriter(); + // assert that the wal is rolled + assertEquals(2, ((FSHLog) hlog).getNumLogFiles()); + // add a waledit to table1, and flush the region. + addEdits(hlog, hri1, table1, 3, sequenceId1); + flushRegion(hlog, hri1.getEncodedNameAsBytes()); + // roll log; all old logs should be archived. + hlog.rollWriter(); + assertEquals(0, ((FSHLog) hlog).getNumLogFiles()); + // add an edit to table2, and roll writer + addEdits(hlog, hri2, table2, 1, sequenceId2); + hlog.rollWriter(); + assertEquals(1, ((FSHLog) hlog).getNumLogFiles()); + // add edits for table1, and roll writer + addEdits(hlog, hri1, table1, 2, sequenceId1); + hlog.rollWriter(); + assertEquals(2, ((FSHLog) hlog).getNumLogFiles()); + // add edits for table2, and flush hri1. + addEdits(hlog, hri2, table2, 2, sequenceId2); + flushRegion(hlog, hri1.getEncodedNameAsBytes()); + // the log : region-sequenceId map is + // log1: region2 (unflushed) + // log2: region1 (flushed) + // log3: region2 (unflushed) + // roll the writer; log2 should be archived. + hlog.rollWriter(); + assertEquals(2, ((FSHLog) hlog).getNumLogFiles()); + // flush region2, and all logs should be archived. + addEdits(hlog, hri2, table2, 2, sequenceId2); + flushRegion(hlog, hri2.getEncodedNameAsBytes()); + hlog.rollWriter(); + assertEquals(0, ((FSHLog) hlog).getNumLogFiles()); + } finally { + if (hlog != null) hlog.close(); + } + } + + /** + * On rolling a wal after reaching the threshold, {@link HLog#rollWriter()} returns the list of + * regions which should be flushed in order to archive the oldest wal file. + *

+ * This method tests this behavior by inserting edits and rolling the wal enough times to reach + * the max number of logs threshold. It checks whether we get the "right regions" for flush on + * rolling the wal. + * @throws Exception + */ + @Test + public void testFindMemStoresEligibleForFlush() throws Exception { + LOG.debug("testFindMemStoresEligibleForFlush"); + Configuration conf1 = HBaseConfiguration.create(conf); + conf1.setInt("hbase.regionserver.maxlogs", 1); + HLog hlog = HLogFactory.createHLog(fs, FSUtils.getRootDir(conf1), dir.toString(), conf1); + TableName t1 = TableName.valueOf("t1"); + TableName t2 = TableName.valueOf("t2"); + HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + HRegionInfo hri2 = new HRegionInfo(t2, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + // variables to mock region sequenceIds + final AtomicLong sequenceId1 = new AtomicLong(1); + final AtomicLong sequenceId2 = new AtomicLong(1); + // add edits and roll the wal + try { + addEdits(hlog, hri1, t1, 2, sequenceId1); + hlog.rollWriter(); + // add some more edits and roll the wal. This would reach the log number threshold + addEdits(hlog, hri1, t1, 2, sequenceId1); + hlog.rollWriter(); + // with above rollWriter call, the max logs limit is reached. + assertTrue(((FSHLog) hlog).getNumLogFiles() == 2); + + // get the regions to flush; since there is only one region in the oldest wal, it should + // return only one region. + byte[][] regionsToFlush = ((FSHLog) hlog).findRegionsToForceFlush(); + assertEquals(regionsToFlush.length, 1); + assertEquals(regionsToFlush[0], hri1.getEncodedNameAsBytes()); + // insert edits in second region + addEdits(hlog, hri2, t2, 2, sequenceId2); + // get the regions to flush, it should still read region1. + regionsToFlush = ((FSHLog) hlog).findRegionsToForceFlush(); + assertEquals(regionsToFlush.length, 1); + assertEquals(regionsToFlush[0], hri1.getEncodedNameAsBytes()); + // flush region 1, and roll the wal file. Only last wal which has entries for region1 should + // remain. + flushRegion(hlog, hri1.getEncodedNameAsBytes()); + hlog.rollWriter(); + // only one wal should remain now (that is for the second region). + assertEquals(1, ((FSHLog) hlog).getNumLogFiles()); + // flush the second region + flushRegion(hlog, hri2.getEncodedNameAsBytes()); + hlog.rollWriter(true); + // no wal should remain now. + assertEquals(0, ((FSHLog) hlog).getNumLogFiles()); + // add edits both to region 1 and region 2, and roll. + addEdits(hlog, hri1, t1, 2, sequenceId1); + addEdits(hlog, hri2, t2, 2, sequenceId2); + hlog.rollWriter(); + // add edits and roll the writer, to reach the max logs limit. + assertEquals(1, ((FSHLog) hlog).getNumLogFiles()); + addEdits(hlog, hri1, t1, 2, sequenceId1); + hlog.rollWriter(); + // it should return two regions to flush, as the oldest wal file has entries + // for both regions. + regionsToFlush = ((FSHLog) hlog).findRegionsToForceFlush(); + assertEquals(regionsToFlush.length, 2); + // flush both regions + flushRegion(hlog, hri1.getEncodedNameAsBytes()); + flushRegion(hlog, hri2.getEncodedNameAsBytes()); + hlog.rollWriter(true); + assertEquals(0, ((FSHLog) hlog).getNumLogFiles()); + // Add an edit to region1, and roll the wal. + addEdits(hlog, hri1, t1, 2, sequenceId1); + // tests partial flush: roll on a partial flush, and ensure that wal is not archived. + hlog.startCacheFlush(hri1.getEncodedNameAsBytes()); + hlog.rollWriter(); + hlog.completeCacheFlush(hri1.getEncodedNameAsBytes()); + assertEquals(1, ((FSHLog) hlog).getNumLogFiles()); + } finally { + if (hlog != null) hlog.close(); + } + } + + /** + * Simulates HLog append ops for a region and tests + * {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} API. + * It compares the region sequenceIds with oldestFlushing and oldestUnFlushed entries. + * If a region's entries are larger than min of (oldestFlushing, oldestUnFlushed), then the + * region should be flushed before archiving this WAL. + */ + @Test + public void testAllRegionsFlushed() { + Map oldestFlushingSeqNo = new HashMap(); + Map oldestUnFlushedSeqNo = new HashMap(); + Map seqNo = new HashMap(); + // create a table + TableName t1 = TableName.valueOf("t1"); + // create a region + HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + // variables to mock region sequenceIds + final AtomicLong sequenceId1 = new AtomicLong(1); + // test empty map + assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo)); + // add entries in the region + seqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.incrementAndGet()); + oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get()); + // should say region1 is not flushed. + assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo)); + // test with entries in oldestFlushing map. + oldestUnFlushedSeqNo.clear(); + oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get()); + assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo)); + // simulate region flush, i.e., clear oldestFlushing and oldestUnflushed maps + oldestFlushingSeqNo.clear(); + oldestUnFlushedSeqNo.clear(); + assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo)); + // insert some large values for region1 + oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), 1000l); + seqNo.put(hri1.getEncodedNameAsBytes(), 1500l); + assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo)); + + // tests when oldestUnFlushed/oldestFlushing contains larger value. + // It means region is flushed. + oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), 1200l); + oldestUnFlushedSeqNo.clear(); + seqNo.put(hri1.getEncodedNameAsBytes(), 1199l); + assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo)); + } + + /** + * helper method to simulate region flush for a WAL. + * @param hlog + * @param regionEncodedName + */ + private void flushRegion(HLog hlog, byte[] regionEncodedName) { + hlog.startCacheFlush(regionEncodedName); + hlog.completeCacheFlush(regionEncodedName); + } + static class DumbWALActionsListener implements WALActionsListener { int increments = 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java index 05ca1fd07de..d13e594544f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java @@ -1089,6 +1089,7 @@ public class TestHLogSplit { HRegionInfo regioninfo = new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); log = HLogFactory.createHLog(fs, HBASEDIR, logName, conf); + final AtomicLong sequenceId = new AtomicLong(1); final int total = 20; for (int i = 0; i < total; i++) { @@ -1096,7 +1097,7 @@ public class TestHLogSplit { kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor("column")); - log.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd); + log.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd, sequenceId); } // Send the data to HDFS datanodes and close the HDFS writer log.sync(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java index 65baff56909..6831869a975 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.assertFalse; import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -110,6 +111,7 @@ public class TestLogRollingNoCluster { @Override public void run() { this.log.info(getName() +" started"); + final AtomicLong sequenceId = new AtomicLong(1); try { for (int i = 0; i < this.count; i++) { long now = System.currentTimeMillis(); @@ -123,7 +125,7 @@ public class TestLogRollingNoCluster { this.wal.append(HRegionInfo.FIRST_META_REGIONINFO, HTableDescriptor.META_TABLEDESC.getTableName(), - edit, now, HTableDescriptor.META_TABLEDESC); + edit, now, HTableDescriptor.META_TABLEDESC, sequenceId); } String msg = getName() + " finished"; if (isException()) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java index 14d09df8992..0a31d9fadc6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -89,6 +90,7 @@ public class TestWALActionsListener { DummyWALActionsListener laterobserver = new DummyWALActionsListener(); HLog hlog = HLogFactory.createHLog(fs, TEST_UTIL.getDataTestDir(), logName, conf, list, null); + final AtomicLong sequenceId = new AtomicLong(1); HRegionInfo hri = new HRegionInfo(TableName.valueOf(SOME_BYTES), SOME_BYTES, SOME_BYTES, false); @@ -100,7 +102,7 @@ public class TestWALActionsListener { HTableDescriptor htd = new HTableDescriptor(); htd.addFamily(new HColumnDescriptor(b)); - hlog.append(hri, TableName.valueOf(b), edit, 0, htd); + hlog.append(hri, TableName.valueOf(b), edit, 0, htd, sequenceId); if (i == 10) { hlog.registerWALActionsListener(laterobserver); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index b42b2f808c4..5762ef48621 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -270,31 +270,33 @@ public class TestWALReplay { HLog wal1 = createWAL(this.conf); // Add 1k to each family. final int countPerFamily = 1000; + final AtomicLong sequenceId = new AtomicLong(1); for (HColumnDescriptor hcd: htd.getFamilies()) { addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, - wal1, htd); + wal1, htd, sequenceId); } wal1.close(); runWALSplit(this.conf); HLog wal2 = createWAL(this.conf); - // Up the sequenceid so that these edits are after the ones added above. - wal2.setSequenceNumber(wal1.getSequenceNumber()); // Add 1k to each family. for (HColumnDescriptor hcd: htd.getFamilies()) { addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, - ee, wal2, htd); + ee, wal2, htd, sequenceId); } wal2.close(); runWALSplit(this.conf); HLog wal3 = createWAL(this.conf); - wal3.setSequenceNumber(wal2.getSequenceNumber()); try { - long wal3SeqId = wal3.getSequenceNumber(); HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal3); long seqid = region.getOpenSeqNum(); - assertTrue(seqid > wal3SeqId); + // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1. + // When opened, this region would apply 6k edits, and increment the sequenceId by 1 + assertTrue(seqid > sequenceId.get()); + assertEquals(seqid - 1, sequenceId.get()); + LOG.debug("region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: " + + sequenceId.get()); // TODO: Scan all. region.close(); @@ -395,8 +397,6 @@ public class TestWALReplay { HLog wal = createWAL(this.conf); HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); long seqid = region.getOpenSeqNum(); - // HRegionServer usually does this. It knows the largest seqid across all regions. - wal.setSequenceNumber(seqid); boolean first = true; for (HColumnDescriptor hcd: htd.getFamilies()) { addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); @@ -420,8 +420,6 @@ public class TestWALReplay { HLog wal2 = createWAL(this.conf); HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2); long seqid2 = region2.getOpenSeqNum(); - // HRegionServer usually does this. It knows the largest seqid across all regions. - wal2.setSequenceNumber(seqid2); assertTrue(seqid + result.size() < seqid2); final Result result1b = region2.get(g); assertEquals(result.size(), result1b.size()); @@ -458,8 +456,6 @@ public class TestWALReplay { } }; long seqid3 = region3.initialize(); - // HRegionServer usually does this. It knows the largest seqid across all regions. - wal3.setSequenceNumber(seqid3); Result result3 = region3.get(g); // Assert that count of cells is same as before crash. assertEquals(result2.size(), result3.size()); @@ -513,8 +509,6 @@ public class TestWALReplay { HLog wal = createWAL(this.conf); HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); long seqid = region.getOpenSeqNum(); - // HRegionServer usually does this. It knows the largest seqid across all regions. - wal.setSequenceNumber(seqid); for (HColumnDescriptor hcd: htd.getFamilies()) { addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); } @@ -548,8 +542,6 @@ public class TestWALReplay { HLog wal2 = createWAL(this.conf); HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2); long seqid2 = region2.getOpenSeqNum(); - // HRegionServer usually does this. It knows the largest seqid across all regions. - wal2.setSequenceNumber(seqid2); assertTrue(seqid + result.size() < seqid2); final Result result1b = region2.get(g); @@ -605,12 +597,8 @@ public class TestWALReplay { Configuration customConf = new Configuration(this.conf); customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, CustomStoreFlusher.class.getName()); - HRegion region = new HRegion(basedir, wal, this.fs, customConf, hri, htd, rsServices); - long seqid = region.initialize(); - // HRegionServer usually does this. It knows the largest seqid across all - // regions. - wal.setSequenceNumber(seqid); - + HRegion region = + HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null); int writtenRowCount = 10; List families = new ArrayList( htd.getFamilies()); @@ -661,13 +649,8 @@ public class TestWALReplay { runWALSplit(this.conf); HLog wal2 = createWAL(this.conf); Mockito.doReturn(false).when(rsServices).isAborted(); - HRegion region2 = new HRegion(basedir, wal2, this.fs, this.conf, hri, htd, - rsServices); - long seqid2 = region2.initialize(); - // HRegionServer usually does this. It knows the largest seqid across all - // regions. - wal2.setSequenceNumber(seqid2); - + HRegion region2 = + HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null); scanner = region2.getScanner(new Scan()); assertEquals(writtenRowCount, getScannedCount(scanner)); } @@ -706,12 +689,13 @@ public class TestWALReplay { final HLog wal = createWAL(this.conf); final byte[] rowName = tableName.getName(); final byte[] regionName = hri.getEncodedNameAsBytes(); + final AtomicLong sequenceId = new AtomicLong(1); // Add 1k to each family. final int countPerFamily = 1000; for (HColumnDescriptor hcd: htd.getFamilies()) { addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, - ee, wal, htd); + ee, wal, htd, sequenceId); } // Add a cache flush, shouldn't have any effect @@ -723,14 +707,14 @@ public class TestWALReplay { long now = ee.currentTimeMillis(); edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, now, rowName)); - wal.append(hri, tableName, edit, now, htd); + wal.append(hri, tableName, edit, now, htd, sequenceId); // Delete the c family to verify deletes make it over. edit = new WALEdit(); now = ee.currentTimeMillis(); edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily)); - wal.append(hri, tableName, edit, now, htd); + wal.append(hri, tableName, edit, now, htd, sequenceId); // Sync. wal.sync(); @@ -767,7 +751,7 @@ public class TestWALReplay { long seqid = region.initialize(); // We flushed during init. assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0); - assertTrue(seqid > wal.getSequenceNumber()); + assertTrue(seqid - 1 == sequenceId.get()); Get get = new Get(rowName); Result result = region.get(get); @@ -800,15 +784,9 @@ public class TestWALReplay { MockHLog wal = createMockWAL(this.conf); HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); - long seqid = region.getOpenSeqNum(); - // HRegionServer usually does this. It knows the largest seqid across all - // regions. - wal.setSequenceNumber(seqid); for (HColumnDescriptor hcd : htd.getFamilies()) { addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); } - // get the seq no after first set of entries. - long sequenceNumber = wal.getSequenceNumber(); // Let us flush the region // But this time completeflushcache is not yet done @@ -816,7 +794,7 @@ public class TestWALReplay { for (HColumnDescriptor hcd : htd.getFamilies()) { addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x"); } - long lastestSeqNumber = wal.getSequenceNumber(); + long lastestSeqNumber = region.getSequenceId().get(); // get the current seq no wal.doCompleteCacheFlush = true; // allow complete cache flush with the previous seq number got after first @@ -891,9 +869,9 @@ public class TestWALReplay { } } - private void addWALEdits (final TableName tableName, final HRegionInfo hri, - final byte [] rowName, final byte [] family, - final int count, EnvironmentEdge ee, final HLog wal, final HTableDescriptor htd) + private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName, + final byte[] family, final int count, EnvironmentEdge ee, final HLog wal, + final HTableDescriptor htd, final AtomicLong sequenceId) throws IOException { String familyStr = Bytes.toString(family); for (int j = 0; j < count; j++) { @@ -902,7 +880,7 @@ public class TestWALReplay { WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTimeMillis(), columnBytes)); - wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd); + wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd, sequenceId); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java index 5cf614c80bb..98a87dbc269 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java @@ -52,6 +52,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; @Category(LargeTests.class) @RunWith(Parameterized.class) @@ -74,6 +75,7 @@ public class TestReplicationHLogReaderManager { private PathWatcher pathWatcher; private int nbRows; private int walEditKVs; + private final AtomicLong sequenceId = new AtomicLong(1); @Parameters public static Collection parameters() { @@ -206,7 +208,7 @@ public class TestReplicationHLogReaderManager { } private void appendToLogPlus(int count) throws IOException { - log.append(info, tableName, getWALEdits(count), System.currentTimeMillis(), htd); + log.append(info, tableName, getWALEdits(count), System.currentTimeMillis(), htd, sequenceId); } private WALEdit getWALEdits(int count) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 223f943421d..602539e9909 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.SortedMap; import java.util.SortedSet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -188,7 +189,7 @@ public class TestReplicationSourceManager { listeners.add(replication); HLog hlog = HLogFactory.createHLog(fs, utility.getDataTestDir(), logName, conf, listeners, URLEncoder.encode("regionserver:60020", "UTF8")); - + final AtomicLong sequenceId = new AtomicLong(1); manager.init(); HTableDescriptor htd = new HTableDescriptor(); htd.addFamily(new HColumnDescriptor(f1)); @@ -200,7 +201,7 @@ public class TestReplicationSourceManager { LOG.info(i); HLogKey key = new HLogKey(hri.getRegionName(), test, seq++, System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); - hlog.append(hri, test, edit, System.currentTimeMillis(), htd); + hlog.append(hri, test, edit, System.currentTimeMillis(), htd, sequenceId); } // Simulate a rapid insert that's followed @@ -211,7 +212,7 @@ public class TestReplicationSourceManager { LOG.info(baseline + " and " + time); for (int i = 0; i < 3; i++) { - hlog.append(hri, test, edit, System.currentTimeMillis(), htd); + hlog.append(hri, test, edit, System.currentTimeMillis(), htd, sequenceId); } assertEquals(6, manager.getHLogs().get(slaveId).size()); @@ -221,7 +222,7 @@ public class TestReplicationSourceManager { manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), "1", 0, false, false); - hlog.append(hri, test, edit, System.currentTimeMillis(), htd); + hlog.append(hri, test, edit, System.currentTimeMillis(), htd, sequenceId); assertEquals(1, manager.getHLogs().size());