HBASE-4594 Fix maxVersions when scanning a file with a TS > read point

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1203467 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Nicolas Spiegelberg 2011-11-18 02:19:33 +00:00
parent 619d1a327e
commit 1e68440431
15 changed files with 115 additions and 83 deletions

View File

@ -606,12 +606,9 @@ public class HFileReaderV2 extends AbstractHFileReader {
blockBuffer.reset();
if (this.reader.shouldIncludeMemstoreTS()) {
try {
ByteArrayInputStream byte_input = new ByteArrayInputStream(blockBuffer.array());
byte_input.skip(blockBuffer.arrayOffset() + blockBuffer.position()
+ KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen);
DataInputStream data_input = new DataInputStream(byte_input);
currMemstoreTS = WritableUtils.readVLong(data_input);
int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position()
+ KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen;
currMemstoreTS = Bytes.readVLong(blockBuffer.array(), memstoreTSOffset);
currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS);
} catch (Exception e) {
throw new RuntimeException("Error reading memstoreTS. " + e);
@ -654,12 +651,9 @@ public class HFileReaderV2 extends AbstractHFileReader {
blockBuffer.reset();
if (this.reader.shouldIncludeMemstoreTS()) {
try {
ByteArrayInputStream byte_input = new ByteArrayInputStream(blockBuffer.array());
byte_input.skip(blockBuffer.arrayOffset() + blockBuffer.position()
+ KEY_VALUE_LEN_SIZE + klen + vlen);
DataInputStream data_input = new DataInputStream(byte_input);
memstoreTS = WritableUtils.readVLong(data_input);
int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position()
+ KEY_VALUE_LEN_SIZE + klen + vlen;
memstoreTS = Bytes.readVLong(blockBuffer.array(), memstoreTSOffset);
memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
} catch (Exception e) {
throw new RuntimeException("Error reading memstoreTS. " + e);

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.KeyValue.KeyComparator;
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
@ -394,10 +395,6 @@ public class HFileWriterV2 extends AbstractHFileWriter {
entryCount++;
}
public static int getEncodedLength(long value) {
return WritableUtils.getVIntSize(value);
}
@Override
public void close() throws IOException {
if (outputStream == null) {

View File

@ -48,12 +48,15 @@ public interface ColumnTracker {
* @param length
* @param ttl The timeToLive to enforce.
* @param type The type of the KeyValue
* @param ignoreCount indicates if the KV needs to be excluded while counting
* (used during compactions. We only count KV's that are older than all the
* scanners' read points.)
* @return The match code instance.
* @throws IOException in case there is an internal consistency problem
* caused by a data corruption.
*/
public ScanQueryMatcher.MatchCode checkColumn(byte[] bytes, int offset,
int length, long ttl, byte type)
int length, long ttl, byte type, boolean ignoreCount)
throws IOException;
/**

View File

@ -102,7 +102,7 @@ public class ExplicitColumnTracker implements ColumnTracker {
*/
@Override
public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset,
int length, long timestamp, byte type) {
int length, long timestamp, byte type, boolean ignoreCount) {
// delete markers should never be passed to an
// *Explicit*ColumnTracker
assert !KeyValue.isDelete(type);
@ -124,6 +124,8 @@ public class ExplicitColumnTracker implements ColumnTracker {
// Column Matches. If it is not a duplicate key, increment the version count
// and include.
if(ret == 0) {
if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE;
//If column matches, check if it is a duplicate timestamp
if (sameAsPreviousTS(timestamp)) {
//If duplicate, skip this Key

View File

@ -3026,7 +3026,6 @@ public class HRegion implements HeapSize { // , Writable{
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
StoreScanner scanner = store.getScanner(scan, entry.getValue());
scanner.useRWCC(true);
scanners.add(scanner);
}
this.storeHeap = new KeyValueHeap(scanners, comparator);

View File

@ -44,7 +44,13 @@ public class ReadWriteConsistencyControl {
new LinkedList<WriteEntry>();
private static final ThreadLocal<Long> perThreadReadPoint =
new ThreadLocal<Long>();
new ThreadLocal<Long>() {
@Override
protected
Long initialValue() {
return Long.MAX_VALUE;
}
};
/**
* Default constructor. Initializes the memstoreRead/Write points to 0.
@ -63,10 +69,6 @@ public class ReadWriteConsistencyControl {
throw new RuntimeException("Already used this rwcc. Too late to initialize");
}
if (this.memstoreWrite > startPoint) {
throw new RuntimeException("Cannot decrease RWCC timestamp");
}
this.memstoreRead = this.memstoreWrite = startPoint;
}
}
@ -179,7 +181,6 @@ public class ReadWriteConsistencyControl {
}
}
if (interrupted) Thread.currentThread().interrupt();
}
public long memstoreReadPoint() {

View File

@ -92,11 +92,8 @@ public class ScanQueryMatcher {
*/
private final long earliestPutTs;
/** Should we ignore KV's with a newer RWCC timestamp **/
private boolean enforceRWCC = false;
public void useRWCC(boolean flag) {
this.enforceRWCC = flag;
}
/** readPoint over which the KVs are unconditionally included */
protected long maxReadPointToTrackVersions;
/**
* This variable shows whether there is an null column in the query. There
@ -116,6 +113,7 @@ public class ScanQueryMatcher {
*/
public ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo,
NavigableSet<byte[]> columns, StoreScanner.ScanType scanType,
long readPointToUse,
long earliestPutTs) {
this.tr = scan.getTimeRange();
this.rowComparator = scanInfo.getComparator().getRawComparator();
@ -125,6 +123,7 @@ public class ScanQueryMatcher {
scanInfo.getFamily());
this.filter = scan.getFilter();
this.earliestPutTs = earliestPutTs;
this.maxReadPointToTrackVersions = readPointToUse;
/* how to deal with deletes */
// keep deleted cells: if compaction or raw scan
@ -159,6 +158,7 @@ public class ScanQueryMatcher {
ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo,
NavigableSet<byte[]> columns) {
this(scan, scanInfo, columns, StoreScanner.ScanType.USER_SCAN,
Long.MAX_VALUE, /* max Readpoint to track versions */
HConstants.LATEST_TIMESTAMP);
}
@ -234,13 +234,6 @@ public class ScanQueryMatcher {
return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
}
// The compaction thread has no readPoint set. For other operations, we
// will ignore updates that are done after the read operation has started.
if (this.enforceRWCC &&
kv.getMemstoreTS() > ReadWriteConsistencyControl.getThreadReadPoint()) {
return MatchCode.SKIP;
}
/*
* The delete logic is pretty complicated now.
* This is corroborated by the following:
@ -328,7 +321,7 @@ public class ScanQueryMatcher {
}
MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength,
timestamp, type);
timestamp, type, kv.getMemstoreTS() > maxReadPointToTrackVersions);
/*
* According to current implementation, colChecker can only be
* SEEK_NEXT_COL, SEEK_NEXT_ROW, SKIP or INCLUDE. Therefore, always return

View File

@ -64,17 +64,20 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
*/
@Override
public MatchCode checkColumn(byte[] bytes, int offset, int length,
long timestamp, byte type) throws IOException {
long timestamp, byte type, boolean ignoreCount) throws IOException {
if (columnBuffer == null) {
// first iteration.
resetBuffer(bytes, offset, length);
if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE;
// do not count a delete marker as another version
return checkVersion(type, timestamp);
}
int cmp = Bytes.compareTo(bytes, offset, length,
columnBuffer, columnOffset, columnLength);
if (cmp == 0) {
if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE;
//If column matches, check if it is a duplicate timestamp
if (sameAsPreviousTSAndType(timestamp, type)) {
return ScanQueryMatcher.MatchCode.SKIP;
@ -88,6 +91,7 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
if (cmp > 0) {
// switched columns, lets do something.x
resetBuffer(bytes, offset, length);
if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE;
return checkVersion(type, timestamp);
}

View File

@ -535,7 +535,8 @@ public class Store extends SchemaConfigured implements HeapSize {
// treat this as a minor compaction.
InternalScanner scanner = new StoreScanner(this, scan, Collections
.singletonList(new CollectionBackedScanner(set, this.comparator)),
ScanType.MINOR_COMPACT, HConstants.OLDEST_TIMESTAMP);
ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(),
HConstants.OLDEST_TIMESTAMP);
try {
// TODO: We can fail in the below block before we complete adding this
// flush to list of store files. Add cleanup of anything put on filesystem
@ -556,6 +557,9 @@ public class Store extends SchemaConfigured implements HeapSize {
// If we know that this KV is going to be included always, then let us
// set its memstoreTS to 0. This will help us save space when writing to disk.
if (kv.getMemstoreTS() <= smallestReadPoint) {
// let us not change the original KV. It could be in the memstore
// changing its memstoreTS could affect other threads/scanners.
kv = kv.shallowCopy();
kv.setMemstoreTS(0);
}
writer.append(kv);
@ -1281,6 +1285,7 @@ public class Store extends SchemaConfigured implements HeapSize {
StoreFile.Writer writer = null;
// Find the smallest read point across all the Scanners.
long smallestReadPoint = region.getSmallestReadPoint();
ReadWriteConsistencyControl.setThreadReadPoint(smallestReadPoint);
try {
InternalScanner scanner = null;
try {
@ -1289,7 +1294,7 @@ public class Store extends SchemaConfigured implements HeapSize {
/* include deletes, unless we are doing a major compaction */
scanner = new StoreScanner(this, scan, scanners,
majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
earliestPutTs);
smallestReadPoint, earliestPutTs);
if (region.getCoprocessorHost() != null) {
InternalScanner cpScanner = region.getCoprocessorHost().preCompact(
this, scanner);

View File

@ -1155,7 +1155,7 @@ public class StoreFile {
boolean isCompaction) {
return new StoreFileScanner(this,
getScanner(cacheBlocks, pread,
isCompaction));
isCompaction), !isCompaction);
}
/**

View File

@ -52,6 +52,8 @@ class StoreFileScanner implements KeyValueScanner {
private boolean delayedReseek;
private KeyValue delayedSeekKV;
private boolean enforceRWCC = false;
//The variable, realSeekDone, may cheat on store file scanner for the
// multi-column bloom-filter optimization.
// So this flag shows whether this storeFileScanner could do a reseek.
@ -65,9 +67,10 @@ class StoreFileScanner implements KeyValueScanner {
* Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
* @param hfs HFile scanner
*/
public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs) {
public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useRWCC) {
this.reader = reader;
this.hfs = hfs;
this.enforceRWCC = useRWCC;
}
/**
@ -122,11 +125,13 @@ class StoreFileScanner implements KeyValueScanner {
public KeyValue next() throws IOException {
KeyValue retKey = cur;
try {
// only seek if we aren't at the end. cur == null implies 'end'.
if (cur != null) {
hfs.next();
cur = hfs.getKeyValue();
skipKVsNewerThanReadpoint();
}
} catch(IOException e) {
throw new IOException("Could not iterate " + this, e);
@ -136,6 +141,7 @@ class StoreFileScanner implements KeyValueScanner {
public boolean seek(KeyValue key) throws IOException {
seekCount.incrementAndGet();
try {
try {
if(!seekAtOrAfter(hfs, key)) {
@ -145,7 +151,8 @@ class StoreFileScanner implements KeyValueScanner {
this.isReseekable = true;
cur = hfs.getKeyValue();
return true;
return skipKVsNewerThanReadpoint();
} finally {
realSeekDone = true;
}
@ -156,6 +163,7 @@ class StoreFileScanner implements KeyValueScanner {
public boolean reseek(KeyValue key) throws IOException {
seekCount.incrementAndGet();
try {
try {
if (!reseekAtOrAfter(hfs, key)) {
@ -163,7 +171,8 @@ class StoreFileScanner implements KeyValueScanner {
return false;
}
cur = hfs.getKeyValue();
return true;
return skipKVsNewerThanReadpoint();
} finally {
realSeekDone = true;
}
@ -172,6 +181,35 @@ class StoreFileScanner implements KeyValueScanner {
}
}
protected boolean skipKVsNewerThanReadpoint() throws IOException {
long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
// We want to ignore all key-values that are newer than our current
// readPoint
while(enforceRWCC
&& cur != null
&& (cur.getMemstoreTS() > readPoint)) {
hfs.next();
cur = hfs.getKeyValue();
}
if (cur == null) {
close();
return false;
}
// For the optimisation in HBASE-4346, we set the KV's memstoreTS to
// 0, if it is older than all the scanners' read points. It is possible
// that a newer KV's memstoreTS was reset to 0. But, there is an
// older KV which was not reset to 0 (because it was
// not old enough during flush). Make sure that we set it correctly now,
// so that the comparision order does not change.
if (cur.getMemstoreTS() <= readPoint) {
cur.setMemstoreTS(0);
}
return true;
}
public void close() {
// Nothing to close on HFileScanner?
cur = null;
@ -329,5 +367,4 @@ class StoreFileScanner implements KeyValueScanner {
static final long getSeekCount() {
return seekCount.get();
}
}

View File

@ -95,7 +95,7 @@ class StoreScanner extends NonLazyKeyValueScanner
"Cannot specify any column for a raw scan");
}
matcher = new ScanQueryMatcher(scan, store.scanInfo, columns,
ScanType.USER_SCAN, HConstants.LATEST_TIMESTAMP);
ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP);
// Pass columns to try to filter out unnecessary StoreFiles.
List<KeyValueScanner> scanners = getScanners(scan, columns);
@ -127,13 +127,15 @@ class StoreScanner extends NonLazyKeyValueScanner
* @param store who we scan
* @param scan the spec
* @param scanners ancilliary scanners
* @param smallestReadPoint the readPoint that we should use for tracking versions
* @param retainDeletesInOutput should we retain deletes after compaction?
*/
StoreScanner(Store store, Scan scan,
List<? extends KeyValueScanner> scanners, ScanType scanType,
long earliestPutTs) throws IOException {
long smallestReadPoint, long earliestPutTs) throws IOException {
this(store, false, scan, null);
matcher = new ScanQueryMatcher(scan, store.scanInfo, null, scanType,
earliestPutTs);
smallestReadPoint, earliestPutTs);
// Seek all scanners to the initial key
for(KeyValueScanner scanner : scanners) {
@ -150,7 +152,7 @@ class StoreScanner extends NonLazyKeyValueScanner
final List<KeyValueScanner> scanners) throws IOException {
this(null, scan.getCacheBlocks(), scan, columns);
this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
HConstants.LATEST_TIMESTAMP);
Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP);
// Seek all scanners to the initial key
for (KeyValueScanner scanner : scanners) {
@ -159,15 +161,6 @@ class StoreScanner extends NonLazyKeyValueScanner
heap = new KeyValueHeap(scanners, scanInfo.getComparator());
}
/**
* Advise the StoreScanner if it should enforce the RWCC mechanism
* for ignoring newer KVs or not.
* @param flag
*/
public void useRWCC(boolean flag) {
matcher.useRWCC(flag);
}
/*
* @return List of scanners ordered properly.
*/
@ -199,15 +192,17 @@ class StoreScanner extends NonLazyKeyValueScanner
// include only those scan files which pass all filters
for (KeyValueScanner kvs : allStoreScanners) {
if (kvs instanceof StoreFileScanner) {
if (memOnly == false && ((StoreFileScanner)kvs).shouldSeek(scan, columns))
if (memOnly == false && ((StoreFileScanner)kvs).shouldSeek(scan, columns)) {
scanners.add(kvs);
}
}
else {
// kvs is a MemStoreScanner
if (filesOnly == false && this.store.memstore.shouldSeek(scan))
if (filesOnly == false && this.store.memstore.shouldSeek(scan)) {
scanners.add(kvs);
}
}
}
return scanners;
}

View File

@ -57,7 +57,7 @@ public class TestExplicitColumnTracker extends HBaseTestCase {
//"Match"
for(byte [] col : scannerColumns){
result.add(exp.checkColumn(col, 0, col.length, ++timestamp,
KeyValue.Type.Put.getCode()));
KeyValue.Type.Put.getCode(), false));
}
assertEquals(expected.size(), result.size());
@ -169,13 +169,15 @@ public class TestExplicitColumnTracker extends HBaseTestCase {
Long.MAX_VALUE);
for (int i = 0; i < 100000; i+=2) {
byte [] col = Bytes.toBytes("col"+i);
explicit.checkColumn(col, 0, col.length, 1, KeyValue.Type.Put.getCode());
explicit.checkColumn(col, 0, col.length, 1, KeyValue.Type.Put.getCode(),
false);
}
explicit.update();
for (int i = 1; i < 100000; i+=2) {
byte [] col = Bytes.toBytes("col"+i);
explicit.checkColumn(col, 0, col.length, 1, KeyValue.Type.Put.getCode());
explicit.checkColumn(col, 0, col.length, 1, KeyValue.Type.Put.getCode(),
false);
}
}

View File

@ -121,7 +121,7 @@ public class TestFSErrorsExposed {
StoreFile sf = new StoreFile(fs, writer.getPath(), util.getConfiguration(),
cacheConf, BloomType.NONE);
List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
Collections.singletonList(sf), false, true);
Collections.singletonList(sf), false, true, false);
KeyValueScanner scanner = scanners.get(0);
FaultyInputStream inStream = fs.inStreams.get(0).get();

View File

@ -56,7 +56,7 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
for(byte [] qualifier : qualifiers) {
ScanQueryMatcher.MatchCode mc = tracker.checkColumn(qualifier, 0,
qualifier.length, 1, KeyValue.Type.Put.getCode());
qualifier.length, 1, KeyValue.Type.Put.getCode(), false);
actual.add(mc);
}
@ -89,7 +89,7 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
long timestamp = 0;
for(byte [] qualifier : qualifiers) {
MatchCode mc = tracker.checkColumn(qualifier, 0, qualifier.length,
++timestamp, KeyValue.Type.Put.getCode());
++timestamp, KeyValue.Type.Put.getCode(), false);
actual.add(mc);
}
@ -113,7 +113,7 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
try {
for(byte [] qualifier : qualifiers) {
tracker.checkColumn(qualifier, 0, qualifier.length, 1,
KeyValue.Type.Put.getCode());
KeyValue.Type.Put.getCode(), false);
}
} catch (Exception e) {
ok = true;