HBASE-910 Scanner misses columns / rows when the scanner is obtained durring a memcache flush
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@718865 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6c89d36cef
commit
0d735d26a9
|
@ -77,6 +77,8 @@ Release 0.19.0 - Unreleased
|
|||
HBASE-1003 If cell exceeds TTL but not VERSIONs, will not be removed during
|
||||
major compaction
|
||||
HBASE-1005 Regex and string comparison operators for ColumnValueFilter
|
||||
HBASE-910 Scanner misses columns / rows when the scanner is obtained
|
||||
durring a memcache flush
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-901 Add a limit to key length, check key and value length on client side
|
||||
|
|
|
@ -21,13 +21,12 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -36,11 +35,12 @@ import org.apache.hadoop.hbase.HStoreKey;
|
|||
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
||||
import org.apache.hadoop.hbase.io.Cell;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.MapFile;
|
||||
|
||||
/**
|
||||
* Scanner scans both the memcache and the HStore
|
||||
*/
|
||||
class HStoreScanner implements InternalScanner {
|
||||
class HStoreScanner implements InternalScanner, ChangedReadersObserver {
|
||||
static final Log LOG = LogFactory.getLog(HStoreScanner.class);
|
||||
|
||||
private InternalScanner[] scanners;
|
||||
|
@ -50,6 +50,15 @@ class HStoreScanner implements InternalScanner {
|
|||
private boolean multipleMatchers = false;
|
||||
private RowFilterInterface dataFilter;
|
||||
private HStore store;
|
||||
private final long timestamp;
|
||||
private final byte [][] targetCols;
|
||||
|
||||
// Indices for memcache scanner and hstorefile scanner.
|
||||
private static final int MEMS_INDEX = 0;
|
||||
private static final int HSFS_INDEX = MEMS_INDEX + 1;
|
||||
|
||||
// Used around transition from no storefile to the first.
|
||||
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
|
||||
/** Create an Scanner with a handle on the memcache and HStore files. */
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -64,51 +73,72 @@ class HStoreScanner implements InternalScanner {
|
|||
this.scanners = new InternalScanner[2];
|
||||
this.resultSets = new TreeMap[scanners.length];
|
||||
this.keys = new HStoreKey[scanners.length];
|
||||
// Save these args in case we need them later handling change in readers
|
||||
// See updateReaders below.
|
||||
this.timestamp = timestamp;
|
||||
this.targetCols = targetCols;
|
||||
|
||||
try {
|
||||
scanners[0] = store.memcache.getScanner(timestamp, targetCols, firstRow);
|
||||
scanners[1] = new StoreFileScanner(store, timestamp, targetCols, firstRow);
|
||||
for (int i = 0; i < scanners.length; i++) {
|
||||
if (scanners[i].isWildcardScanner()) {
|
||||
this.wildcardMatch = true;
|
||||
}
|
||||
if (scanners[i].isMultipleMatchScanner()) {
|
||||
this.multipleMatchers = true;
|
||||
}
|
||||
}
|
||||
} catch(IOException e) {
|
||||
for (int i = 0; i < this.scanners.length; i++) {
|
||||
if(scanners[i] != null) {
|
||||
closeScanner(i);
|
||||
}
|
||||
scanners[MEMS_INDEX] =
|
||||
store.memcache.getScanner(timestamp, targetCols, firstRow);
|
||||
scanners[HSFS_INDEX] =
|
||||
new StoreFileScanner(store, timestamp, targetCols, firstRow);
|
||||
for (int i = MEMS_INDEX; i < scanners.length; i++) {
|
||||
checkScannerFlags(i);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
doClose();
|
||||
throw e;
|
||||
}
|
||||
|
||||
// Advance to the first key in each scanner.
|
||||
// All results will match the required column-set and scanTime.
|
||||
for (int i = 0; i < scanners.length; i++) {
|
||||
keys[i] = new HStoreKey();
|
||||
resultSets[i] = new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
|
||||
if(scanners[i] != null && !scanners[i].next(keys[i], resultSets[i])) {
|
||||
closeScanner(i);
|
||||
}
|
||||
for (int i = MEMS_INDEX; i < scanners.length; i++) {
|
||||
setupScanner(i);
|
||||
}
|
||||
|
||||
this.store.addChangedReaderObserver(this);
|
||||
}
|
||||
|
||||
/*
|
||||
* @param i Index.
|
||||
*/
|
||||
private void checkScannerFlags(final int i) {
|
||||
if (this.scanners[i].isWildcardScanner()) {
|
||||
this.wildcardMatch = true;
|
||||
}
|
||||
if (this.scanners[i].isMultipleMatchScanner()) {
|
||||
this.multipleMatchers = true;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Do scanner setup.
|
||||
* @param i
|
||||
* @throws IOException
|
||||
*/
|
||||
private void setupScanner(final int i) throws IOException {
|
||||
this.keys[i] = new HStoreKey();
|
||||
this.resultSets[i] = new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
|
||||
if (this.scanners[i] != null && !this.scanners[i].next(this.keys[i], this.resultSets[i])) {
|
||||
closeScanner(i);
|
||||
}
|
||||
}
|
||||
|
||||
/** @return true if the scanner is a wild card scanner */
|
||||
public boolean isWildcardScanner() {
|
||||
return wildcardMatch;
|
||||
return this.wildcardMatch;
|
||||
}
|
||||
|
||||
/** @return true if the scanner is a multiple match scanner */
|
||||
public boolean isMultipleMatchScanner() {
|
||||
return multipleMatchers;
|
||||
return this.multipleMatchers;
|
||||
}
|
||||
|
||||
public boolean next(HStoreKey key, SortedMap<byte [], Cell> results)
|
||||
throws IOException {
|
||||
|
||||
throws IOException {
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
// Filtered flag is set by filters. If a cell has been 'filtered out'
|
||||
// -- i.e. it is not to be returned to the caller -- the flag is 'true'.
|
||||
boolean filtered = true;
|
||||
|
@ -243,6 +273,9 @@ class HStoreScanner implements InternalScanner {
|
|||
}
|
||||
|
||||
return moreToFollow;
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/** Shut down a single scanner */
|
||||
|
@ -261,10 +294,43 @@ class HStoreScanner implements InternalScanner {
|
|||
}
|
||||
|
||||
public void close() {
|
||||
for(int i = 0; i < scanners.length; i++) {
|
||||
if(scanners[i] != null) {
|
||||
this.store.deleteChangedReaderObserver(this);
|
||||
doClose();
|
||||
}
|
||||
|
||||
private void doClose() {
|
||||
for (int i = MEMS_INDEX; i < scanners.length; i++) {
|
||||
if (scanners[i] != null) {
|
||||
closeScanner(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Implementation of ChangedReadersObserver
|
||||
|
||||
public void updateReaders() throws IOException {
|
||||
this.lock.writeLock().lock();
|
||||
try {
|
||||
MapFile.Reader [] readers = this.store.getReaders();
|
||||
if (this.scanners[HSFS_INDEX] == null && readers != null &&
|
||||
readers.length > 0) {
|
||||
// Presume that we went from no readers to at least one -- need to put
|
||||
// a HStoreScanner in place.
|
||||
try {
|
||||
// I think its safe getting key from mem at this stage -- it shouldn't have
|
||||
// been flushed yet
|
||||
this.scanners[HSFS_INDEX] = new StoreFileScanner(this.store,
|
||||
this.timestamp, this. targetCols, this.keys[MEMS_INDEX].getRow());
|
||||
checkScannerFlags(HSFS_INDEX);
|
||||
setupScanner(HSFS_INDEX);
|
||||
LOG.debug("Added a StoreFileScanner to outstanding HStoreScanner");
|
||||
} catch (IOException e) {
|
||||
doClose();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this.lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -22,25 +22,30 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HBaseTestCase;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.hbase.io.Cell;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
|
||||
/**
|
||||
* Test of a long-lived scanner validating as we go.
|
||||
*/
|
||||
public class TestScanner extends HBaseTestCase {
|
||||
private static final byte [] FIRST_ROW = HConstants.EMPTY_START_ROW;
|
||||
private final Log LOG = LogFactory.getLog(this.getClass());
|
||||
|
||||
private static final byte [] FIRST_ROW =
|
||||
HConstants.EMPTY_START_ROW;
|
||||
private static final byte [][] COLS = {
|
||||
HConstants.COLUMN_FAMILY
|
||||
};
|
||||
|
@ -52,7 +57,8 @@ public class TestScanner extends HBaseTestCase {
|
|||
|
||||
private static final byte [] ROW_KEY =
|
||||
HRegionInfo.ROOT_REGIONINFO.getRegionName();
|
||||
private static final HRegionInfo REGION_INFO = HRegionInfo.ROOT_REGIONINFO;
|
||||
private static final HRegionInfo REGION_INFO =
|
||||
HRegionInfo.ROOT_REGIONINFO;
|
||||
|
||||
private static final long START_CODE = Long.MAX_VALUE;
|
||||
|
||||
|
@ -84,8 +90,7 @@ public class TestScanner extends HBaseTestCase {
|
|||
|
||||
/** Use a scanner to get the region info and then validate the results */
|
||||
private void scan(boolean validateStartcode, String serverName)
|
||||
throws IOException {
|
||||
|
||||
throws IOException {
|
||||
InternalScanner scanner = null;
|
||||
TreeMap<byte [], Cell> results =
|
||||
new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
|
||||
|
@ -140,7 +145,55 @@ public class TestScanner extends HBaseTestCase {
|
|||
byte [] bytes = region.get(ROW_KEY, HConstants.COL_REGIONINFO).getValue();
|
||||
validateRegionInfo(bytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* HBase-910.
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testScanAndConcurrentFlush() throws Exception {
|
||||
this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
|
||||
HRegionIncommon hri = new HRegionIncommon(r);
|
||||
try {
|
||||
addContent(hri, Bytes.toString(HConstants.COL_REGIONINFO));
|
||||
int count = count(hri, -1);
|
||||
assertEquals(count, count(hri, 100));
|
||||
assertEquals(count, count(hri, 0));
|
||||
assertEquals(count, count(hri, count - 1));
|
||||
} finally {
|
||||
this.r.close();
|
||||
this.r.getLog().closeAndDelete();
|
||||
shutdownDfs(cluster);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* @param hri Region
|
||||
* @param flushIndex At what row we start the flush.
|
||||
* @return Count of rows found.
|
||||
* @throws IOException
|
||||
*/
|
||||
private int count(final HRegionIncommon hri, final int flushIndex)
|
||||
throws IOException {
|
||||
LOG.info("Taking out counting scan");
|
||||
ScannerIncommon s = hri.getScanner(EXPLICIT_COLS,
|
||||
HConstants.EMPTY_START_ROW, HConstants.LATEST_TIMESTAMP);
|
||||
HStoreKey key = new HStoreKey();
|
||||
SortedMap<byte [], Cell> values =
|
||||
new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
|
||||
int count = 0;
|
||||
while (s.next(key, values)) {
|
||||
count++;
|
||||
if (flushIndex == count) {
|
||||
LOG.info("Starting flush at flush index " + flushIndex);
|
||||
hri.flushcache();
|
||||
LOG.info("Finishing flush");
|
||||
}
|
||||
}
|
||||
s.close();
|
||||
LOG.info("Found " + count + " items");
|
||||
return count;
|
||||
}
|
||||
|
||||
/** The test!
|
||||
* @throws IOException
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue