HBASE-9754 Eliminate threadlocal from MVCC code (Ted Yu)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1533633 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2013-10-18 21:09:48 +00:00
parent 6d0eade8b6
commit 09ec361d3a
23 changed files with 127 additions and 163 deletions

View File

@ -27,11 +27,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
@ -229,6 +231,7 @@ public class ZooKeeperScanPolicyObserver extends BaseRegionObserver {
// take default action // take default action
return null; return null;
} }
return new StoreScanner(store, scanInfo, scan, targetCols); return new StoreScanner(store, scanInfo, scan, targetCols,
((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
} }
} }

View File

@ -872,6 +872,17 @@ public class HRegion implements HeapSize { // , Writable{
public MultiVersionConsistencyControl getMVCC() { public MultiVersionConsistencyControl getMVCC() {
return mvcc; return mvcc;
} }
/*
* Returns readpoint considering given IsolationLevel
*/
public long getReadpoint(IsolationLevel isolationLevel) {
if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
// This scan can read even uncommitted transactions
return Long.MAX_VALUE;
}
return mvcc.memstoreReadPoint();
}
public boolean isLoadingCfsOnDemandDefault() { public boolean isLoadingCfsOnDemandDefault() {
return this.isLoadingCfsOnDemandDefault; return this.isLoadingCfsOnDemandDefault;
@ -3392,13 +3403,7 @@ public class HRegion implements HeapSize { // , Writable{
// getSmallestReadPoint, before scannerReadPoints is updated. // getSmallestReadPoint, before scannerReadPoints is updated.
IsolationLevel isolationLevel = scan.getIsolationLevel(); IsolationLevel isolationLevel = scan.getIsolationLevel();
synchronized(scannerReadPoints) { synchronized(scannerReadPoints) {
if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) { this.readPt = getReadpoint(isolationLevel);
// This scan can read even uncommitted transactions
this.readPt = Long.MAX_VALUE;
MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
} else {
this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
}
scannerReadPoints.put(this, this.readPt); scannerReadPoints.put(this, this.readPt);
} }
@ -3413,7 +3418,7 @@ public class HRegion implements HeapSize { // , Writable{
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
scan.getFamilyMap().entrySet()) { scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey()); Store store = stores.get(entry.getKey());
KeyValueScanner scanner = store.getScanner(scan, entry.getValue()); KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
|| this.filter.isFamilyEssential(entry.getKey())) { || this.filter.isFamilyEssential(entry.getKey())) {
scanners.add(scanner); scanners.add(scanner);
@ -3469,10 +3474,6 @@ public class HRegion implements HeapSize { // , Writable{
startRegionOperation(Operation.SCAN); startRegionOperation(Operation.SCAN);
readRequestsCount.increment(); readRequestsCount.increment();
try { try {
// This could be a new thread from the last time we called next().
MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
return nextRaw(outResults, limit); return nextRaw(outResults, limit);
} finally { } finally {
closeRegionOperation(); closeRegionOperation();
@ -3740,8 +3741,6 @@ public class HRegion implements HeapSize { // , Writable{
boolean result = false; boolean result = false;
startRegionOperation(); startRegionOperation();
try { try {
// This could be a new thread from the last time we called next().
MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
KeyValue kv = KeyValue.createFirstOnRow(row); KeyValue kv = KeyValue.createFirstOnRow(row);
// use request seek to make use of the lazy seek option. See HBASE-5520 // use request seek to make use of the lazy seek option. See HBASE-5520
result = this.storeHeap.requestSeek(kv, true, true); result = this.storeHeap.requestSeek(kv, true, true);

View File

@ -3066,7 +3066,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
maxResultSize = maxScannerResultSize; maxResultSize = maxScannerResultSize;
} }
List<Cell> values = new ArrayList<Cell>(); List<Cell> values = new ArrayList<Cell>();
MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
region.startRegionOperation(Operation.SCAN); region.startRegionOperation(Operation.SCAN);
try { try {
int i = 0; int i = 0;

View File

@ -880,14 +880,14 @@ public class HStore implements Store {
@Override @Override
public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet,
boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
byte[] stopRow) throws IOException { byte[] stopRow, long readPt) throws IOException {
Collection<StoreFile> storeFilesToScan; Collection<StoreFile> storeFilesToScan;
List<KeyValueScanner> memStoreScanners; List<KeyValueScanner> memStoreScanners;
this.lock.readLock().lock(); this.lock.readLock().lock();
try { try {
storeFilesToScan = storeFilesToScan =
this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow); this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
memStoreScanners = this.memstore.getScanners(); memStoreScanners = this.memstore.getScanners(readPt);
} finally { } finally {
this.lock.readLock().unlock(); this.lock.readLock().unlock();
} }
@ -898,7 +898,8 @@ public class HStore implements Store {
// but now we get them in ascending order, which I think is // but now we get them in ascending order, which I think is
// actually more correct, since memstore get put at the end. // actually more correct, since memstore get put at the end.
List<StoreFileScanner> sfScanners = StoreFileScanner List<StoreFileScanner> sfScanners = StoreFileScanner
.getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, matcher); .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, matcher,
readPt);
List<KeyValueScanner> scanners = List<KeyValueScanner> scanners =
new ArrayList<KeyValueScanner>(sfScanners.size()+1); new ArrayList<KeyValueScanner>(sfScanners.size()+1);
scanners.addAll(sfScanners); scanners.addAll(sfScanners);
@ -1648,7 +1649,7 @@ public class HStore implements Store {
@Override @Override
public KeyValueScanner getScanner(Scan scan, public KeyValueScanner getScanner(Scan scan,
final NavigableSet<byte []> targetCols) throws IOException { final NavigableSet<byte []> targetCols, long readPt) throws IOException {
lock.readLock().lock(); lock.readLock().lock();
try { try {
KeyValueScanner scanner = null; KeyValueScanner scanner = null;
@ -1656,7 +1657,7 @@ public class HStore implements Store {
scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols); scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
} }
if (scanner == null) { if (scanner == null) {
scanner = new StoreScanner(this, getScanInfo(), scan, targetCols); scanner = new StoreScanner(this, getScanInfo(), scan, targetCols, readPt);
} }
return scanner; return scanner;
} finally { } finally {

View File

@ -59,7 +59,7 @@ public class KeyValueHeap extends NonLazyKeyValueScanner
private KeyValueScanner current = null; private KeyValueScanner current = null;
private KVScannerComparator comparator; private KVScannerComparator comparator;
/** /**
* Constructor. This KeyValueHeap will handle closing of passed in * Constructor. This KeyValueHeap will handle closing of passed in
* KeyValueScanners. * KeyValueScanners.

View File

@ -674,11 +674,11 @@ public class MemStore implements HeapSize {
/** /**
* @return scanner on memstore and snapshot in this order. * @return scanner on memstore and snapshot in this order.
*/ */
List<KeyValueScanner> getScanners() { List<KeyValueScanner> getScanners(long readPt) {
this.lock.readLock().lock(); this.lock.readLock().lock();
try { try {
return Collections.<KeyValueScanner>singletonList( return Collections.<KeyValueScanner>singletonList(
new MemStoreScanner(MultiVersionConsistencyControl.getThreadReadPoint())); new MemStoreScanner(readPt));
} finally { } finally {
this.lock.readLock().unlock(); this.lock.readLock().unlock();
} }

View File

@ -24,9 +24,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
/** /**
* Manages the read/write consistency within memstore. This provides * Manages the read/write consistency within memstore. This provides
* an interface for readers to determine what entries to ignore, and * an interface for readers to determine what entries to ignore, and
@ -44,15 +41,6 @@ public class MultiVersionConsistencyControl {
private final LinkedList<WriteEntry> writeQueue = private final LinkedList<WriteEntry> writeQueue =
new LinkedList<WriteEntry>(); new LinkedList<WriteEntry>();
private static final ThreadLocal<Long> perThreadReadPoint =
new ThreadLocal<Long>() {
@Override
protected
Long initialValue() {
return Long.MAX_VALUE;
}
};
/** /**
* Default constructor. Initializes the memstoreRead/Write points to 0. * Default constructor. Initializes the memstoreRead/Write points to 0.
*/ */
@ -74,40 +62,6 @@ public class MultiVersionConsistencyControl {
} }
} }
/**
* Get this thread's read point. Used primarily by the memstore scanner to
* know which values to skip (ie: have not been completed/committed to
* memstore).
*/
public static long getThreadReadPoint() {
return perThreadReadPoint.get();
}
/**
* Set the thread read point to the given value. The thread MVCC
* is used by the Memstore scanner so it knows which values to skip.
* Give it a value of 0 if you want everything.
*/
public static void setThreadReadPoint(long readPoint) {
perThreadReadPoint.set(readPoint);
}
/**
* Set the thread MVCC read point to whatever the current read point is in
* this particular instance of MVCC. Returns the new thread read point value.
*/
public static long resetThreadReadPoint(MultiVersionConsistencyControl mvcc) {
perThreadReadPoint.set(mvcc.memstoreReadPoint());
return getThreadReadPoint();
}
/**
* Set the thread MVCC read point to 0 (include everything).
*/
public static void resetThreadReadPoint() {
perThreadReadPoint.set(0L);
}
/** /**
* Generate and return a {@link WriteEntry} with a new write number. * Generate and return a {@link WriteEntry} with a new write number.
* To complete the WriteEntry and wait for it to be visible, * To complete the WriteEntry and wait for it to be visible,

View File

@ -75,7 +75,7 @@ public interface Store extends HeapSize, StoreConfigInformation {
* @return a scanner over the current key values * @return a scanner over the current key values
* @throws IOException on failure * @throws IOException on failure
*/ */
KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols) KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt)
throws IOException; throws IOException;
/** /**
@ -88,6 +88,7 @@ public interface Store extends HeapSize, StoreConfigInformation {
* @param matcher * @param matcher
* @param startRow * @param startRow
* @param stopRow * @param stopRow
* @param readPt
* @return all scanners for this store * @return all scanners for this store
*/ */
List<KeyValueScanner> getScanners( List<KeyValueScanner> getScanners(
@ -97,7 +98,8 @@ public interface Store extends HeapSize, StoreConfigInformation {
boolean isCompaction, boolean isCompaction,
ScanQueryMatcher matcher, ScanQueryMatcher matcher,
byte[] startRow, byte[] startRow,
byte[] stopRow byte[] stopRow,
long readPt
) throws IOException; ) throws IOException;
ScanInfo getScanInfo(); ScanInfo getScanInfo();

View File

@ -1054,7 +1054,10 @@ public class StoreFile {
*/ */
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
boolean pread) { boolean pread) {
return getStoreFileScanner(cacheBlocks, pread, false); return getStoreFileScanner(cacheBlocks, pread, false,
// 0 is passed as readpoint because this method is only used by test
// where StoreFile is directly operated upon
0);
} }
/** /**
@ -1067,10 +1070,10 @@ public class StoreFile {
*/ */
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
boolean pread, boolean pread,
boolean isCompaction) { boolean isCompaction, long readPt) {
return new StoreFileScanner(this, return new StoreFileScanner(this,
getScanner(cacheBlocks, pread, getScanner(cacheBlocks, pread, isCompaction),
isCompaction), !isCompaction, reader.hasMVCCInfo()); !isCompaction, reader.hasMVCCInfo(), readPt);
} }
/** /**

View File

@ -58,12 +58,16 @@ public class StoreFileScanner implements KeyValueScanner {
private static AtomicLong seekCount; private static AtomicLong seekCount;
private ScanQueryMatcher matcher; private ScanQueryMatcher matcher;
private long readPt;
/** /**
* Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
* @param hfs HFile scanner * @param hfs HFile scanner
*/ */
public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC, boolean hasMVCC) { public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC,
boolean hasMVCC, long readPt) {
this.readPt = readPt;
this.reader = reader; this.reader = reader;
this.hfs = hfs; this.hfs = hfs;
this.enforceMVCC = useMVCC; this.enforceMVCC = useMVCC;
@ -77,9 +81,9 @@ public class StoreFileScanner implements KeyValueScanner {
public static List<StoreFileScanner> getScannersForStoreFiles( public static List<StoreFileScanner> getScannersForStoreFiles(
Collection<StoreFile> files, Collection<StoreFile> files,
boolean cacheBlocks, boolean cacheBlocks,
boolean usePread) throws IOException { boolean usePread, long readPt) throws IOException {
return getScannersForStoreFiles(files, cacheBlocks, return getScannersForStoreFiles(files, cacheBlocks,
usePread, false); usePread, false, readPt);
} }
/** /**
@ -87,9 +91,9 @@ public class StoreFileScanner implements KeyValueScanner {
*/ */
public static List<StoreFileScanner> getScannersForStoreFiles( public static List<StoreFileScanner> getScannersForStoreFiles(
Collection<StoreFile> files, boolean cacheBlocks, boolean usePread, Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
boolean isCompaction) throws IOException { boolean isCompaction, long readPt) throws IOException {
return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
null); null, readPt);
} }
/** /**
@ -99,13 +103,13 @@ public class StoreFileScanner implements KeyValueScanner {
*/ */
public static List<StoreFileScanner> getScannersForStoreFiles( public static List<StoreFileScanner> getScannersForStoreFiles(
Collection<StoreFile> files, boolean cacheBlocks, boolean usePread, Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
boolean isCompaction, ScanQueryMatcher matcher) throws IOException { boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException {
List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>( List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
files.size()); files.size());
for (StoreFile file : files) { for (StoreFile file : files) {
StoreFile.Reader r = file.createReader(); StoreFile.Reader r = file.createReader();
StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
isCompaction); isCompaction, readPt);
scanner.setScanQueryMatcher(matcher); scanner.setScanQueryMatcher(matcher);
scanners.add(scanner); scanners.add(scanner);
} }
@ -180,13 +184,11 @@ public class StoreFileScanner implements KeyValueScanner {
} }
protected boolean skipKVsNewerThanReadpoint() throws IOException { protected boolean skipKVsNewerThanReadpoint() throws IOException {
long readPoint = MultiVersionConsistencyControl.getThreadReadPoint();
// We want to ignore all key-values that are newer than our current // We want to ignore all key-values that are newer than our current
// readPoint // readPoint
while(enforceMVCC while(enforceMVCC
&& cur != null && cur != null
&& (cur.getMvccVersion() > readPoint)) { && (cur.getMvccVersion() > readPt)) {
hfs.next(); hfs.next();
cur = hfs.getKeyValue(); cur = hfs.getKeyValue();
} }
@ -202,7 +204,7 @@ public class StoreFileScanner implements KeyValueScanner {
// older KV which was not reset to 0 (because it was // older KV which was not reset to 0 (because it was
// not old enough during flush). Make sure that we set it correctly now, // not old enough during flush). Make sure that we set it correctly now,
// so that the comparision order does not change. // so that the comparision order does not change.
if (cur.getMvccVersion() <= readPoint) { if (cur.getMvccVersion() <= readPt) {
cur.setMvccVersion(0); cur.setMvccVersion(0);
} }
return true; return true;

View File

@ -95,10 +95,13 @@ public class StoreScanner extends NonLazyKeyValueScanner
// A flag whether use pread for scan // A flag whether use pread for scan
private boolean scanUsePread = false; private boolean scanUsePread = false;
private final long readPt;
/** An internal constructor. */ /** An internal constructor. */
protected StoreScanner(Store store, boolean cacheBlocks, Scan scan, protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
final NavigableSet<byte[]> columns, long ttl, int minVersions) { final NavigableSet<byte[]> columns, long ttl, int minVersions, long readPt) {
this.readPt = readPt;
this.store = store; this.store = store;
this.cacheBlocks = cacheBlocks; this.cacheBlocks = cacheBlocks;
isGet = scan.isGetScan(); isGet = scan.isGetScan();
@ -137,10 +140,11 @@ public class StoreScanner extends NonLazyKeyValueScanner
* @param columns which columns we are scanning * @param columns which columns we are scanning
* @throws IOException * @throws IOException
*/ */
public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns) public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
long readPt)
throws IOException { throws IOException {
this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(), this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
scanInfo.getMinVersions()); scanInfo.getMinVersions(), readPt);
if (columns != null && scan.isRaw()) { if (columns != null && scan.isRaw()) {
throw new DoNotRetryIOException( throw new DoNotRetryIOException(
"Cannot specify any column for a raw scan"); "Cannot specify any column for a raw scan");
@ -220,7 +224,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint, List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
this(store, false, scan, null, scanInfo.getTtl(), this(store, false, scan, null, scanInfo.getTtl(),
scanInfo.getMinVersions()); scanInfo.getMinVersions(), smallestReadPoint);
if (dropDeletesFromRow == null) { if (dropDeletesFromRow == null) {
matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType,
smallestReadPoint, earliestPutTs, oldestUnexpiredTS); smallestReadPoint, earliestPutTs, oldestUnexpiredTS);
@ -250,16 +254,27 @@ public class StoreScanner extends NonLazyKeyValueScanner
ScanType scanType, final NavigableSet<byte[]> columns, ScanType scanType, final NavigableSet<byte[]> columns,
final List<KeyValueScanner> scanners) throws IOException { final List<KeyValueScanner> scanners) throws IOException {
this(scan, scanInfo, scanType, columns, scanners, this(scan, scanInfo, scanType, columns, scanners,
HConstants.LATEST_TIMESTAMP); HConstants.LATEST_TIMESTAMP,
// 0 is passed as readpoint because the test bypasses Store
0);
} }
// Constructor for testing. // Constructor for testing.
StoreScanner(final Scan scan, ScanInfo scanInfo, StoreScanner(final Scan scan, ScanInfo scanInfo,
ScanType scanType, final NavigableSet<byte[]> columns,
final List<KeyValueScanner> scanners, long earliestPutTs)
throws IOException {
this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
// 0 is passed as readpoint because the test bypasses Store
0);
}
private StoreScanner(final Scan scan, ScanInfo scanInfo,
ScanType scanType, final NavigableSet<byte[]> columns, ScanType scanType, final NavigableSet<byte[]> columns,
final List<KeyValueScanner> scanners, long earliestPutTs) final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
throws IOException { throws IOException {
this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(), this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
scanInfo.getMinVersions()); scanInfo.getMinVersions(), readPt);
this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType, this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS); Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS);
@ -282,7 +297,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
final boolean isCompaction = false; final boolean isCompaction = false;
boolean usePread = isGet || scanUsePread; boolean usePread = isGet || scanUsePread;
return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread, return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
isCompaction, matcher, scan.getStartRow(), scan.getStopRow())); isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
} }
/** /**
@ -626,7 +641,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
for (KeyValueScanner scanner : scanners) { for (KeyValueScanner scanner : scanners) {
if (scanner instanceof StoreFileScanner) { if (scanner instanceof StoreFileScanner) {
ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
MultiVersionConsistencyControl.getThreadReadPoint(), latch); this.readPt, latch);
executor.submit(seekHandler); executor.submit(seekHandler);
handlers.add(seekHandler); handlers.add(seekHandler);
} else { } else {

View File

@ -178,14 +178,13 @@ public abstract class Compactor {
} }
protected List<StoreFileScanner> createFileScanners( protected List<StoreFileScanner> createFileScanners(
final Collection<StoreFile> filesToCompact) throws IOException { final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException {
return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true); return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true,
smallestReadPoint);
} }
protected long setSmallestReadPoint() { protected long getSmallestReadPoint() {
long smallestReadPoint = store.getSmallestReadPoint(); return store.getSmallestReadPoint();
MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
return smallestReadPoint;
} }
protected InternalScanner preCreateCoprocScanner(final CompactionRequest request, protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,

View File

@ -47,12 +47,12 @@ public class DefaultCompactor extends Compactor {
FileDetails fd = getFileDetails(request.getFiles(), request.isMajor()); FileDetails fd = getFileDetails(request.getFiles(), request.isMajor());
this.progress = new CompactionProgress(fd.maxKeyCount); this.progress = new CompactionProgress(fd.maxKeyCount);
List<StoreFileScanner> scanners = createFileScanners(request.getFiles()); // Find the smallest read point across all the Scanners.
long smallestReadPoint = getSmallestReadPoint();
List<StoreFileScanner> scanners = createFileScanners(request.getFiles(), smallestReadPoint);
StoreFile.Writer writer = null; StoreFile.Writer writer = null;
List<Path> newFiles = new ArrayList<Path>(); List<Path> newFiles = new ArrayList<Path>();
// Find the smallest read point across all the Scanners.
long smallestReadPoint = setSmallestReadPoint();
try { try {
InternalScanner scanner = null; InternalScanner scanner = null;
try { try {

View File

@ -54,7 +54,6 @@ public class ParallelSeekHandler extends EventHandler {
@Override @Override
public void process() { public void process() {
try { try {
MultiVersionConsistencyControl.setThreadReadPoint(readPoint);
scanner.seek(keyValue); scanner.seek(keyValue);
} catch (IOException e) { } catch (IOException e) {
LOG.error("", e); LOG.error("", e);

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
@ -2772,10 +2773,12 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
*/ */
public static List<Cell> getFromStoreFile(HStore store, public static List<Cell> getFromStoreFile(HStore store,
Get get) throws IOException { Get get) throws IOException {
MultiVersionConsistencyControl.resetThreadReadPoint();
Scan scan = new Scan(get); Scan scan = new Scan(get);
InternalScanner scanner = (InternalScanner) store.getScanner(scan, InternalScanner scanner = (InternalScanner) store.getScanner(scan,
scan.getFamilyMap().get(store.getFamily().getName())); scan.getFamilyMap().get(store.getFamily().getName()),
// originally MultiVersionConsistencyControl.resetThreadReadPoint() was called to set
// readpoint 0.
0);
List<Cell> result = new ArrayList<Cell>(); List<Cell> result = new ArrayList<Cell>();
scanner.next(result); scanner.next(result);

View File

@ -23,11 +23,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -44,12 +42,14 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
@ -99,7 +99,8 @@ public class TestRegionObserverScannerOpenHook {
Store store, Scan scan, NavigableSet<byte[]> targetCols, KeyValueScanner s) Store store, Scan scan, NavigableSet<byte[]> targetCols, KeyValueScanner s)
throws IOException { throws IOException {
scan.setFilter(new NoDataFilter()); scan.setFilter(new NoDataFilter());
return new StoreScanner(store, store.getScanInfo(), scan, targetCols); return new StoreScanner(store, store.getScanInfo(), scan, targetCols,
((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
} }
} }
@ -265,4 +266,4 @@ public class TestRegionObserverScannerOpenHook {
table.close(); table.close();
UTIL.shutdownMiniCluster(); UTIL.shutdownMiniCluster();
} }
} }

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
@ -335,10 +336,6 @@ public class HFileReadWriteTest {
public void runMergeWorkload() throws IOException { public void runMergeWorkload() throws IOException {
long maxKeyCount = prepareForMerge(); long maxKeyCount = prepareForMerge();
List<StoreFileScanner> scanners =
StoreFileScanner.getScannersForStoreFiles(inputStoreFiles, false,
false);
HColumnDescriptor columnDescriptor = new HColumnDescriptor( HColumnDescriptor columnDescriptor = new HColumnDescriptor(
HFileReadWriteTest.class.getSimpleName()); HFileReadWriteTest.class.getSimpleName());
columnDescriptor.setBlocksize(blockSize); columnDescriptor.setBlocksize(blockSize);
@ -350,6 +347,10 @@ public class HFileReadWriteTest {
HRegion region = new HRegion(outputDir, null, fs, conf, regionInfo, htd, null); HRegion region = new HRegion(outputDir, null, fs, conf, regionInfo, htd, null);
HStore store = new HStore(region, columnDescriptor, conf); HStore store = new HStore(region, columnDescriptor, conf);
List<StoreFileScanner> scanners =
StoreFileScanner.getScannersForStoreFiles(inputStoreFiles, false,
false, region.getReadpoint(IsolationLevel.READ_COMMITTED));
StoreFile.Writer writer = store.createWriterInTmp(maxKeyCount, compression, false, true, false); StoreFile.Writer writer = store.createWriterInTmp(maxKeyCount, compression, false, true, false);
StatisticsPrinter statsPrinter = new StatisticsPrinter(); StatisticsPrinter statsPrinter = new StatisticsPrinter();

View File

@ -74,6 +74,8 @@ public class NoOpScanPolicyObserver extends BaseRegionObserver {
public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
Store store, final Scan scan, final NavigableSet<byte[]> targetCols, KeyValueScanner s) Store store, final Scan scan, final NavigableSet<byte[]> targetCols, KeyValueScanner s)
throws IOException { throws IOException {
return new StoreScanner(store, store.getScanInfo(), scan, targetCols); HRegion r = c.getEnvironment().getRegion();
return new StoreScanner(store, store.getScanInfo(), scan, targetCols,
r.getReadpoint(scan.getIsolationLevel()));
} }
} }

View File

@ -144,7 +144,9 @@ public class TestFSErrorsExposed {
cacheConf, BloomType.NONE, NoOpDataBlockEncoder.INSTANCE); cacheConf, BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles( List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
Collections.singletonList(sf), false, true, false); Collections.singletonList(sf), false, true, false,
// 0 is passed as readpoint because this test operates on StoreFile directly
0);
KeyValueScanner scanner = scanners.get(0); KeyValueScanner scanner = scanners.get(0);
FaultyInputStream inStream = faultyfs.inStreams.get(0).get(); FaultyInputStream inStream = faultyfs.inStreams.get(0).get();

View File

@ -1861,12 +1861,10 @@ public class TestHRegion {
scan.addFamily(fam2); scan.addFamily(fam2);
scan.addFamily(fam4); scan.addFamily(fam4);
is = (RegionScannerImpl) region.getScanner(scan); is = (RegionScannerImpl) region.getScanner(scan);
MultiVersionConsistencyControl.resetThreadReadPoint(region.getMVCC());
assertEquals(1, ((RegionScannerImpl) is).storeHeap.getHeap().size()); assertEquals(1, ((RegionScannerImpl) is).storeHeap.getHeap().size());
scan = new Scan(); scan = new Scan();
is = (RegionScannerImpl) region.getScanner(scan); is = (RegionScannerImpl) region.getScanner(scan);
MultiVersionConsistencyControl.resetThreadReadPoint(region.getMVCC());
assertEquals(families.length - 1, ((RegionScannerImpl) is).storeHeap.getHeap().size()); assertEquals(families.length - 1, ((RegionScannerImpl) is).storeHeap.getHeap().size());
} finally { } finally {
HRegion.closeHRegion(this.region); HRegion.closeHRegion(this.region);

View File

@ -90,10 +90,9 @@ public class TestMemStore extends TestCase {
*/ */
public void testScanAcrossSnapshot() throws IOException { public void testScanAcrossSnapshot() throws IOException {
int rowCount = addRows(this.memstore); int rowCount = addRows(this.memstore);
List<KeyValueScanner> memstorescanners = this.memstore.getScanners(); List<KeyValueScanner> memstorescanners = this.memstore.getScanners(0);
Scan scan = new Scan(); Scan scan = new Scan();
List<Cell> result = new ArrayList<Cell>(); List<Cell> result = new ArrayList<Cell>();
MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
ScanInfo scanInfo = new ScanInfo(null, 0, 1, HConstants.LATEST_TIMESTAMP, false, ScanInfo scanInfo = new ScanInfo(null, 0, 1, HConstants.LATEST_TIMESTAMP, false,
0, this.memstore.comparator); 0, this.memstore.comparator);
ScanType scanType = ScanType.USER_SCAN; ScanType scanType = ScanType.USER_SCAN;
@ -115,8 +114,7 @@ public class TestMemStore extends TestCase {
scanner.close(); scanner.close();
} }
MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); memstorescanners = this.memstore.getScanners(mvcc.memstoreReadPoint());
memstorescanners = this.memstore.getScanners();
// Now assert can count same number even if a snapshot mid-scan. // Now assert can count same number even if a snapshot mid-scan.
s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
count = 0; count = 0;
@ -141,7 +139,7 @@ public class TestMemStore extends TestCase {
for (KeyValueScanner scanner : memstorescanners) { for (KeyValueScanner scanner : memstorescanners) {
scanner.close(); scanner.close();
} }
memstorescanners = this.memstore.getScanners(); memstorescanners = this.memstore.getScanners(mvcc.memstoreReadPoint());
// Assert that new values are seen in kvset as we scan. // Assert that new values are seen in kvset as we scan.
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
@ -206,8 +204,7 @@ public class TestMemStore extends TestCase {
private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2)
throws IOException { throws IOException {
MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); List<KeyValueScanner> memstorescanners = this.memstore.getScanners(mvcc.memstoreReadPoint());
List<KeyValueScanner> memstorescanners = this.memstore.getScanners();
assertEquals(1, memstorescanners.size()); assertEquals(1, memstorescanners.size());
final KeyValueScanner scanner = memstorescanners.get(0); final KeyValueScanner scanner = memstorescanners.get(0);
scanner.seek(KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW)); scanner.seek(KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW));
@ -248,14 +245,12 @@ public class TestMemStore extends TestCase {
kv1.setMvccVersion(w.getWriteNumber()); kv1.setMvccVersion(w.getWriteNumber());
memstore.add(kv1); memstore.add(kv1);
MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); KeyValueScanner s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
KeyValueScanner s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{}); assertScannerResults(s, new KeyValue[]{});
mvcc.completeMemstoreInsert(w); mvcc.completeMemstoreInsert(w);
MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv1}); assertScannerResults(s, new KeyValue[]{kv1});
w = mvcc.beginMemstoreInsert(); w = mvcc.beginMemstoreInsert();
@ -263,14 +258,12 @@ public class TestMemStore extends TestCase {
kv2.setMvccVersion(w.getWriteNumber()); kv2.setMvccVersion(w.getWriteNumber());
memstore.add(kv2); memstore.add(kv2);
MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv1}); assertScannerResults(s, new KeyValue[]{kv1});
mvcc.completeMemstoreInsert(w); mvcc.completeMemstoreInsert(w);
MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv1, kv2}); assertScannerResults(s, new KeyValue[]{kv1, kv2});
} }
@ -302,8 +295,7 @@ public class TestMemStore extends TestCase {
mvcc.completeMemstoreInsert(w); mvcc.completeMemstoreInsert(w);
// BEFORE STARTING INSERT 2, SEE FIRST KVS // BEFORE STARTING INSERT 2, SEE FIRST KVS
MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); KeyValueScanner s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
KeyValueScanner s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv11, kv12}); assertScannerResults(s, new KeyValue[]{kv11, kv12});
// START INSERT 2: Write both columns val2 // START INSERT 2: Write both columns val2
@ -317,8 +309,7 @@ public class TestMemStore extends TestCase {
memstore.add(kv22); memstore.add(kv22);
// BEFORE COMPLETING INSERT 2, SEE FIRST KVS // BEFORE COMPLETING INSERT 2, SEE FIRST KVS
MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv11, kv12}); assertScannerResults(s, new KeyValue[]{kv11, kv12});
// COMPLETE INSERT 2 // COMPLETE INSERT 2
@ -327,8 +318,7 @@ public class TestMemStore extends TestCase {
// NOW SHOULD SEE NEW KVS IN ADDITION TO OLD KVS. // NOW SHOULD SEE NEW KVS IN ADDITION TO OLD KVS.
// See HBASE-1485 for discussion about what we should do with // See HBASE-1485 for discussion about what we should do with
// the duplicate-TS inserts // the duplicate-TS inserts
MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv21, kv11, kv22, kv12}); assertScannerResults(s, new KeyValue[]{kv21, kv11, kv22, kv12});
} }
@ -357,8 +347,7 @@ public class TestMemStore extends TestCase {
mvcc.completeMemstoreInsert(w); mvcc.completeMemstoreInsert(w);
// BEFORE STARTING INSERT 2, SEE FIRST KVS // BEFORE STARTING INSERT 2, SEE FIRST KVS
MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); KeyValueScanner s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
KeyValueScanner s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv11, kv12}); assertScannerResults(s, new KeyValue[]{kv11, kv12});
// START DELETE: Insert delete for one of the columns // START DELETE: Insert delete for one of the columns
@ -369,16 +358,14 @@ public class TestMemStore extends TestCase {
memstore.add(kvDel); memstore.add(kvDel);
// BEFORE COMPLETING DELETE, SEE FIRST KVS // BEFORE COMPLETING DELETE, SEE FIRST KVS
MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv11, kv12}); assertScannerResults(s, new KeyValue[]{kv11, kv12});
// COMPLETE DELETE // COMPLETE DELETE
mvcc.completeMemstoreInsert(w); mvcc.completeMemstoreInsert(w);
// NOW WE SHOULD SEE DELETE // NOW WE SHOULD SEE DELETE
MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv11, kvDel, kv12}); assertScannerResults(s, new KeyValue[]{kv11, kvDel, kv12});
} }
@ -430,9 +417,7 @@ public class TestMemStore extends TestCase {
mvcc.completeMemstoreInsert(w); mvcc.completeMemstoreInsert(w);
// Assert that we can read back // Assert that we can read back
MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); KeyValueScanner s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
KeyValueScanner s = this.memstore.getScanners().get(0);
s.seek(kv); s.seek(kv);
KeyValue ret = s.next(); KeyValue ret = s.next();
@ -507,7 +492,6 @@ public class TestMemStore extends TestCase {
* @throws InterruptedException * @throws InterruptedException
*/ */
public void testGetNextRow() throws Exception { public void testGetNextRow() throws Exception {
MultiVersionConsistencyControl.resetThreadReadPoint();
addRows(this.memstore); addRows(this.memstore);
// Add more versions to make it a little more interesting. // Add more versions to make it a little more interesting.
Thread.sleep(1); Thread.sleep(1);
@ -532,7 +516,7 @@ public class TestMemStore extends TestCase {
ScanType scanType = ScanType.USER_SCAN; ScanType scanType = ScanType.USER_SCAN;
InternalScanner scanner = new StoreScanner(new Scan( InternalScanner scanner = new StoreScanner(new Scan(
Bytes.toBytes(startRowId)), scanInfo, scanType, null, Bytes.toBytes(startRowId)), scanInfo, scanType, null,
memstore.getScanners()); memstore.getScanners(0));
List<Cell> results = new ArrayList<Cell>(); List<Cell> results = new ArrayList<Cell>();
for (int i = 0; scanner.next(results); i++) { for (int i = 0; scanner.next(results); i++) {
int rowId = startRowId + i; int rowId = startRowId + i;
@ -1036,7 +1020,7 @@ public class TestMemStore extends TestCase {
static void doScan(MemStore ms, int iteration) throws IOException { static void doScan(MemStore ms, int iteration) throws IOException {
long nanos = System.nanoTime(); long nanos = System.nanoTime();
KeyValueScanner s = ms.getScanners().get(0); KeyValueScanner s = ms.getScanners(0).get(0);
s.seek(KeyValue.createFirstOnRow(new byte[]{})); s.seek(KeyValue.createFirstOnRow(new byte[]{}));
System.out.println(iteration + " create/seek took: " + (System.nanoTime() - nanos)/1000); System.out.println(iteration + " create/seek took: " + (System.nanoTime() - nanos)/1000);
@ -1055,17 +1039,11 @@ public class TestMemStore extends TestCase {
addRows(25000, ms); addRows(25000, ms);
System.out.println("Took for insert: " + (System.nanoTime()-n1)/1000); System.out.println("Took for insert: " + (System.nanoTime()-n1)/1000);
System.out.println("foo"); System.out.println("foo");
MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
for (int i = 0 ; i < 50 ; i++) for (int i = 0 ; i < 50 ; i++)
doScan(ms, i); doScan(ms, i);
} }
} }

View File

@ -164,7 +164,7 @@ public class TestMemStoreChunkPool {
assertEquals(2, memstore.kvset.size()); assertEquals(2, memstore.kvset.size());
// opening scanner before clear the snapshot // opening scanner before clear the snapshot
List<KeyValueScanner> scanners = memstore.getScanners(); List<KeyValueScanner> scanners = memstore.getScanners(0);
// Shouldn't putting back the chunks to pool,since some scanners are opening // Shouldn't putting back the chunks to pool,since some scanners are opening
// based on their data // based on their data
memstore.clearSnapshot(snapshot); memstore.clearSnapshot(snapshot);
@ -187,7 +187,7 @@ public class TestMemStoreChunkPool {
memstore.add(new KeyValue(row, fam, qf6, val)); memstore.add(new KeyValue(row, fam, qf6, val));
memstore.add(new KeyValue(row, fam, qf7, val)); memstore.add(new KeyValue(row, fam, qf7, val));
// opening scanners // opening scanners
scanners = memstore.getScanners(); scanners = memstore.getScanners(0);
// close scanners before clear the snapshot // close scanners before clear the snapshot
for (KeyValueScanner scanner : scanners) { for (KeyValueScanner scanner : scanners) {
scanner.close(); scanner.close();
@ -198,4 +198,4 @@ public class TestMemStoreChunkPool {
assertTrue(chunkPool.getPoolSize() > 0); assertTrue(chunkPool.getPoolSize() > 0);
} }
} }

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
@ -51,6 +52,7 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScanType;
@ -287,7 +289,8 @@ public class TestCoprocessorScanPolicy {
newVersions == null ? family.getMaxVersions() : newVersions, newVersions == null ? family.getMaxVersions() : newVersions,
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
return new StoreScanner(store, scanInfo, scan, targetCols); return new StoreScanner(store, scanInfo, scan, targetCols,
((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
} }
} }