HBASE-4071 Data GC: Remove all versions > TTL EXCEPT the last written version

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1160978 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-08-24 05:15:37 +00:00
parent df360dccde
commit cca758c679
14 changed files with 657 additions and 102 deletions

View File

@ -453,6 +453,8 @@ Release 0.91.0 - Unreleased
HBASE-4176 Exposing HBase Filters to the Thrift API (Anirudh Todi) HBASE-4176 Exposing HBase Filters to the Thrift API (Anirudh Todi)
HBASE-4221 Changes necessary to build and run against Hadoop 0.23 HBASE-4221 Changes necessary to build and run against Hadoop 0.23
(todd) (todd)
HBASE-4071 Data GC: Remove all versions > TTL EXCEPT the last
written version (Lars Hofhansl)
Release 0.90.5 - Unreleased Release 0.90.5 - Unreleased

View File

@ -87,6 +87,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
public static final String BLOOMFILTER = "BLOOMFILTER"; public static final String BLOOMFILTER = "BLOOMFILTER";
public static final String FOREVER = "FOREVER"; public static final String FOREVER = "FOREVER";
public static final String REPLICATION_SCOPE = "REPLICATION_SCOPE"; public static final String REPLICATION_SCOPE = "REPLICATION_SCOPE";
public static final String MIN_VERSIONS = "MIN_VERSIONS";
/** /**
* Default compression type. * Default compression type.
@ -99,6 +100,11 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
*/ */
public static final int DEFAULT_VERSIONS = 3; public static final int DEFAULT_VERSIONS = 3;
/**
* Default is not to keep a minimum of versions.
*/
public static final int DEFAULT_MIN_VERSIONS = 0;
/* /*
* Cache here the HCD value. * Cache here the HCD value.
* Question: its OK to cache since when we're reenable, we create a new HCD? * Question: its OK to cache since when we're reenable, we create a new HCD?
@ -259,6 +265,37 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
final String compression, final boolean inMemory, final String compression, final boolean inMemory,
final boolean blockCacheEnabled, final int blocksize, final boolean blockCacheEnabled, final int blocksize,
final int timeToLive, final String bloomFilter, final int scope) { final int timeToLive, final String bloomFilter, final int scope) {
this(familyName, DEFAULT_MIN_VERSIONS, maxVersions, compression, inMemory,
blockCacheEnabled, blocksize, timeToLive, bloomFilter, scope);
}
/**
* Constructor
* @param familyName Column family name. Must be 'printable' -- digit or
* letter -- and may not contain a <code>:<code>
* @param minVersions Minimum number of versions to keep
* @param maxVersions Maximum number of versions to keep
* @param compression Compression type
* @param inMemory If true, column data should be kept in an HRegionServer's
* cache
* @param blockCacheEnabled If true, MapFile blocks should be cached
* @param blocksize Block size to use when writing out storefiles. Use
* smaller blocksizes for faster random-access at expense of larger indices
* (more memory consumption). Default is usually 64k.
* @param timeToLive Time-to-live of cell contents, in seconds
* (use HConstants.FOREVER for unlimited TTL)
* @param bloomFilter Bloom filter type for this column
* @param scope The scope tag for this column
*
* @throws IllegalArgumentException if passed a family name that is made of
* other than 'word' characters: i.e. <code>[a-zA-Z_0-9]</code> or contains
* a <code>:</code>
* @throws IllegalArgumentException if the number of versions is &lt;= 0
*/
public HColumnDescriptor(final byte [] familyName, final int minVersions,
final int maxVersions, final String compression, final boolean inMemory,
final boolean blockCacheEnabled, final int blocksize,
final int timeToLive, final String bloomFilter, final int scope) {
isLegalFamilyName(familyName); isLegalFamilyName(familyName);
this.name = familyName; this.name = familyName;
@ -267,7 +304,19 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
// Until there is support, consider 0 or < 0 -- a configuration error. // Until there is support, consider 0 or < 0 -- a configuration error.
throw new IllegalArgumentException("Maximum versions must be positive"); throw new IllegalArgumentException("Maximum versions must be positive");
} }
if (minVersions > 0) {
if (timeToLive == HConstants.FOREVER) {
throw new IllegalArgumentException("Minimum versions requires TTL.");
}
if (minVersions > maxVersions) {
throw new IllegalArgumentException("Minimum versions must be <= "+
"maximum versions.");
}
}
setMaxVersions(maxVersions); setMaxVersions(maxVersions);
setMinVersions(minVersions);
setInMemory(inMemory); setInMemory(inMemory);
setBlockCacheEnabled(blockCacheEnabled); setBlockCacheEnabled(blockCacheEnabled);
setTimeToLive(timeToLive); setTimeToLive(timeToLive);
@ -508,6 +557,22 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
setValue(TTL, Integer.toString(timeToLive)); setValue(TTL, Integer.toString(timeToLive));
} }
/**
* @return The minimum number of versions to keep.
*/
public int getMinVersions() {
String value = getValue(MIN_VERSIONS);
return (value != null)? Integer.valueOf(value).intValue(): 0;
}
/**
* @param minVersions The minimum number of versions to keep.
* (used when timeToLive is set)
*/
public void setMinVersions(int minVersions) {
setValue(MIN_VERSIONS, Integer.toString(minVersions));
}
/** /**
* @return True if MapFile blocks should be cached. * @return True if MapFile blocks should be cached.
*/ */

View File

@ -19,10 +19,12 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
/** /**
* Implementing classes of this interface will be used for the tracking * Implementing classes of this interface will be used for the tracking
* and enforcement of columns and numbers of versions during the course of a * and enforcement of columns and numbers of versions and timeToLive during
* Get or Scan operation. * the course of a Get or Scan operation.
* <p> * <p>
* Currently there are two different types of Store/Family-level queries. * Currently there are two different types of Store/Family-level queries.
* <ul><li>{@link ExplicitColumnTracker} is used when the query specifies * <ul><li>{@link ExplicitColumnTracker} is used when the query specifies
@ -42,11 +44,11 @@ public interface ColumnTracker {
* @param bytes * @param bytes
* @param offset * @param offset
* @param length * @param length
* @param timestamp * @param ttl The timeToLive to enforce.
* @return The match code instance. * @return The match code instance.
*/ */
public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset, public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset,
int length, long timestamp); int length, long ttl);
/** /**
* Updates internal variables in between files * Updates internal variables in between files
@ -76,4 +78,19 @@ public interface ColumnTracker {
* @return null, or a ColumnCount that we should seek to * @return null, or a ColumnCount that we should seek to
*/ */
public ColumnCount getColumnHint(); public ColumnCount getColumnHint();
/**
* Retrieve the MatchCode for the next row or column
*/
public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset,
int qualLength);
/**
* Give the tracker a chance to declare it's done based on only the timestamp
* to allow an early out.
*
* @param timestamp
* @return <code>true</code> to early out based on timestamp.
*/
public boolean isDone(long timestamp);
} }

View File

@ -24,6 +24,7 @@ import java.util.List;
import java.util.NavigableSet; import java.util.NavigableSet;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
/** /**
@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.util.Bytes;
public class ExplicitColumnTracker implements ColumnTracker { public class ExplicitColumnTracker implements ColumnTracker {
private final int maxVersions; private final int maxVersions;
private final int minVersions;
private final List<ColumnCount> columns; private final List<ColumnCount> columns;
private final List<ColumnCount> columnsToReuse; private final List<ColumnCount> columnsToReuse;
private int index; private int index;
@ -55,18 +57,24 @@ public class ExplicitColumnTracker implements ColumnTracker {
/** Keeps track of the latest timestamp included for current column. /** Keeps track of the latest timestamp included for current column.
* Used to eliminate duplicates. */ * Used to eliminate duplicates. */
private long latestTSOfCurrentColumn; private long latestTSOfCurrentColumn;
private long oldestStamp;
/** /**
* Default constructor. * Default constructor.
* @param columns columns specified user in query * @param columns columns specified user in query
* @param minVersions minimum number of versions to keep
* @param maxVersions maximum versions to return per column * @param maxVersions maximum versions to return per column
* @param ttl The timeToLive to enforce
*/ */
public ExplicitColumnTracker(NavigableSet<byte[]> columns, int maxVersions) { public ExplicitColumnTracker(NavigableSet<byte[]> columns, int minVersions,
int maxVersions, long ttl) {
this.maxVersions = maxVersions; this.maxVersions = maxVersions;
this.minVersions = minVersions;
this.oldestStamp = System.currentTimeMillis() - ttl;
this.columns = new ArrayList<ColumnCount>(columns.size()); this.columns = new ArrayList<ColumnCount>(columns.size());
this.columnsToReuse = new ArrayList<ColumnCount>(columns.size()); this.columnsToReuse = new ArrayList<ColumnCount>(columns.size());
for(byte [] column : columns) { for(byte [] column : columns) {
this.columnsToReuse.add(new ColumnCount(column,maxVersions)); this.columnsToReuse.add(new ColumnCount(column));
} }
reset(); reset();
} }
@ -108,7 +116,7 @@ public class ExplicitColumnTracker implements ColumnTracker {
int ret = Bytes.compareTo(column.getBuffer(), column.getOffset(), int ret = Bytes.compareTo(column.getBuffer(), column.getOffset(),
column.getLength(), bytes, offset, length); column.getLength(), bytes, offset, length);
// Column Matches. If it is not a duplicate key, decrement versions left // Column Matches. If it is not a duplicate key, increment the version count
// and include. // and include.
if(ret == 0) { if(ret == 0) {
//If column matches, check if it is a duplicate timestamp //If column matches, check if it is a duplicate timestamp
@ -116,7 +124,8 @@ public class ExplicitColumnTracker implements ColumnTracker {
//If duplicate, skip this Key //If duplicate, skip this Key
return ScanQueryMatcher.MatchCode.SKIP; return ScanQueryMatcher.MatchCode.SKIP;
} }
if(this.column.decrement() == 0) { int count = this.column.increment();
if(count >= maxVersions || (count >= minVersions && isExpired(timestamp))) {
// Done with versions for this column // Done with versions for this column
this.columns.remove(this.index); this.columns.remove(this.index);
resetTS(); resetTS();
@ -185,11 +194,15 @@ public class ExplicitColumnTracker implements ColumnTracker {
return timestamp == latestTSOfCurrentColumn; return timestamp == latestTSOfCurrentColumn;
} }
private boolean isExpired(long timestamp) {
return timestamp < oldestStamp;
}
private void buildColumnList() { private void buildColumnList() {
this.columns.clear(); this.columns.clear();
this.columns.addAll(this.columnsToReuse); this.columns.addAll(this.columnsToReuse);
for(ColumnCount col : this.columns) { for(ColumnCount col : this.columns) {
col.setCount(this.maxVersions); col.setCount(0);
} }
} }
@ -227,5 +240,18 @@ public class ExplicitColumnTracker implements ColumnTracker {
} }
} }
} }
public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset,
int qualLength) {
doneWithColumn(bytes, offset,qualLength);
if (getColumnHint() == null) {
return MatchCode.SEEK_NEXT_ROW;
} else {
return MatchCode.SEEK_NEXT_COL;
}
}
public boolean isDone(long timestamp) {
return minVersions <=0 && isExpired(timestamp);
}
} }

View File

@ -1297,9 +1297,8 @@ public class HRegion implements HeapSize { // , Writable{
this.readRequestsCount.increment(); this.readRequestsCount.increment();
try { try {
Store store = getStore(family); Store store = getStore(family);
KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
// get the closest key. (HStore.getRowKeyAtOrBefore can return null) // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
KeyValue key = store.getRowKeyAtOrBefore(kv); KeyValue key = store.getRowKeyAtOrBefore(row);
Result result = null; Result result = null;
if (key != null) { if (key != null) {
Get get = new Get(key.getRow()); Get get = new Get(key.getRow());

View File

@ -53,9 +53,6 @@ public class ScanQueryMatcher {
/** Key to seek to in memstore and StoreFiles */ /** Key to seek to in memstore and StoreFiles */
protected KeyValue startKey; protected KeyValue startKey;
/** Oldest allowed version stamp for TTL enforcement */
protected long oldestStamp;
/** Row comparator for the region this query is for */ /** Row comparator for the region this query is for */
KeyValue.KeyComparator rowComparator; KeyValue.KeyComparator rowComparator;
@ -72,10 +69,9 @@ public class ScanQueryMatcher {
*/ */
public ScanQueryMatcher(Scan scan, byte [] family, public ScanQueryMatcher(Scan scan, byte [] family,
NavigableSet<byte[]> columns, long ttl, NavigableSet<byte[]> columns, long ttl,
KeyValue.KeyComparator rowComparator, int maxVersions, KeyValue.KeyComparator rowComparator, int minVersions, int maxVersions,
boolean retainDeletesInOutput) { boolean retainDeletesInOutput) {
this.tr = scan.getTimeRange(); this.tr = scan.getTimeRange();
this.oldestStamp = System.currentTimeMillis() - ttl;
this.rowComparator = rowComparator; this.rowComparator = rowComparator;
this.deletes = new ScanDeleteTracker(); this.deletes = new ScanDeleteTracker();
this.stopRow = scan.getStopRow(); this.stopRow = scan.getStopRow();
@ -86,19 +82,26 @@ public class ScanQueryMatcher {
// Single branch to deal with two types of reads (columns vs all in family) // Single branch to deal with two types of reads (columns vs all in family)
if (columns == null || columns.size() == 0) { if (columns == null || columns.size() == 0) {
// use a specialized scan for wildcard column tracker. // use a specialized scan for wildcard column tracker.
this.columns = new ScanWildcardColumnTracker(maxVersions); this.columns = new ScanWildcardColumnTracker(minVersions, maxVersions, ttl);
} else { } else {
// We can share the ExplicitColumnTracker, diff is we reset // We can share the ExplicitColumnTracker, diff is we reset
// between rows, not between storefiles. // between rows, not between storefiles.
this.columns = new ExplicitColumnTracker(columns,maxVersions); this.columns = new ExplicitColumnTracker(columns, minVersions, maxVersions,
ttl);
} }
} }
public ScanQueryMatcher(Scan scan, byte [] family, public ScanQueryMatcher(Scan scan, byte [] family,
NavigableSet<byte[]> columns, long ttl, NavigableSet<byte[]> columns, long ttl,
KeyValue.KeyComparator rowComparator, int maxVersions) { KeyValue.KeyComparator rowComparator, int minVersions, int maxVersions) {
/* By default we will not include deletes */ /* By default we will not include deletes */
/* deletes are included explicitly (for minor compaction) */ /* deletes are included explicitly (for minor compaction) */
this(scan, family, columns, ttl, rowComparator, maxVersions, false); this(scan, family, columns, ttl, rowComparator, minVersions, maxVersions,
false);
}
public ScanQueryMatcher(Scan scan, byte [] family,
NavigableSet<byte[]> columns, long ttl,
KeyValue.KeyComparator rowComparator, int maxVersions) {
this(scan, family, columns, ttl, rowComparator, 0, maxVersions);
} }
/** /**
@ -158,9 +161,9 @@ public class ScanQueryMatcher {
(offset - initialOffset) - KeyValue.TIMESTAMP_TYPE_SIZE; (offset - initialOffset) - KeyValue.TIMESTAMP_TYPE_SIZE;
long timestamp = kv.getTimestamp(); long timestamp = kv.getTimestamp();
if (isExpired(timestamp)) { // check for early out based on timestamp alone
// done, the rest of this column will also be expired as well. if (columns.isDone(timestamp)) {
return getNextRowOrNextColumn(bytes, offset, qualLength); return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
} }
byte type = kv.getType(); byte type = kv.getType();
@ -192,7 +195,7 @@ public class ScanQueryMatcher {
if (timestampComparison >= 1) { if (timestampComparison >= 1) {
return MatchCode.SKIP; return MatchCode.SKIP;
} else if (timestampComparison <= -1) { } else if (timestampComparison <= -1) {
return getNextRowOrNextColumn(bytes, offset, qualLength); return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
} }
/** /**
@ -206,7 +209,7 @@ public class ScanQueryMatcher {
if (filterResponse == ReturnCode.SKIP) { if (filterResponse == ReturnCode.SKIP) {
return MatchCode.SKIP; return MatchCode.SKIP;
} else if (filterResponse == ReturnCode.NEXT_COL) { } else if (filterResponse == ReturnCode.NEXT_COL) {
return getNextRowOrNextColumn(bytes, offset, qualLength); return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
} else if (filterResponse == ReturnCode.NEXT_ROW) { } else if (filterResponse == ReturnCode.NEXT_ROW) {
stickyNextRow = true; stickyNextRow = true;
return MatchCode.SEEK_NEXT_ROW; return MatchCode.SEEK_NEXT_ROW;
@ -228,23 +231,6 @@ public class ScanQueryMatcher {
} }
public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset,
int qualLength) {
if (columns instanceof ExplicitColumnTracker) {
//We only come here when we know that columns is an instance of
//ExplicitColumnTracker so we should never have a cast exception
((ExplicitColumnTracker)columns).doneWithColumn(bytes, offset,
qualLength);
if (columns.getColumnHint() == null) {
return MatchCode.SEEK_NEXT_ROW;
} else {
return MatchCode.SEEK_NEXT_COL;
}
} else {
return MatchCode.SEEK_NEXT_COL;
}
}
public boolean moreRowsMayExistAfter(KeyValue kv) { public boolean moreRowsMayExistAfter(KeyValue kv) {
if (!Bytes.equals(stopRow , HConstants.EMPTY_END_ROW) && if (!Bytes.equals(stopRow , HConstants.EMPTY_END_ROW) &&
rowComparator.compareRows(kv.getBuffer(),kv.getRowOffset(), rowComparator.compareRows(kv.getBuffer(),kv.getRowOffset(),
@ -278,10 +264,6 @@ public class ScanQueryMatcher {
return (type != KeyValue.Type.Put.getCode()); return (type != KeyValue.Type.Put.getCode());
} }
protected boolean isExpired(long timestamp) {
return (timestamp < oldestStamp);
}
/** /**
* *
* @return the start key * @return the start key

View File

@ -37,16 +37,22 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
private int columnLength = 0; private int columnLength = 0;
private int currentCount = 0; private int currentCount = 0;
private int maxVersions; private int maxVersions;
private int minVersions;
/* Keeps track of the latest timestamp included for current column. /* Keeps track of the latest timestamp included for current column.
* Used to eliminate duplicates. */ * Used to eliminate duplicates. */
private long latestTSOfCurrentColumn; private long latestTSOfCurrentColumn;
private long oldestStamp;
/** /**
* Return maxVersions of every row. * Return maxVersions of every row.
* @param maxVersion * @param minVersion Minimum number of versions to keep
* @param maxVersion Maximum number of versions to return
* @param ttl TimeToLive to enforce
*/ */
public ScanWildcardColumnTracker(int maxVersion) { public ScanWildcardColumnTracker(int minVersion, int maxVersion, long ttl) {
this.maxVersions = maxVersion; this.maxVersions = maxVersion;
this.minVersions = minVersion;
this.oldestStamp = System.currentTimeMillis() - ttl;
} }
/** /**
@ -65,16 +71,8 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
long timestamp) { long timestamp) {
if (columnBuffer == null) { if (columnBuffer == null) {
// first iteration. // first iteration.
columnBuffer = bytes; resetBuffer(bytes, offset, length);
columnOffset = offset; return checkVersion(++currentCount, timestamp);
columnLength = length;
currentCount = 0;
if (++currentCount > maxVersions) {
return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL;
}
setTS(timestamp);
return ScanQueryMatcher.MatchCode.INCLUDE;
} }
int cmp = Bytes.compareTo(bytes, offset, length, int cmp = Bytes.compareTo(bytes, offset, length,
columnBuffer, columnOffset, columnLength); columnBuffer, columnOffset, columnLength);
@ -83,11 +81,7 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
if (sameAsPreviousTS(timestamp)) { if (sameAsPreviousTS(timestamp)) {
return ScanQueryMatcher.MatchCode.SKIP; return ScanQueryMatcher.MatchCode.SKIP;
} }
if (++currentCount > maxVersions) { return checkVersion(++currentCount, timestamp);
return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; // skip to next col
}
setTS(timestamp);
return ScanQueryMatcher.MatchCode.INCLUDE;
} }
resetTS(); resetTS();
@ -95,14 +89,8 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
// new col > old col // new col > old col
if (cmp > 0) { if (cmp > 0) {
// switched columns, lets do something.x // switched columns, lets do something.x
columnBuffer = bytes; resetBuffer(bytes, offset, length);
columnOffset = offset; return checkVersion(++currentCount, timestamp);
columnLength = length;
currentCount = 0;
if (++currentCount > maxVersions)
return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL;
setTS(timestamp);
return ScanQueryMatcher.MatchCode.INCLUDE;
} }
// new col < oldcol // new col < oldcol
@ -114,15 +102,29 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
"into a column actually smaller than the previous column: " + "into a column actually smaller than the previous column: " +
Bytes.toStringBinary(bytes, offset, length)); Bytes.toStringBinary(bytes, offset, length));
// switched columns // switched columns
resetBuffer(bytes, offset, length);
return checkVersion(++currentCount, timestamp);
}
private void resetBuffer(byte[] bytes, int offset, int length) {
columnBuffer = bytes; columnBuffer = bytes;
columnOffset = offset; columnOffset = offset;
columnLength = length; columnLength = length;
currentCount = 0; currentCount = 0;
if (++currentCount > maxVersions) {
return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL;
} }
private MatchCode checkVersion(int version, long timestamp) {
if (version > maxVersions) {
return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; // skip to next col
}
// keep the KV if required by minversions or it is not expired, yet
if (version <= minVersions || !isExpired(timestamp)) {
setTS(timestamp); setTS(timestamp);
return ScanQueryMatcher.MatchCode.INCLUDE; return ScanQueryMatcher.MatchCode.INCLUDE;
} else {
return MatchCode.SEEK_NEXT_COL;
}
} }
@Override @Override
@ -150,6 +152,10 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
return timestamp == latestTSOfCurrentColumn; return timestamp == latestTSOfCurrentColumn;
} }
private boolean isExpired(long timestamp) {
return timestamp < oldestStamp;
}
/** /**
* Used by matcher and scan/get to get a hint of the next column * Used by matcher and scan/get to get a hint of the next column
* to seek to after checkColumn() returns SKIP. Returns the next interesting * to seek to after checkColumn() returns SKIP. Returns the next interesting
@ -170,4 +176,14 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
public boolean done() { public boolean done() {
return false; return false;
} }
public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset,
int qualLength) {
return MatchCode.SEEK_NEXT_COL;
}
public boolean isDone(long timestamp) {
return minVersions <=0 && isExpired(timestamp);
}
} }

View File

@ -40,7 +40,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
@ -52,8 +51,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -92,6 +89,7 @@ public class Store implements HeapSize {
final Configuration conf; final Configuration conf;
// ttl in milliseconds. // ttl in milliseconds.
protected long ttl; protected long ttl;
protected int minVersions;
long majorCompactionTime; long majorCompactionTime;
private final int minFilesToCompact; private final int minFilesToCompact;
private final int maxFilesToCompact; private final int maxFilesToCompact;
@ -179,6 +177,7 @@ public class Store implements HeapSize {
// second -> ms adjust for user data // second -> ms adjust for user data
this.ttl *= 1000; this.ttl *= 1000;
} }
this.minVersions = family.getMinVersions();
this.memstore = new MemStore(conf, this.comparator); this.memstore = new MemStore(conf, this.comparator);
this.storeNameStr = Bytes.toString(this.family.getName()); this.storeNameStr = Bytes.toString(this.family.getName());
@ -491,12 +490,14 @@ public class Store implements HeapSize {
// A. Write the map out to the disk // A. Write the map out to the disk
writer = createWriterInTmp(set.size()); writer = createWriterInTmp(set.size());
writer.setTimeRangeTracker(snapshotTimeRangeTracker); writer.setTimeRangeTracker(snapshotTimeRangeTracker);
int entries = 0;
try { try {
for (KeyValue kv: set) { for (KeyValue kv: set) {
if (!isExpired(kv, oldestTimestamp)) { // If minVersion > 0 we will wait until the next compaction to
// collect expired KVs. (following the logic for maxVersions).
// TODO: As Jonathan Gray points this can be optimized
// (see HBASE-4241)
if (minVersions > 0 || !isExpired(kv, oldestTimestamp)) {
writer.append(kv); writer.append(kv);
entries++;
flushed += this.memstore.heapSizeChange(kv, true); flushed += this.memstore.heapSizeChange(kv, true);
} }
} }
@ -717,7 +718,7 @@ public class Store implements HeapSize {
// Ready to go. Have list of files to compact. // Ready to go. Have list of files to compact.
StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId); StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId);
// Move the compaction into place. // Move the compaction into place.
StoreFile sf = completeCompaction(filesToCompact, writer); completeCompaction(filesToCompact, writer);
} finally { } finally {
synchronized (filesCompacting) { synchronized (filesCompacting) {
filesCompacting.removeAll(filesToCompact); filesCompacting.removeAll(filesToCompact);
@ -1267,14 +1268,23 @@ public class Store implements HeapSize {
* current container: i.e. we'll see deletes before we come across cells we * current container: i.e. we'll see deletes before we come across cells we
* are to delete. Presumption is that the memstore#kvset is processed before * are to delete. Presumption is that the memstore#kvset is processed before
* memstore#snapshot and so on. * memstore#snapshot and so on.
* @param kv First possible item on targeted row; i.e. empty columns, latest * @param row The row key of the targeted row.
* timestamp and maximum type.
* @return Found keyvalue or null if none found. * @return Found keyvalue or null if none found.
* @throws IOException * @throws IOException
*/ */
KeyValue getRowKeyAtOrBefore(final KeyValue kv) throws IOException { KeyValue getRowKeyAtOrBefore(final byte[] row) throws IOException {
// If minVersions is set, we will not ignore expired KVs.
// As we're only looking for the latest matches, that should be OK.
// With minVersions > 0 we guarantee that any KV that has any version
// at all (expired or not) has at least one version that will not expire.
// Note that this method used to take a KeyValue as arguments. KeyValue
// can be back-dated, a row key cannot.
long ttlToUse = this.minVersions > 0 ? Long.MAX_VALUE : this.ttl;
KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker( GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
this.comparator, kv, this.ttl, this.region.getRegionInfo().isMetaRegion()); this.comparator, kv, ttlToUse, this.region.getRegionInfo().isMetaRegion());
this.lock.readLock().lock(); this.lock.readLock().lock();
try { try {
// First go to the memstore. Pick up deletes and candidates. // First go to the memstore. Pick up deletes and candidates.

View File

@ -64,7 +64,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
this.cacheBlocks = scan.getCacheBlocks(); this.cacheBlocks = scan.getCacheBlocks();
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(), matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
columns, store.ttl, store.comparator.getRawComparator(), columns, store.ttl, store.comparator.getRawComparator(),
store.versionsToReturn(scan.getMaxVersions()), store.minVersions, store.versionsToReturn(scan.getMaxVersions()),
false); false);
this.isGet = scan.isGetScan(); this.isGet = scan.isGetScan();
@ -98,7 +98,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
this.cacheBlocks = false; this.cacheBlocks = false;
this.isGet = false; this.isGet = false;
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(), matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
null, store.ttl, store.comparator.getRawComparator(), null, store.ttl, store.comparator.getRawComparator(), store.minVersions,
store.versionsToReturn(scan.getMaxVersions()), retainDeletesInOutput); store.versionsToReturn(scan.getMaxVersions()), retainDeletesInOutput);
// Seek all scanners to the initial key // Seek all scanners to the initial key
@ -120,7 +120,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
this.isGet = false; this.isGet = false;
this.cacheBlocks = scan.getCacheBlocks(); this.cacheBlocks = scan.getCacheBlocks();
this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl, this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl,
comparator.getRawComparator(), scan.getMaxVersions(), false); comparator.getRawComparator(), 0, scan.getMaxVersions(), false);
// Seek all scanners to the initial key // Seek all scanners to the initial key
for(KeyValueScanner scanner : scanners) { for(KeyValueScanner scanner : scanners) {

View File

@ -408,6 +408,7 @@ module Hbase
family.setCompressionType(org.apache.hadoop.hbase.io.hfile.Compression::Algorithm.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION) family.setCompressionType(org.apache.hadoop.hbase.io.hfile.Compression::Algorithm.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION)
family.setBlocksize(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE) family.setBlocksize(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE)
family.setMaxVersions(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS) family.setMaxVersions(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS)
family.setMinVersions(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS)
if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER)
bloomtype = arg[org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER].upcase bloomtype = arg[org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER].upcase
unless org.apache.hadoop.hbase.regionserver.StoreFile::BloomType.constants.include?(bloomtype) unless org.apache.hadoop.hbase.regionserver.StoreFile::BloomType.constants.include?(bloomtype)

View File

@ -197,20 +197,33 @@ public abstract class HBaseTestCase extends TestCase {
*/ */
protected HTableDescriptor createTableDescriptor(final String name, protected HTableDescriptor createTableDescriptor(final String name,
final int versions) { final int versions) {
return createTableDescriptor(name, HColumnDescriptor.DEFAULT_MIN_VERSIONS,
versions, HConstants.FOREVER);
}
/**
* Create a table of name <code>name</code> with {@link COLUMNS} for
* families.
* @param name Name to give table.
* @param versions How many versions to allow per column.
* @return Column descriptor.
*/
protected HTableDescriptor createTableDescriptor(final String name,
final int minVersions, final int versions, final int ttl) {
HTableDescriptor htd = new HTableDescriptor(name); HTableDescriptor htd = new HTableDescriptor(name);
htd.addFamily(new HColumnDescriptor(fam1, versions, htd.addFamily(new HColumnDescriptor(fam1, minVersions, versions,
HColumnDescriptor.DEFAULT_COMPRESSION, false, false, HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
HColumnDescriptor.DEFAULT_BLOCKSIZE, HConstants.FOREVER, HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
HColumnDescriptor.DEFAULT_BLOOMFILTER, HColumnDescriptor.DEFAULT_BLOOMFILTER,
HConstants.REPLICATION_SCOPE_LOCAL)); HConstants.REPLICATION_SCOPE_LOCAL));
htd.addFamily(new HColumnDescriptor(fam2, versions, htd.addFamily(new HColumnDescriptor(fam2, minVersions, versions,
HColumnDescriptor.DEFAULT_COMPRESSION, false, false, HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
HColumnDescriptor.DEFAULT_BLOCKSIZE, HConstants.FOREVER, HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
HColumnDescriptor.DEFAULT_BLOOMFILTER, HColumnDescriptor.DEFAULT_BLOOMFILTER,
HConstants.REPLICATION_SCOPE_LOCAL)); HConstants.REPLICATION_SCOPE_LOCAL));
htd.addFamily(new HColumnDescriptor(fam3, versions, htd.addFamily(new HColumnDescriptor(fam3, minVersions, versions,
HColumnDescriptor.DEFAULT_COMPRESSION, false, false, HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
HColumnDescriptor.DEFAULT_BLOCKSIZE, HConstants.FOREVER, HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
HColumnDescriptor.DEFAULT_BLOOMFILTER, HColumnDescriptor.DEFAULT_BLOOMFILTER,
HConstants.REPLICATION_SCOPE_LOCAL)); HConstants.REPLICATION_SCOPE_LOCAL));
return htd; return htd;

View File

@ -44,7 +44,7 @@ public class TestExplicitColumnTracker extends HBaseTestCase {
List<byte[]> scannerColumns, List<byte[]> scannerColumns,
List<MatchCode> expected) { List<MatchCode> expected) {
ColumnTracker exp = new ExplicitColumnTracker( ColumnTracker exp = new ExplicitColumnTracker(
trackColumns, maxVersions); trackColumns, 0, maxVersions, Long.MAX_VALUE);
//Initialize result //Initialize result
@ -161,7 +161,8 @@ public class TestExplicitColumnTracker extends HBaseTestCase {
columns.add(Bytes.toBytes("col"+i)); columns.add(Bytes.toBytes("col"+i));
} }
ColumnTracker explicit = new ExplicitColumnTracker(columns, maxVersions); ColumnTracker explicit = new ExplicitColumnTracker(columns, 0, maxVersions,
Long.MAX_VALUE);
for (int i = 0; i < 100000; i+=2) { for (int i = 0; i < 100000; i+=2) {
byte [] col = Bytes.toBytes("col"+i); byte [] col = Bytes.toBytes("col"+i);
explicit.checkColumn(col, 0, col.length, 1); explicit.checkColumn(col, 0, col.length, 1);

View File

@ -0,0 +1,423 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.filter.TimestampsFilter;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Test Minimum Versions feature (HBASE-4071).
*/
public class TestMinVersions extends HBaseTestCase {
private final byte[] T0 = Bytes.toBytes("0");
private final byte[] T1 = Bytes.toBytes("1");
private final byte[] T2 = Bytes.toBytes("2");
private final byte[] T3 = Bytes.toBytes("3");
private final byte[] T4 = Bytes.toBytes("4");
private final byte[] T5 = Bytes.toBytes("5");
private final byte[] c0 = COLUMNS[0];
/**
* Verify behavior of getClosestBefore(...)
*/
public void testGetClosestBefore() throws Exception {
HTableDescriptor htd = createTableDescriptor(getName(), 1, 1000, 1);
HRegion region = createNewHRegion(htd, null, null);
long ts = System.currentTimeMillis() - 2000; // 2s in the past
Put p = new Put(T1, ts);
p.add(c0, c0, T1);
region.put(p);
p = new Put(T1, ts+1);
p.add(c0, c0, T4);
region.put(p);
p = new Put(T3, ts);
p.add(c0, c0, T3);
region.put(p);
// now make sure that getClosestBefore(...) get can
// rows that would be expired without minVersion.
// also make sure it gets the latest version
Result r = region.getClosestRowBefore(T1, c0);
checkResult(r, c0, T4);
r = region.getClosestRowBefore(T2, c0);
checkResult(r, c0, T4);
// now flush/compact
region.flushcache();
region.compactStores(true);
r = region.getClosestRowBefore(T1, c0);
checkResult(r, c0, T4);
r = region.getClosestRowBefore(T2, c0);
checkResult(r, c0, T4);
}
/**
* Test mixed memstore and storefile scanning
* with minimum versions.
*/
public void testStoreMemStore() throws Exception {
// keep 3 versions minimum
HTableDescriptor htd = createTableDescriptor(getName(), 3, 1000, 1);
HRegion region = createNewHRegion(htd, null, null);
long ts = System.currentTimeMillis() - 2000; // 2s in the past
Put p = new Put(T1, ts-1);
p.add(c0, c0, T2);
region.put(p);
p = new Put(T1, ts-3);
p.add(c0, c0, T0);
region.put(p);
// now flush/compact
region.flushcache();
region.compactStores(true);
p = new Put(T1, ts);
p.add(c0, c0, T3);
region.put(p);
p = new Put(T1, ts-2);
p.add(c0, c0, T1);
region.put(p);
p = new Put(T1, ts-3);
p.add(c0, c0, T0);
region.put(p);
// newest version in the memstore
// the 2nd oldest in the store file
// and the 3rd, 4th oldest also in the memstore
Get g = new Get(T1);
g.setMaxVersions();
Result r = region.get(g, null); // this'll use ScanWildcardColumnTracker
checkResult(r, c0, T3,T2,T1);
g = new Get(T1);
g.setMaxVersions();
g.addColumn(c0, c0);
r = region.get(g, null); // this'll use ExplicitColumnTracker
checkResult(r, c0, T3,T2,T1);
}
/**
* Make sure the Deletes behave as expected with minimum versions
*/
public void testDelete() throws Exception {
HTableDescriptor htd = createTableDescriptor(getName(), 3, 1000, 1);
HRegion region = createNewHRegion(htd, null, null);
long ts = System.currentTimeMillis() - 2000; // 2s in the past
Put p = new Put(T1, ts-2);
p.add(c0, c0, T1);
region.put(p);
p = new Put(T1, ts-1);
p.add(c0, c0, T2);
region.put(p);
p = new Put(T1, ts);
p.add(c0, c0, T3);
region.put(p);
Delete d = new Delete(T1, ts-1, null);
region.delete(d, null, true);
Get g = new Get(T1);
g.setMaxVersions();
Result r = region.get(g, null); // this'll use ScanWildcardColumnTracker
checkResult(r, c0, T3);
g = new Get(T1);
g.setMaxVersions();
g.addColumn(c0, c0);
r = region.get(g, null); // this'll use ExplicitColumnTracker
checkResult(r, c0, T3);
// now flush/compact
region.flushcache();
region.compactStores(true);
// try again
g = new Get(T1);
g.setMaxVersions();
r = region.get(g, null); // this'll use ScanWildcardColumnTracker
checkResult(r, c0, T3);
g = new Get(T1);
g.setMaxVersions();
g.addColumn(c0, c0);
r = region.get(g, null); // this'll use ExplicitColumnTracker
checkResult(r, c0, T3);
}
/**
* Make sure the memstor behaves correctly with minimum versions
*/
public void testMemStore() throws Exception {
HTableDescriptor htd = createTableDescriptor(getName(), 2, 1000, 1);
HRegion region = createNewHRegion(htd, null, null);
long ts = System.currentTimeMillis() - 2000; // 2s in the past
// 2nd version
Put p = new Put(T1, ts-2);
p.add(c0, c0, T2);
region.put(p);
// 3rd version
p = new Put(T1, ts-1);
p.add(c0, c0, T3);
region.put(p);
// 4th version
p = new Put(T1, ts);
p.add(c0, c0, T4);
region.put(p);
// now flush/compact
region.flushcache();
region.compactStores(true);
// now put the first version (backdated)
p = new Put(T1, ts-3);
p.add(c0, c0, T1);
region.put(p);
// now the latest change is in the memstore,
// but it is not the latest version
Result r = region.get(new Get(T1), null);
checkResult(r, c0, T4);
Get g = new Get(T1);
g.setMaxVersions();
r = region.get(g, null); // this'll use ScanWildcardColumnTracker
checkResult(r, c0, T4,T3);
g = new Get(T1);
g.setMaxVersions();
g.addColumn(c0, c0);
r = region.get(g, null); // this'll use ExplicitColumnTracker
checkResult(r, c0, T4,T3);
p = new Put(T1, ts+1);
p.add(c0, c0, T5);
region.put(p);
// now the latest version is in the memstore
g = new Get(T1);
g.setMaxVersions();
r = region.get(g, null); // this'll use ScanWildcardColumnTracker
checkResult(r, c0, T5,T4);
g = new Get(T1);
g.setMaxVersions();
g.addColumn(c0, c0);
r = region.get(g, null); // this'll use ExplicitColumnTracker
checkResult(r, c0, T5,T4);
}
/**
* Verify basic minimum versions functionality
*/
public void testBaseCase() throws Exception {
// 1 version minimum, 1000 versions maximum, ttl = 1s
HTableDescriptor htd = createTableDescriptor(getName(), 2, 1000, 1);
HRegion region = createNewHRegion(htd, null, null);
long ts = System.currentTimeMillis() - 2000; // 2s in the past
// 1st version
Put p = new Put(T1, ts-3);
p.add(c0, c0, T1);
region.put(p);
// 2nd version
p = new Put(T1, ts-2);
p.add(c0, c0, T2);
region.put(p);
// 3rd version
p = new Put(T1, ts-1);
p.add(c0, c0, T3);
region.put(p);
// 4th version
p = new Put(T1, ts);
p.add(c0, c0, T4);
region.put(p);
Result r = region.get(new Get(T1), null);
checkResult(r, c0, T4);
Get g = new Get(T1);
g.setTimeRange(0L, ts+1);
r = region.get(g, null);
checkResult(r, c0, T4);
// oldest version still exists
g.setTimeRange(0L, ts-2);
r = region.get(g, null);
checkResult(r, c0, T1);
// gets see only available versions
// even before compactions
g = new Get(T1);
g.setMaxVersions();
r = region.get(g, null); // this'll use ScanWildcardColumnTracker
checkResult(r, c0, T4,T3);
g = new Get(T1);
g.setMaxVersions();
g.addColumn(c0, c0);
r = region.get(g, null); // this'll use ExplicitColumnTracker
checkResult(r, c0, T4,T3);
// now flush
region.flushcache();
region.compactStores();
// oldest version still exists
// flushing/minor compactions can't get rid of these, anymore
g = new Get(T1);
g.setTimeRange(0L, ts-2);
r = region.get(g, null);
checkResult(r, c0, T1);
// major compaction
region.compactStores(true);
// after compaction the 4th version is still available
g = new Get(T1);
g.setTimeRange(0L, ts+1);
r = region.get(g, null);
checkResult(r, c0, T4);
// so is the 3rd
g.setTimeRange(0L, ts);
r = region.get(g, null);
checkResult(r, c0, T3);
// but the 2nd and earlier versions are gone
g.setTimeRange(0L, ts-1);
r = region.get(g, null);
assertTrue(r.isEmpty());
}
/**
* Verify that basic filters still behave correctly with
* minimum versions enabled.
*/
public void testFilters() throws Exception {
HTableDescriptor htd = createTableDescriptor(getName(), 2, 1000, 1);
HRegion region = createNewHRegion(htd, null, null);
final byte [] c1 = COLUMNS[1];
long ts = System.currentTimeMillis() - 2000; // 2s in the past
Put p = new Put(T1, ts-3);
p.add(c0, c0, T0);
p.add(c1, c1, T0);
region.put(p);
p = new Put(T1, ts-2);
p.add(c0, c0, T1);
p.add(c1, c1, T1);
region.put(p);
p = new Put(T1, ts-1);
p.add(c0, c0, T2);
p.add(c1, c1, T2);
region.put(p);
p = new Put(T1, ts);
p.add(c0, c0, T3);
p.add(c1, c1, T3);
region.put(p);
List<Long> tss = new ArrayList<Long>();
tss.add(ts-1);
tss.add(ts-2);
Get g = new Get(T1);
g.addColumn(c1,c1);
g.setFilter(new TimestampsFilter(tss));
g.setMaxVersions();
Result r = region.get(g, null);
checkResult(r, c1, T2,T1);
g = new Get(T1);
g.addColumn(c0,c0);
g.setFilter(new TimestampsFilter(tss));
g.setMaxVersions();
r = region.get(g, null);
checkResult(r, c0, T2,T1);
// now flush/compact
region.flushcache();
region.compactStores(true);
g = new Get(T1);
g.addColumn(c1,c1);
g.setFilter(new TimestampsFilter(tss));
g.setMaxVersions();
r = region.get(g, null);
checkResult(r, c1, T2);
g = new Get(T1);
g.addColumn(c0,c0);
g.setFilter(new TimestampsFilter(tss));
g.setMaxVersions();
r = region.get(g, null);
checkResult(r, c0, T2);
}
private void checkResult(Result r, byte[] col, byte[] ... vals) {
assertEquals(r.size(), vals.length);
List<KeyValue> kvs = r.getColumn(col, col);
assertEquals(kvs.size(), vals.length);
for (int i=0;i<vals.length;i++) {
assertEquals(kvs.get(i).getValue(), vals[i]);
}
}
}

View File

@ -33,7 +33,7 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
public void testCheckColumn_Ok() { public void testCheckColumn_Ok() {
ScanWildcardColumnTracker tracker = ScanWildcardColumnTracker tracker =
new ScanWildcardColumnTracker(VERSIONS); new ScanWildcardColumnTracker(0, VERSIONS, Long.MAX_VALUE);
//Create list of qualifiers //Create list of qualifiers
List<byte[]> qualifiers = new ArrayList<byte[]>(); List<byte[]> qualifiers = new ArrayList<byte[]>();
@ -65,7 +65,7 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
public void testCheckColumn_EnforceVersions() { public void testCheckColumn_EnforceVersions() {
ScanWildcardColumnTracker tracker = ScanWildcardColumnTracker tracker =
new ScanWildcardColumnTracker(VERSIONS); new ScanWildcardColumnTracker(0, VERSIONS, Long.MAX_VALUE);
//Create list of qualifiers //Create list of qualifiers
List<byte[]> qualifiers = new ArrayList<byte[]>(); List<byte[]> qualifiers = new ArrayList<byte[]>();
@ -98,7 +98,7 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
public void DisabledTestCheckColumn_WrongOrder() { public void DisabledTestCheckColumn_WrongOrder() {
ScanWildcardColumnTracker tracker = ScanWildcardColumnTracker tracker =
new ScanWildcardColumnTracker(VERSIONS); new ScanWildcardColumnTracker(0, VERSIONS, Long.MAX_VALUE);
//Create list of qualifiers //Create list of qualifiers
List<byte[]> qualifiers = new ArrayList<byte[]>(); List<byte[]> qualifiers = new ArrayList<byte[]>();