HBASE-4485 Eliminate window of missing Data
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1203466 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d116edef93
commit
619d1a327e
|
@ -673,6 +673,10 @@ public class MemStore implements HeapSize {
|
|||
private Iterator<KeyValue> kvsetIt;
|
||||
private Iterator<KeyValue> snapshotIt;
|
||||
|
||||
// The kvset and snapshot at the time of creating this scanner
|
||||
volatile KeyValueSkipListSet kvsetAtCreation;
|
||||
volatile KeyValueSkipListSet snapshotAtCreation;
|
||||
|
||||
// Sub lists on which we're iterating
|
||||
private SortedSet<KeyValue> kvTail;
|
||||
private SortedSet<KeyValue> snapshotTail;
|
||||
|
@ -703,6 +707,9 @@ public class MemStore implements HeapSize {
|
|||
|
||||
MemStoreScanner() {
|
||||
super();
|
||||
|
||||
kvsetAtCreation = kvset;
|
||||
snapshotAtCreation = snapshot;
|
||||
}
|
||||
|
||||
protected KeyValue getNext(Iterator<KeyValue> it) {
|
||||
|
@ -734,8 +741,8 @@ public class MemStore implements HeapSize {
|
|||
|
||||
// kvset and snapshot will never be null.
|
||||
// if tailSet can't find anything, SortedSet is empty (not null).
|
||||
kvTail = kvset.tailSet(key);
|
||||
snapshotTail = snapshot.tailSet(key);
|
||||
kvTail = kvsetAtCreation.tailSet(key);
|
||||
snapshotTail = snapshotAtCreation.tailSet(key);
|
||||
|
||||
return seekInSubLists(key);
|
||||
}
|
||||
|
|
|
@ -424,10 +424,15 @@ public class Store extends SchemaConfigured implements HeapSize {
|
|||
ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
|
||||
newFiles.add(sf);
|
||||
this.storefiles = sortAndClone(newFiles);
|
||||
notifyChangedReadersObservers();
|
||||
} finally {
|
||||
// We need the lock, as long as we are updating the storefiles
|
||||
// or changing the memstore. Let us release it before calling
|
||||
// notifyChangeReadersObservers. See HBASE-4485 for a possible
|
||||
// deadlock scenario that could have happened if continue to hold
|
||||
// the lock.
|
||||
this.lock.writeLock().unlock();
|
||||
}
|
||||
notifyChangedReadersObservers();
|
||||
LOG.info("Successfully loaded store file " + srcPath
|
||||
+ " into store " + this + " (new location: " + dstPath + ")");
|
||||
}
|
||||
|
@ -671,15 +676,21 @@ public class Store extends SchemaConfigured implements HeapSize {
|
|||
ArrayList<StoreFile> newList = new ArrayList<StoreFile>(storefiles);
|
||||
newList.add(sf);
|
||||
storefiles = sortAndClone(newList);
|
||||
|
||||
this.memstore.clearSnapshot(set);
|
||||
|
||||
// Tell listeners of the change in readers.
|
||||
notifyChangedReadersObservers();
|
||||
|
||||
return needsCompaction();
|
||||
} finally {
|
||||
// We need the lock, as long as we are updating the storefiles
|
||||
// or changing the memstore. Let us release it before calling
|
||||
// notifyChangeReadersObservers. See HBASE-4485 for a possible
|
||||
// deadlock scenario that could have happened if continue to hold
|
||||
// the lock.
|
||||
this.lock.writeLock().unlock();
|
||||
}
|
||||
|
||||
// Tell listeners of the change in readers.
|
||||
notifyChangedReadersObservers();
|
||||
|
||||
return needsCompaction();
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -692,6 +703,35 @@ public class Store extends SchemaConfigured implements HeapSize {
|
|||
}
|
||||
}
|
||||
|
||||
protected List<KeyValueScanner> getScanners(boolean cacheBlocks,
|
||||
boolean isGet,
|
||||
boolean isCompaction,
|
||||
ScanQueryMatcher matcher) throws IOException {
|
||||
List<StoreFile> storeFiles;
|
||||
List<KeyValueScanner> memStoreScanners;
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
storeFiles = this.getStorefiles();
|
||||
memStoreScanners = this.memstore.getScanners();
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
|
||||
// First the store file scanners
|
||||
|
||||
// TODO this used to get the store files in descending order,
|
||||
// but now we get them in ascending order, which I think is
|
||||
// actually more correct, since memstore get put at the end.
|
||||
List<StoreFileScanner> sfScanners = StoreFileScanner
|
||||
.getScannersForStoreFiles(storeFiles, cacheBlocks, isGet, isCompaction, matcher);
|
||||
List<KeyValueScanner> scanners =
|
||||
new ArrayList<KeyValueScanner>(sfScanners.size()+1);
|
||||
scanners.addAll(sfScanners);
|
||||
// Then the memstore scanners
|
||||
scanners.addAll(memStoreScanners);
|
||||
return scanners;
|
||||
}
|
||||
|
||||
/*
|
||||
* @param o Observer who wants to know about changes in set of Readers
|
||||
*/
|
||||
|
@ -1381,8 +1421,8 @@ public class Store extends SchemaConfigured implements HeapSize {
|
|||
this.family.getBloomFilterType());
|
||||
result.createReader();
|
||||
}
|
||||
this.lock.writeLock().lock();
|
||||
try {
|
||||
this.lock.writeLock().lock();
|
||||
try {
|
||||
// Change this.storefiles so it reflects new state but do not
|
||||
// delete old store files until we have sent out notification of
|
||||
|
@ -1398,34 +1438,40 @@ public class Store extends SchemaConfigured implements HeapSize {
|
|||
}
|
||||
|
||||
this.storefiles = sortAndClone(newStoreFiles);
|
||||
} finally {
|
||||
// We need the lock, as long as we are updating the storefiles
|
||||
// or changing the memstore. Let us release it before calling
|
||||
// notifyChangeReadersObservers. See HBASE-4485 for a possible
|
||||
// deadlock scenario that could have happened if continue to hold
|
||||
// the lock.
|
||||
this.lock.writeLock().unlock();
|
||||
}
|
||||
|
||||
// Tell observers that list of StoreFiles has changed.
|
||||
notifyChangedReadersObservers();
|
||||
// Finally, delete old store files.
|
||||
for (StoreFile hsf: compactedFiles) {
|
||||
hsf.deleteReader();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e = RemoteExceptionHandler.checkIOException(e);
|
||||
LOG.error("Failed replacing compacted files in " + this.storeNameStr +
|
||||
". Compacted file is " + (result == null? "none": result.toString()) +
|
||||
". Files replaced " + compactedFiles.toString() +
|
||||
" some of which may have been already removed", e);
|
||||
// Tell observers that list of StoreFiles has changed.
|
||||
notifyChangedReadersObservers();
|
||||
// Finally, delete old store files.
|
||||
for (StoreFile hsf: compactedFiles) {
|
||||
hsf.deleteReader();
|
||||
}
|
||||
// 4. Compute new store size
|
||||
this.storeSize = 0L;
|
||||
this.totalUncompressedBytes = 0L;
|
||||
for (StoreFile hsf : this.storefiles) {
|
||||
StoreFile.Reader r = hsf.getReader();
|
||||
if (r == null) {
|
||||
LOG.warn("StoreFile " + hsf + " has a null Reader");
|
||||
continue;
|
||||
}
|
||||
this.storeSize += r.length();
|
||||
this.totalUncompressedBytes += r.getTotalUncompressedBytes();
|
||||
} catch (IOException e) {
|
||||
e = RemoteExceptionHandler.checkIOException(e);
|
||||
LOG.error("Failed replacing compacted files in " + this.storeNameStr +
|
||||
". Compacted file is " + (result == null? "none": result.toString()) +
|
||||
". Files replaced " + compactedFiles.toString() +
|
||||
" some of which may have been already removed", e);
|
||||
}
|
||||
|
||||
// 4. Compute new store size
|
||||
this.storeSize = 0L;
|
||||
this.totalUncompressedBytes = 0L;
|
||||
for (StoreFile hsf : this.storefiles) {
|
||||
StoreFile.Reader r = hsf.getReader();
|
||||
if (r == null) {
|
||||
LOG.warn("StoreFile " + hsf + " has a null Reader");
|
||||
continue;
|
||||
}
|
||||
} finally {
|
||||
this.lock.writeLock().unlock();
|
||||
this.storeSize += r.length();
|
||||
this.totalUncompressedBytes += r.getTotalUncompressedBytes();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -172,20 +172,7 @@ class StoreScanner extends NonLazyKeyValueScanner
|
|||
* @return List of scanners ordered properly.
|
||||
*/
|
||||
private List<KeyValueScanner> getScanners() throws IOException {
|
||||
// First the store file scanners
|
||||
|
||||
// TODO this used to get the store files in descending order,
|
||||
// but now we get them in ascending order, which I think is
|
||||
// actually more correct, since memstore get put at the end.
|
||||
List<StoreFileScanner> sfScanners = StoreFileScanner
|
||||
.getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, isGet,
|
||||
false);
|
||||
List<KeyValueScanner> scanners =
|
||||
new ArrayList<KeyValueScanner>(sfScanners.size()+1);
|
||||
scanners.addAll(sfScanners);
|
||||
// Then the memstore scanners
|
||||
scanners.addAll(this.store.memstore.getScanners());
|
||||
return scanners;
|
||||
return this.store.getScanners(cacheBlocks, isGet, false, null);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -203,25 +190,25 @@ class StoreScanner extends NonLazyKeyValueScanner
|
|||
memOnly = false;
|
||||
filesOnly = false;
|
||||
}
|
||||
List<KeyValueScanner> scanners = new LinkedList<KeyValueScanner>();
|
||||
// First the store file scanners
|
||||
if (memOnly == false) {
|
||||
List<StoreFileScanner> sfScanners = StoreFileScanner
|
||||
.getScannersForStoreFiles(store.getStorefiles(), cacheBlocks,
|
||||
isGet, false, this.matcher);
|
||||
List<KeyValueScanner> allStoreScanners =
|
||||
this.store.getScanners(cacheBlocks, isGet, false, this.matcher);
|
||||
|
||||
// include only those scan files which pass all filters
|
||||
for (StoreFileScanner sfs : sfScanners) {
|
||||
if (sfs.shouldSeek(scan, columns)) {
|
||||
scanners.add(sfs);
|
||||
}
|
||||
List<KeyValueScanner> scanners =
|
||||
new ArrayList<KeyValueScanner>(allStoreScanners.size());
|
||||
|
||||
// include only those scan files which pass all filters
|
||||
for (KeyValueScanner kvs : allStoreScanners) {
|
||||
if (kvs instanceof StoreFileScanner) {
|
||||
if (memOnly == false && ((StoreFileScanner)kvs).shouldSeek(scan, columns))
|
||||
scanners.add(kvs);
|
||||
}
|
||||
else {
|
||||
// kvs is a MemStoreScanner
|
||||
if (filesOnly == false && this.store.memstore.shouldSeek(scan))
|
||||
scanners.add(kvs);
|
||||
}
|
||||
}
|
||||
|
||||
// Then the memstore scanners
|
||||
if ((filesOnly == false) && (this.store.memstore.shouldSeek(scan))) {
|
||||
scanners.addAll(this.store.memstore.getScanners());
|
||||
}
|
||||
return scanners;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue