HBASE-4536 Allow CF to retain deleted rows

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1187531 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
larsh 2011-10-21 19:57:07 +00:00
parent 53c69f2e9f
commit f9f3a148d9
24 changed files with 1310 additions and 278 deletions

View File

@ -2,6 +2,7 @@ HBase Change Log
Release 0.93.0 - Unreleased
NEW FEATURE
HBASE-4460 Support running an embedded ThriftServer within a RegionServer (jgray)
HBASE-4536 Allow CF to retain deleted rows (Lars H)
IMPROVEMENT
HBASE-4132 Extend the WALActionsListener API to accomodate log archival

View File

@ -670,14 +670,14 @@ admin.enableTable(table);
<title>
Minimum Number of Versions
</title>
<para>Like number of max row versions, the minimum number of row versions to keep is configured per column
<para>Like maximum number of row versions, the minimum number of row versions to keep is configured per column
family via <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/HColumnDescriptor.html">HColumnDescriptor</link>.
The default for min versions is 0, which means the feature is disabled.
The minimum number of row versions parameter is used together with the time-to-live parameter and can be combined with the
number of row versions parameter to allow configurations such as
"keep the last T minutes worth of data, at most N versions, <emphasis>but keep at least M versions around</emphasis>"
(where M is the value for minimum number of row versions, M&lt;=N).
This parameter should only be set when time-to-live is enabled for a column family and must be less or equal to the
(where M is the value for minimum number of row versions, M&lt;N).
This parameter should only be set when time-to-live is enabled for a column family and must be less than the
number of row versions.
</para>
</section>
@ -723,6 +723,23 @@ admin.enableTable(table);
<para>See <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/HColumnDescriptor.html">HColumnDescriptor</link> for more information.
</para>
</section>
<section xml:id="cf.keep.deleted">
<title>
Keeping Deleted Cells
</title>
<para>ColumnFamilies can optionally keep deleted cells. That means deleted cells can still be retrieved with
<link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Get.html">Get</link> or
<link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Scan.html">Scan</link> operations,
as long these operations have a time range specified that ends before the timestamp of any delete that would affect the cells.
This allows for point in time queries even in the presence of deletes.
</para>
<para>
Deleted cells are still subject to TTL and there will never be more than "maximum number of versions" deleted cells.
A new "raw" scan options returns all deleted rows and the delete markers.
</para>
<para>See <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/HColumnDescriptor.html">HColumnDescriptor</link> for more information.
</para>
</section>
<section xml:id="schema.bloom">
<title>Bloom Filters</title>
<para>Bloom Filters can be enabled per-ColumnFamily.

View File

@ -88,6 +88,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
public static final String FOREVER = "FOREVER";
public static final String REPLICATION_SCOPE = "REPLICATION_SCOPE";
public static final String MIN_VERSIONS = "MIN_VERSIONS";
public static final String KEEP_DELETED_CELLS = "KEEP_DELETED_CELLS";
/**
* Default compression type.
@ -116,6 +117,11 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
*/
public static final boolean DEFAULT_IN_MEMORY = false;
/**
* Default setting for preventing deleted from being collected immediately.
*/
public static final boolean DEFAULT_KEEP_DELETED = false;
/**
* Default setting for whether to use a block cache or not.
*/
@ -151,6 +157,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
DEFAULT_VALUES.put(BLOCKSIZE, String.valueOf(DEFAULT_BLOCKSIZE));
DEFAULT_VALUES.put(HConstants.IN_MEMORY, String.valueOf(DEFAULT_IN_MEMORY));
DEFAULT_VALUES.put(BLOCKCACHE, String.valueOf(DEFAULT_BLOCKCACHE));
DEFAULT_VALUES.put(KEEP_DELETED_CELLS, String.valueOf(DEFAULT_KEEP_DELETED));
}
// Column family name
@ -265,8 +272,9 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
final String compression, final boolean inMemory,
final boolean blockCacheEnabled, final int blocksize,
final int timeToLive, final String bloomFilter, final int scope) {
this(familyName, DEFAULT_MIN_VERSIONS, maxVersions, compression, inMemory,
blockCacheEnabled, blocksize, timeToLive, bloomFilter, scope);
this(familyName, DEFAULT_MIN_VERSIONS, maxVersions, DEFAULT_KEEP_DELETED,
compression, inMemory, blockCacheEnabled, blocksize, timeToLive,
bloomFilter, scope);
}
/**
@ -275,6 +283,8 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
* letter -- and may not contain a <code>:<code>
* @param minVersions Minimum number of versions to keep
* @param maxVersions Maximum number of versions to keep
* @param keepDeletedCells Whether to retain deleted cells until they expire
* up to maxVersions versions.
* @param compression Compression type
* @param inMemory If true, column data should be kept in an HRegionServer's
* cache
@ -292,8 +302,9 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
* a <code>:</code>
* @throws IllegalArgumentException if the number of versions is &lt;= 0
*/
public HColumnDescriptor(final byte [] familyName, final int minVersions,
final int maxVersions, final String compression, final boolean inMemory,
public HColumnDescriptor(final byte[] familyName, final int minVersions,
final int maxVersions, final boolean keepDeletedCells,
final String compression, final boolean inMemory,
final boolean blockCacheEnabled, final int blocksize,
final int timeToLive, final String bloomFilter, final int scope) {
isLegalFamilyName(familyName);
@ -309,14 +320,15 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
if (timeToLive == HConstants.FOREVER) {
throw new IllegalArgumentException("Minimum versions requires TTL.");
}
if (minVersions > maxVersions) {
throw new IllegalArgumentException("Minimum versions must be <= "+
"maximum versions.");
if (minVersions >= maxVersions) {
throw new IllegalArgumentException("Minimum versions must be < "
+ "maximum versions.");
}
}
setMaxVersions(maxVersions);
setMinVersions(minVersions);
setKeepDeletedCells(keepDeletedCells);
setInMemory(inMemory);
setBlockCacheEnabled(blockCacheEnabled);
setTimeToLive(timeToLive);
@ -542,6 +554,22 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
setValue(HConstants.IN_MEMORY, Boolean.toString(inMemory));
}
public boolean getKeepDeletedCells() {
String value = getValue(KEEP_DELETED_CELLS);
if (value != null) {
return Boolean.valueOf(value).booleanValue();
}
return DEFAULT_KEEP_DELETED;
}
/**
* @param keepDeletedRows True if deleted rows should not be collected
* immediately.
*/
public void setKeepDeletedCells(boolean keepDeletedCells) {
setValue(KEEP_DELETED_CELLS, Boolean.toString(keepDeletedCells));
}
/**
* @return Time-to-live of cell contents, in seconds.
*/

View File

@ -207,6 +207,14 @@ public class KeyValue implements Writable, HeapSize {
// the row cached
private volatile byte [] rowCache = null;
/**
* @return True if a delete type, a {@link KeyValue.Type#Delete} or
* a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
* KeyValue type.
*/
public static boolean isDelete(byte t) {
return Type.Delete.getCode() <= t && t <= Type.DeleteFamily.getCode();
}
/** Here be dragons **/
@ -1038,8 +1046,7 @@ public class KeyValue implements Writable, HeapSize {
* KeyValue type.
*/
public boolean isDelete() {
int t = getType();
return Type.Delete.getCode() <= t && t <= Type.DeleteFamily.getCode();
return KeyValue.isDelete(getType());
}
/**

View File

@ -26,6 +26,7 @@ public interface Attributes {
/**
* Sets an attribute.
* In case value = null attribute is removed from the attributes map.
* Attribute names starting with _ indicate system attributes.
* @param name attribute name
* @param value attribute value
*/

View File

@ -82,12 +82,13 @@ import java.util.TreeSet;
* execute {@link #setCacheBlocks(boolean)}.
*/
public class Scan extends OperationWithAttributes implements Writable {
private static final String RAW_ATTR = "_raw_";
private static final byte SCAN_VERSION = (byte)2;
private byte [] startRow = HConstants.EMPTY_START_ROW;
private byte [] stopRow = HConstants.EMPTY_END_ROW;
private int maxVersions = 1;
private int batch = -1;
// If application wants to collect scan metrics, it needs to
// call scan.setAttribute(SCAN_ATTRIBUTES_ENABLE, Bytes.toBytes(Boolean.TRUE))
static public String SCAN_ATTRIBUTES_METRICS_ENABLE =
@ -695,4 +696,26 @@ public class Scan extends OperationWithAttributes implements Writable {
}
return cols.toString();
}
/**
* Enable/disable "raw" mode for this scan.
* If "raw" is enabled the scan will return all
* delete marker and deleted rows that have not
* been collected, yet.
* This is mostly useful for Scan on column families
* that have KEEP_DELETED_ROWS enabled.
* It is an error to specify any column when "raw" is set.
* @param raw True/False to enable/disable "raw" mode.
*/
public void setRaw(boolean raw) {
setAttribute(RAW_ATTR, Bytes.toBytes(raw));
}
/**
* @return True if this Scan is in "raw" mode.
*/
public boolean isRaw() {
byte[] attr = getAttribute(RAW_ATTR);
return attr == null ? false : Bytes.toBoolean(attr);
}
}

View File

@ -47,12 +47,14 @@ public interface ColumnTracker {
* @param offset
* @param length
* @param ttl The timeToLive to enforce.
* @param type The type of the KeyValue
* @return The match code instance.
* @throws IOException in case there is an internal consistency problem
* caused by a data corruption.
*/
public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset,
int length, long ttl) throws IOException;
public ScanQueryMatcher.MatchCode checkColumn(byte[] bytes, int offset,
int length, long ttl, byte type)
throws IOException;
/**
* Updates internal variables in between files

View File

@ -24,6 +24,7 @@ import java.util.List;
import java.util.NavigableSet;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
import org.apache.hadoop.hbase.util.Bytes;
@ -97,16 +98,14 @@ public class ExplicitColumnTracker implements ColumnTracker {
}
/**
* Checks against the parameters of the query and the columns which have
* already been processed by this query.
* @param bytes KeyValue buffer
* @param offset offset to the start of the qualifier
* @param length length of the qualifier
* @param timestamp timestamp of the key being checked
* @return MatchCode telling ScanQueryMatcher what action to take
* {@inheritDoc}
*/
@Override
public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset,
int length, long timestamp) {
int length, long timestamp, byte type) {
// delete markers should never be passed to an
// *Explicit*ColumnTracker
assert !KeyValue.isDelete(type);
do {
// No more columns left, we are done with this query
if(this.columns.size() == 0) {
@ -143,12 +142,12 @@ public class ExplicitColumnTracker implements ColumnTracker {
if (this.columns.size() == this.index) {
// We have served all the requested columns.
this.column = null;
return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
} else {
// We are done with current column; advance to next column
// of interest.
// We are done with current column; advance to next column
// of interest.
this.column = this.columns.get(this.index);
return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
}
} else {
setTS(timestamp);

View File

@ -20,6 +20,9 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.NavigableSet;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
@ -29,8 +32,7 @@ import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.DeleteTracker.DeleteResult;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.NavigableSet;
import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
/**
* A query matcher that is specifically designed for the scan case.
@ -39,72 +41,104 @@ public class ScanQueryMatcher {
// Optimization so we can skip lots of compares when we decide to skip
// to the next row.
private boolean stickyNextRow;
private byte[] stopRow;
private final byte[] stopRow;
protected TimeRange tr;
private final TimeRange tr;
protected Filter filter;
private final Filter filter;
/** Keeps track of deletes */
protected DeleteTracker deletes;
protected boolean retainDeletesInOutput;
private final DeleteTracker deletes;
/*
* The following three booleans define how we deal with deletes.
* There are three different aspects:
* 1. Whether to keep delete markers. This is used in compactions.
* Minor compactions always keep delete markers.
* 2. Whether to keep deleted rows. This is also used in compactions,
* if the store is set to keep deleted rows. This implies keeping
* the delete markers as well.
* In this case deleted rows are subject to the normal max version
* and TTL/min version rules just like "normal" rows.
* 3. Whether a scan can do time travel queries even before deleted
* marker to reach deleted rows.
*/
/** whether to retain delete markers */
private final boolean retainDeletesInOutput;
/** whether to return deleted rows */
private final boolean keepDeletedCells;
/** whether time range queries can see rows "behind" a delete */
private final boolean seePastDeleteMarkers;
/** Keeps track of columns and versions */
protected ColumnTracker columns;
private final ColumnTracker columns;
/** Key to seek to in memstore and StoreFiles */
protected KeyValue startKey;
private final KeyValue startKey;
/** Row comparator for the region this query is for */
KeyValue.KeyComparator rowComparator;
private final KeyValue.KeyComparator rowComparator;
/* row is not private for tests */
/** Row the query is on */
protected byte [] row;
byte [] row;
/**
* Constructs a ScanQueryMatcher for a Scan.
* @param scan
* @param family
* @param columns
* @param ttl
* @param rowComparator
* Oldest put in any of the involved store files
* Used to decide whether it is ok to delete
* family delete marker of this store keeps
* deleted KVs.
*/
public ScanQueryMatcher(Scan scan, byte [] family,
NavigableSet<byte[]> columns, long ttl,
KeyValue.KeyComparator rowComparator, int minVersions, int maxVersions,
boolean retainDeletesInOutput) {
private final long earliestPutTs;
/**
* Construct a QueryMatcher for a scan
* @param scan
* @param scanInfo The store's immutable scan info
* @param columns
* @param scanType Type of the scan
* @param earliestPutTs Earliest put seen in any of the store files.
*/
public ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo,
NavigableSet<byte[]> columns, StoreScanner.ScanType scanType,
long earliestPutTs) {
this.tr = scan.getTimeRange();
this.rowComparator = rowComparator;
this.rowComparator = scanInfo.getComparator().getRawComparator();
this.deletes = new ScanDeleteTracker();
this.stopRow = scan.getStopRow();
this.startKey = KeyValue.createFirstOnRow(scan.getStartRow(), family, null);
this.startKey = KeyValue.createFirstOnRow(scan.getStartRow(), scanInfo.getFamily(), null);
this.filter = scan.getFilter();
this.retainDeletesInOutput = retainDeletesInOutput;
this.earliestPutTs = earliestPutTs;
/* how to deal with deletes */
// keep deleted cells: if compaction or raw scan
this.keepDeletedCells = (scanInfo.getKeepDeletedCells() && scanType != ScanType.USER_SCAN) || scan.isRaw();
// retain deletes: if minor compaction or raw scan
this.retainDeletesInOutput = scanType == ScanType.MINOR_COMPACT || scan.isRaw();
// seePastDeleteMarker: user initiated scans
this.seePastDeleteMarkers = scanInfo.getKeepDeletedCells() && scanType == ScanType.USER_SCAN;
int maxVersions = Math.min(scan.getMaxVersions(), scanInfo.getMaxVersions());
// Single branch to deal with two types of reads (columns vs all in family)
if (columns == null || columns.size() == 0) {
// use a specialized scan for wildcard column tracker.
this.columns = new ScanWildcardColumnTracker(minVersions, maxVersions, ttl);
this.columns = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), maxVersions, scanInfo.getTtl());
} else {
// We can share the ExplicitColumnTracker, diff is we reset
// between rows, not between storefiles.
this.columns = new ExplicitColumnTracker(columns, minVersions, maxVersions,
ttl);
this.columns = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions,
scanInfo.getTtl());
}
}
public ScanQueryMatcher(Scan scan, byte [] family,
NavigableSet<byte[]> columns, long ttl,
KeyValue.KeyComparator rowComparator, int minVersions, int maxVersions) {
/* By default we will not include deletes */
/* deletes are included explicitly (for minor compaction) */
this(scan, family, columns, ttl, rowComparator, minVersions, maxVersions,
false);
}
public ScanQueryMatcher(Scan scan, byte [] family,
NavigableSet<byte[]> columns, long ttl,
KeyValue.KeyComparator rowComparator, int maxVersions) {
this(scan, family, columns, ttl, rowComparator, 0, maxVersions);
/*
* Constructor for tests
*/
ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo,
NavigableSet<byte[]> columns) {
this(scan, scanInfo, columns, StoreScanner.ScanType.USER_SCAN,
HConstants.LATEST_TIMESTAMP);
}
/**
@ -171,21 +205,50 @@ public class ScanQueryMatcher {
return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
}
/*
* The delete logic is pretty complicated now.
* This is corroborated by the following:
* 1. The store might be instructed to keep deleted rows around.
* 2. A scan can optionally see past a delete marker now.
* 3. If deleted rows are kept, we have to find out when we can
* remove the delete markers.
* 4. Family delete markers are always first (regardless of their TS)
* 5. Delete markers should not be counted as version
* 6. Delete markers affect puts of the *same* TS
* 7. Delete marker need to be version counted together with puts
* they affect
*/
byte type = kv.getType();
if (isDelete(type)) {
if (tr.withinOrAfterTimeRange(timestamp)) {
this.deletes.add(bytes, offset, qualLength, timestamp, type);
if (kv.isDelete()) {
if (!keepDeletedCells) {
// first ignore delete markers if the scanner can do so, and the
// range does not include the marker
boolean includeDeleteMarker = seePastDeleteMarkers ?
// +1, to allow a range between a delete and put of same TS
tr.withinTimeRange(timestamp+1) :
tr.withinOrAfterTimeRange(timestamp);
if (includeDeleteMarker) {
this.deletes.add(bytes, offset, qualLength, timestamp, type);
}
// Can't early out now, because DelFam come before any other keys
}
if (retainDeletesInOutput) {
// always include
return MatchCode.INCLUDE;
}
else {
} else if (keepDeletedCells) {
if (timestamp < earliestPutTs) {
// keeping delete rows, but there are no puts older than
// this delete in the store files.
return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
}
// else: fall through and do version counting on the
// delete markers
} else {
return MatchCode.SKIP;
}
}
if (!this.deletes.isEmpty()) {
// note the following next else if...
// delete marker are not subject to other delete markers
} else if (!this.deletes.isEmpty()) {
DeleteResult deleteResult = deletes.isDeleted(bytes, offset, qualLength,
timestamp);
switch (deleteResult) {
@ -228,7 +291,8 @@ public class ScanQueryMatcher {
}
}
MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength, timestamp);
MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength,
timestamp, type);
/*
* According to current implementation, colChecker can only be
* SEEK_NEXT_COL, SEEK_NEXT_ROW, SKIP or INCLUDE. Therefore, always return
@ -269,11 +333,6 @@ public class ScanQueryMatcher {
stickyNextRow = false;
}
// should be in KeyValue.
protected boolean isDelete(byte type) {
return (type != KeyValue.Type.Put.getCode());
}
/**
*
* @return the start key

View File

@ -22,9 +22,8 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
import org.apache.hadoop.hbase.util.Bytes;
@ -32,17 +31,17 @@ import org.apache.hadoop.hbase.util.Bytes;
* Keeps track of the columns for a scan if they are not explicitly specified
*/
public class ScanWildcardColumnTracker implements ColumnTracker {
private static final Log LOG =
LogFactory.getLog(ScanWildcardColumnTracker.class);
private byte [] columnBuffer = null;
private int columnOffset = 0;
private int columnLength = 0;
private int currentCount = 0;
private int maxVersions;
private int minVersions;
/* Keeps track of the latest timestamp included for current column.
/* Keeps track of the latest timestamp and type included for current column.
* Used to eliminate duplicates. */
private long latestTSOfCurrentColumn;
private byte latestTypeOfCurrentColumn;
private long oldestStamp;
/**
@ -58,41 +57,38 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
}
/**
* Can only return INCLUDE or SKIP, since returning "NEXT" or
* "DONE" would imply we have finished with this row, when
* this class can't figure that out.
*
* @param bytes
* @param offset
* @param length
* @param timestamp
* @return The match code instance.
* {@inheritDoc}
* This receives puts *and* deletes.
* Deletes do not count as a version, but rather take the version
* of the previous put (so eventually all but the last can be reclaimed).
*/
@Override
public MatchCode checkColumn(byte[] bytes, int offset, int length,
long timestamp) throws IOException {
long timestamp, byte type) throws IOException {
if (columnBuffer == null) {
// first iteration.
resetBuffer(bytes, offset, length);
return checkVersion(++currentCount, timestamp);
// do not count a delete marker as another version
return checkVersion(type, timestamp);
}
int cmp = Bytes.compareTo(bytes, offset, length,
columnBuffer, columnOffset, columnLength);
if (cmp == 0) {
//If column matches, check if it is a duplicate timestamp
if (sameAsPreviousTS(timestamp)) {
if (sameAsPreviousTSAndType(timestamp, type)) {
return ScanQueryMatcher.MatchCode.SKIP;
}
return checkVersion(++currentCount, timestamp);
return checkVersion(type, timestamp);
}
resetTS();
resetTSAndType();
// new col > old col
if (cmp > 0) {
// switched columns, lets do something.x
resetBuffer(bytes, offset, length);
return checkVersion(++currentCount, timestamp);
return checkVersion(type, timestamp);
}
// new col < oldcol
@ -112,13 +108,25 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
currentCount = 0;
}
private MatchCode checkVersion(int version, long timestamp) {
if (version > maxVersions) {
/**
* Check whether this version should be retained.
* There are 4 variables considered:
* If this version is past max versions -> skip it
* If this kv has expired or was deleted, check min versions
* to decide whther to skip it or not.
*
* Increase the version counter unless this is a delete
*/
private MatchCode checkVersion(byte type, long timestamp) {
if (!KeyValue.isDelete(type)) {
currentCount++;
}
if (currentCount > maxVersions) {
return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; // skip to next col
}
// keep the KV if required by minversions or it is not expired, yet
if (version <= minVersions || !isExpired(timestamp)) {
setTS(timestamp);
if (currentCount <= minVersions || !isExpired(timestamp)) {
setTSAndType(timestamp, type);
return ScanQueryMatcher.MatchCode.INCLUDE;
} else {
return MatchCode.SEEK_NEXT_COL;
@ -136,19 +144,21 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
@Override
public void reset() {
columnBuffer = null;
resetTS();
resetTSAndType();
}
private void resetTS() {
private void resetTSAndType() {
latestTSOfCurrentColumn = HConstants.LATEST_TIMESTAMP;
latestTypeOfCurrentColumn = 0;
}
private void setTS(long timestamp) {
private void setTSAndType(long timestamp, byte type) {
latestTSOfCurrentColumn = timestamp;
latestTypeOfCurrentColumn = type;
}
private boolean sameAsPreviousTS(long timestamp) {
return timestamp == latestTSOfCurrentColumn;
private boolean sameAsPreviousTSAndType(long timestamp, byte type) {
return timestamp == latestTSOfCurrentColumn && type == latestTypeOfCurrentColumn;
}
private boolean isExpired(long timestamp) {

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HeapSize;
@ -49,6 +50,7 @@ import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
@ -95,9 +97,7 @@ public class Store implements HeapSize {
final Configuration conf;
final CacheConfig cacheConf;
// ttl in milliseconds.
protected long ttl;
protected int minVersions;
protected int maxVersions;
private long ttl;
long majorCompactionTime;
private final int minFilesToCompact;
private final int maxFilesToCompact;
@ -119,6 +119,8 @@ public class Store implements HeapSize {
private CompactionProgress progress;
private final int compactionKVMax;
// not private for testing
/* package */ScanInfo scanInfo;
/*
* List of store files inside this store. This is an immutable list that
* is atomically replaced when its contents change.
@ -183,8 +185,9 @@ public class Store implements HeapSize {
// second -> ms adjust for user data
this.ttl *= 1000;
}
this.minVersions = family.getMinVersions();
this.maxVersions = family.getMaxVersions();
scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
family.getMaxVersions(), ttl, family.getKeepDeletedCells(),
this.comparator);
this.memstore = new MemStore(conf, this.comparator);
this.storeNameStr = Bytes.toString(this.family.getName());
@ -477,13 +480,13 @@ public class Store implements HeapSize {
return null;
}
Scan scan = new Scan();
scan.setMaxVersions(maxVersions);
scan.setMaxVersions(scanInfo.getMaxVersions());
// Use a store scanner to find which rows to flush.
// Note that we need to retain deletes, hence
// pass true as the StoreScanner's retainDeletesInOutput argument.
InternalScanner scanner = new StoreScanner(this, scan,
Collections.singletonList(new CollectionBackedScanner(set,
this.comparator)), true);
// treat this as a minor compaction.
InternalScanner scanner = new StoreScanner(this, scan, Collections
.singletonList(new CollectionBackedScanner(set, this.comparator)),
ScanType.MINOR_COMPACT, HConstants.OLDEST_TIMESTAMP);
try {
// TODO: We can fail in the below block before we complete adding this
// flush to list of store files. Add cleanup of anything put on filesystem
@ -1108,6 +1111,7 @@ public class Store implements HeapSize {
throws IOException {
// calculate maximum key count after compaction (for blooms)
int maxKeyCount = 0;
long earliestPutTs = HConstants.LATEST_TIMESTAMP;
for (StoreFile file : filesToCompact) {
StoreFile.Reader r = file.getReader();
if (r != null) {
@ -1123,6 +1127,19 @@ public class Store implements HeapSize {
", size=" + StringUtils.humanReadableInt(r.length()) );
}
}
// For major compactions calculate the earliest put timestamp
// of all involved storefiles. This is used to remove
// family delete marker during the compaction.
if (majorCompaction) {
byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
if (tmp == null) {
// there's a file with no information, must be an old one
// assume we have very old puts
earliestPutTs = HConstants.OLDEST_TIMESTAMP;
} else {
earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
}
}
}
// keep track of compaction progress
@ -1141,7 +1158,9 @@ public class Store implements HeapSize {
Scan scan = new Scan();
scan.setMaxVersions(family.getMaxVersions());
/* include deletes, unless we are doing a major compaction */
scanner = new StoreScanner(this, scan, scanners, !majorCompaction);
scanner = new StoreScanner(this, scan, scanners,
majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
earliestPutTs);
if (region.getCoprocessorHost() != null) {
InternalScanner cpScanner = region.getCoprocessorHost().preCompact(
this, scanner);
@ -1374,7 +1393,7 @@ public class Store implements HeapSize {
// at all (expired or not) has at least one version that will not expire.
// Note that this method used to take a KeyValue as arguments. KeyValue
// can be back-dated, a row key cannot.
long ttlToUse = this.minVersions > 0 ? Long.MAX_VALUE : this.ttl;
long ttlToUse = scanInfo.getMinVersions() > 0 ? Long.MAX_VALUE : this.ttl;
KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
@ -1842,15 +1861,16 @@ public class Store implements HeapSize {
return this.cacheConf;
}
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + (17 * ClassSize.REFERENCE) +
(7 * Bytes.SIZEOF_LONG) + (1 * Bytes.SIZEOF_DOUBLE) +
(7 * Bytes.SIZEOF_INT) + (1 * Bytes.SIZEOF_BOOLEAN));
public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ (18 * ClassSize.REFERENCE) + (7 * Bytes.SIZEOF_LONG)
+ (1 * Bytes.SIZEOF_DOUBLE) + (5 * Bytes.SIZEOF_INT)
+ Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.OBJECT + ClassSize.REENTRANT_LOCK +
ClassSize.CONCURRENT_SKIPLISTMAP +
ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
+ ClassSize.CONCURRENT_SKIPLISTMAP
+ ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT
+ ScanInfo.FIXED_OVERHEAD);
@Override
public long heapSize() {
@ -1861,4 +1881,62 @@ public class Store implements HeapSize {
return comparator;
}
/**
* Immutable information for scans over a store.
*/
public static class ScanInfo {
private byte[] family;
private int minVersions;
private int maxVersions;
private long ttl;
private boolean keepDeletedCells;
private KVComparator comparator;
public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ (2 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_INT)
+ Bytes.SIZEOF_LONG + Bytes.SIZEOF_BOOLEAN);
/**
* @param family Name of this store's column family
* @param minVersions Store's MIN_VERSIONS setting
* @param maxVersions Store's VERSIONS setting
* @param ttl Store's TTL (in ms)
* @param keepDeletedCells Store's keepDeletedCells setting
* @param comparator The store's comparator
*/
public ScanInfo(byte[] family, int minVersions, int maxVersions, long ttl,
boolean keepDeletedCells, KVComparator comparator) {
this.family = family;
this.minVersions = minVersions;
this.maxVersions = maxVersions;
this.ttl = ttl;
this.keepDeletedCells = keepDeletedCells;
this.comparator = comparator;
}
public byte[] getFamily() {
return family;
}
public int getMinVersions() {
return minVersions;
}
public int getMaxVersions() {
return maxVersions;
}
public long getTtl() {
return ttl;
}
public boolean getKeepDeletedCells() {
return keepDeletedCells;
}
public KVComparator getComparator() {
return comparator;
}
}
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
@ -116,6 +117,9 @@ public class StoreFile {
/** Key for Timerange information in metadata*/
public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
/** Key for timestamp of earliest-put in metadata*/
public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
// Make default block size for StoreFiles 8k while testing. TODO: FIX!
// Need to make it 8k for testing.
public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024;
@ -737,6 +741,7 @@ public class StoreFile {
private int lastBloomKeyOffset, lastBloomKeyLen;
private KVComparator kvComparator;
private KeyValue lastKv = null;
private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
/* isTimeRangeTrackerSet keeps track if the timeRange has already been set
@ -796,14 +801,15 @@ public class StoreFile {
writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
writer.appendFileInfo(MAJOR_COMPACTION_KEY,
Bytes.toBytes(majorCompaction));
appendTimeRangeMetadata();
appendTrackedTimestampsToMetadata();
}
/**
* Add TimestampRange to Metadata
* Add TimestampRange and earliest put timestamp to Metadata
*/
public void appendTimeRangeMetadata() throws IOException {
public void appendTrackedTimestampsToMetadata() throws IOException {
appendFileInfo(TIMERANGE_KEY,WritableUtils.toByteArray(timeRangeTracker));
appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
}
/**
@ -816,29 +822,22 @@ public class StoreFile {
}
/**
* Record the earlest Put timestamp.
*
* If the timeRangeTracker is not set,
* update TimeRangeTracker to include the timestamp of this key
* @param kv
* @throws IOException
*/
public void includeInTimeRangeTracker(final KeyValue kv) {
public void trackTimestamps(final KeyValue kv) {
if (KeyValue.Type.Put.getCode() == kv.getType()) {
earliestPutTs = Math.min(earliestPutTs, kv.getTimestamp());
}
if (!isTimeRangeTrackerSet) {
timeRangeTracker.includeTimestamp(kv);
}
}
/**
* If the timeRangeTracker is not set,
* update TimeRangeTracker to include the timestamp of this key
* @param key
* @throws IOException
*/
public void includeInTimeRangeTracker(final byte [] key) {
if (!isTimeRangeTrackerSet) {
timeRangeTracker.includeTimestamp(key);
}
}
public void append(final KeyValue kv) throws IOException {
if (this.bloomFilterWriter != null) {
// only add to the bloom filter on a new, unique key
@ -908,7 +907,7 @@ public class StoreFile {
}
}
writer.append(kv);
includeInTimeRangeTracker(kv);
trackTimestamps(kv);
}
public Path getPath() {

View File

@ -20,9 +20,10 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
@ -86,12 +87,14 @@ class StoreScanner extends NonLazyKeyValueScanner
* @throws IOException
*/
StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns)
throws IOException {
throws IOException {
this(store, scan.getCacheBlocks(), scan, columns);
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
columns, store.ttl, store.comparator.getRawComparator(),
store.minVersions, store.versionsToReturn(scan.getMaxVersions()),
false);
if (columns != null && scan.isRaw()) {
throw new DoNotRetryIOException(
"Cannot specify any column for a raw scan");
}
matcher = new ScanQueryMatcher(scan, store.scanInfo, columns,
ScanType.USER_SCAN, HConstants.LATEST_TIMESTAMP);
// Pass columns to try to filter out unnecessary StoreFiles.
List<KeyValueScanner> scanners = getScanners(scan, columns);
@ -124,12 +127,12 @@ class StoreScanner extends NonLazyKeyValueScanner
* @param scan the spec
* @param scanners ancilliary scanners
*/
StoreScanner(Store store, Scan scan, List<? extends KeyValueScanner> scanners,
boolean retainDeletesInOutput) throws IOException {
StoreScanner(Store store, Scan scan,
List<? extends KeyValueScanner> scanners, ScanType scanType,
long earliestPutTs) throws IOException {
this(store, false, scan, null);
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
null, store.ttl, store.comparator.getRawComparator(), store.minVersions,
store.versionsToReturn(scan.getMaxVersions()), retainDeletesInOutput);
matcher = new ScanQueryMatcher(scan, store.scanInfo, null, scanType,
earliestPutTs);
// Seek all scanners to the initial key
for(KeyValueScanner scanner : scanners) {
@ -141,20 +144,18 @@ class StoreScanner extends NonLazyKeyValueScanner
}
// Constructor for testing.
StoreScanner(final Scan scan, final byte [] colFamily, final long ttl,
final KeyValue.KVComparator comparator,
final NavigableSet<byte[]> columns,
final List<KeyValueScanner> scanners)
throws IOException {
StoreScanner(final Scan scan, Store.ScanInfo scanInfo,
StoreScanner.ScanType scanType, final NavigableSet<byte[]> columns,
final List<KeyValueScanner> scanners) throws IOException {
this(null, scan.getCacheBlocks(), scan, columns);
this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl,
comparator.getRawComparator(), 0, scan.getMaxVersions(), false);
this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
HConstants.LATEST_TIMESTAMP);
// Seek all scanners to the initial key
for(KeyValueScanner scanner : scanners) {
for (KeyValueScanner scanner : scanners) {
scanner.seek(matcher.getStartKey());
}
heap = new KeyValueHeap(scanners, comparator);
heap = new KeyValueHeap(scanners, scanInfo.getComparator());
}
/*
@ -476,5 +477,13 @@ class StoreScanner extends NonLazyKeyValueScanner
lazySeekEnabledGlobally = enable;
}
/**
* Enum to distinguish general scan types.
*/
public static enum ScanType {
MAJOR_COMPACT,
MINOR_COMPACT,
USER_SCAN
}
}

View File

@ -463,6 +463,7 @@ module Hbase
family.setBlocksize(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE)
family.setMaxVersions(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS)
family.setMinVersions(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS)
family.setKeepDeletedRows(JBoolean.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::KEEP_DELETED_CELLS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::KEEP_DELETED_CELLS)
if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER)
bloomtype = arg[org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER].upcase
unless org.apache.hadoop.hbase.regionserver.StoreFile::BloomType.constants.include?(bloomtype)

View File

@ -198,7 +198,7 @@ public abstract class HBaseTestCase extends TestCase {
protected HTableDescriptor createTableDescriptor(final String name,
final int versions) {
return createTableDescriptor(name, HColumnDescriptor.DEFAULT_MIN_VERSIONS,
versions, HConstants.FOREVER);
versions, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED);
}
/**
@ -209,21 +209,21 @@ public abstract class HBaseTestCase extends TestCase {
* @return Column descriptor.
*/
protected HTableDescriptor createTableDescriptor(final String name,
final int minVersions, final int versions, final int ttl) {
final int minVersions, final int versions, final int ttl, boolean keepDeleted) {
HTableDescriptor htd = new HTableDescriptor(name);
htd.addFamily(new HColumnDescriptor(fam1, minVersions, versions,
HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
HColumnDescriptor.DEFAULT_BLOOMFILTER,
HConstants.REPLICATION_SCOPE_LOCAL));
keepDeleted, HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
HColumnDescriptor.DEFAULT_BLOOMFILTER,
HConstants.REPLICATION_SCOPE_LOCAL));
htd.addFamily(new HColumnDescriptor(fam2, minVersions, versions,
HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
keepDeleted, HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
HColumnDescriptor.DEFAULT_BLOOMFILTER,
HConstants.REPLICATION_SCOPE_LOCAL));
htd.addFamily(new HColumnDescriptor(fam3, minVersions, versions,
HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
keepDeleted, HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
HColumnDescriptor.DEFAULT_BLOOMFILTER,
HConstants.REPLICATION_SCOPE_LOCAL));
return htd;

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@ -71,10 +72,8 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.junit.After;
import org.junit.AfterClass;
@ -129,6 +128,82 @@ public class TestFromClientSide {
}
/**
* Basic client side validation of HBASE-4536
*/
@Test
public void testKeepDeletedCells() throws Exception {
final byte[] TABLENAME = Bytes.toBytes("testKeepDeletesCells");
final byte[] FAMILY = Bytes.toBytes("family");
final byte[] C0 = Bytes.toBytes("c0");
final byte[] T1 = Bytes.toBytes("T1");
final byte[] T2 = Bytes.toBytes("T2");
final byte[] T3 = Bytes.toBytes("T3");
HColumnDescriptor hcd = new HColumnDescriptor(FAMILY,
HColumnDescriptor.DEFAULT_MIN_VERSIONS,
HColumnDescriptor.DEFAULT_VERSIONS,
true,
HColumnDescriptor.DEFAULT_COMPRESSION,
HColumnDescriptor.DEFAULT_IN_MEMORY,
HColumnDescriptor.DEFAULT_BLOCKCACHE,
HColumnDescriptor.DEFAULT_BLOCKSIZE,
HColumnDescriptor.DEFAULT_TTL,
HColumnDescriptor.DEFAULT_BLOOMFILTER,
HColumnDescriptor.DEFAULT_REPLICATION_SCOPE);
HTableDescriptor desc = new HTableDescriptor(TABLENAME);
desc.addFamily(hcd);
TEST_UTIL.getHBaseAdmin().createTable(desc);
Configuration c = TEST_UTIL.getConfiguration();
HTable h = new HTable(c, TABLENAME);
long ts = System.currentTimeMillis();
Put p = new Put(T1, ts);
p.add(FAMILY, C0, T1);
h.put(p);
p = new Put(T1, ts+2);
p.add(FAMILY, C0, T2);
h.put(p);
p = new Put(T1, ts+4);
p.add(FAMILY, C0, T3);
h.put(p);
Delete d = new Delete(T1, ts+2, null);
h.delete(d);
d = new Delete(T1, ts+3, null);
d.deleteColumns(FAMILY, C0, ts+3);
h.delete(d);
Get g = new Get(T1);
// does *not* include the delete
g.setTimeRange(0, ts+3);
Result r = h.get(g);
assertArrayEquals(T2, r.getValue(FAMILY, C0));
Scan s = new Scan(T1);
s.setTimeRange(0, ts+3);
s.setMaxVersions();
ResultScanner scanner = h.getScanner(s);
KeyValue[] kvs = scanner.next().raw();
assertArrayEquals(T2, kvs[0].getValue());
assertArrayEquals(T1, kvs[1].getValue());
scanner.close();
s = new Scan(T1);
s.setRaw(true);
s.setMaxVersions();
scanner = h.getScanner(s);
kvs = scanner.next().raw();
assertTrue(kvs[0].isDeleteFamily());
assertArrayEquals(T3, kvs[1].getValue());
assertTrue(kvs[2].isDelete());
assertArrayEquals(T2, kvs[3].getValue());
assertArrayEquals(T1, kvs[4].getValue());
scanner.close();
}
/**
* HBASE-2468 use case 1 and 2: region info de/serialization
*/
@Test

View File

@ -24,13 +24,8 @@ import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.fail;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -240,11 +235,15 @@ public class TestCompaction extends HBaseTestCase {
// Multiple versions allowed for an entry, so the delete isn't enough
// Lower TTL and expire to ensure that all our entries have been wiped
final int ttlInSeconds = 1;
final int ttl = 1000;
for (Store store: this.r.stores.values()) {
store.ttl = ttlInSeconds * 1000;
Store.ScanInfo old = store.scanInfo;
Store.ScanInfo si = new Store.ScanInfo(old.getFamily(),
old.getMinVersions(), old.getMaxVersions(), ttl,
old.getKeepDeletedCells(), old.getComparator());
store.scanInfo = si;
}
Thread.sleep(ttlInSeconds * 1000);
Thread.sleep(1000);
r.compactStores(true);
int count = count();
@ -446,11 +445,15 @@ public class TestCompaction extends HBaseTestCase {
// Multiple versions allowed for an entry, so the delete isn't enough
// Lower TTL and expire to ensure that all our entries have been wiped
final int ttlInSeconds = 1;
final int ttl = 1000;
for (Store store: this.r.stores.values()) {
store.ttl = ttlInSeconds * 1000;
Store.ScanInfo old = store.scanInfo;
Store.ScanInfo si = new Store.ScanInfo(old.getFamily(),
old.getMinVersions(), old.getMaxVersions(), ttl,
old.getKeepDeletedCells(), old.getComparator());
store.scanInfo = si;
}
Thread.sleep(ttlInSeconds * 1000);
Thread.sleep(ttl);
r.compactStores(true);
assertEquals(0, count());

View File

@ -27,6 +27,7 @@ import java.util.TreeSet;
import java.util.Arrays;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
import org.apache.hadoop.hbase.util.Bytes;
@ -54,7 +55,8 @@ public class TestExplicitColumnTracker extends HBaseTestCase {
long timestamp = 0;
//"Match"
for(byte [] col : scannerColumns){
result.add(exp.checkColumn(col, 0, col.length, ++timestamp));
result.add(exp.checkColumn(col, 0, col.length, ++timestamp,
KeyValue.Type.Put.getCode()));
}
assertEquals(expected.size(), result.size());
@ -166,13 +168,13 @@ public class TestExplicitColumnTracker extends HBaseTestCase {
Long.MAX_VALUE);
for (int i = 0; i < 100000; i+=2) {
byte [] col = Bytes.toBytes("col"+i);
explicit.checkColumn(col, 0, col.length, 1);
explicit.checkColumn(col, 0, col.length, 1, KeyValue.Type.Put.getCode());
}
explicit.update();
for (int i = 1; i < 100000; i+=2) {
byte [] col = Bytes.toBytes("col"+i);
explicit.checkColumn(col, 0, col.length, 1);
explicit.checkColumn(col, 0, col.length, 1, KeyValue.Type.Put.getCode());
}
}

View File

@ -0,0 +1,725 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
public class TestKeepDeletes extends HBaseTestCase {
private final byte[] T0 = Bytes.toBytes("0");
private final byte[] T1 = Bytes.toBytes("1");
private final byte[] T2 = Bytes.toBytes("2");
private final byte[] T3 = Bytes.toBytes("3");
private final byte[] T4 = Bytes.toBytes("4");
private final byte[] T5 = Bytes.toBytes("5");
private final byte[] T6 = Bytes.toBytes("6");
private final byte[] c0 = COLUMNS[0];
private final byte[] c1 = COLUMNS[1];
/**
* Make sure that deleted rows are retained.
* Family delete markers are deleted.
* Column Delete markers are versioned
* Time range scan of deleted rows are possible
*/
public void testBasicScenario() throws Exception {
// keep 3 versions, rows do not expire
HTableDescriptor htd = createTableDescriptor(getName(), 0, 3,
HConstants.FOREVER, true);
HRegion region = createNewHRegion(htd, null, null);
long ts = System.currentTimeMillis();
Put p = new Put(T1, ts);
p.add(c0, c0, T1);
region.put(p);
p = new Put(T1, ts+1);
p.add(c0, c0, T2);
region.put(p);
p = new Put(T1, ts+2);
p.add(c0, c0, T3);
region.put(p);
p = new Put(T1, ts+4);
p.add(c0, c0, T4);
region.put(p);
// now place a delete marker at ts+2
Delete d = new Delete(T1, ts+2, null);
region.delete(d, null, true);
// a raw scan can see the delete markers
// (one for each column family)
assertEquals(3, countDeleteMarkers(region));
// get something *before* the delete marker
Get g = new Get(T1);
g.setMaxVersions();
g.setTimeRange(0L, ts+2);
Result r = region.get(g, null);
checkResult(r, c0, c0, T2,T1);
// flush
region.flushcache();
// yep, T2 still there, T1 gone
r = region.get(g, null);
checkResult(r, c0, c0, T2);
// major compact
region.compactStores(true);
region.compactStores(true);
// one delete marker left (the others did not
// have older puts)
assertEquals(1, countDeleteMarkers(region));
// still there (even after multiple compactions)
r = region.get(g, null);
checkResult(r, c0, c0, T2);
// a timerange that includes the delete marker won't see past rows
g.setTimeRange(0L, ts+4);
r = region.get(g, null);
assertTrue(r.isEmpty());
// two more puts, this will expire the older puts.
p = new Put(T1, ts+5);
p.add(c0, c0, T5);
region.put(p);
p = new Put(T1, ts+6);
p.add(c0, c0, T6);
region.put(p);
// also add an old put again
// (which is past the max versions)
p = new Put(T1, ts);
p.add(c0, c0, T1);
region.put(p);
r = region.get(g, null);
assertTrue(r.isEmpty());
region.flushcache();
region.compactStores(true);
region.compactStores(true);
// verify that the delete marker itself was collected
region.put(p);
r = region.get(g, null);
checkResult(r, c0, c0, T1);
assertEquals(0, countDeleteMarkers(region));
}
/**
* Even when the store does not keep deletes a "raw" scan will
* return everything it can find (unless discarding cells is guaranteed
* to have no effect).
* Assuming this the desired behavior. Could also disallow "raw" scanning
* if the store does not have KEEP_DELETED_CELLS enabled.
* (can be changed easily)
*/
public void testRawScanWithoutKeepingDeletes() throws Exception {
// KEEP_DELETED_CELLS is NOT enabled
HTableDescriptor htd = createTableDescriptor(getName(), 0, 3,
HConstants.FOREVER, false);
HRegion region = createNewHRegion(htd, null, null);
long ts = System.currentTimeMillis();
Put p = new Put(T1, ts);
p.add(c0, c0, T1);
region.put(p);
Delete d = new Delete(T1, ts, null);
d.deleteColumn(c0, c0, ts);
region.delete(d, null, true);
// scan still returns delete markers and deletes rows
Scan s = new Scan();
s.setRaw(true);
s.setMaxVersions();
InternalScanner scan = region.getScanner(s);
List<KeyValue> kvs = new ArrayList<KeyValue>();
scan.next(kvs);
assertEquals(2, kvs.size());
region.flushcache();
region.compactStores(true);
// after compaction they are gone
// (note that this a test with a Store without
// KEEP_DELETED_CELLS)
s = new Scan();
s.setRaw(true);
s.setMaxVersions();
scan = region.getScanner(s);
kvs = new ArrayList<KeyValue>();
scan.next(kvs);
assertTrue(kvs.isEmpty());
}
/**
* basic verification of existing behavior
*/
public void testWithoutKeepingDeletes() throws Exception {
// KEEP_DELETED_CELLS is NOT enabled
HTableDescriptor htd = createTableDescriptor(getName(), 0, 3,
HConstants.FOREVER, false);
HRegion region = createNewHRegion(htd, null, null);
long ts = System.currentTimeMillis();
Put p = new Put(T1, ts);
p.add(c0, c0, T1);
region.put(p);
Delete d = new Delete(T1, ts+2, null);
d.deleteColumn(c0, c0, ts);
region.delete(d, null, true);
// "past" get does not see rows behind delete marker
Get g = new Get(T1);
g.setMaxVersions();
g.setTimeRange(0L, ts+1);
Result r = region.get(g, null);
assertTrue(r.isEmpty());
// "past" scan does not see rows behind delete marker
Scan s = new Scan();
s.setMaxVersions();
s.setTimeRange(0L, ts+1);
InternalScanner scanner = region.getScanner(s);
List<KeyValue> kvs = new ArrayList<KeyValue>();
while(scanner.next(kvs));
assertTrue(kvs.isEmpty());
// flushing and minor compaction keep delete markers
region.flushcache();
region.compactStores();
assertEquals(1, countDeleteMarkers(region));
region.compactStores(true);
// major compaction deleted it
assertEquals(0, countDeleteMarkers(region));
}
/**
* The ExplicitColumnTracker does not support "raw" scanning.
*/
public void testRawScanWithColumns() throws Exception {
HTableDescriptor htd = createTableDescriptor(getName(), 0, 3,
HConstants.FOREVER, true);
HRegion region = createNewHRegion(htd, null, null);
Scan s = new Scan();
s.setRaw(true);
s.setMaxVersions();
s.addColumn(c0, c0);
try {
InternalScanner scan = region.getScanner(s);
fail("raw scanner with columns should have failed");
} catch (DoNotRetryIOException dnre) {
// ok!
}
}
/**
* Verify that "raw" scanning mode return delete markers and deletes rows.
*/
public void testRawScan() throws Exception {
HTableDescriptor htd = createTableDescriptor(getName(), 0, 3,
HConstants.FOREVER, true);
HRegion region = createNewHRegion(htd, null, null);
long ts = System.currentTimeMillis();
Put p = new Put(T1, ts);
p.add(c0, c0, T1);
region.put(p);
p = new Put(T1, ts+2);
p.add(c0, c0, T2);
region.put(p);
p = new Put(T1, ts+4);
p.add(c0, c0, T3);
region.put(p);
Delete d = new Delete(T1, ts+1, null);
region.delete(d, null, true);
d = new Delete(T1, ts+2, null);
d.deleteColumn(c0, c0, ts+2);
region.delete(d, null, true);
d = new Delete(T1, ts+3, null);
d.deleteColumns(c0, c0, ts+3);
region.delete(d, null, true);
Scan s = new Scan();
s.setRaw(true);
s.setMaxVersions();
InternalScanner scan = region.getScanner(s);
List<KeyValue> kvs = new ArrayList<KeyValue>();
scan.next(kvs);
assertTrue(kvs.get(0).isDeleteFamily());
assertEquals(kvs.get(1).getValue(), T3);
assertTrue(kvs.get(2).isDelete());
assertTrue(kvs.get(3).isDeleteType());
assertEquals(kvs.get(4).getValue(), T2);
assertEquals(kvs.get(5).getValue(), T1);
}
/**
* Verify that delete markers are removed from an otherwise empty store.
*/
public void testDeleteMarkerExpirationEmptyStore() throws Exception {
HTableDescriptor htd = createTableDescriptor(getName(), 0, 1,
HConstants.FOREVER, true);
HRegion region = createNewHRegion(htd, null, null);
long ts = System.currentTimeMillis();
Delete d = new Delete(T1, ts, null);
d.deleteColumns(c0, c0, ts);
region.delete(d, null, true);
d = new Delete(T1, ts, null);
d.deleteFamily(c0);
region.delete(d, null, true);
d = new Delete(T1, ts, null);
d.deleteColumn(c0, c0, ts+1);
region.delete(d, null, true);
d = new Delete(T1, ts, null);
d.deleteColumn(c0, c0, ts+2);
region.delete(d, null, true);
// 1 family marker, 1 column marker, 2 version markers
assertEquals(4, countDeleteMarkers(region));
// neither flush nor minor compaction removes any marker
region.flushcache();
assertEquals(4, countDeleteMarkers(region));
region.compactStores(false);
assertEquals(4, countDeleteMarkers(region));
// major compaction removes all, since there are no puts they affect
region.compactStores(true);
assertEquals(0, countDeleteMarkers(region));
}
/**
* Test delete marker removal from store files.
*/
public void testDeleteMarkerExpiration() throws Exception {
HTableDescriptor htd = createTableDescriptor(getName(), 0, 1,
HConstants.FOREVER, true);
HRegion region = createNewHRegion(htd, null, null);
long ts = System.currentTimeMillis();
Put p = new Put(T1, ts);
p.add(c0, c0, T1);
region.put(p);
// a put into another store (CF) should have no effect
p = new Put(T1, ts-10);
p.add(c1, c0, T1);
region.put(p);
// all the following deletes affect the put
Delete d = new Delete(T1, ts, null);
d.deleteColumns(c0, c0, ts);
region.delete(d, null, true);
d = new Delete(T1, ts, null);
d.deleteFamily(c0, ts);
region.delete(d, null, true);
d = new Delete(T1, ts, null);
d.deleteColumn(c0, c0, ts+1);
region.delete(d, null, true);
d = new Delete(T1, ts, null);
d.deleteColumn(c0, c0, ts+2);
region.delete(d, null, true);
// 1 family marker, 1 column marker, 2 version markers
assertEquals(4, countDeleteMarkers(region));
region.flushcache();
assertEquals(4, countDeleteMarkers(region));
region.compactStores(false);
assertEquals(4, countDeleteMarkers(region));
// another put will push out the earlier put...
p = new Put(T1, ts+3);
p.add(c0, c0, T1);
region.put(p);
region.flushcache();
// no markers are collected, since there is an affected put
region.compactStores(true);
assertEquals(4, countDeleteMarkers(region));
// the last collections collected the earlier put
// so after this collection all markers
region.compactStores(true);
assertEquals(0, countDeleteMarkers(region));
}
/**
* Verify correct range demarcation
*/
public void testRanges() throws Exception {
HTableDescriptor htd = createTableDescriptor(getName(), 0, 3,
HConstants.FOREVER, true);
HRegion region = createNewHRegion(htd, null, null);
long ts = System.currentTimeMillis();
Put p = new Put(T1, ts);
p.add(c0, c0, T1);
p.add(c0, c1, T1);
p.add(c1, c0, T1);
p.add(c1, c1, T1);
region.put(p);
p = new Put(T2, ts);
p.add(c0, c0, T1);
p.add(c0, c1, T1);
p.add(c1, c0, T1);
p.add(c1, c1, T1);
region.put(p);
p = new Put(T1, ts+1);
p.add(c0, c0, T2);
p.add(c0, c1, T2);
p.add(c1, c0, T2);
p.add(c1, c1, T2);
region.put(p);
p = new Put(T2, ts+1);
p.add(c0, c0, T2);
p.add(c0, c1, T2);
p.add(c1, c0, T2);
p.add(c1, c1, T2);
region.put(p);
Delete d = new Delete(T1, ts+1, null);
d.deleteColumns(c0, c0, ts+1);
region.delete(d, null, true);
d = new Delete(T1, ts+1, null);
d.deleteFamily(c1, ts+1);
region.delete(d, null, true);
d = new Delete(T2, ts+1, null);
d.deleteFamily(c0, ts+1);
region.delete(d, null, true);
// add an older delete, to make sure it is filtered
d = new Delete(T1, ts-10, null);
d.deleteFamily(c1, ts-10);
region.delete(d, null, true);
// ts + 2 does NOT include the delete at ts+1
checkGet(region, T1, c0, c0, ts+2, T2, T1);
checkGet(region, T1, c0, c1, ts+2, T2, T1);
checkGet(region, T1, c1, c0, ts+2, T2, T1);
checkGet(region, T1, c1, c1, ts+2, T2, T1);
checkGet(region, T2, c0, c0, ts+2, T2, T1);
checkGet(region, T2, c0, c1, ts+2, T2, T1);
checkGet(region, T2, c1, c0, ts+2, T2, T1);
checkGet(region, T2, c1, c1, ts+2, T2, T1);
// ts + 3 does
checkGet(region, T1, c0, c0, ts+3);
checkGet(region, T1, c0, c1, ts+3, T2, T1);
checkGet(region, T1, c1, c0, ts+3);
checkGet(region, T1, c1, c1, ts+3);
checkGet(region, T2, c0, c0, ts+3);
checkGet(region, T2, c0, c1, ts+3);
checkGet(region, T2, c1, c0, ts+3, T2, T1);
checkGet(region, T2, c1, c1, ts+3, T2, T1);
}
/**
* Verify that column/version delete makers are sorted
* with their respective puts and removed correctly by
* versioning (i.e. not relying on the store earliestPutTS).
*/
public void testDeleteMarkerVersioning() throws Exception {
HTableDescriptor htd = createTableDescriptor(getName(), 0, 1,
HConstants.FOREVER, true);
HRegion region = createNewHRegion(htd, null, null);
long ts = System.currentTimeMillis();
Put p = new Put(T1, ts);
p.add(c0, c0, T1);
region.put(p);
// this prevents marker collection based on earliestPut
// (cannot keep earliest put per column in the store file)
p = new Put(T1, ts-10);
p.add(c0, c1, T1);
region.put(p);
Delete d = new Delete(T1, ts, null);
// test corner case (Put and Delete have same TS)
d.deleteColumns(c0, c0, ts);
region.delete(d, null, true);
d = new Delete(T1, ts+1, null);
d.deleteColumn(c0, c0, ts+1);
region.delete(d, null, true);
d = new Delete(T1, ts+3, null);
d.deleteColumn(c0, c0, ts+3);
region.delete(d, null, true);
region.flushcache();
region.compactStores(true);
region.compactStores(true);
assertEquals(3, countDeleteMarkers(region));
// add two more puts, since max version is 1
// the 2nd put (and all delete markers following)
// will be removed.
p = new Put(T1, ts+2);
p.add(c0, c0, T2);
region.put(p);
// delete, put, delete, delete, put
assertEquals(3, countDeleteMarkers(region));
p = new Put(T1, ts+3);
p.add(c0, c0, T3);
region.put(p);
// This is potentially questionable behavior.
// This could be changed by not letting the ScanQueryMatcher
// return SEEK_NEXT_COL if a put is past VERSIONS, but instead
// return SKIP if the store has KEEP_DELETED_CELLS set.
//
// As it stands, the 1 here is correct here.
// There are two puts, VERSIONS is one, so after the 1st put the scanner
// knows that there can be no more KVs (put or delete) that have any effect.
//
// delete, put, put | delete, delete
assertEquals(1, countDeleteMarkers(region));
// flush cache only sees what is in the memstore
region.flushcache();
// Here we have the three markers again, because the flush above
// removed the 2nd put before the file is written.
// So there's only one put, and hence the deletes already in the store
// files cannot be removed safely.
// delete, put, delete, delete
assertEquals(3, countDeleteMarkers(region));
region.compactStores(true);
assertEquals(3, countDeleteMarkers(region));
// add one more put
p = new Put(T1, ts+4);
p.add(c0, c0, T4);
region.put(p);
region.flushcache();
// one trailing delete marker remains (but only one)
// because delete markers do not increase the version count
assertEquals(1, countDeleteMarkers(region));
region.compactStores(true);
region.compactStores(true);
assertEquals(1, countDeleteMarkers(region));
}
/**
* Verify scenarios with multiple CFs and columns
*/
public void testWithMixedCFs() throws Exception {
HTableDescriptor htd = createTableDescriptor(getName(), 0, 1,
HConstants.FOREVER, true);
HRegion region = createNewHRegion(htd, null, null);
long ts = System.currentTimeMillis();
Put p = new Put(T1, ts);
p.add(c0, c0, T1);
p.add(c0, c1, T1);
p.add(c1, c0, T1);
p.add(c1, c1, T1);
region.put(p);
p = new Put(T2, ts+1);
p.add(c0, c0, T2);
p.add(c0, c1, T2);
p.add(c1, c0, T2);
p.add(c1, c1, T2);
region.put(p);
// family markers are each family
Delete d = new Delete(T1, ts, null);
region.delete(d, null, true);
d = new Delete(T2, ts+1, null);
region.delete(d, null, true);
Scan s = new Scan(T1);
s.setTimeRange(0, ts+1);
InternalScanner scanner = region.getScanner(s);
List<KeyValue> kvs = new ArrayList<KeyValue>();
scanner.next(kvs);
assertEquals(4, kvs.size());
scanner.close();
s = new Scan(T2);
s.setTimeRange(0, ts+2);
scanner = region.getScanner(s);
kvs = new ArrayList<KeyValue>();
scanner.next(kvs);
assertEquals(4, kvs.size());
scanner.close();
}
/**
* Test keeping deleted rows together with min versions set
* @throws Exception
*/
public void testWithMinVersions() throws Exception {
HTableDescriptor htd = createTableDescriptor(getName(), 3, 1000, 1, true);
HRegion region = createNewHRegion(htd, null, null);
long ts = System.currentTimeMillis() - 2000; // 2s in the past
Put p = new Put(T1, ts);
p.add(c0, c0, T3);
region.put(p);
p = new Put(T1, ts-1);
p.add(c0, c0, T2);
region.put(p);
p = new Put(T1, ts-3);
p.add(c0, c0, T1);
region.put(p);
p = new Put(T1, ts-4);
p.add(c0, c0, T0);
region.put(p);
// all puts now are just retained because of min versions = 3
// place a family delete marker
Delete d = new Delete(T1, ts-1, null);
region.delete(d, null, true);
// and a column delete marker
d = new Delete(T1, ts-2, null);
d.deleteColumns(c0, c0, ts-1);
region.delete(d, null, true);
Get g = new Get(T1);
g.setMaxVersions();
g.setTimeRange(0L, ts-2);
Result r = region.get(g, null);
checkResult(r, c0, c0, T1,T0);
// 3 families, one column delete marker
assertEquals(4, countDeleteMarkers(region));
region.flushcache();
// no delete marker removes by the flush
assertEquals(4, countDeleteMarkers(region));
r = region.get(g, null);
checkResult(r, c0, c0, T1);
p = new Put(T1, ts+1);
p.add(c0, c0, T4);
region.put(p);
region.flushcache();
assertEquals(4, countDeleteMarkers(region));
r = region.get(g, null);
checkResult(r, c0, c0, T1);
// this will push out the last put before
// family delete marker
p = new Put(T1, ts+2);
p.add(c0, c0, T5);
region.put(p);
region.flushcache();
region.compactStores(true);
// the two family markers without puts are gone
assertEquals(2, countDeleteMarkers(region));
// the last compactStores updated the earliestPutTs,
// so after the next compaction the last family delete marker is also gone
region.compactStores(true);
assertEquals(0, countDeleteMarkers(region));
}
private void checkGet(HRegion region, byte[] row, byte[] fam, byte[] col,
long time, byte[]... vals) throws IOException {
Get g = new Get(row);
g.addColumn(fam, col);
g.setMaxVersions();
g.setTimeRange(0L, time);
Result r = region.get(g, null);
checkResult(r, fam, col, vals);
}
private int countDeleteMarkers(HRegion region) throws IOException {
Scan s = new Scan();
s.setRaw(true);
s.setMaxVersions();
InternalScanner scan = region.getScanner(s);
List<KeyValue> kvs = new ArrayList<KeyValue>();
int res = 0;
boolean hasMore;
do {
hasMore = scan.next(kvs);
for (KeyValue kv : kvs) {
if(kv.isDelete()) res++;
}
kvs.clear();
} while (hasMore);
scan.close();
return res;
}
private void checkResult(Result r, byte[] fam, byte[] col, byte[] ... vals) {
assertEquals(r.size(), vals.length);
List<KeyValue> kvs = r.getColumn(fam, col);
assertEquals(kvs.size(), vals.length);
for (int i=0;i<vals.length;i++) {
assertEquals(kvs.get(i).getValue(), vals[i]);
}
}
}

View File

@ -26,8 +26,6 @@ import java.rmi.UnexpectedException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
@ -39,8 +37,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.Store.ScanInfo;
import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.base.Joiner;
@ -89,8 +88,10 @@ public class TestMemStore extends TestCase {
Scan scan = new Scan();
List<KeyValue> result = new ArrayList<KeyValue>();
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
StoreScanner s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
this.memstore.comparator, null, memstorescanners);
ScanInfo scanInfo = new ScanInfo(null, 0, 1, HConstants.LATEST_TIMESTAMP, false,
this.memstore.comparator);
ScanType scanType = ScanType.USER_SCAN;
StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
int count = 0;
try {
while (s.next(result)) {
@ -111,8 +112,7 @@ public class TestMemStore extends TestCase {
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
memstorescanners = this.memstore.getScanners();
// Now assert can count same number even if a snapshot mid-scan.
s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
this.memstore.comparator, null, memstorescanners);
s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
count = 0;
try {
while (s.next(result)) {
@ -138,8 +138,7 @@ public class TestMemStore extends TestCase {
memstorescanners = this.memstore.getScanners();
// Assert that new values are seen in kvset as we scan.
long ts = System.currentTimeMillis();
s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
this.memstore.comparator, null, memstorescanners);
s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
count = 0;
int snapshotIndex = 5;
try {
@ -553,10 +552,12 @@ public class TestMemStore extends TestCase {
}
//starting from each row, validate results should contain the starting row
for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
InternalScanner scanner =
new StoreScanner(new Scan(Bytes.toBytes(startRowId)), FAMILY,
Integer.MAX_VALUE, this.memstore.comparator, null,
memstore.getScanners());
ScanInfo scanInfo = new ScanInfo(FAMILY, 0, 1, Integer.MAX_VALUE, false,
this.memstore.comparator);
ScanType scanType = ScanType.USER_SCAN;
InternalScanner scanner = new StoreScanner(new Scan(
Bytes.toBytes(startRowId)), scanInfo, scanType, null,
memstore.getScanners());
List<KeyValue> results = new ArrayList<KeyValue>();
for (int i = 0; scanner.next(results); i++) {
int rowId = startRowId + i;

View File

@ -49,7 +49,7 @@ public class TestMinVersions extends HBaseTestCase {
* Verify behavior of getClosestBefore(...)
*/
public void testGetClosestBefore() throws Exception {
HTableDescriptor htd = createTableDescriptor(getName(), 1, 1000, 1);
HTableDescriptor htd = createTableDescriptor(getName(), 1, 1000, 1, false);
HRegion region = createNewHRegion(htd, null, null);
long ts = System.currentTimeMillis() - 2000; // 2s in the past
@ -92,7 +92,7 @@ public class TestMinVersions extends HBaseTestCase {
*/
public void testStoreMemStore() throws Exception {
// keep 3 versions minimum
HTableDescriptor htd = createTableDescriptor(getName(), 3, 1000, 1);
HTableDescriptor htd = createTableDescriptor(getName(), 3, 1000, 1, false);
HRegion region = createNewHRegion(htd, null, null);
long ts = System.currentTimeMillis() - 2000; // 2s in the past
@ -141,7 +141,7 @@ public class TestMinVersions extends HBaseTestCase {
* Make sure the Deletes behave as expected with minimum versions
*/
public void testDelete() throws Exception {
HTableDescriptor htd = createTableDescriptor(getName(), 3, 1000, 1);
HTableDescriptor htd = createTableDescriptor(getName(), 3, 1000, 1, false);
HRegion region = createNewHRegion(htd, null, null);
long ts = System.currentTimeMillis() - 2000; // 2s in the past
@ -193,7 +193,7 @@ public class TestMinVersions extends HBaseTestCase {
* Make sure the memstor behaves correctly with minimum versions
*/
public void testMemStore() throws Exception {
HTableDescriptor htd = createTableDescriptor(getName(), 2, 1000, 1);
HTableDescriptor htd = createTableDescriptor(getName(), 2, 1000, 1, false);
HRegion region = createNewHRegion(htd, null, null);
long ts = System.currentTimeMillis() - 2000; // 2s in the past
@ -262,7 +262,7 @@ public class TestMinVersions extends HBaseTestCase {
*/
public void testBaseCase() throws Exception {
// 1 version minimum, 1000 versions maximum, ttl = 1s
HTableDescriptor htd = createTableDescriptor(getName(), 2, 1000, 1);
HTableDescriptor htd = createTableDescriptor(getName(), 2, 1000, 1, false);
HRegion region = createNewHRegion(htd, null, null);
long ts = System.currentTimeMillis() - 2000; // 2s in the past
@ -347,7 +347,7 @@ public class TestMinVersions extends HBaseTestCase {
* minimum versions enabled.
*/
public void testFilters() throws Exception {
HTableDescriptor htd = createTableDescriptor(getName(), 2, 1000, 1);
HTableDescriptor htd = createTableDescriptor(getName(), 2, 1000, 1, false);
HRegion region = createNewHRegion(htd, null, null);
final byte [] c1 = COLUMNS[1];
@ -408,7 +408,7 @@ public class TestMinVersions extends HBaseTestCase {
g.setMaxVersions();
r = region.get(g, null);
checkResult(r, c0, T2);
}
}
private void checkResult(Result r, byte[] col, byte[] ... vals) {
assertEquals(r.size(), vals.length);

View File

@ -26,6 +26,7 @@ import java.util.List;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.KeyValue.KeyComparator;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
@ -51,7 +52,7 @@ public class TestQueryMatcher extends HBaseTestCase {
private Get get;
long ttl = Long.MAX_VALUE;
KeyComparator rowComparator;
KVComparator rowComparator;
private Scan scan;
public void setUp() throws Exception {
@ -76,7 +77,7 @@ public class TestQueryMatcher extends HBaseTestCase {
get.addColumn(fam2, col5);
this.scan = new Scan(get);
rowComparator = KeyValue.KEY_COMPARATOR;
rowComparator = KeyValue.COMPARATOR;
}
@ -95,8 +96,9 @@ public class TestQueryMatcher extends HBaseTestCase {
expected.add(ScanQueryMatcher.MatchCode.DONE);
// 2,4,5
ScanQueryMatcher qm = new ScanQueryMatcher(scan, fam2,
get.getFamilyMap().get(fam2), ttl, rowComparator, 1);
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new Store.ScanInfo(fam2,
0, 1, ttl, false, rowComparator), get.getFamilyMap().get(fam2));
List<KeyValue> memstore = new ArrayList<KeyValue>();
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@ -139,7 +141,8 @@ public class TestQueryMatcher extends HBaseTestCase {
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
expected.add(ScanQueryMatcher.MatchCode.DONE);
ScanQueryMatcher qm = new ScanQueryMatcher(scan, fam2, null, ttl, rowComparator, 1);
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new Store.ScanInfo(fam2,
0, 1, ttl, false, rowComparator), null);
List<KeyValue> memstore = new ArrayList<KeyValue>();
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@ -189,8 +192,8 @@ public class TestQueryMatcher extends HBaseTestCase {
ScanQueryMatcher.MatchCode.DONE
};
ScanQueryMatcher qm = new ScanQueryMatcher(scan, fam2,
get.getFamilyMap().get(fam2), testTTL, rowComparator, 1);
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new Store.ScanInfo(fam2,
0, 1, testTTL, false, rowComparator), get.getFamilyMap().get(fam2));
long now = System.currentTimeMillis();
KeyValue [] kvs = new KeyValue[] {
@ -241,8 +244,8 @@ public class TestQueryMatcher extends HBaseTestCase {
ScanQueryMatcher.MatchCode.DONE
};
ScanQueryMatcher qm = new ScanQueryMatcher(scan, fam2,
null, testTTL, rowComparator, 1);
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new Store.ScanInfo(fam2,
0, 1, testTTL, false, rowComparator), null);
long now = System.currentTimeMillis();
KeyValue [] kvs = new KeyValue[] {

View File

@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
import org.apache.hadoop.hbase.util.Bytes;
@ -54,7 +55,7 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
for(byte [] qualifier : qualifiers) {
ScanQueryMatcher.MatchCode mc = tracker.checkColumn(qualifier, 0,
qualifier.length, 1);
qualifier.length, 1, KeyValue.Type.Put.getCode());
actual.add(mc);
}
@ -87,7 +88,7 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
long timestamp = 0;
for(byte [] qualifier : qualifiers) {
MatchCode mc = tracker.checkColumn(qualifier, 0, qualifier.length,
++timestamp);
++timestamp, KeyValue.Type.Put.getCode());
actual.add(mc);
}
@ -110,7 +111,8 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
try {
for(byte [] qualifier : qualifiers) {
tracker.checkColumn(qualifier, 0, qualifier.length, 1);
tracker.checkColumn(qualifier, 0, qualifier.length, 1,
KeyValue.Type.Put.getCode());
}
} catch (Exception e) {
ok = true;

View File

@ -24,16 +24,13 @@ import junit.framework.TestCase;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.Store.ScanInfo;
import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
import org.apache.hadoop.hbase.util.Bytes;
import org.mockito.Mockito;
import org.mockito.stubbing.OngoingStubbing;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
@ -42,6 +39,9 @@ import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixtu
public class TestStoreScanner extends TestCase {
private static final String CF_STR = "cf";
final byte [] CF = Bytes.toBytes(CF_STR);
private ScanInfo scanInfo = new ScanInfo(CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE, false,
KeyValue.COMPARATOR);
private ScanType scanType = ScanType.USER_SCAN;
/*
* Test utility for building a NavigableSet for scanners.
@ -74,9 +74,8 @@ public class TestStoreScanner extends TestCase {
Scan scanSpec = new Scan(Bytes.toBytes(r1));
scanSpec.setTimeRange(0, 6);
scanSpec.setMaxVersions();
StoreScanner scan =
new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
KeyValue.COMPARATOR, getCols("a"), scanners);
StoreScanner scan = new StoreScanner(scanSpec, scanInfo, scanType,
getCols("a"), scanners);
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
assertEquals(5, results.size());
@ -85,8 +84,8 @@ public class TestStoreScanner extends TestCase {
scanSpec = new Scan(Bytes.toBytes(r1));
scanSpec.setTimeRange(1, 3);
scanSpec.setMaxVersions();
scan = new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
KeyValue.COMPARATOR, getCols("a"), scanners);
scan = new StoreScanner(scanSpec, scanInfo, scanType, getCols("a"),
scanners);
results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
assertEquals(2, results.size());
@ -94,8 +93,8 @@ public class TestStoreScanner extends TestCase {
scanSpec = new Scan(Bytes.toBytes(r1));
scanSpec.setTimeRange(5, 10);
scanSpec.setMaxVersions();
scan = new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
KeyValue.COMPARATOR, getCols("a"), scanners);
scan = new StoreScanner(scanSpec, scanInfo, scanType, getCols("a"),
scanners);
results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
assertEquals(1, results.size());
@ -104,8 +103,8 @@ public class TestStoreScanner extends TestCase {
scanSpec = new Scan(Bytes.toBytes(r1));
scanSpec.setTimeRange(0, 10);
scanSpec.setMaxVersions(3);
scan = new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
KeyValue.COMPARATOR, getCols("a"), scanners);
scan = new StoreScanner(scanSpec, scanInfo, scanType, getCols("a"),
scanners);
results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
assertEquals(3, results.size());
@ -124,10 +123,8 @@ public class TestStoreScanner extends TestCase {
Scan scanSpec = new Scan(Bytes.toBytes("R1"));
// this only uses maxVersions (default=1) and TimeRange (default=all)
StoreScanner scan =
new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
KeyValue.COMPARATOR, getCols("a"),
scanners);
StoreScanner scan = new StoreScanner(scanSpec, scanInfo, scanType,
getCols("a"), scanners);
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
@ -153,10 +150,8 @@ public class TestStoreScanner extends TestCase {
Scan scanSpec = new Scan(Bytes.toBytes("R1"));
// this only uses maxVersions (default=1) and TimeRange (default=all)
StoreScanner scan =
new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
KeyValue.COMPARATOR, getCols("a"),
scanners);
StoreScanner scan = new StoreScanner(scanSpec, scanInfo, scanType,
getCols("a"), scanners);
List<KeyValue> results = new ArrayList<KeyValue>();
scan.next(results);
@ -183,9 +178,8 @@ public class TestStoreScanner extends TestCase {
};
List<KeyValueScanner> scanners = scanFixture(kvs);
Scan scanSpec = new Scan(Bytes.toBytes("R1"));
StoreScanner scan =
new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
getCols("a"), scanners);
StoreScanner scan = new StoreScanner(scanSpec, scanInfo, scanType,
getCols("a"), scanners);
List<KeyValue> results = new ArrayList<KeyValue>();
assertFalse(scan.next(results));
@ -204,9 +198,8 @@ public class TestStoreScanner extends TestCase {
};
List<KeyValueScanner> scanners = scanFixture(kvs);
Scan scanSpec = new Scan(Bytes.toBytes("R1"));
StoreScanner scan =
new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
getCols("a"), scanners);
StoreScanner scan = new StoreScanner(scanSpec, scanInfo, scanType,
getCols("a"), scanners);
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
@ -232,9 +225,8 @@ public class TestStoreScanner extends TestCase {
};
List<KeyValueScanner> scanners = scanFixture(kvs1, kvs2);
StoreScanner scan =
new StoreScanner(new Scan(Bytes.toBytes("R1")), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
getCols("a"), scanners);
StoreScanner scan = new StoreScanner(new Scan(Bytes.toBytes("R1")),
scanInfo, scanType, getCols("a"), scanners);
List<KeyValue> results = new ArrayList<KeyValue>();
// the two put at ts=now will be masked by the 1 delete, and
// since the scan default returns 1 version we'll return the newest
@ -258,9 +250,8 @@ public class TestStoreScanner extends TestCase {
List<KeyValueScanner> scanners = scanFixture(kvs1, kvs2);
Scan scanSpec = new Scan(Bytes.toBytes("R1")).setMaxVersions(2);
StoreScanner scan =
new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
getCols("a"), scanners);
StoreScanner scan = new StoreScanner(scanSpec, scanInfo, scanType,
getCols("a"), scanners);
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
assertEquals(2, results.size());
@ -275,9 +266,8 @@ public class TestStoreScanner extends TestCase {
KeyValueTestUtil.create("R1", "cf", "a", 1, KeyValue.Type.DeleteColumn, "dont-care"),
};
List<KeyValueScanner> scanners = scanFixture(kvs);
StoreScanner scan =
new StoreScanner(new Scan(Bytes.toBytes("R1")), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
null, scanners);
StoreScanner scan = new StoreScanner(new Scan(Bytes.toBytes("R1")),
scanInfo, scanType, null, scanners);
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
assertEquals(2, results.size());
@ -305,9 +295,8 @@ public class TestStoreScanner extends TestCase {
};
List<KeyValueScanner> scanners = scanFixture(kvs);
StoreScanner scan =
new StoreScanner(new Scan().setMaxVersions(2), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
null, scanners);
StoreScanner scan = new StoreScanner(new Scan().setMaxVersions(2),
scanInfo, scanType, null, scanners);
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
assertEquals(5, results.size());
@ -334,9 +323,9 @@ public class TestStoreScanner extends TestCase {
KeyValueTestUtil.create("R2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
};
List<KeyValueScanner> scanners = scanFixture(kvs);
StoreScanner scan =
new StoreScanner(new Scan().setMaxVersions(Integer.MAX_VALUE), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
null, scanners);
StoreScanner scan = new StoreScanner(
new Scan().setMaxVersions(Integer.MAX_VALUE), scanInfo, scanType, null,
scanners);
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
assertEquals(0, results.size());
@ -355,9 +344,8 @@ public class TestStoreScanner extends TestCase {
KeyValueTestUtil.create("R1", "cf", "b", 5, KeyValue.Type.Put, "dont-care")
};
List<KeyValueScanner> scanners = scanFixture(kvs);
StoreScanner scan =
new StoreScanner(new Scan(), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
null, scanners);
StoreScanner scan = new StoreScanner(new Scan(), scanInfo, scanType, null,
scanners);
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
assertEquals(1, results.size());
@ -379,9 +367,8 @@ public class TestStoreScanner extends TestCase {
public void testSkipColumn() throws IOException {
List<KeyValueScanner> scanners = scanFixture(kvs);
StoreScanner scan =
new StoreScanner(new Scan(), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
getCols("a", "d"), scanners);
StoreScanner scan = new StoreScanner(new Scan(), scanInfo, scanType,
getCols("a", "d"), scanners);
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
@ -417,8 +404,11 @@ public class TestStoreScanner extends TestCase {
List<KeyValueScanner> scanners = scanFixture(kvs);
Scan scan = new Scan();
scan.setMaxVersions(1);
ScanInfo scanInfo = new ScanInfo(CF, 0, 1, 500, false,
KeyValue.COMPARATOR);
ScanType scanType = ScanType.USER_SCAN;
StoreScanner scanner =
new StoreScanner(scan, CF, 500, KeyValue.COMPARATOR,
new StoreScanner(scan, scanInfo, scanType,
null, scanners);
List<KeyValue> results = new ArrayList<KeyValue>();
@ -440,10 +430,8 @@ public class TestStoreScanner extends TestCase {
public void testScannerReseekDoesntNPE() throws Exception {
List<KeyValueScanner> scanners = scanFixture(kvs);
StoreScanner scan =
new StoreScanner(new Scan(), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
getCols("a", "d"), scanners);
StoreScanner scan = new StoreScanner(new Scan(), scanInfo, scanType,
getCols("a", "d"), scanners);
// Previously a updateReaders twice in a row would cause an NPE. In test this would also
// normally cause an NPE because scan.store is null. So as long as we get through these
@ -467,9 +455,8 @@ public class TestStoreScanner extends TestCase {
};
List<KeyValueScanner> scanners = scanFixture(kvs);
Scan scanSpec = new Scan(Bytes.toBytes("R1"));
StoreScanner scan =
new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
getCols("a"), scanners);
StoreScanner scan = new StoreScanner(scanSpec, scanInfo, scanType,
getCols("a"), scanners);
assertNull(scan.peek());
}
}