diff --git a/CHANGES.txt b/CHANGES.txt index a56820ae0ee..12c4681a633 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -173,6 +173,8 @@ Release 0.91.0 - Unreleased Content-Encoding: gzip in parallel HBASE-4116 [stargate] StringIndexOutOfBoundsException in row spec parse (Allan Yan) + HBASE-3845 data loss because lastSeqWritten can miss memstore edits + (Prakash Khemani and ramkrishna.s.vasudevan) IMPROVEMENTS HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack) diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 02c98f51eea..2e745a5feb8 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1094,7 +1094,8 @@ public class HRegion implements HeapSize { // , Writable{ final long currentMemStoreSize = this.memstoreSize.get(); List storeFlushers = new ArrayList(stores.size()); try { - sequenceId = (wal == null)? myseqid: wal.startCacheFlush(); + sequenceId = (wal == null)? myseqid : + wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes()); completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId); for (Store s : stores.values()) { @@ -1144,7 +1145,9 @@ public class HRegion implements HeapSize { // , Writable{ // We used to only catch IOEs but its possible that we'd get other // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch // all and sundry. - if (wal != null) wal.abortCacheFlush(); + if (wal != null) { + wal.abortCacheFlush(this.regionInfo.getEncodedNameAsBytes()); + } DroppedSnapshotException dse = new DroppedSnapshotException("region: " + Bytes.toStringBinary(getRegionName())); dse.initCause(t); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index ce7c46611a2..d6d9c84f9d5 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -1116,6 +1116,18 @@ public class HLog implements Syncable { return outputfiles.size(); } + private byte[] getSnapshotName(byte[] encodedRegionName) { + byte snp[] = new byte[encodedRegionName.length + 3]; + // an encoded region name has only hex digits. s, n or p are not hex + // and therefore snapshot-names will never collide with + // encoded-region-names + snp[0] = 's'; snp[1] = 'n'; snp[2] = 'p'; + for (int i = 0; i < encodedRegionName.length; i++) { + snp[i+3] = encodedRegionName[i]; + } + return snp; + } + /** * By acquiring a log sequence ID, we can allow log messages to continue while * we flush the cache. @@ -1124,16 +1136,53 @@ public class HLog implements Syncable { * completion of a cache-flush. Otherwise the log-seq-id for the flush will * not appear in the correct logfile. * - * @return sequence ID to pass {@link #completeCacheFlush(byte[], byte[], long, boolean)} - * (byte[], byte[], long)} + * Ensuring that flushes and log-rolls don't happen concurrently also allows + * us to temporarily put a log-seq-number in lastSeqWritten against the region + * being flushed that might not be the earliest in-memory log-seq-number for + * that region. By the time the flush is completed or aborted and before the + * cacheFlushLock is released it is ensured that lastSeqWritten again has the + * oldest in-memory edit's lsn for the region that was being flushed. + * + * In this method, by removing the entry in lastSeqWritten for the region + * being flushed we ensure that the next edit inserted in this region will be + * correctly recorded in {@link #append(HRegionInfo, HLogKey, WALEdit)}. The + * lsn of the earliest in-memory lsn - which is now in the memstore snapshot - + * is saved temporarily in the lastSeqWritten map while the flush is active. + * + * @return sequence ID to pass + * {@link #completeCacheFlush(byte[], byte[], long, boolean)} (byte[], + * byte[], long)} * @see #completeCacheFlush(byte[], byte[], long, boolean) * @see #abortCacheFlush() */ - public long startCacheFlush() { + public long startCacheFlush(final byte[] encodedRegionName) { this.cacheFlushLock.lock(); + Long seq = this.lastSeqWritten.remove(encodedRegionName); + // seq is the lsn of the oldest edit associated with this region. If a + // snapshot already exists - because the last flush failed - then seq will + // be the lsn of the oldest edit in the snapshot + if (seq != null) { + // keeping the earliest sequence number of the snapshot in + // lastSeqWritten maintains the correctness of + // getOldestOutstandingSeqNum(). But it doesn't matter really because + // everything is being done inside of cacheFlush lock. + Long oldseq = + lastSeqWritten.put(getSnapshotName(encodedRegionName), seq); + if (oldseq != null) { + LOG.error("Logic Error Snapshot seq id from earlier flush still" + + " present! for region " + Bytes.toString(encodedRegionName) + + " overwritten oldseq=" + oldseq + "with new seq=" + seq); + Runtime.getRuntime().halt(1); + } + } else { + LOG.error("Logic Error - flushing an empty region??? " + + Bytes.toString(encodedRegionName)); + Runtime.getRuntime().halt(1); + } return obtainSeqNum(); } + /** * Complete the cache flush * @@ -1160,15 +1209,15 @@ public class HLog implements Syncable { writeTime += System.currentTimeMillis() - now; writeOps++; this.numEntries.incrementAndGet(); - Long seq = this.lastSeqWritten.get(encodedRegionName); - if (seq != null && logSeqId >= seq.longValue()) { - this.lastSeqWritten.remove(encodedRegionName); - } } // sync txn to file system this.sync(); } finally { + // updateLock not needed for removing snapshot's entry + // Cleaning up of lastSeqWritten is in the finally clause because we + // don't want to confuse getOldestOutstandingSeqNum() + this.lastSeqWritten.remove(getSnapshotName(encodedRegionName)); this.cacheFlushLock.unlock(); } } @@ -1187,7 +1236,25 @@ public class HLog implements Syncable { * currently is a restart of the regionserver so the snapshot content dropped * by the failure gets restored to the memstore. */ - public void abortCacheFlush() { + public void abortCacheFlush(byte[] encodedRegionName) { + Long snapshot_seq = + this.lastSeqWritten.remove(getSnapshotName(encodedRegionName)); + if (snapshot_seq != null) { + // updateLock not necessary because we are racing against + // lastSeqWritten.putIfAbsent() in append() and we will always win + // before releasing cacheFlushLock make sure that the region's entry in + // lastSeqWritten points to the earliest edit in the region + Long current_memstore_earliest_seq = + this.lastSeqWritten.put(encodedRegionName, snapshot_seq); + if (current_memstore_earliest_seq != null && + (current_memstore_earliest_seq.longValue() <= + snapshot_seq.longValue())) { + LOG.error("Logic Error region " + Bytes.toString(encodedRegionName) + + "acquired edits out of order current memstore seq=" + + current_memstore_earliest_seq + " snapshot seq=" + snapshot_seq); + Runtime.getRuntime().halt(1); + } + } this.cacheFlushLock.unlock(); } diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 2ec749cecc3..81142321117 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -469,7 +469,7 @@ public class TestHLog { htd.addFamily(new HColumnDescriptor("column")); log.append(info, tableName, cols, System.currentTimeMillis(), htd); - long logSeqId = log.startCacheFlush(); + long logSeqId = log.startCacheFlush(info.getEncodedNameAsBytes()); log.completeCacheFlush(info.getEncodedNameAsBytes(), tableName, logSeqId, info.isMetaRegion()); log.close(); @@ -540,7 +540,7 @@ public class TestHLog { HTableDescriptor htd = new HTableDescriptor(); htd.addFamily(new HColumnDescriptor("column")); log.append(hri, tableName, cols, System.currentTimeMillis(), htd); - long logSeqId = log.startCacheFlush(); + long logSeqId = log.startCacheFlush(hri.getEncodedNameAsBytes()); log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, logSeqId, false); log.close(); Path filename = log.computeFilename(); @@ -651,7 +651,7 @@ public class TestHLog { // Flush the first region, we expect to see the first two files getting // archived - long seqId = log.startCacheFlush(); + long seqId = log.startCacheFlush(hri.getEncodedNameAsBytes()); log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, seqId, false); log.rollWriter(); assertEquals(2, log.getNumLogFiles()); @@ -659,7 +659,7 @@ public class TestHLog { // Flush the second region, which removes all the remaining output files // since the oldest was completely flushed and the two others only contain // flush information - seqId = log.startCacheFlush(); + seqId = log.startCacheFlush(hri2.getEncodedNameAsBytes()); log.completeCacheFlush(hri2.getEncodedNameAsBytes(), tableName2, seqId, false); log.rollWriter(); assertEquals(0, log.getNumLogFiles()); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 8986b774855..6706ac801b5 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -371,7 +371,7 @@ public class TestWALReplay { } // Add a cache flush, shouldn't have any effect - long logSeqId = wal.startCacheFlush(); + long logSeqId = wal.startCacheFlush(regionName); wal.completeCacheFlush(regionName, tableName, logSeqId, hri.isMetaRegion()); // Add an edit to another family, should be skipped. @@ -534,4 +534,4 @@ public class TestWALReplay { htd.addFamily(c); return htd; } -} \ No newline at end of file +}