From 555d9b70bd650a0df0ed9e382de449c337274493 Mon Sep 17 00:00:00 2001 From: ramkrishna Date: Mon, 14 Dec 2015 10:09:41 +0530 Subject: [PATCH] HBASE-14895 Seek only to the newly flushed file on scanner reset on flush (Ram) --- .../regionserver/ChangedReadersObserver.java | 3 +- .../hadoop/hbase/regionserver/HStore.java | 30 +++- .../regionserver/ReversedStoreScanner.java | 6 +- .../hadoop/hbase/regionserver/Store.java | 19 +++ .../hbase/regionserver/StoreFileScanner.java | 3 + .../hbase/regionserver/StoreScanner.java | 116 ++++++++------- .../client/TestBlockEvictionFromClient.java | 132 ++++++++++++++++++ .../hbase/regionserver/TestStoreScanner.java | 4 +- .../hbase/regionserver/TestWideScanner.java | 2 +- 9 files changed, 258 insertions(+), 57 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java index 36b75597064..0bc75e7d078 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -33,5 +34,5 @@ public interface ChangedReadersObserver { * Notify observers. * @throws IOException e */ - void updateReaders() throws IOException; + void updateReaders(List sfs) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 49b6c50b532..badbd65f127 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1042,7 +1042,7 @@ public class HStore implements Store { this.lock.writeLock().unlock(); } // notify to be called here - only in case of flushes - notifyChangedReadersObservers(); + notifyChangedReadersObservers(sfs); if (LOG.isTraceEnabled()) { long totalSize = 0; for (StoreFile sf : sfs) { @@ -1060,9 +1060,9 @@ public class HStore implements Store { * Notify all observers that set of Readers has changed. * @throws IOException */ - private void notifyChangedReadersObservers() throws IOException { + private void notifyChangedReadersObservers(List sfs) throws IOException { for (ChangedReadersObserver o : this.changedReaderObservers) { - o.updateReaders(); + o.updateReaders(sfs); } } @@ -1101,6 +1101,30 @@ public class HStore implements Store { return scanners; } + @Override + public List getScanners(List files, boolean cacheBlocks, + boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, + byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException { + List memStoreScanners = null; + if (includeMemstoreScanner) { + this.lock.readLock().lock(); + try { + memStoreScanners = this.memstore.getScanners(readPt); + } finally { + this.lock.readLock().unlock(); + } + } + List sfScanners = StoreFileScanner.getScannersForStoreFiles(files, + cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore()); + List scanners = new ArrayList(sfScanners.size() + 1); + scanners.addAll(sfScanners); + // Then the memstore scanners + if (memStoreScanners != null) { + scanners.addAll(memStoreScanners); + } + return scanners; + } + @Override public void addChangedReaderObserver(ChangedReadersObserver o) { this.changedReaderObservers.add(o); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java index 0e1d90fb4a8..41c13f58cd2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java @@ -123,13 +123,15 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner { @Override public boolean seekToPreviousRow(Cell key) throws IOException { - checkReseek(); + boolean flushed = checkFlushed(); + checkReseek(flushed); return this.heap.seekToPreviousRow(key); } @Override public boolean backwardSeek(Cell key) throws IOException { - checkReseek(); + boolean flushed = checkFlushed(); + checkReseek(flushed); return this.heap.backwardSeek(key); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index f137a8ec4d3..8bb10f0f300 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -105,6 +105,25 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf byte[] stopRow, long readPt ) throws IOException; + + /** + * Create scanners on the given files and if needed on the memstore with no filtering based on TTL + * (that happens further down the line). + * @param files the list of files on which the scanners has to be created + * @param cacheBlocks cache the blocks or not + * @param isGet true if it is get, false if not + * @param usePread true to use pread, false if not + * @param isCompaction true if the scanner is created for compaction + * @param matcher the scan query matcher + * @param startRow the start row + * @param stopRow the stop row + * @param readPt the read point of the current scan + * @param includeMemstoreScanner true if memstore has to be included + * @return scanners on the given files and on the memstore if specified + */ + List getScanners(List files, boolean cacheBlocks, boolean isGet, + boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, + byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException; ScanInfo getScanInfo(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index c864733c1d4..d752e173791 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -48,6 +48,7 @@ public class StoreFileScanner implements KeyValueScanner { private final StoreFile.Reader reader; private final HFileScanner hfs; private Cell cur = null; + private boolean closed = false; private boolean realSeekDone; private boolean delayedReseek; @@ -246,11 +247,13 @@ public class StoreFileScanner implements KeyValueScanner { } public void close() { + if (closed) return; cur = null; this.hfs.close(); if (this.reader != null) { this.reader.decrementRefCount(); } + closed = true; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 44f07f7344b..987a3f52495 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.NavigableSet; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -126,6 +127,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private boolean scanUsePread = false; // Indicates whether there was flush during the course of the scan protected volatile boolean flushed = false; + // generally we get one file from a flush + protected List flushedStoreFiles = new ArrayList(1); + // The current list of scanners + protected List currentScanners = new ArrayList(); + // flush update lock + private ReentrantLock flushLock = new ReentrantLock(); protected final long readPt; @@ -170,6 +177,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } } + protected void addCurrentScanners(List scanners) { + this.currentScanners.addAll(scanners); + } /** * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we * are not in a compaction. @@ -207,7 +217,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // set rowOffset this.storeOffset = scan.getRowOffsetPerColumnFamily(); - + addCurrentScanners(scanners); // Combine all seeked scanners with a heap resetKVHeap(scanners, store.getComparator()); } @@ -264,7 +274,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // Seek all scanners to the initial key seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); - + addCurrentScanners(scanners); // Combine all seeked scanners with a heap resetKVHeap(scanners, store.getComparator()); } @@ -303,6 +313,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } // Seek all scanners to the initial key seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); + addCurrentScanners(scanners); resetKVHeap(scanners, scanInfo.getComparator()); } @@ -403,7 +414,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public Cell peek() { - checkResetHeap(); + checkFlushed(); if (this.heap == null) { return this.lastTop; } @@ -435,11 +446,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.heapsForDelayedClose.clear(); if (this.heap != null) { this.heap.close(); + this.currentScanners.clear(); this.heap = null; // CLOSED! } } else { if (this.heap != null) { this.heapsForDelayedClose.add(this.heap); + this.currentScanners.clear(); this.heap = null; } } @@ -448,9 +461,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public boolean seek(Cell key) throws IOException { - checkResetHeap(); + boolean flushed = checkFlushed(); // reset matcher state, in case that underlying store changed - checkReseek(); + checkReseek(flushed); return this.heap.seek(key); } @@ -470,8 +483,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner if (scannerContext == null) { throw new IllegalArgumentException("Scanner context cannot be null"); } - checkResetHeap(); - if (checkReseek()) { + boolean flushed = checkFlushed(); + if (checkReseek(flushed)) { return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); } @@ -665,36 +678,25 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // Implementation of ChangedReadersObserver @Override - public void updateReaders() throws IOException { + public void updateReaders(List sfs) throws IOException { flushed = true; + flushLock.lock(); + try { + flushedStoreFiles.addAll(sfs); + } finally { + flushLock.unlock(); + } // Let the next() call handle re-creating and seeking } - protected void nullifyCurrentHeap() { - if (this.closing) return; - // All public synchronized API calls will call 'checkReseek' which will cause - // the scanner stack to reseek if this.heap==null && this.lastTop != null. - // But if two calls to updateReaders() happen without a 'next' or 'peek' then we - // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders - // which is NOT what we want, not to mention could cause an NPE. So we early out here. - if (this.heap == null) return; - // this could be null. - this.lastTop = this.heap.peek(); - - //DebugPrint.println("SS updateReaders, topKey = " + lastTop); - - // close scanners to old obsolete Store files - this.heapsForDelayedClose.add(this.heap);// Don't close now. Delay it till StoreScanner#close - this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP - } - /** + * @param flushed indicates if there was a flush * @return true if top of heap has changed (and KeyValueHeap has to try the * next KV) * @throws IOException */ - protected boolean checkReseek() throws IOException { - if (this.heap == null && this.lastTop != null) { + protected boolean checkReseek(boolean flushed) throws IOException { + if (flushed && this.lastTop != null) { resetScannerStack(this.lastTop); if (this.heap.peek() == null || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) { @@ -710,21 +712,37 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } protected void resetScannerStack(Cell lastTopKey) throws IOException { - if (heap != null) { - throw new RuntimeException("StoreScanner.reseek run on an existing heap!"); - } - /* When we have the scan object, should we not pass it to getScanners() * to get a limited set of scanners? We did so in the constructor and we - * could have done it now by storing the scan object from the constructor */ - List scanners = getScannersNoCompaction(); + * could have done it now by storing the scan object from the constructor + */ - // Seek all scanners to the initial key + final boolean isCompaction = false; + boolean usePread = get || scanUsePread; + List scanners = null; + try { + flushLock.lock(); + scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread, + isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true)); + // Clear the current set of flushed store files so that they don't get added again + flushedStoreFiles.clear(); + } finally { + flushLock.unlock(); + } + + // Seek the new scanners to the last key seekScanners(scanners, lastTopKey, false, parallelSeekEnabled); - + // remove the older memstore scanner + for (int i = 0; i < currentScanners.size(); i++) { + if (!currentScanners.get(i).isFileScanner()) { + currentScanners.remove(i); + break; + } + } + // add the newly created scanners on the flushed files and the current active memstore scanner + addCurrentScanners(scanners); // Combine all seeked scanners with a heap - resetKVHeap(scanners, store.getComparator()); - + resetKVHeap(this.currentScanners, store.getComparator()); // Reset the state of the Query Matcher and set to top row. // Only reset and call setRow if the row changes; avoids confusing the // query matcher if scanning intra-row. @@ -771,34 +789,36 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public boolean reseek(Cell kv) throws IOException { - checkResetHeap(); + boolean flushed = checkFlushed(); // Heap will not be null, if this is called from next() which. // If called from RegionScanner.reseek(...) make sure the scanner // stack is reset if needed. - checkReseek(); + checkReseek(flushed); if (explicitColumnQuery && lazySeekEnabledGlobally) { return heap.requestSeek(kv, true, useRowColBloom); } return heap.reseek(kv); } - protected void checkResetHeap() { + protected boolean checkFlushed() { // check the var without any lock. Suppose even if we see the old // value here still it is ok to continue because we will not be resetting // the heap but will continue with the referenced memstore's snapshot. For compactions // any way we don't need the updateReaders at all to happen as we still continue with // the older files if (flushed) { - // If the 'flushed' is found to be true then there is a need to ensure - // that the current scanner updates the heap that it has and then proceed - // with the scan and ensure to reset the flushed inside the lock - // One thing can be sure that the same store scanner cannot be in reseek and - // next at the same time ie. within the same store scanner it is always single - // threaded - nullifyCurrentHeap(); + // If there is a flush and the current scan is notified on the flush ensure that the + // scan's heap gets reset and we do a seek on the newly flushed file. + if(!this.closing) { + this.lastTop = this.heap.peek(); + } else { + return false; + } // reset the flag flushed = false; + return true; } + return false; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java index 6dedee23813..a812623f8eb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java @@ -920,6 +920,138 @@ public class TestBlockEvictionFromClient { } } + @Test + public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush() + throws IOException, InterruptedException { + // do flush and scan in parallel + HTable table = null; + try { + latch = new CountDownLatch(1); + compactionLatch = new CountDownLatch(1); + TableName tableName = + TableName.valueOf("testBlockEvictionAfterHBASE13082WithCompactionAndFlush"); + // Create a table with block size as 1024 + table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, + CustomInnerRegionObserverWrapper.class.getName()); + // get the block cache and region + RegionLocator locator = table.getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions( + regionName); + Store store = region.getStores().iterator().next(); + CacheConfig cacheConf = store.getCacheConfig(); + cacheConf.setCacheDataOnWrite(true); + cacheConf.setEvictOnClose(true); + BlockCache cache = cacheConf.getBlockCache(); + + // insert data. 2 Rows are added + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, data); + table.put(put); + put = new Put(ROW1); + put.addColumn(FAMILY, QUALIFIER, data); + table.put(put); + assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); + // Should create one Hfile with 2 blocks + region.flush(true); + // read the data and expect same blocks, one new hit, no misses + int refCount = 0; + // Check how this miss is happening + // insert a second column, read the row, no new blocks, 3 new hits + byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); + byte[] data2 = Bytes.add(data, data); + put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER2, data2); + table.put(put); + // flush, one new block + System.out.println("Flushing cache"); + region.flush(true); + Iterator iterator = cache.iterator(); + iterateBlockCache(cache, iterator); + // Create three sets of scan + ScanThread[] scanThreads = initiateScan(table, false); + Thread.sleep(100); + iterator = cache.iterator(); + boolean usedBlocksFound = false; + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + if (refCount != 0) { + // Blocks will be with count 3 + assertEquals(NO_OF_THREADS, refCount); + usedBlocksFound = true; + } + } + // Make a put and do a flush + QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); + data2 = Bytes.add(data, data); + put = new Put(ROW1); + put.addColumn(FAMILY, QUALIFIER2, data2); + table.put(put); + // flush, one new block + System.out.println("Flushing cache"); + region.flush(true); + assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); + usedBlocksFound = false; + System.out.println("Compacting"); + assertEquals(3, store.getStorefilesCount()); + store.triggerMajorCompaction(); + region.compact(true); + waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max + assertEquals(1, store.getStorefilesCount()); + // Even after compaction is done we will have some blocks that cannot + // be evicted this is because the scan is still referencing them + iterator = cache.iterator(); + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + if (refCount != 0) { + // Blocks will be with count 3 as they are not yet cleared + assertEquals(NO_OF_THREADS, refCount); + usedBlocksFound = true; + } + } + assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); + // Should not throw exception + compactionLatch.countDown(); + latch.countDown(); + for (ScanThread thread : scanThreads) { + thread.join(); + } + // by this time all blocks should have been evicted + iterator = cache.iterator(); + // Since a flush and compaction happened after a scan started + // we need to ensure that all the original blocks of the compacted file + // is also removed. + iterateBlockCache(cache, iterator); + Result r = table.get(new Get(ROW)); + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); + // The gets would be working on new blocks + iterator = cache.iterator(); + iterateBlockCache(cache, iterator); + } finally { + if (table != null) { + table.close(); + } + } + } + + @Test public void testScanWithException() throws IOException, InterruptedException { HTable table = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index 5b7e9ccea86..728029f6a2f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -454,9 +454,9 @@ public class TestStoreScanner extends TestCase { // normally cause an NPE because scan.store is null. So as long as we get through these // two calls we are good and the bug was quashed. - scan.updateReaders(); + scan.updateReaders(new ArrayList()); - scan.updateReaders(); + scan.updateReaders(new ArrayList()); scan.peek(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java index ca7b3b1746a..f598a8dfab7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java @@ -130,7 +130,7 @@ public class TestWideScanner extends HBaseTestCase { ((HRegion.RegionScannerImpl)s).storeHeap.getHeap().iterator(); while (scanners.hasNext()) { StoreScanner ss = (StoreScanner)scanners.next(); - ss.updateReaders(); + ss.updateReaders(new ArrayList()); } } while (more);