HBASE-3845 data loss because lastSeqWritten can miss memstore edits
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1151190 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c3b24e6c58
commit
4c51d6dc6b
|
@ -173,6 +173,8 @@ Release 0.91.0 - Unreleased
|
||||||
Content-Encoding: gzip in parallel
|
Content-Encoding: gzip in parallel
|
||||||
HBASE-4116 [stargate] StringIndexOutOfBoundsException in row spec parse
|
HBASE-4116 [stargate] StringIndexOutOfBoundsException in row spec parse
|
||||||
(Allan Yan)
|
(Allan Yan)
|
||||||
|
HBASE-3845 data loss because lastSeqWritten can miss memstore edits
|
||||||
|
(Prakash Khemani and ramkrishna.s.vasudevan)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
|
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
|
||||||
|
|
|
@ -1094,7 +1094,8 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
final long currentMemStoreSize = this.memstoreSize.get();
|
final long currentMemStoreSize = this.memstoreSize.get();
|
||||||
List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
|
List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
|
||||||
try {
|
try {
|
||||||
sequenceId = (wal == null)? myseqid: wal.startCacheFlush();
|
sequenceId = (wal == null)? myseqid :
|
||||||
|
wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes());
|
||||||
completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
|
completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
|
||||||
|
|
||||||
for (Store s : stores.values()) {
|
for (Store s : stores.values()) {
|
||||||
|
@ -1144,7 +1145,9 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
// We used to only catch IOEs but its possible that we'd get other
|
// We used to only catch IOEs but its possible that we'd get other
|
||||||
// exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
|
// exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
|
||||||
// all and sundry.
|
// all and sundry.
|
||||||
if (wal != null) wal.abortCacheFlush();
|
if (wal != null) {
|
||||||
|
wal.abortCacheFlush(this.regionInfo.getEncodedNameAsBytes());
|
||||||
|
}
|
||||||
DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
|
DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
|
||||||
Bytes.toStringBinary(getRegionName()));
|
Bytes.toStringBinary(getRegionName()));
|
||||||
dse.initCause(t);
|
dse.initCause(t);
|
||||||
|
|
|
@ -1116,6 +1116,18 @@ public class HLog implements Syncable {
|
||||||
return outputfiles.size();
|
return outputfiles.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private byte[] getSnapshotName(byte[] encodedRegionName) {
|
||||||
|
byte snp[] = new byte[encodedRegionName.length + 3];
|
||||||
|
// an encoded region name has only hex digits. s, n or p are not hex
|
||||||
|
// and therefore snapshot-names will never collide with
|
||||||
|
// encoded-region-names
|
||||||
|
snp[0] = 's'; snp[1] = 'n'; snp[2] = 'p';
|
||||||
|
for (int i = 0; i < encodedRegionName.length; i++) {
|
||||||
|
snp[i+3] = encodedRegionName[i];
|
||||||
|
}
|
||||||
|
return snp;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* By acquiring a log sequence ID, we can allow log messages to continue while
|
* By acquiring a log sequence ID, we can allow log messages to continue while
|
||||||
* we flush the cache.
|
* we flush the cache.
|
||||||
|
@ -1124,16 +1136,53 @@ public class HLog implements Syncable {
|
||||||
* completion of a cache-flush. Otherwise the log-seq-id for the flush will
|
* completion of a cache-flush. Otherwise the log-seq-id for the flush will
|
||||||
* not appear in the correct logfile.
|
* not appear in the correct logfile.
|
||||||
*
|
*
|
||||||
* @return sequence ID to pass {@link #completeCacheFlush(byte[], byte[], long, boolean)}
|
* Ensuring that flushes and log-rolls don't happen concurrently also allows
|
||||||
* (byte[], byte[], long)}
|
* us to temporarily put a log-seq-number in lastSeqWritten against the region
|
||||||
|
* being flushed that might not be the earliest in-memory log-seq-number for
|
||||||
|
* that region. By the time the flush is completed or aborted and before the
|
||||||
|
* cacheFlushLock is released it is ensured that lastSeqWritten again has the
|
||||||
|
* oldest in-memory edit's lsn for the region that was being flushed.
|
||||||
|
*
|
||||||
|
* In this method, by removing the entry in lastSeqWritten for the region
|
||||||
|
* being flushed we ensure that the next edit inserted in this region will be
|
||||||
|
* correctly recorded in {@link #append(HRegionInfo, HLogKey, WALEdit)}. The
|
||||||
|
* lsn of the earliest in-memory lsn - which is now in the memstore snapshot -
|
||||||
|
* is saved temporarily in the lastSeqWritten map while the flush is active.
|
||||||
|
*
|
||||||
|
* @return sequence ID to pass
|
||||||
|
* {@link #completeCacheFlush(byte[], byte[], long, boolean)} (byte[],
|
||||||
|
* byte[], long)}
|
||||||
* @see #completeCacheFlush(byte[], byte[], long, boolean)
|
* @see #completeCacheFlush(byte[], byte[], long, boolean)
|
||||||
* @see #abortCacheFlush()
|
* @see #abortCacheFlush()
|
||||||
*/
|
*/
|
||||||
public long startCacheFlush() {
|
public long startCacheFlush(final byte[] encodedRegionName) {
|
||||||
this.cacheFlushLock.lock();
|
this.cacheFlushLock.lock();
|
||||||
|
Long seq = this.lastSeqWritten.remove(encodedRegionName);
|
||||||
|
// seq is the lsn of the oldest edit associated with this region. If a
|
||||||
|
// snapshot already exists - because the last flush failed - then seq will
|
||||||
|
// be the lsn of the oldest edit in the snapshot
|
||||||
|
if (seq != null) {
|
||||||
|
// keeping the earliest sequence number of the snapshot in
|
||||||
|
// lastSeqWritten maintains the correctness of
|
||||||
|
// getOldestOutstandingSeqNum(). But it doesn't matter really because
|
||||||
|
// everything is being done inside of cacheFlush lock.
|
||||||
|
Long oldseq =
|
||||||
|
lastSeqWritten.put(getSnapshotName(encodedRegionName), seq);
|
||||||
|
if (oldseq != null) {
|
||||||
|
LOG.error("Logic Error Snapshot seq id from earlier flush still" +
|
||||||
|
" present! for region " + Bytes.toString(encodedRegionName) +
|
||||||
|
" overwritten oldseq=" + oldseq + "with new seq=" + seq);
|
||||||
|
Runtime.getRuntime().halt(1);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG.error("Logic Error - flushing an empty region??? " +
|
||||||
|
Bytes.toString(encodedRegionName));
|
||||||
|
Runtime.getRuntime().halt(1);
|
||||||
|
}
|
||||||
return obtainSeqNum();
|
return obtainSeqNum();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Complete the cache flush
|
* Complete the cache flush
|
||||||
*
|
*
|
||||||
|
@ -1160,15 +1209,15 @@ public class HLog implements Syncable {
|
||||||
writeTime += System.currentTimeMillis() - now;
|
writeTime += System.currentTimeMillis() - now;
|
||||||
writeOps++;
|
writeOps++;
|
||||||
this.numEntries.incrementAndGet();
|
this.numEntries.incrementAndGet();
|
||||||
Long seq = this.lastSeqWritten.get(encodedRegionName);
|
|
||||||
if (seq != null && logSeqId >= seq.longValue()) {
|
|
||||||
this.lastSeqWritten.remove(encodedRegionName);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// sync txn to file system
|
// sync txn to file system
|
||||||
this.sync();
|
this.sync();
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
|
// updateLock not needed for removing snapshot's entry
|
||||||
|
// Cleaning up of lastSeqWritten is in the finally clause because we
|
||||||
|
// don't want to confuse getOldestOutstandingSeqNum()
|
||||||
|
this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
|
||||||
this.cacheFlushLock.unlock();
|
this.cacheFlushLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1187,7 +1236,25 @@ public class HLog implements Syncable {
|
||||||
* currently is a restart of the regionserver so the snapshot content dropped
|
* currently is a restart of the regionserver so the snapshot content dropped
|
||||||
* by the failure gets restored to the memstore.
|
* by the failure gets restored to the memstore.
|
||||||
*/
|
*/
|
||||||
public void abortCacheFlush() {
|
public void abortCacheFlush(byte[] encodedRegionName) {
|
||||||
|
Long snapshot_seq =
|
||||||
|
this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
|
||||||
|
if (snapshot_seq != null) {
|
||||||
|
// updateLock not necessary because we are racing against
|
||||||
|
// lastSeqWritten.putIfAbsent() in append() and we will always win
|
||||||
|
// before releasing cacheFlushLock make sure that the region's entry in
|
||||||
|
// lastSeqWritten points to the earliest edit in the region
|
||||||
|
Long current_memstore_earliest_seq =
|
||||||
|
this.lastSeqWritten.put(encodedRegionName, snapshot_seq);
|
||||||
|
if (current_memstore_earliest_seq != null &&
|
||||||
|
(current_memstore_earliest_seq.longValue() <=
|
||||||
|
snapshot_seq.longValue())) {
|
||||||
|
LOG.error("Logic Error region " + Bytes.toString(encodedRegionName) +
|
||||||
|
"acquired edits out of order current memstore seq=" +
|
||||||
|
current_memstore_earliest_seq + " snapshot seq=" + snapshot_seq);
|
||||||
|
Runtime.getRuntime().halt(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
this.cacheFlushLock.unlock();
|
this.cacheFlushLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -469,7 +469,7 @@ public class TestHLog {
|
||||||
htd.addFamily(new HColumnDescriptor("column"));
|
htd.addFamily(new HColumnDescriptor("column"));
|
||||||
|
|
||||||
log.append(info, tableName, cols, System.currentTimeMillis(), htd);
|
log.append(info, tableName, cols, System.currentTimeMillis(), htd);
|
||||||
long logSeqId = log.startCacheFlush();
|
long logSeqId = log.startCacheFlush(info.getEncodedNameAsBytes());
|
||||||
log.completeCacheFlush(info.getEncodedNameAsBytes(), tableName, logSeqId,
|
log.completeCacheFlush(info.getEncodedNameAsBytes(), tableName, logSeqId,
|
||||||
info.isMetaRegion());
|
info.isMetaRegion());
|
||||||
log.close();
|
log.close();
|
||||||
|
@ -540,7 +540,7 @@ public class TestHLog {
|
||||||
HTableDescriptor htd = new HTableDescriptor();
|
HTableDescriptor htd = new HTableDescriptor();
|
||||||
htd.addFamily(new HColumnDescriptor("column"));
|
htd.addFamily(new HColumnDescriptor("column"));
|
||||||
log.append(hri, tableName, cols, System.currentTimeMillis(), htd);
|
log.append(hri, tableName, cols, System.currentTimeMillis(), htd);
|
||||||
long logSeqId = log.startCacheFlush();
|
long logSeqId = log.startCacheFlush(hri.getEncodedNameAsBytes());
|
||||||
log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, logSeqId, false);
|
log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, logSeqId, false);
|
||||||
log.close();
|
log.close();
|
||||||
Path filename = log.computeFilename();
|
Path filename = log.computeFilename();
|
||||||
|
@ -651,7 +651,7 @@ public class TestHLog {
|
||||||
|
|
||||||
// Flush the first region, we expect to see the first two files getting
|
// Flush the first region, we expect to see the first two files getting
|
||||||
// archived
|
// archived
|
||||||
long seqId = log.startCacheFlush();
|
long seqId = log.startCacheFlush(hri.getEncodedNameAsBytes());
|
||||||
log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, seqId, false);
|
log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, seqId, false);
|
||||||
log.rollWriter();
|
log.rollWriter();
|
||||||
assertEquals(2, log.getNumLogFiles());
|
assertEquals(2, log.getNumLogFiles());
|
||||||
|
@ -659,7 +659,7 @@ public class TestHLog {
|
||||||
// Flush the second region, which removes all the remaining output files
|
// Flush the second region, which removes all the remaining output files
|
||||||
// since the oldest was completely flushed and the two others only contain
|
// since the oldest was completely flushed and the two others only contain
|
||||||
// flush information
|
// flush information
|
||||||
seqId = log.startCacheFlush();
|
seqId = log.startCacheFlush(hri2.getEncodedNameAsBytes());
|
||||||
log.completeCacheFlush(hri2.getEncodedNameAsBytes(), tableName2, seqId, false);
|
log.completeCacheFlush(hri2.getEncodedNameAsBytes(), tableName2, seqId, false);
|
||||||
log.rollWriter();
|
log.rollWriter();
|
||||||
assertEquals(0, log.getNumLogFiles());
|
assertEquals(0, log.getNumLogFiles());
|
||||||
|
|
|
@ -371,7 +371,7 @@ public class TestWALReplay {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add a cache flush, shouldn't have any effect
|
// Add a cache flush, shouldn't have any effect
|
||||||
long logSeqId = wal.startCacheFlush();
|
long logSeqId = wal.startCacheFlush(regionName);
|
||||||
wal.completeCacheFlush(regionName, tableName, logSeqId, hri.isMetaRegion());
|
wal.completeCacheFlush(regionName, tableName, logSeqId, hri.isMetaRegion());
|
||||||
|
|
||||||
// Add an edit to another family, should be skipped.
|
// Add an edit to another family, should be skipped.
|
||||||
|
|
Loading…
Reference in New Issue