From cca758c679c131327310c1a2b374ff9922beedd6 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Wed, 24 Aug 2011 05:15:37 +0000 Subject: [PATCH] HBASE-4071 Data GC: Remove all versions > TTL EXCEPT the last written version git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1160978 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + .../hadoop/hbase/HColumnDescriptor.java | 65 +++ .../hbase/regionserver/ColumnTracker.java | 25 +- .../regionserver/ExplicitColumnTracker.java | 38 +- .../hadoop/hbase/regionserver/HRegion.java | 3 +- .../hbase/regionserver/ScanQueryMatcher.java | 52 +-- .../ScanWildcardColumnTracker.java | 76 ++-- .../hadoop/hbase/regionserver/Store.java | 32 +- .../hbase/regionserver/StoreScanner.java | 6 +- src/main/ruby/hbase/admin.rb | 1 + .../apache/hadoop/hbase/HBaseTestCase.java | 25 +- .../TestExplicitColumnTracker.java | 5 +- .../hbase/regionserver/TestMinVersions.java | 423 ++++++++++++++++++ .../TestScanWildcardColumnTracker.java | 6 +- 14 files changed, 657 insertions(+), 102 deletions(-) create mode 100644 src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java diff --git a/CHANGES.txt b/CHANGES.txt index c36ea5075d7..e3f5665d592 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -453,6 +453,8 @@ Release 0.91.0 - Unreleased HBASE-4176 Exposing HBase Filters to the Thrift API (Anirudh Todi) HBASE-4221 Changes necessary to build and run against Hadoop 0.23 (todd) + HBASE-4071 Data GC: Remove all versions > TTL EXCEPT the last + written version (Lars Hofhansl) Release 0.90.5 - Unreleased diff --git a/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java index 33476dbaeac..298237af6b3 100644 --- a/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java +++ b/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java @@ -87,6 +87,7 @@ public class HColumnDescriptor implements WritableComparable public static final String BLOOMFILTER = "BLOOMFILTER"; public static final String FOREVER = "FOREVER"; public static final String REPLICATION_SCOPE = "REPLICATION_SCOPE"; + public static final String MIN_VERSIONS = "MIN_VERSIONS"; /** * Default compression type. @@ -99,6 +100,11 @@ public class HColumnDescriptor implements WritableComparable */ public static final int DEFAULT_VERSIONS = 3; + /** + * Default is not to keep a minimum of versions. + */ + public static final int DEFAULT_MIN_VERSIONS = 0; + /* * Cache here the HCD value. * Question: its OK to cache since when we're reenable, we create a new HCD? @@ -259,6 +265,37 @@ public class HColumnDescriptor implements WritableComparable final String compression, final boolean inMemory, final boolean blockCacheEnabled, final int blocksize, final int timeToLive, final String bloomFilter, final int scope) { + this(familyName, DEFAULT_MIN_VERSIONS, maxVersions, compression, inMemory, + blockCacheEnabled, blocksize, timeToLive, bloomFilter, scope); + } + + /** + * Constructor + * @param familyName Column family name. Must be 'printable' -- digit or + * letter -- and may not contain a : + * @param minVersions Minimum number of versions to keep + * @param maxVersions Maximum number of versions to keep + * @param compression Compression type + * @param inMemory If true, column data should be kept in an HRegionServer's + * cache + * @param blockCacheEnabled If true, MapFile blocks should be cached + * @param blocksize Block size to use when writing out storefiles. Use + * smaller blocksizes for faster random-access at expense of larger indices + * (more memory consumption). Default is usually 64k. + * @param timeToLive Time-to-live of cell contents, in seconds + * (use HConstants.FOREVER for unlimited TTL) + * @param bloomFilter Bloom filter type for this column + * @param scope The scope tag for this column + * + * @throws IllegalArgumentException if passed a family name that is made of + * other than 'word' characters: i.e. [a-zA-Z_0-9] or contains + * a : + * @throws IllegalArgumentException if the number of versions is <= 0 + */ + public HColumnDescriptor(final byte [] familyName, final int minVersions, + final int maxVersions, final String compression, final boolean inMemory, + final boolean blockCacheEnabled, final int blocksize, + final int timeToLive, final String bloomFilter, final int scope) { isLegalFamilyName(familyName); this.name = familyName; @@ -267,7 +304,19 @@ public class HColumnDescriptor implements WritableComparable // Until there is support, consider 0 or < 0 -- a configuration error. throw new IllegalArgumentException("Maximum versions must be positive"); } + + if (minVersions > 0) { + if (timeToLive == HConstants.FOREVER) { + throw new IllegalArgumentException("Minimum versions requires TTL."); + } + if (minVersions > maxVersions) { + throw new IllegalArgumentException("Minimum versions must be <= "+ + "maximum versions."); + } + } + setMaxVersions(maxVersions); + setMinVersions(minVersions); setInMemory(inMemory); setBlockCacheEnabled(blockCacheEnabled); setTimeToLive(timeToLive); @@ -508,6 +557,22 @@ public class HColumnDescriptor implements WritableComparable setValue(TTL, Integer.toString(timeToLive)); } + /** + * @return The minimum number of versions to keep. + */ + public int getMinVersions() { + String value = getValue(MIN_VERSIONS); + return (value != null)? Integer.valueOf(value).intValue(): 0; + } + + /** + * @param minVersions The minimum number of versions to keep. + * (used when timeToLive is set) + */ + public void setMinVersions(int minVersions) { + setValue(MIN_VERSIONS, Integer.toString(minVersions)); + } + /** * @return True if MapFile blocks should be cached. */ diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java index 78f946c512f..c4d246a6838 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java @@ -19,10 +19,12 @@ */ package org.apache.hadoop.hbase.regionserver; +import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; + /** * Implementing classes of this interface will be used for the tracking - * and enforcement of columns and numbers of versions during the course of a - * Get or Scan operation. + * and enforcement of columns and numbers of versions and timeToLive during + * the course of a Get or Scan operation. *

* Currently there are two different types of Store/Family-level queries. *

  • {@link ExplicitColumnTracker} is used when the query specifies @@ -42,11 +44,11 @@ public interface ColumnTracker { * @param bytes * @param offset * @param length - * @param timestamp + * @param ttl The timeToLive to enforce. * @return The match code instance. */ public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset, - int length, long timestamp); + int length, long ttl); /** * Updates internal variables in between files @@ -76,4 +78,19 @@ public interface ColumnTracker { * @return null, or a ColumnCount that we should seek to */ public ColumnCount getColumnHint(); + + /** + * Retrieve the MatchCode for the next row or column + */ + public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset, + int qualLength); + + /** + * Give the tracker a chance to declare it's done based on only the timestamp + * to allow an early out. + * + * @param timestamp + * @return true to early out based on timestamp. + */ + public boolean isDone(long timestamp); } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java index f48343269c8..520f7784145 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.NavigableSet; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; import org.apache.hadoop.hbase.util.Bytes; /** @@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.util.Bytes; public class ExplicitColumnTracker implements ColumnTracker { private final int maxVersions; + private final int minVersions; private final List columns; private final List columnsToReuse; private int index; @@ -55,23 +57,29 @@ public class ExplicitColumnTracker implements ColumnTracker { /** Keeps track of the latest timestamp included for current column. * Used to eliminate duplicates. */ private long latestTSOfCurrentColumn; + private long oldestStamp; /** * Default constructor. * @param columns columns specified user in query + * @param minVersions minimum number of versions to keep * @param maxVersions maximum versions to return per column + * @param ttl The timeToLive to enforce */ - public ExplicitColumnTracker(NavigableSet columns, int maxVersions) { + public ExplicitColumnTracker(NavigableSet columns, int minVersions, + int maxVersions, long ttl) { this.maxVersions = maxVersions; + this.minVersions = minVersions; + this.oldestStamp = System.currentTimeMillis() - ttl; this.columns = new ArrayList(columns.size()); this.columnsToReuse = new ArrayList(columns.size()); for(byte [] column : columns) { - this.columnsToReuse.add(new ColumnCount(column,maxVersions)); + this.columnsToReuse.add(new ColumnCount(column)); } reset(); } - /** + /** * Done when there are no more columns to match against. */ public boolean done() { @@ -108,7 +116,7 @@ public class ExplicitColumnTracker implements ColumnTracker { int ret = Bytes.compareTo(column.getBuffer(), column.getOffset(), column.getLength(), bytes, offset, length); - // Column Matches. If it is not a duplicate key, decrement versions left + // Column Matches. If it is not a duplicate key, increment the version count // and include. if(ret == 0) { //If column matches, check if it is a duplicate timestamp @@ -116,7 +124,8 @@ public class ExplicitColumnTracker implements ColumnTracker { //If duplicate, skip this Key return ScanQueryMatcher.MatchCode.SKIP; } - if(this.column.decrement() == 0) { + int count = this.column.increment(); + if(count >= maxVersions || (count >= minVersions && isExpired(timestamp))) { // Done with versions for this column this.columns.remove(this.index); resetTS(); @@ -185,11 +194,15 @@ public class ExplicitColumnTracker implements ColumnTracker { return timestamp == latestTSOfCurrentColumn; } + private boolean isExpired(long timestamp) { + return timestamp < oldestStamp; + } + private void buildColumnList() { this.columns.clear(); this.columns.addAll(this.columnsToReuse); for(ColumnCount col : this.columns) { - col.setCount(this.maxVersions); + col.setCount(0); } } @@ -227,5 +240,18 @@ public class ExplicitColumnTracker implements ColumnTracker { } } } + public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset, + int qualLength) { + doneWithColumn(bytes, offset,qualLength); + if (getColumnHint() == null) { + return MatchCode.SEEK_NEXT_ROW; + } else { + return MatchCode.SEEK_NEXT_COL; + } + } + + public boolean isDone(long timestamp) { + return minVersions <=0 && isExpired(timestamp); + } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 2390718a99a..30c9d69a040 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1297,9 +1297,8 @@ public class HRegion implements HeapSize { // , Writable{ this.readRequestsCount.increment(); try { Store store = getStore(family); - KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP); // get the closest key. (HStore.getRowKeyAtOrBefore can return null) - KeyValue key = store.getRowKeyAtOrBefore(kv); + KeyValue key = store.getRowKeyAtOrBefore(row); Result result = null; if (key != null) { Get get = new Get(key.getRow()); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java index 48dd8e970e0..a461e2143f0 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java @@ -53,9 +53,6 @@ public class ScanQueryMatcher { /** Key to seek to in memstore and StoreFiles */ protected KeyValue startKey; - /** Oldest allowed version stamp for TTL enforcement */ - protected long oldestStamp; - /** Row comparator for the region this query is for */ KeyValue.KeyComparator rowComparator; @@ -72,10 +69,9 @@ public class ScanQueryMatcher { */ public ScanQueryMatcher(Scan scan, byte [] family, NavigableSet columns, long ttl, - KeyValue.KeyComparator rowComparator, int maxVersions, + KeyValue.KeyComparator rowComparator, int minVersions, int maxVersions, boolean retainDeletesInOutput) { this.tr = scan.getTimeRange(); - this.oldestStamp = System.currentTimeMillis() - ttl; this.rowComparator = rowComparator; this.deletes = new ScanDeleteTracker(); this.stopRow = scan.getStopRow(); @@ -86,19 +82,26 @@ public class ScanQueryMatcher { // Single branch to deal with two types of reads (columns vs all in family) if (columns == null || columns.size() == 0) { // use a specialized scan for wildcard column tracker. - this.columns = new ScanWildcardColumnTracker(maxVersions); + this.columns = new ScanWildcardColumnTracker(minVersions, maxVersions, ttl); } else { // We can share the ExplicitColumnTracker, diff is we reset // between rows, not between storefiles. - this.columns = new ExplicitColumnTracker(columns,maxVersions); + this.columns = new ExplicitColumnTracker(columns, minVersions, maxVersions, + ttl); } } public ScanQueryMatcher(Scan scan, byte [] family, NavigableSet columns, long ttl, - KeyValue.KeyComparator rowComparator, int maxVersions) { + KeyValue.KeyComparator rowComparator, int minVersions, int maxVersions) { /* By default we will not include deletes */ /* deletes are included explicitly (for minor compaction) */ - this(scan, family, columns, ttl, rowComparator, maxVersions, false); + this(scan, family, columns, ttl, rowComparator, minVersions, maxVersions, + false); + } + public ScanQueryMatcher(Scan scan, byte [] family, + NavigableSet columns, long ttl, + KeyValue.KeyComparator rowComparator, int maxVersions) { + this(scan, family, columns, ttl, rowComparator, 0, maxVersions); } /** @@ -158,9 +161,9 @@ public class ScanQueryMatcher { (offset - initialOffset) - KeyValue.TIMESTAMP_TYPE_SIZE; long timestamp = kv.getTimestamp(); - if (isExpired(timestamp)) { - // done, the rest of this column will also be expired as well. - return getNextRowOrNextColumn(bytes, offset, qualLength); + // check for early out based on timestamp alone + if (columns.isDone(timestamp)) { + return columns.getNextRowOrNextColumn(bytes, offset, qualLength); } byte type = kv.getType(); @@ -192,7 +195,7 @@ public class ScanQueryMatcher { if (timestampComparison >= 1) { return MatchCode.SKIP; } else if (timestampComparison <= -1) { - return getNextRowOrNextColumn(bytes, offset, qualLength); + return columns.getNextRowOrNextColumn(bytes, offset, qualLength); } /** @@ -206,7 +209,7 @@ public class ScanQueryMatcher { if (filterResponse == ReturnCode.SKIP) { return MatchCode.SKIP; } else if (filterResponse == ReturnCode.NEXT_COL) { - return getNextRowOrNextColumn(bytes, offset, qualLength); + return columns.getNextRowOrNextColumn(bytes, offset, qualLength); } else if (filterResponse == ReturnCode.NEXT_ROW) { stickyNextRow = true; return MatchCode.SEEK_NEXT_ROW; @@ -228,23 +231,6 @@ public class ScanQueryMatcher { } - public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset, - int qualLength) { - if (columns instanceof ExplicitColumnTracker) { - //We only come here when we know that columns is an instance of - //ExplicitColumnTracker so we should never have a cast exception - ((ExplicitColumnTracker)columns).doneWithColumn(bytes, offset, - qualLength); - if (columns.getColumnHint() == null) { - return MatchCode.SEEK_NEXT_ROW; - } else { - return MatchCode.SEEK_NEXT_COL; - } - } else { - return MatchCode.SEEK_NEXT_COL; - } - } - public boolean moreRowsMayExistAfter(KeyValue kv) { if (!Bytes.equals(stopRow , HConstants.EMPTY_END_ROW) && rowComparator.compareRows(kv.getBuffer(),kv.getRowOffset(), @@ -278,10 +264,6 @@ public class ScanQueryMatcher { return (type != KeyValue.Type.Put.getCode()); } - protected boolean isExpired(long timestamp) { - return (timestamp < oldestStamp); - } - /** * * @return the start key diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java index 6a027d6e115..ce2bebfd39a 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java @@ -37,16 +37,22 @@ public class ScanWildcardColumnTracker implements ColumnTracker { private int columnLength = 0; private int currentCount = 0; private int maxVersions; + private int minVersions; /* Keeps track of the latest timestamp included for current column. * Used to eliminate duplicates. */ private long latestTSOfCurrentColumn; + private long oldestStamp; /** * Return maxVersions of every row. - * @param maxVersion + * @param minVersion Minimum number of versions to keep + * @param maxVersion Maximum number of versions to return + * @param ttl TimeToLive to enforce */ - public ScanWildcardColumnTracker(int maxVersion) { + public ScanWildcardColumnTracker(int minVersion, int maxVersion, long ttl) { this.maxVersions = maxVersion; + this.minVersions = minVersion; + this.oldestStamp = System.currentTimeMillis() - ttl; } /** @@ -65,16 +71,8 @@ public class ScanWildcardColumnTracker implements ColumnTracker { long timestamp) { if (columnBuffer == null) { // first iteration. - columnBuffer = bytes; - columnOffset = offset; - columnLength = length; - currentCount = 0; - - if (++currentCount > maxVersions) { - return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; - } - setTS(timestamp); - return ScanQueryMatcher.MatchCode.INCLUDE; + resetBuffer(bytes, offset, length); + return checkVersion(++currentCount, timestamp); } int cmp = Bytes.compareTo(bytes, offset, length, columnBuffer, columnOffset, columnLength); @@ -83,11 +81,7 @@ public class ScanWildcardColumnTracker implements ColumnTracker { if (sameAsPreviousTS(timestamp)) { return ScanQueryMatcher.MatchCode.SKIP; } - if (++currentCount > maxVersions) { - return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; // skip to next col - } - setTS(timestamp); - return ScanQueryMatcher.MatchCode.INCLUDE; + return checkVersion(++currentCount, timestamp); } resetTS(); @@ -95,14 +89,8 @@ public class ScanWildcardColumnTracker implements ColumnTracker { // new col > old col if (cmp > 0) { // switched columns, lets do something.x - columnBuffer = bytes; - columnOffset = offset; - columnLength = length; - currentCount = 0; - if (++currentCount > maxVersions) - return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; - setTS(timestamp); - return ScanQueryMatcher.MatchCode.INCLUDE; + resetBuffer(bytes, offset, length); + return checkVersion(++currentCount, timestamp); } // new col < oldcol @@ -111,18 +99,32 @@ public class ScanWildcardColumnTracker implements ColumnTracker { // was incorrectly stored into the store for this one. Continue, but // complain. LOG.error("ScanWildcardColumnTracker.checkColumn ran " + - "into a column actually smaller than the previous column: " + + "into a column actually smaller than the previous column: " + Bytes.toStringBinary(bytes, offset, length)); // switched columns + resetBuffer(bytes, offset, length); + return checkVersion(++currentCount, timestamp); + } + + private void resetBuffer(byte[] bytes, int offset, int length) { columnBuffer = bytes; columnOffset = offset; columnLength = length; currentCount = 0; - if (++currentCount > maxVersions) { - return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; + } + + private MatchCode checkVersion(int version, long timestamp) { + if (version > maxVersions) { + return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; // skip to next col } - setTS(timestamp); - return ScanQueryMatcher.MatchCode.INCLUDE; + // keep the KV if required by minversions or it is not expired, yet + if (version <= minVersions || !isExpired(timestamp)) { + setTS(timestamp); + return ScanQueryMatcher.MatchCode.INCLUDE; + } else { + return MatchCode.SEEK_NEXT_COL; + } + } @Override @@ -150,6 +152,10 @@ public class ScanWildcardColumnTracker implements ColumnTracker { return timestamp == latestTSOfCurrentColumn; } + private boolean isExpired(long timestamp) { + return timestamp < oldestStamp; + } + /** * Used by matcher and scan/get to get a hint of the next column * to seek to after checkColumn() returns SKIP. Returns the next interesting @@ -170,4 +176,14 @@ public class ScanWildcardColumnTracker implements ColumnTracker { public boolean done() { return false; } + + public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset, + int qualLength) { + return MatchCode.SEEK_NEXT_COL; + } + + public boolean isDone(long timestamp) { + return minVersions <=0 && isExpired(timestamp); + } + } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 5aedc47ea05..960bb3f3044 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -40,7 +40,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.HeapSize; -import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; @@ -52,8 +51,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.StringUtils; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -92,6 +89,7 @@ public class Store implements HeapSize { final Configuration conf; // ttl in milliseconds. protected long ttl; + protected int minVersions; long majorCompactionTime; private final int minFilesToCompact; private final int maxFilesToCompact; @@ -179,6 +177,7 @@ public class Store implements HeapSize { // second -> ms adjust for user data this.ttl *= 1000; } + this.minVersions = family.getMinVersions(); this.memstore = new MemStore(conf, this.comparator); this.storeNameStr = Bytes.toString(this.family.getName()); @@ -491,12 +490,14 @@ public class Store implements HeapSize { // A. Write the map out to the disk writer = createWriterInTmp(set.size()); writer.setTimeRangeTracker(snapshotTimeRangeTracker); - int entries = 0; try { for (KeyValue kv: set) { - if (!isExpired(kv, oldestTimestamp)) { + // If minVersion > 0 we will wait until the next compaction to + // collect expired KVs. (following the logic for maxVersions). + // TODO: As Jonathan Gray points this can be optimized + // (see HBASE-4241) + if (minVersions > 0 || !isExpired(kv, oldestTimestamp)) { writer.append(kv); - entries++; flushed += this.memstore.heapSizeChange(kv, true); } } @@ -717,7 +718,7 @@ public class Store implements HeapSize { // Ready to go. Have list of files to compact. StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId); // Move the compaction into place. - StoreFile sf = completeCompaction(filesToCompact, writer); + completeCompaction(filesToCompact, writer); } finally { synchronized (filesCompacting) { filesCompacting.removeAll(filesToCompact); @@ -1267,14 +1268,23 @@ public class Store implements HeapSize { * current container: i.e. we'll see deletes before we come across cells we * are to delete. Presumption is that the memstore#kvset is processed before * memstore#snapshot and so on. - * @param kv First possible item on targeted row; i.e. empty columns, latest - * timestamp and maximum type. + * @param row The row key of the targeted row. * @return Found keyvalue or null if none found. * @throws IOException */ - KeyValue getRowKeyAtOrBefore(final KeyValue kv) throws IOException { + KeyValue getRowKeyAtOrBefore(final byte[] row) throws IOException { + // If minVersions is set, we will not ignore expired KVs. + // As we're only looking for the latest matches, that should be OK. + // With minVersions > 0 we guarantee that any KV that has any version + // at all (expired or not) has at least one version that will not expire. + // Note that this method used to take a KeyValue as arguments. KeyValue + // can be back-dated, a row key cannot. + long ttlToUse = this.minVersions > 0 ? Long.MAX_VALUE : this.ttl; + + KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP); + GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker( - this.comparator, kv, this.ttl, this.region.getRegionInfo().isMetaRegion()); + this.comparator, kv, ttlToUse, this.region.getRegionInfo().isMetaRegion()); this.lock.readLock().lock(); try { // First go to the memstore. Pick up deletes and candidates. diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 4b624c07fe5..a456f5287c7 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -64,7 +64,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb this.cacheBlocks = scan.getCacheBlocks(); matcher = new ScanQueryMatcher(scan, store.getFamily().getName(), columns, store.ttl, store.comparator.getRawComparator(), - store.versionsToReturn(scan.getMaxVersions()), + store.minVersions, store.versionsToReturn(scan.getMaxVersions()), false); this.isGet = scan.isGetScan(); @@ -98,7 +98,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb this.cacheBlocks = false; this.isGet = false; matcher = new ScanQueryMatcher(scan, store.getFamily().getName(), - null, store.ttl, store.comparator.getRawComparator(), + null, store.ttl, store.comparator.getRawComparator(), store.minVersions, store.versionsToReturn(scan.getMaxVersions()), retainDeletesInOutput); // Seek all scanners to the initial key @@ -120,7 +120,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb this.isGet = false; this.cacheBlocks = scan.getCacheBlocks(); this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl, - comparator.getRawComparator(), scan.getMaxVersions(), false); + comparator.getRawComparator(), 0, scan.getMaxVersions(), false); // Seek all scanners to the initial key for(KeyValueScanner scanner : scanners) { diff --git a/src/main/ruby/hbase/admin.rb b/src/main/ruby/hbase/admin.rb index 4460d6e18ec..acabaa81c6c 100644 --- a/src/main/ruby/hbase/admin.rb +++ b/src/main/ruby/hbase/admin.rb @@ -408,6 +408,7 @@ module Hbase family.setCompressionType(org.apache.hadoop.hbase.io.hfile.Compression::Algorithm.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION) family.setBlocksize(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE) family.setMaxVersions(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS) + family.setMinVersions(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER) bloomtype = arg[org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER].upcase unless org.apache.hadoop.hbase.regionserver.StoreFile::BloomType.constants.include?(bloomtype) diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java index bf8004b1764..3f2f41e7e26 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java @@ -197,20 +197,33 @@ public abstract class HBaseTestCase extends TestCase { */ protected HTableDescriptor createTableDescriptor(final String name, final int versions) { + return createTableDescriptor(name, HColumnDescriptor.DEFAULT_MIN_VERSIONS, + versions, HConstants.FOREVER); + } + + /** + * Create a table of name name with {@link COLUMNS} for + * families. + * @param name Name to give table. + * @param versions How many versions to allow per column. + * @return Column descriptor. + */ + protected HTableDescriptor createTableDescriptor(final String name, + final int minVersions, final int versions, final int ttl) { HTableDescriptor htd = new HTableDescriptor(name); - htd.addFamily(new HColumnDescriptor(fam1, versions, + htd.addFamily(new HColumnDescriptor(fam1, minVersions, versions, HColumnDescriptor.DEFAULT_COMPRESSION, false, false, - HColumnDescriptor.DEFAULT_BLOCKSIZE, HConstants.FOREVER, + HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl, HColumnDescriptor.DEFAULT_BLOOMFILTER, HConstants.REPLICATION_SCOPE_LOCAL)); - htd.addFamily(new HColumnDescriptor(fam2, versions, + htd.addFamily(new HColumnDescriptor(fam2, minVersions, versions, HColumnDescriptor.DEFAULT_COMPRESSION, false, false, - HColumnDescriptor.DEFAULT_BLOCKSIZE, HConstants.FOREVER, + HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl, HColumnDescriptor.DEFAULT_BLOOMFILTER, HConstants.REPLICATION_SCOPE_LOCAL)); - htd.addFamily(new HColumnDescriptor(fam3, versions, + htd.addFamily(new HColumnDescriptor(fam3, minVersions, versions, HColumnDescriptor.DEFAULT_COMPRESSION, false, false, - HColumnDescriptor.DEFAULT_BLOCKSIZE, HConstants.FOREVER, + HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl, HColumnDescriptor.DEFAULT_BLOOMFILTER, HConstants.REPLICATION_SCOPE_LOCAL)); return htd; diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java index 10b7f962b19..49ab86dd74b 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java @@ -44,7 +44,7 @@ public class TestExplicitColumnTracker extends HBaseTestCase { List scannerColumns, List expected) { ColumnTracker exp = new ExplicitColumnTracker( - trackColumns, maxVersions); + trackColumns, 0, maxVersions, Long.MAX_VALUE); //Initialize result @@ -161,7 +161,8 @@ public class TestExplicitColumnTracker extends HBaseTestCase { columns.add(Bytes.toBytes("col"+i)); } - ColumnTracker explicit = new ExplicitColumnTracker(columns, maxVersions); + ColumnTracker explicit = new ExplicitColumnTracker(columns, 0, maxVersions, + Long.MAX_VALUE); for (int i = 0; i < 100000; i+=2) { byte [] col = Bytes.toBytes("col"+i); explicit.checkColumn(col, 0, col.length, 1); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java new file mode 100644 index 00000000000..cf2fcf48627 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java @@ -0,0 +1,423 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.filter.TimestampsFilter; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Test Minimum Versions feature (HBASE-4071). + */ +public class TestMinVersions extends HBaseTestCase { + private final byte[] T0 = Bytes.toBytes("0"); + private final byte[] T1 = Bytes.toBytes("1"); + private final byte[] T2 = Bytes.toBytes("2"); + private final byte[] T3 = Bytes.toBytes("3"); + private final byte[] T4 = Bytes.toBytes("4"); + private final byte[] T5 = Bytes.toBytes("5"); + + private final byte[] c0 = COLUMNS[0]; + + /** + * Verify behavior of getClosestBefore(...) + */ + public void testGetClosestBefore() throws Exception { + HTableDescriptor htd = createTableDescriptor(getName(), 1, 1000, 1); + HRegion region = createNewHRegion(htd, null, null); + + long ts = System.currentTimeMillis() - 2000; // 2s in the past + + Put p = new Put(T1, ts); + p.add(c0, c0, T1); + region.put(p); + + p = new Put(T1, ts+1); + p.add(c0, c0, T4); + region.put(p); + + p = new Put(T3, ts); + p.add(c0, c0, T3); + region.put(p); + + // now make sure that getClosestBefore(...) get can + // rows that would be expired without minVersion. + // also make sure it gets the latest version + Result r = region.getClosestRowBefore(T1, c0); + checkResult(r, c0, T4); + + r = region.getClosestRowBefore(T2, c0); + checkResult(r, c0, T4); + + // now flush/compact + region.flushcache(); + region.compactStores(true); + + r = region.getClosestRowBefore(T1, c0); + checkResult(r, c0, T4); + + r = region.getClosestRowBefore(T2, c0); + checkResult(r, c0, T4); + } + + /** + * Test mixed memstore and storefile scanning + * with minimum versions. + */ + public void testStoreMemStore() throws Exception { + // keep 3 versions minimum + HTableDescriptor htd = createTableDescriptor(getName(), 3, 1000, 1); + HRegion region = createNewHRegion(htd, null, null); + + long ts = System.currentTimeMillis() - 2000; // 2s in the past + + Put p = new Put(T1, ts-1); + p.add(c0, c0, T2); + region.put(p); + + p = new Put(T1, ts-3); + p.add(c0, c0, T0); + region.put(p); + + // now flush/compact + region.flushcache(); + region.compactStores(true); + + p = new Put(T1, ts); + p.add(c0, c0, T3); + region.put(p); + + p = new Put(T1, ts-2); + p.add(c0, c0, T1); + region.put(p); + + p = new Put(T1, ts-3); + p.add(c0, c0, T0); + region.put(p); + + // newest version in the memstore + // the 2nd oldest in the store file + // and the 3rd, 4th oldest also in the memstore + + Get g = new Get(T1); + g.setMaxVersions(); + Result r = region.get(g, null); // this'll use ScanWildcardColumnTracker + checkResult(r, c0, T3,T2,T1); + + g = new Get(T1); + g.setMaxVersions(); + g.addColumn(c0, c0); + r = region.get(g, null); // this'll use ExplicitColumnTracker + checkResult(r, c0, T3,T2,T1); + } + + /** + * Make sure the Deletes behave as expected with minimum versions + */ + public void testDelete() throws Exception { + HTableDescriptor htd = createTableDescriptor(getName(), 3, 1000, 1); + HRegion region = createNewHRegion(htd, null, null); + + long ts = System.currentTimeMillis() - 2000; // 2s in the past + + Put p = new Put(T1, ts-2); + p.add(c0, c0, T1); + region.put(p); + + p = new Put(T1, ts-1); + p.add(c0, c0, T2); + region.put(p); + + p = new Put(T1, ts); + p.add(c0, c0, T3); + region.put(p); + + Delete d = new Delete(T1, ts-1, null); + region.delete(d, null, true); + + Get g = new Get(T1); + g.setMaxVersions(); + Result r = region.get(g, null); // this'll use ScanWildcardColumnTracker + checkResult(r, c0, T3); + + g = new Get(T1); + g.setMaxVersions(); + g.addColumn(c0, c0); + r = region.get(g, null); // this'll use ExplicitColumnTracker + checkResult(r, c0, T3); + + // now flush/compact + region.flushcache(); + region.compactStores(true); + + // try again + g = new Get(T1); + g.setMaxVersions(); + r = region.get(g, null); // this'll use ScanWildcardColumnTracker + checkResult(r, c0, T3); + + g = new Get(T1); + g.setMaxVersions(); + g.addColumn(c0, c0); + r = region.get(g, null); // this'll use ExplicitColumnTracker + checkResult(r, c0, T3); + } + + /** + * Make sure the memstor behaves correctly with minimum versions + */ + public void testMemStore() throws Exception { + HTableDescriptor htd = createTableDescriptor(getName(), 2, 1000, 1); + HRegion region = createNewHRegion(htd, null, null); + + long ts = System.currentTimeMillis() - 2000; // 2s in the past + + // 2nd version + Put p = new Put(T1, ts-2); + p.add(c0, c0, T2); + region.put(p); + + // 3rd version + p = new Put(T1, ts-1); + p.add(c0, c0, T3); + region.put(p); + + // 4th version + p = new Put(T1, ts); + p.add(c0, c0, T4); + region.put(p); + + // now flush/compact + region.flushcache(); + region.compactStores(true); + + // now put the first version (backdated) + p = new Put(T1, ts-3); + p.add(c0, c0, T1); + region.put(p); + + // now the latest change is in the memstore, + // but it is not the latest version + + Result r = region.get(new Get(T1), null); + checkResult(r, c0, T4); + + Get g = new Get(T1); + g.setMaxVersions(); + r = region.get(g, null); // this'll use ScanWildcardColumnTracker + checkResult(r, c0, T4,T3); + + g = new Get(T1); + g.setMaxVersions(); + g.addColumn(c0, c0); + r = region.get(g, null); // this'll use ExplicitColumnTracker + checkResult(r, c0, T4,T3); + + p = new Put(T1, ts+1); + p.add(c0, c0, T5); + region.put(p); + + // now the latest version is in the memstore + + g = new Get(T1); + g.setMaxVersions(); + r = region.get(g, null); // this'll use ScanWildcardColumnTracker + checkResult(r, c0, T5,T4); + + g = new Get(T1); + g.setMaxVersions(); + g.addColumn(c0, c0); + r = region.get(g, null); // this'll use ExplicitColumnTracker + checkResult(r, c0, T5,T4); + } + + /** + * Verify basic minimum versions functionality + */ + public void testBaseCase() throws Exception { + // 1 version minimum, 1000 versions maximum, ttl = 1s + HTableDescriptor htd = createTableDescriptor(getName(), 2, 1000, 1); + HRegion region = createNewHRegion(htd, null, null); + + long ts = System.currentTimeMillis() - 2000; // 2s in the past + + // 1st version + Put p = new Put(T1, ts-3); + p.add(c0, c0, T1); + region.put(p); + + // 2nd version + p = new Put(T1, ts-2); + p.add(c0, c0, T2); + region.put(p); + + // 3rd version + p = new Put(T1, ts-1); + p.add(c0, c0, T3); + region.put(p); + + // 4th version + p = new Put(T1, ts); + p.add(c0, c0, T4); + region.put(p); + + Result r = region.get(new Get(T1), null); + checkResult(r, c0, T4); + + Get g = new Get(T1); + g.setTimeRange(0L, ts+1); + r = region.get(g, null); + checkResult(r, c0, T4); + + // oldest version still exists + g.setTimeRange(0L, ts-2); + r = region.get(g, null); + checkResult(r, c0, T1); + + // gets see only available versions + // even before compactions + g = new Get(T1); + g.setMaxVersions(); + r = region.get(g, null); // this'll use ScanWildcardColumnTracker + checkResult(r, c0, T4,T3); + + g = new Get(T1); + g.setMaxVersions(); + g.addColumn(c0, c0); + r = region.get(g, null); // this'll use ExplicitColumnTracker + checkResult(r, c0, T4,T3); + + // now flush + region.flushcache(); + region.compactStores(); + + // oldest version still exists + // flushing/minor compactions can't get rid of these, anymore + g = new Get(T1); + g.setTimeRange(0L, ts-2); + r = region.get(g, null); + checkResult(r, c0, T1); + + // major compaction + region.compactStores(true); + + // after compaction the 4th version is still available + g = new Get(T1); + g.setTimeRange(0L, ts+1); + r = region.get(g, null); + checkResult(r, c0, T4); + + // so is the 3rd + g.setTimeRange(0L, ts); + r = region.get(g, null); + checkResult(r, c0, T3); + + // but the 2nd and earlier versions are gone + g.setTimeRange(0L, ts-1); + r = region.get(g, null); + assertTrue(r.isEmpty()); + } + + /** + * Verify that basic filters still behave correctly with + * minimum versions enabled. + */ + public void testFilters() throws Exception { + HTableDescriptor htd = createTableDescriptor(getName(), 2, 1000, 1); + HRegion region = createNewHRegion(htd, null, null); + final byte [] c1 = COLUMNS[1]; + + long ts = System.currentTimeMillis() - 2000; // 2s in the past + + Put p = new Put(T1, ts-3); + p.add(c0, c0, T0); + p.add(c1, c1, T0); + region.put(p); + + p = new Put(T1, ts-2); + p.add(c0, c0, T1); + p.add(c1, c1, T1); + region.put(p); + + p = new Put(T1, ts-1); + p.add(c0, c0, T2); + p.add(c1, c1, T2); + region.put(p); + + p = new Put(T1, ts); + p.add(c0, c0, T3); + p.add(c1, c1, T3); + region.put(p); + + List tss = new ArrayList(); + tss.add(ts-1); + tss.add(ts-2); + + Get g = new Get(T1); + g.addColumn(c1,c1); + g.setFilter(new TimestampsFilter(tss)); + g.setMaxVersions(); + Result r = region.get(g, null); + checkResult(r, c1, T2,T1); + + g = new Get(T1); + g.addColumn(c0,c0); + g.setFilter(new TimestampsFilter(tss)); + g.setMaxVersions(); + r = region.get(g, null); + checkResult(r, c0, T2,T1); + + // now flush/compact + region.flushcache(); + region.compactStores(true); + + g = new Get(T1); + g.addColumn(c1,c1); + g.setFilter(new TimestampsFilter(tss)); + g.setMaxVersions(); + r = region.get(g, null); + checkResult(r, c1, T2); + + g = new Get(T1); + g.addColumn(c0,c0); + g.setFilter(new TimestampsFilter(tss)); + g.setMaxVersions(); + r = region.get(g, null); + checkResult(r, c0, T2); +} + + private void checkResult(Result r, byte[] col, byte[] ... vals) { + assertEquals(r.size(), vals.length); + List kvs = r.getColumn(col, col); + assertEquals(kvs.size(), vals.length); + for (int i=0;i qualifiers = new ArrayList(); @@ -65,7 +65,7 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase { public void testCheckColumn_EnforceVersions() { ScanWildcardColumnTracker tracker = - new ScanWildcardColumnTracker(VERSIONS); + new ScanWildcardColumnTracker(0, VERSIONS, Long.MAX_VALUE); //Create list of qualifiers List qualifiers = new ArrayList(); @@ -98,7 +98,7 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase { public void DisabledTestCheckColumn_WrongOrder() { ScanWildcardColumnTracker tracker = - new ScanWildcardColumnTracker(VERSIONS); + new ScanWildcardColumnTracker(0, VERSIONS, Long.MAX_VALUE); //Create list of qualifiers List qualifiers = new ArrayList();