HBASE-14895 Seek only to the newly flushed file on scanner reset on flush

(Ram)
This commit is contained in:
ramkrishna 2015-12-14 10:09:41 +05:30
parent 676ce01c82
commit 555d9b70bd
9 changed files with 258 additions and 57 deletions

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -33,5 +34,5 @@ public interface ChangedReadersObserver {
* Notify observers.
* @throws IOException e
*/
void updateReaders() throws IOException;
void updateReaders(List<StoreFile> sfs) throws IOException;
}

View File

@ -1042,7 +1042,7 @@ public class HStore implements Store {
this.lock.writeLock().unlock();
}
// notify to be called here - only in case of flushes
notifyChangedReadersObservers();
notifyChangedReadersObservers(sfs);
if (LOG.isTraceEnabled()) {
long totalSize = 0;
for (StoreFile sf : sfs) {
@ -1060,9 +1060,9 @@ public class HStore implements Store {
* Notify all observers that set of Readers has changed.
* @throws IOException
*/
private void notifyChangedReadersObservers() throws IOException {
private void notifyChangedReadersObservers(List<StoreFile> sfs) throws IOException {
for (ChangedReadersObserver o : this.changedReaderObservers) {
o.updateReaders();
o.updateReaders(sfs);
}
}
@ -1101,6 +1101,30 @@ public class HStore implements Store {
return scanners;
}
@Override
public List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks,
boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException {
List<KeyValueScanner> memStoreScanners = null;
if (includeMemstoreScanner) {
this.lock.readLock().lock();
try {
memStoreScanners = this.memstore.getScanners(readPt);
} finally {
this.lock.readLock().unlock();
}
}
List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files,
cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore());
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(sfScanners.size() + 1);
scanners.addAll(sfScanners);
// Then the memstore scanners
if (memStoreScanners != null) {
scanners.addAll(memStoreScanners);
}
return scanners;
}
@Override
public void addChangedReaderObserver(ChangedReadersObserver o) {
this.changedReaderObservers.add(o);

View File

@ -123,13 +123,15 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
@Override
public boolean seekToPreviousRow(Cell key) throws IOException {
checkReseek();
boolean flushed = checkFlushed();
checkReseek(flushed);
return this.heap.seekToPreviousRow(key);
}
@Override
public boolean backwardSeek(Cell key) throws IOException {
checkReseek();
boolean flushed = checkFlushed();
checkReseek(flushed);
return this.heap.backwardSeek(key);
}
}

View File

@ -106,6 +106,25 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
long readPt
) throws IOException;
/**
* Create scanners on the given files and if needed on the memstore with no filtering based on TTL
* (that happens further down the line).
* @param files the list of files on which the scanners has to be created
* @param cacheBlocks cache the blocks or not
* @param isGet true if it is get, false if not
* @param usePread true to use pread, false if not
* @param isCompaction true if the scanner is created for compaction
* @param matcher the scan query matcher
* @param startRow the start row
* @param stopRow the stop row
* @param readPt the read point of the current scan
* @param includeMemstoreScanner true if memstore has to be included
* @return scanners on the given files and on the memstore if specified
*/
List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks, boolean isGet,
boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException;
ScanInfo getScanInfo();
/**

View File

@ -48,6 +48,7 @@ public class StoreFileScanner implements KeyValueScanner {
private final StoreFile.Reader reader;
private final HFileScanner hfs;
private Cell cur = null;
private boolean closed = false;
private boolean realSeekDone;
private boolean delayedReseek;
@ -246,11 +247,13 @@ public class StoreFileScanner implements KeyValueScanner {
}
public void close() {
if (closed) return;
cur = null;
this.hfs.close();
if (this.reader != null) {
this.reader.decrementRefCount();
}
closed = true;
}
/**

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -126,6 +127,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
private boolean scanUsePread = false;
// Indicates whether there was flush during the course of the scan
protected volatile boolean flushed = false;
// generally we get one file from a flush
protected List<StoreFile> flushedStoreFiles = new ArrayList<StoreFile>(1);
// The current list of scanners
protected List<KeyValueScanner> currentScanners = new ArrayList<KeyValueScanner>();
// flush update lock
private ReentrantLock flushLock = new ReentrantLock();
protected final long readPt;
@ -170,6 +177,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
}
protected void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
this.currentScanners.addAll(scanners);
}
/**
* Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
* are not in a compaction.
@ -207,7 +217,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// set rowOffset
this.storeOffset = scan.getRowOffsetPerColumnFamily();
addCurrentScanners(scanners);
// Combine all seeked scanners with a heap
resetKVHeap(scanners, store.getComparator());
}
@ -264,7 +274,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// Seek all scanners to the initial key
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
addCurrentScanners(scanners);
// Combine all seeked scanners with a heap
resetKVHeap(scanners, store.getComparator());
}
@ -303,6 +313,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
// Seek all scanners to the initial key
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
addCurrentScanners(scanners);
resetKVHeap(scanners, scanInfo.getComparator());
}
@ -403,7 +414,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override
public Cell peek() {
checkResetHeap();
checkFlushed();
if (this.heap == null) {
return this.lastTop;
}
@ -435,11 +446,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.heapsForDelayedClose.clear();
if (this.heap != null) {
this.heap.close();
this.currentScanners.clear();
this.heap = null; // CLOSED!
}
} else {
if (this.heap != null) {
this.heapsForDelayedClose.add(this.heap);
this.currentScanners.clear();
this.heap = null;
}
}
@ -448,9 +461,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override
public boolean seek(Cell key) throws IOException {
checkResetHeap();
boolean flushed = checkFlushed();
// reset matcher state, in case that underlying store changed
checkReseek();
checkReseek(flushed);
return this.heap.seek(key);
}
@ -470,8 +483,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
if (scannerContext == null) {
throw new IllegalArgumentException("Scanner context cannot be null");
}
checkResetHeap();
if (checkReseek()) {
boolean flushed = checkFlushed();
if (checkReseek(flushed)) {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
@ -665,36 +678,25 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// Implementation of ChangedReadersObserver
@Override
public void updateReaders() throws IOException {
public void updateReaders(List<StoreFile> sfs) throws IOException {
flushed = true;
flushLock.lock();
try {
flushedStoreFiles.addAll(sfs);
} finally {
flushLock.unlock();
}
// Let the next() call handle re-creating and seeking
}
protected void nullifyCurrentHeap() {
if (this.closing) return;
// All public synchronized API calls will call 'checkReseek' which will cause
// the scanner stack to reseek if this.heap==null && this.lastTop != null.
// But if two calls to updateReaders() happen without a 'next' or 'peek' then we
// will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
// which is NOT what we want, not to mention could cause an NPE. So we early out here.
if (this.heap == null) return;
// this could be null.
this.lastTop = this.heap.peek();
//DebugPrint.println("SS updateReaders, topKey = " + lastTop);
// close scanners to old obsolete Store files
this.heapsForDelayedClose.add(this.heap);// Don't close now. Delay it till StoreScanner#close
this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
}
/**
* @param flushed indicates if there was a flush
* @return true if top of heap has changed (and KeyValueHeap has to try the
* next KV)
* @throws IOException
*/
protected boolean checkReseek() throws IOException {
if (this.heap == null && this.lastTop != null) {
protected boolean checkReseek(boolean flushed) throws IOException {
if (flushed && this.lastTop != null) {
resetScannerStack(this.lastTop);
if (this.heap.peek() == null
|| store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
@ -710,21 +712,37 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
protected void resetScannerStack(Cell lastTopKey) throws IOException {
if (heap != null) {
throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
}
/* When we have the scan object, should we not pass it to getScanners()
* to get a limited set of scanners? We did so in the constructor and we
* could have done it now by storing the scan object from the constructor */
List<KeyValueScanner> scanners = getScannersNoCompaction();
* could have done it now by storing the scan object from the constructor
*/
// Seek all scanners to the initial key
final boolean isCompaction = false;
boolean usePread = get || scanUsePread;
List<KeyValueScanner> scanners = null;
try {
flushLock.lock();
scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread,
isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true));
// Clear the current set of flushed store files so that they don't get added again
flushedStoreFiles.clear();
} finally {
flushLock.unlock();
}
// Seek the new scanners to the last key
seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
// remove the older memstore scanner
for (int i = 0; i < currentScanners.size(); i++) {
if (!currentScanners.get(i).isFileScanner()) {
currentScanners.remove(i);
break;
}
}
// add the newly created scanners on the flushed files and the current active memstore scanner
addCurrentScanners(scanners);
// Combine all seeked scanners with a heap
resetKVHeap(scanners, store.getComparator());
resetKVHeap(this.currentScanners, store.getComparator());
// Reset the state of the Query Matcher and set to top row.
// Only reset and call setRow if the row changes; avoids confusing the
// query matcher if scanning intra-row.
@ -771,34 +789,36 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override
public boolean reseek(Cell kv) throws IOException {
checkResetHeap();
boolean flushed = checkFlushed();
// Heap will not be null, if this is called from next() which.
// If called from RegionScanner.reseek(...) make sure the scanner
// stack is reset if needed.
checkReseek();
checkReseek(flushed);
if (explicitColumnQuery && lazySeekEnabledGlobally) {
return heap.requestSeek(kv, true, useRowColBloom);
}
return heap.reseek(kv);
}
protected void checkResetHeap() {
protected boolean checkFlushed() {
// check the var without any lock. Suppose even if we see the old
// value here still it is ok to continue because we will not be resetting
// the heap but will continue with the referenced memstore's snapshot. For compactions
// any way we don't need the updateReaders at all to happen as we still continue with
// the older files
if (flushed) {
// If the 'flushed' is found to be true then there is a need to ensure
// that the current scanner updates the heap that it has and then proceed
// with the scan and ensure to reset the flushed inside the lock
// One thing can be sure that the same store scanner cannot be in reseek and
// next at the same time ie. within the same store scanner it is always single
// threaded
nullifyCurrentHeap();
// If there is a flush and the current scan is notified on the flush ensure that the
// scan's heap gets reset and we do a seek on the newly flushed file.
if(!this.closing) {
this.lastTop = this.heap.peek();
} else {
return false;
}
// reset the flag
flushed = false;
return true;
}
return false;
}
@Override

View File

@ -920,6 +920,138 @@ public class TestBlockEvictionFromClient {
}
}
@Test
public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
throws IOException, InterruptedException {
// do flush and scan in parallel
HTable table = null;
try {
latch = new CountDownLatch(1);
compactionLatch = new CountDownLatch(1);
TableName tableName =
TableName.valueOf("testBlockEvictionAfterHBASE13082WithCompactionAndFlush");
// Create a table with block size as 1024
table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
CustomInnerRegionObserverWrapper.class.getName());
// get the block cache and region
RegionLocator locator = table.getRegionLocator();
String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions(
regionName);
Store store = region.getStores().iterator().next();
CacheConfig cacheConf = store.getCacheConfig();
cacheConf.setCacheDataOnWrite(true);
cacheConf.setEvictOnClose(true);
BlockCache cache = cacheConf.getBlockCache();
// insert data. 2 Rows are added
Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
put = new Put(ROW1);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
// Should create one Hfile with 2 blocks
region.flush(true);
// read the data and expect same blocks, one new hit, no misses
int refCount = 0;
// Check how this miss is happening
// insert a second column, read the row, no new blocks, 3 new hits
byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
byte[] data2 = Bytes.add(data, data);
put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER2, data2);
table.put(put);
// flush, one new block
System.out.println("Flushing cache");
region.flush(true);
Iterator<CachedBlock> iterator = cache.iterator();
iterateBlockCache(cache, iterator);
// Create three sets of scan
ScanThread[] scanThreads = initiateScan(table, false);
Thread.sleep(100);
iterator = cache.iterator();
boolean usedBlocksFound = false;
while (iterator.hasNext()) {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
refCount = ((BucketCache) cache).getRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
} else {
continue;
}
if (refCount != 0) {
// Blocks will be with count 3
assertEquals(NO_OF_THREADS, refCount);
usedBlocksFound = true;
}
}
// Make a put and do a flush
QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
data2 = Bytes.add(data, data);
put = new Put(ROW1);
put.addColumn(FAMILY, QUALIFIER2, data2);
table.put(put);
// flush, one new block
System.out.println("Flushing cache");
region.flush(true);
assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
usedBlocksFound = false;
System.out.println("Compacting");
assertEquals(3, store.getStorefilesCount());
store.triggerMajorCompaction();
region.compact(true);
waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
assertEquals(1, store.getStorefilesCount());
// Even after compaction is done we will have some blocks that cannot
// be evicted this is because the scan is still referencing them
iterator = cache.iterator();
while (iterator.hasNext()) {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
refCount = ((BucketCache) cache).getRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
} else {
continue;
}
if (refCount != 0) {
// Blocks will be with count 3 as they are not yet cleared
assertEquals(NO_OF_THREADS, refCount);
usedBlocksFound = true;
}
}
assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
// Should not throw exception
compactionLatch.countDown();
latch.countDown();
for (ScanThread thread : scanThreads) {
thread.join();
}
// by this time all blocks should have been evicted
iterator = cache.iterator();
// Since a flush and compaction happened after a scan started
// we need to ensure that all the original blocks of the compacted file
// is also removed.
iterateBlockCache(cache, iterator);
Result r = table.get(new Get(ROW));
assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
// The gets would be working on new blocks
iterator = cache.iterator();
iterateBlockCache(cache, iterator);
} finally {
if (table != null) {
table.close();
}
}
}
@Test
public void testScanWithException() throws IOException, InterruptedException {
HTable table = null;

View File

@ -454,9 +454,9 @@ public class TestStoreScanner extends TestCase {
// normally cause an NPE because scan.store is null. So as long as we get through these
// two calls we are good and the bug was quashed.
scan.updateReaders();
scan.updateReaders(new ArrayList<StoreFile>());
scan.updateReaders();
scan.updateReaders(new ArrayList<StoreFile>());
scan.peek();
}

View File

@ -130,7 +130,7 @@ public class TestWideScanner extends HBaseTestCase {
((HRegion.RegionScannerImpl)s).storeHeap.getHeap().iterator();
while (scanners.hasNext()) {
StoreScanner ss = (StoreScanner)scanners.next();
ss.updateReaders();
ss.updateReaders(new ArrayList<StoreFile>());
}
} while (more);