From 0d735d26a98a1be2d175f797a36fcab60c3b3ac8 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Wed, 19 Nov 2008 06:48:55 +0000 Subject: [PATCH] HBASE-910 Scanner misses columns / rows when the scanner is obtained durring a memcache flush git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@718865 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + .../hbase/regionserver/HStoreScanner.java | 128 +++++++++++++----- .../hbase/regionserver/TestScanner.java | 71 ++++++++-- 3 files changed, 161 insertions(+), 40 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 8e34e33223c..a8e00fdbdaf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -77,6 +77,8 @@ Release 0.19.0 - Unreleased HBASE-1003 If cell exceeds TTL but not VERSIONs, will not be removed during major compaction HBASE-1005 Regex and string comparison operators for ColumnValueFilter + HBASE-910 Scanner misses columns / rows when the scanner is obtained + durring a memcache flush IMPROVEMENTS HBASE-901 Add a limit to key length, check key and value length on client side diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java b/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java index ce9f9094c9f..0381aeb50bb 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java @@ -21,13 +21,12 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,11 +35,12 @@ import org.apache.hadoop.hbase.HStoreKey; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.Cell; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.MapFile; /** * Scanner scans both the memcache and the HStore */ -class HStoreScanner implements InternalScanner { +class HStoreScanner implements InternalScanner, ChangedReadersObserver { static final Log LOG = LogFactory.getLog(HStoreScanner.class); private InternalScanner[] scanners; @@ -50,6 +50,15 @@ class HStoreScanner implements InternalScanner { private boolean multipleMatchers = false; private RowFilterInterface dataFilter; private HStore store; + private final long timestamp; + private final byte [][] targetCols; + + // Indices for memcache scanner and hstorefile scanner. + private static final int MEMS_INDEX = 0; + private static final int HSFS_INDEX = MEMS_INDEX + 1; + + // Used around transition from no storefile to the first. + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); /** Create an Scanner with a handle on the memcache and HStore files. */ @SuppressWarnings("unchecked") @@ -64,51 +73,72 @@ class HStoreScanner implements InternalScanner { this.scanners = new InternalScanner[2]; this.resultSets = new TreeMap[scanners.length]; this.keys = new HStoreKey[scanners.length]; + // Save these args in case we need them later handling change in readers + // See updateReaders below. + this.timestamp = timestamp; + this.targetCols = targetCols; try { - scanners[0] = store.memcache.getScanner(timestamp, targetCols, firstRow); - scanners[1] = new StoreFileScanner(store, timestamp, targetCols, firstRow); - for (int i = 0; i < scanners.length; i++) { - if (scanners[i].isWildcardScanner()) { - this.wildcardMatch = true; - } - if (scanners[i].isMultipleMatchScanner()) { - this.multipleMatchers = true; - } - } - } catch(IOException e) { - for (int i = 0; i < this.scanners.length; i++) { - if(scanners[i] != null) { - closeScanner(i); - } + scanners[MEMS_INDEX] = + store.memcache.getScanner(timestamp, targetCols, firstRow); + scanners[HSFS_INDEX] = + new StoreFileScanner(store, timestamp, targetCols, firstRow); + for (int i = MEMS_INDEX; i < scanners.length; i++) { + checkScannerFlags(i); } + } catch (IOException e) { + doClose(); throw e; } // Advance to the first key in each scanner. // All results will match the required column-set and scanTime. - for (int i = 0; i < scanners.length; i++) { - keys[i] = new HStoreKey(); - resultSets[i] = new TreeMap(Bytes.BYTES_COMPARATOR); - if(scanners[i] != null && !scanners[i].next(keys[i], resultSets[i])) { - closeScanner(i); - } + for (int i = MEMS_INDEX; i < scanners.length; i++) { + setupScanner(i); + } + + this.store.addChangedReaderObserver(this); + } + + /* + * @param i Index. + */ + private void checkScannerFlags(final int i) { + if (this.scanners[i].isWildcardScanner()) { + this.wildcardMatch = true; + } + if (this.scanners[i].isMultipleMatchScanner()) { + this.multipleMatchers = true; + } + } + + /* + * Do scanner setup. + * @param i + * @throws IOException + */ + private void setupScanner(final int i) throws IOException { + this.keys[i] = new HStoreKey(); + this.resultSets[i] = new TreeMap(Bytes.BYTES_COMPARATOR); + if (this.scanners[i] != null && !this.scanners[i].next(this.keys[i], this.resultSets[i])) { + closeScanner(i); } } /** @return true if the scanner is a wild card scanner */ public boolean isWildcardScanner() { - return wildcardMatch; + return this.wildcardMatch; } /** @return true if the scanner is a multiple match scanner */ public boolean isMultipleMatchScanner() { - return multipleMatchers; + return this.multipleMatchers; } public boolean next(HStoreKey key, SortedMap results) - throws IOException { - + throws IOException { + this.lock.readLock().lock(); + try { // Filtered flag is set by filters. If a cell has been 'filtered out' // -- i.e. it is not to be returned to the caller -- the flag is 'true'. boolean filtered = true; @@ -243,6 +273,9 @@ class HStoreScanner implements InternalScanner { } return moreToFollow; + } finally { + this.lock.readLock().unlock(); + } } /** Shut down a single scanner */ @@ -261,10 +294,43 @@ class HStoreScanner implements InternalScanner { } public void close() { - for(int i = 0; i < scanners.length; i++) { - if(scanners[i] != null) { + this.store.deleteChangedReaderObserver(this); + doClose(); + } + + private void doClose() { + for (int i = MEMS_INDEX; i < scanners.length; i++) { + if (scanners[i] != null) { closeScanner(i); } } } -} + + // Implementation of ChangedReadersObserver + + public void updateReaders() throws IOException { + this.lock.writeLock().lock(); + try { + MapFile.Reader [] readers = this.store.getReaders(); + if (this.scanners[HSFS_INDEX] == null && readers != null && + readers.length > 0) { + // Presume that we went from no readers to at least one -- need to put + // a HStoreScanner in place. + try { + // I think its safe getting key from mem at this stage -- it shouldn't have + // been flushed yet + this.scanners[HSFS_INDEX] = new StoreFileScanner(this.store, + this.timestamp, this. targetCols, this.keys[MEMS_INDEX].getRow()); + checkScannerFlags(HSFS_INDEX); + setupScanner(HSFS_INDEX); + LOG.debug("Added a StoreFileScanner to outstanding HStoreScanner"); + } catch (IOException e) { + doClose(); + throw e; + } + } + } finally { + this.lock.writeLock().unlock(); + } + } +} \ No newline at end of file diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java b/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java index 106590fb85e..bf505eb6aaf 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java @@ -22,25 +22,30 @@ package org.apache.hadoop.hbase.regionserver; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.SortedMap; import java.util.TreeMap; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Writables; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseTestCase; -import org.apache.hadoop.hbase.HRegionInfo; - import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HStoreKey; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hdfs.MiniDFSCluster; /** * Test of a long-lived scanner validating as we go. */ public class TestScanner extends HBaseTestCase { - private static final byte [] FIRST_ROW = HConstants.EMPTY_START_ROW; + private final Log LOG = LogFactory.getLog(this.getClass()); + + private static final byte [] FIRST_ROW = + HConstants.EMPTY_START_ROW; private static final byte [][] COLS = { HConstants.COLUMN_FAMILY }; @@ -52,7 +57,8 @@ public class TestScanner extends HBaseTestCase { private static final byte [] ROW_KEY = HRegionInfo.ROOT_REGIONINFO.getRegionName(); - private static final HRegionInfo REGION_INFO = HRegionInfo.ROOT_REGIONINFO; + private static final HRegionInfo REGION_INFO = + HRegionInfo.ROOT_REGIONINFO; private static final long START_CODE = Long.MAX_VALUE; @@ -84,8 +90,7 @@ public class TestScanner extends HBaseTestCase { /** Use a scanner to get the region info and then validate the results */ private void scan(boolean validateStartcode, String serverName) - throws IOException { - + throws IOException { InternalScanner scanner = null; TreeMap results = new TreeMap(Bytes.BYTES_COMPARATOR); @@ -140,7 +145,55 @@ public class TestScanner extends HBaseTestCase { byte [] bytes = region.get(ROW_KEY, HConstants.COL_REGIONINFO).getValue(); validateRegionInfo(bytes); } + + /** + * HBase-910. + * @throws Exception + */ + public void testScanAndConcurrentFlush() throws Exception { + this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null); + HRegionIncommon hri = new HRegionIncommon(r); + try { + addContent(hri, Bytes.toString(HConstants.COL_REGIONINFO)); + int count = count(hri, -1); + assertEquals(count, count(hri, 100)); + assertEquals(count, count(hri, 0)); + assertEquals(count, count(hri, count - 1)); + } finally { + this.r.close(); + this.r.getLog().closeAndDelete(); + shutdownDfs(cluster); + } + } + /* + * @param hri Region + * @param flushIndex At what row we start the flush. + * @return Count of rows found. + * @throws IOException + */ + private int count(final HRegionIncommon hri, final int flushIndex) + throws IOException { + LOG.info("Taking out counting scan"); + ScannerIncommon s = hri.getScanner(EXPLICIT_COLS, + HConstants.EMPTY_START_ROW, HConstants.LATEST_TIMESTAMP); + HStoreKey key = new HStoreKey(); + SortedMap values = + new TreeMap(Bytes.BYTES_COMPARATOR); + int count = 0; + while (s.next(key, values)) { + count++; + if (flushIndex == count) { + LOG.info("Starting flush at flush index " + flushIndex); + hri.flushcache(); + LOG.info("Finishing flush"); + } + } + s.close(); + LOG.info("Found " + count + " items"); + return count; + } + /** The test! * @throws IOException */