HBASE-1883 HRegion passes the wrong minSequenceNumber to doReconstructionLog
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@821247 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2a37e705f7
commit
f7dad3dccc
|
@ -54,6 +54,8 @@ Release 0.21.0 - Unreleased
|
|||
Andrew Purtell)
|
||||
HBASE-1871 Wrong type used in TableMapReduceUtil.initTableReduceJob()
|
||||
(Lars George via Stack)
|
||||
HBASE-1883 HRegion passes the wrong minSequenceNumber to
|
||||
doReconstructionLog (Clint Morgan via Stack)
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-1760 Cleanup TODOs in HTable
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
|
@ -298,7 +299,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
|
||||
// Load in all the HStores.
|
||||
long maxSeqId = -1;
|
||||
long minSeqId = Integer.MAX_VALUE;
|
||||
long minSeqIdToRecover = Integer.MAX_VALUE;
|
||||
|
||||
for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
|
||||
Store store = instantiateHStore(this.basedir, c, oldLogFile, reporter);
|
||||
this.stores.put(c.getName(), store);
|
||||
|
@ -306,13 +308,15 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
if (storeSeqId > maxSeqId) {
|
||||
maxSeqId = storeSeqId;
|
||||
}
|
||||
if (storeSeqId < minSeqId) {
|
||||
minSeqId = storeSeqId;
|
||||
|
||||
long storeSeqIdBeforeRecovery = store.getMaxSeqIdBeforeLogRecovery();
|
||||
if (storeSeqIdBeforeRecovery < minSeqIdToRecover) {
|
||||
minSeqIdToRecover = storeSeqIdBeforeRecovery;
|
||||
}
|
||||
}
|
||||
|
||||
// Play log if one. Delete when done.
|
||||
doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter);
|
||||
doReconstructionLog(oldLogFile, minSeqIdToRecover, maxSeqId, reporter);
|
||||
if (fs.exists(oldLogFile)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Deleting old log file: " + oldLogFile);
|
||||
|
@ -1133,14 +1137,11 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
this.updatesLock.readLock().lock();
|
||||
|
||||
try {
|
||||
if (writeToWAL) {
|
||||
this.log.append(regionInfo.getRegionName(),
|
||||
regionInfo.getTableDesc().getName(), kvs,
|
||||
(regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
|
||||
}
|
||||
long size = 0;
|
||||
Store store = getStore(family);
|
||||
for (KeyValue kv: kvs) {
|
||||
Iterator<KeyValue> kvIterator = kvs.iterator();
|
||||
while(kvIterator.hasNext()) {
|
||||
KeyValue kv = kvIterator.next();
|
||||
// Check if time is LATEST, change to time of most recent addition if so
|
||||
// This is expensive.
|
||||
if (kv.isLatestTimestamp() && kv.isDeleteType()) {
|
||||
|
@ -1154,6 +1155,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
get(store, g, qualifiers, result);
|
||||
if (result.isEmpty()) {
|
||||
// Nothing to delete
|
||||
kvIterator.remove();
|
||||
continue;
|
||||
}
|
||||
if (result.size() > 1) {
|
||||
|
@ -1166,8 +1168,19 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
kv.updateLatestStamp(byteNow);
|
||||
}
|
||||
|
||||
// We must do this in this loop because it could affect
|
||||
// the above get to find the next timestamp to remove.
|
||||
// This is the case when there are multiple deletes for the same column.
|
||||
size = this.memstoreSize.addAndGet(store.delete(kv));
|
||||
}
|
||||
|
||||
if (writeToWAL) {
|
||||
this.log.append(regionInfo.getRegionName(),
|
||||
regionInfo.getTableDesc().getName(), kvs,
|
||||
(regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
|
||||
}
|
||||
|
||||
|
||||
flush = isFlushSize(size);
|
||||
} finally {
|
||||
this.updatesLock.readLock().unlock();
|
||||
|
|
|
@ -129,6 +129,9 @@ public class Store implements HConstants, HeapSize {
|
|||
// reflected in the TreeMaps).
|
||||
private volatile long maxSeqId = -1;
|
||||
|
||||
// The most-recent log-seq-id before we recovered from the LOG.
|
||||
private long maxSeqIdBeforeLogRecovery = -1;
|
||||
|
||||
private final Path regionCompactionDir;
|
||||
private final Object compactLock = new Object();
|
||||
private final int compactionThreshold;
|
||||
|
@ -216,6 +219,8 @@ public class Store implements HConstants, HeapSize {
|
|||
// loadStoreFiles calculates this.maxSeqId. as side-effect.
|
||||
this.storefiles.putAll(loadStoreFiles());
|
||||
|
||||
this.maxSeqIdBeforeLogRecovery = this.maxSeqId;
|
||||
|
||||
// Do reconstruction log.
|
||||
long newId = runReconstructionLog(reconstructionLog, this.maxSeqId, reporter);
|
||||
if (newId != -1) {
|
||||
|
@ -231,6 +236,10 @@ public class Store implements HConstants, HeapSize {
|
|||
return this.maxSeqId;
|
||||
}
|
||||
|
||||
long getMaxSeqIdBeforeLogRecovery() {
|
||||
return maxSeqIdBeforeLogRecovery;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param tabledir
|
||||
* @param encodedName Encoded region name.
|
||||
|
@ -276,8 +285,7 @@ public class Store implements HConstants, HeapSize {
|
|||
}
|
||||
|
||||
/*
|
||||
* Read the reconstructionLog to see whether we need to build a brand-new
|
||||
* file out of non-flushed log entries.
|
||||
* Read the reconstructionLog and put into memstore.
|
||||
*
|
||||
* We can ignore any log message that has a sequence ID that's equal to or
|
||||
* lower than maxSeqID. (Because we know such log messages are already
|
||||
|
@ -303,9 +311,6 @@ public class Store implements HConstants, HeapSize {
|
|||
// general memory usage accounting.
|
||||
long maxSeqIdInLog = -1;
|
||||
long firstSeqIdInLog = -1;
|
||||
// TODO: Move this memstoring over into MemStore.
|
||||
KeyValueSkipListSet reconstructedCache =
|
||||
new KeyValueSkipListSet(this.comparator);
|
||||
SequenceFile.Reader logReader = new SequenceFile.Reader(this.fs,
|
||||
reconstructionLog, this.conf);
|
||||
try {
|
||||
|
@ -332,8 +337,12 @@ public class Store implements HConstants, HeapSize {
|
|||
!val.matchingFamily(family.getName())) {
|
||||
continue;
|
||||
}
|
||||
// Add anything as value as long as we use same instance each time.
|
||||
reconstructedCache.add(val);
|
||||
|
||||
if (val.isDelete()) {
|
||||
this.memstore.delete(val);
|
||||
} else {
|
||||
this.memstore.add(val);
|
||||
}
|
||||
editsCount++;
|
||||
// Every 2k edits, tell the reporter we're making progress.
|
||||
// Have seen 60k edits taking 3minutes to complete.
|
||||
|
@ -353,26 +362,15 @@ public class Store implements HConstants, HeapSize {
|
|||
logReader.close();
|
||||
}
|
||||
|
||||
if (reconstructedCache.size() > 0) {
|
||||
// We create a "virtual flush" at maxSeqIdInLog+1.
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("flushing reconstructionCache");
|
||||
if (maxSeqIdInLog > -1) {
|
||||
// We read some edits, so we should flush the memstore
|
||||
this.snapshot();
|
||||
boolean needCompaction = this.flushCache(maxSeqIdInLog);
|
||||
if (needCompaction) {
|
||||
this.compact(false);
|
||||
}
|
||||
|
||||
long newFileSeqNo = maxSeqIdInLog + 1;
|
||||
StoreFile sf = internalFlushCache(reconstructedCache, newFileSeqNo);
|
||||
// add it to the list of store files with maxSeqIdInLog+1
|
||||
if (sf == null) {
|
||||
throw new IOException("Flush failed with a null store file");
|
||||
}
|
||||
// Add new file to store files. Clear snapshot too while we have the
|
||||
// Store write lock.
|
||||
this.storefiles.put(newFileSeqNo, sf);
|
||||
notifyChangedReadersObservers();
|
||||
|
||||
return newFileSeqNo;
|
||||
}
|
||||
return -1; // the reconstructed cache was 0 sized
|
||||
return maxSeqIdInLog;
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
Loading…
Reference in New Issue