diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java index f8a7b285376..8992a56056f 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.filter.Filter.ReturnCode; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.regionserver.DeleteTracker.DeleteResult; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType; @@ -103,6 +104,27 @@ public class ScanQueryMatcher { * */ private boolean hasNullColumn = true; + // By default, when hbase.hstore.time.to.purge.deletes is 0ms, a delete + // marker is always removed during a major compaction. If set to non-zero + // value then major compaction will try to keep a delete marker around for + // the given number of milliseconds. We want to keep the delete markers + // around a bit longer because old puts might appear out-of-order. For + // example, during log replication between two clusters. + // + // If the delete marker has lived longer than its column-family's TTL then + // the delete marker will be removed even if time.to.purge.deletes has not + // passed. This is because all the Puts that this delete marker can influence + // would have also expired. (Removing of delete markers on col family TTL will + // not happen if min-versions is set to non-zero) + // + // But, if time.to.purge.deletes has not expired then a delete + // marker will not be removed just because there are no Puts that it is + // currently influencing. This is because Puts, that this delete can + // influence. may appear out of order. + private final long timeToPurgeDeletes; + + private final boolean isUserScan; + /** * Construct a QueryMatcher for a scan * @param scan @@ -113,8 +135,7 @@ public class ScanQueryMatcher { */ public ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo, NavigableSet columns, StoreScanner.ScanType scanType, - long readPointToUse, - long earliestPutTs) { + long readPointToUse, long earliestPutTs) { this.tr = scan.getTimeRange(); this.rowComparator = scanInfo.getComparator().getRawComparator(); this.deletes = new ScanDeleteTracker(); @@ -124,14 +145,16 @@ public class ScanQueryMatcher { this.filter = scan.getFilter(); this.earliestPutTs = earliestPutTs; this.maxReadPointToTrackVersions = readPointToUse; + this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes(); /* how to deal with deletes */ + this.isUserScan = scanType == ScanType.USER_SCAN; // keep deleted cells: if compaction or raw scan - this.keepDeletedCells = (scanInfo.getKeepDeletedCells() && scanType != ScanType.USER_SCAN) || scan.isRaw(); + this.keepDeletedCells = (scanInfo.getKeepDeletedCells() && !isUserScan) || scan.isRaw(); // retain deletes: if minor compaction or raw scan this.retainDeletesInOutput = scanType == ScanType.MINOR_COMPACT || scan.isRaw(); // seePastDeleteMarker: user initiated scans - this.seePastDeleteMarkers = scanInfo.getKeepDeletedCells() && scanType == ScanType.USER_SCAN; + this.seePastDeleteMarkers = scanInfo.getKeepDeletedCells() && isUserScan; int maxVersions = Math.min(scan.getMaxVersions(), scanInfo.getMaxVersions()); // Single branch to deal with two types of reads (columns vs all in family) @@ -261,8 +284,12 @@ public class ScanQueryMatcher { } // Can't early out now, because DelFam come before any other keys } - if (retainDeletesInOutput) { - // always include + if (retainDeletesInOutput || + (!isUserScan && + (EnvironmentEdgeManager.currentTimeMillis() - timestamp) <= + timeToPurgeDeletes)) { + // always include or it is not time yet to check whether it is OK + // to purge deltes or not return MatchCode.INCLUDE; } else if (keepDeletedCells) { if (timestamp < earliestPutTs) { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java index 1f1115dd6f2..f3655125f89 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * Keeps track of the columns for a scan if they are not explicitly specified @@ -53,7 +54,7 @@ public class ScanWildcardColumnTracker implements ColumnTracker { public ScanWildcardColumnTracker(int minVersion, int maxVersion, long ttl) { this.maxVersions = maxVersion; this.minVersions = minVersion; - this.oldestStamp = System.currentTimeMillis() - ttl; + this.oldestStamp = EnvironmentEdgeManager.currentTimeMillis() - ttl; } /** diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 52056f276ad..12639f2cba2 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -193,9 +193,14 @@ public class Store extends SchemaConfigured implements HeapSize { // second -> ms adjust for user data this.ttl *= 1000; } + // used by ScanQueryMatcher + long timeToPurgeDeletes = + Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); + LOG.info("time to purge deletes set to " + timeToPurgeDeletes + + "ms in store " + this); scanInfo = new ScanInfo(family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, family.getKeepDeletedCells(), - this.comparator); + timeToPurgeDeletes, this.comparator); this.memstore = new MemStore(conf, this.comparator); this.storeNameStr = getColumnFamilyName(); @@ -2039,6 +2044,7 @@ public class Store extends SchemaConfigured implements HeapSize { private int maxVersions; private long ttl; private boolean keepDeletedCells; + private long timeToPurgeDeletes; private KVComparator comparator; public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT @@ -2050,17 +2056,21 @@ public class Store extends SchemaConfigured implements HeapSize { * @param minVersions Store's MIN_VERSIONS setting * @param maxVersions Store's VERSIONS setting * @param ttl Store's TTL (in ms) + * @param timeToPurgeDeletes duration in ms after which a delete marker can + * be purged during a major compaction. * @param keepDeletedCells Store's keepDeletedCells setting * @param comparator The store's comparator */ public ScanInfo(byte[] family, int minVersions, int maxVersions, long ttl, - boolean keepDeletedCells, KVComparator comparator) { + boolean keepDeletedCells, long timeToPurgeDeletes, + KVComparator comparator) { this.family = family; this.minVersions = minVersions; this.maxVersions = maxVersions; this.ttl = ttl; this.keepDeletedCells = keepDeletedCells; + this.timeToPurgeDeletes = timeToPurgeDeletes; this.comparator = comparator; } @@ -2083,6 +2093,10 @@ public class Store extends SchemaConfigured implements HeapSize { public boolean getKeepDeletedCells() { return keepDeletedCells; } + + public long getTimeToPurgeDeletes() { + return timeToPurgeDeletes; + } public KVComparator getComparator() { return comparator; diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index e5e8789f14e..3a4d2e9df4d 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -155,10 +155,19 @@ class StoreScanner extends NonLazyKeyValueScanner StoreScanner(final Scan scan, Store.ScanInfo scanInfo, StoreScanner.ScanType scanType, final NavigableSet columns, final List scanners) throws IOException { + this(scan, scanInfo, scanType, columns, scanners, + HConstants.LATEST_TIMESTAMP); + } + + // Constructor for testing. + StoreScanner(final Scan scan, Store.ScanInfo scanInfo, + StoreScanner.ScanType scanType, final NavigableSet columns, + final List scanners, long earliestPutTs) + throws IOException { this(null, scan.getCacheBlocks(), scan, columns); this.initializeMetricNames(); this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType, - Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP); + Long.MAX_VALUE, earliestPutTs); // Seek all scanners to the initial key for (KeyValueScanner scanner : scanners) { diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index fc51b3d7ba5..ab26f54b1d8 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -234,7 +234,7 @@ public class TestCompaction extends HBaseTestCase { Store.ScanInfo old = store.scanInfo; Store.ScanInfo si = new Store.ScanInfo(old.getFamily(), old.getMinVersions(), old.getMaxVersions(), ttl, - old.getKeepDeletedCells(), old.getComparator()); + old.getKeepDeletedCells(), 0, old.getComparator()); store.scanInfo = si; } Thread.sleep(1000); @@ -445,7 +445,7 @@ public class TestCompaction extends HBaseTestCase { Store.ScanInfo old = store.scanInfo; Store.ScanInfo si = new Store.ScanInfo(old.getFamily(), old.getMinVersions(), old.getMaxVersions(), ttl, - old.getKeepDeletedCells(), old.getComparator()); + old.getKeepDeletedCells(), 0, old.getComparator()); store.scanInfo = si; } Thread.sleep(ttl); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java index 0ffdc556e73..c1336c38c50 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java @@ -90,7 +90,7 @@ public class TestMemStore extends TestCase { List result = new ArrayList(); MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); ScanInfo scanInfo = new ScanInfo(null, 0, 1, HConstants.LATEST_TIMESTAMP, false, - this.memstore.comparator); + 0, this.memstore.comparator); ScanType scanType = ScanType.USER_SCAN; StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); int count = 0; @@ -554,7 +554,7 @@ public class TestMemStore extends TestCase { //starting from each row, validate results should contain the starting row for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) { ScanInfo scanInfo = new ScanInfo(FAMILY, 0, 1, Integer.MAX_VALUE, false, - this.memstore.comparator); + 0, this.memstore.comparator); ScanType scanType = ScanType.USER_SCAN; InternalScanner scanner = new StoreScanner(new Scan( Bytes.toBytes(startRowId)), scanInfo, scanType, null, diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java index 374130c908b..7d4b748e142 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java @@ -98,7 +98,7 @@ public class TestQueryMatcher extends HBaseTestCase { // 2,4,5 ScanQueryMatcher qm = new ScanQueryMatcher(scan, new Store.ScanInfo(fam2, - 0, 1, ttl, false, rowComparator), get.getFamilyMap().get(fam2)); + 0, 1, ttl, false, 0, rowComparator), get.getFamilyMap().get(fam2)); List memstore = new ArrayList(); memstore.add(new KeyValue(row1, fam2, col1, 1, data)); @@ -142,7 +142,7 @@ public class TestQueryMatcher extends HBaseTestCase { expected.add(ScanQueryMatcher.MatchCode.DONE); ScanQueryMatcher qm = new ScanQueryMatcher(scan, new Store.ScanInfo(fam2, - 0, 1, ttl, false, rowComparator), null); + 0, 1, ttl, false, 0, rowComparator), null); List memstore = new ArrayList(); memstore.add(new KeyValue(row1, fam2, col1, 1, data)); @@ -193,7 +193,7 @@ public class TestQueryMatcher extends HBaseTestCase { }; ScanQueryMatcher qm = new ScanQueryMatcher(scan, new Store.ScanInfo(fam2, - 0, 1, testTTL, false, rowComparator), get.getFamilyMap().get(fam2)); + 0, 1, testTTL, false, 0, rowComparator), get.getFamilyMap().get(fam2)); long now = System.currentTimeMillis(); KeyValue [] kvs = new KeyValue[] { @@ -245,7 +245,7 @@ public class TestQueryMatcher extends HBaseTestCase { }; ScanQueryMatcher qm = new ScanQueryMatcher(scan, new Store.ScanInfo(fam2, - 0, 1, testTTL, false, rowComparator), null); + 0, 1, testTTL, false, 0, rowComparator), null); long now = System.currentTimeMillis(); KeyValue [] kvs = new KeyValue[] { diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index cbd9e081b0b..d0cda62d3f7 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -22,12 +22,16 @@ package org.apache.hadoop.hbase.regionserver; import junit.framework.TestCase; import org.apache.hadoop.hbase.*; + import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.Store.ScanInfo; import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.util.Bytes; import org.junit.experimental.categories.Category; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import java.io.IOException; import java.util.ArrayList; @@ -41,8 +45,8 @@ import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixtu public class TestStoreScanner extends TestCase { private static final String CF_STR = "cf"; final byte [] CF = Bytes.toBytes(CF_STR); - private ScanInfo scanInfo = new ScanInfo(CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE, false, - KeyValue.COMPARATOR); + private ScanInfo scanInfo = new ScanInfo(CF, 0, Integer.MAX_VALUE, + Long.MAX_VALUE, false, 0, KeyValue.COMPARATOR); private ScanType scanType = ScanType.USER_SCAN; public void setUp() throws Exception { @@ -411,7 +415,7 @@ public class TestStoreScanner extends TestCase { List scanners = scanFixture(kvs); Scan scan = new Scan(); scan.setMaxVersions(1); - ScanInfo scanInfo = new ScanInfo(CF, 0, 1, 500, false, + ScanInfo scanInfo = new ScanInfo(CF, 0, 1, 500, false, 0, KeyValue.COMPARATOR); ScanType scanType = ScanType.USER_SCAN; StoreScanner scanner = @@ -482,7 +486,8 @@ public class TestStoreScanner extends TestCase { Scan scan = new Scan(); scan.setMaxVersions(1); // scanner with ttl equal to 500 - ScanInfo scanInfo = new ScanInfo(CF, 0, 1, 500, false, KeyValue.COMPARATOR); + ScanInfo scanInfo = new ScanInfo(CF, 0, 1, 500, false, 0, + KeyValue.COMPARATOR); ScanType scanType = ScanType.USER_SCAN; StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners); @@ -495,4 +500,71 @@ public class TestStoreScanner extends TestCase { assertEquals(false, scanner.next(results)); } + + public void testDeleteMarkerLongevity() throws Exception { + final long now = System.currentTimeMillis(); + EnvironmentEdgeManagerTestHelper.injectEdge(new EnvironmentEdge() { + public long currentTimeMillis() { + return now; + } + }); + KeyValue [] kvs = new KeyValue[] { + /*0*/ new KeyValue(Bytes.toBytes("R1"), Bytes.toBytes("cf"), null, + now-100, KeyValue.Type.DeleteFamily), // live + /*1*/ new KeyValue(Bytes.toBytes("R1"), Bytes.toBytes("cf"), null, + now-1000, KeyValue.Type.DeleteFamily), // expired + /*2*/ KeyValueTestUtil.create("R1", "cf", "a", now-50, + KeyValue.Type.Put, "v3"), // live + /*3*/ KeyValueTestUtil.create("R1", "cf", "a", now-55, + KeyValue.Type.Delete, "dontcare"), // live + /*4*/ KeyValueTestUtil.create("R1", "cf", "a", now-55, + KeyValue.Type.Put, "deleted-version v2"), // deleted + /*5*/ KeyValueTestUtil.create("R1", "cf", "a", now-60, + KeyValue.Type.Put, "v1"), // live + /*6*/ KeyValueTestUtil.create("R1", "cf", "a", now-65, + KeyValue.Type.Put, "v0"), // max-version reached + /*7*/ KeyValueTestUtil.create("R1", "cf", "a", + now-100, KeyValue.Type.DeleteColumn, "dont-care"), // max-version + /*8*/ KeyValueTestUtil.create("R1", "cf", "b", now-600, + KeyValue.Type.DeleteColumn, "dont-care"), //expired + /*9*/ KeyValueTestUtil.create("R1", "cf", "b", now-70, + KeyValue.Type.Put, "v2"), //live + /*10*/ KeyValueTestUtil.create("R1", "cf", "b", now-750, + KeyValue.Type.Put, "v1"), //expired + /*11*/ KeyValueTestUtil.create("R1", "cf", "c", now-500, + KeyValue.Type.Delete, "dontcare"), //expired + /*12*/ KeyValueTestUtil.create("R1", "cf", "c", now-600, + KeyValue.Type.Put, "v1"), //expired + /*13*/ KeyValueTestUtil.create("R1", "cf", "c", now-1000, + KeyValue.Type.Delete, "dontcare"), //expired + /*14*/ KeyValueTestUtil.create("R1", "cf", "d", now-60, + KeyValue.Type.Put, "expired put"), //live + /*15*/ KeyValueTestUtil.create("R1", "cf", "d", now-100, + KeyValue.Type.Delete, "not-expired delete"), //live + }; + List scanners = scanFixture(kvs); + Scan scan = new Scan(); + scan.setMaxVersions(2); + Store.ScanInfo scanInfo = new Store.ScanInfo(Bytes.toBytes("cf"), + 0 /* minVersions */, + 2 /* maxVersions */, 500 /* ttl */, + false /* keepDeletedCells */, + 200, /* timeToPurgeDeletes */ + KeyValue.COMPARATOR); + StoreScanner scanner = + new StoreScanner(scan, scanInfo, + StoreScanner.ScanType.MAJOR_COMPACT, null, scanners, + HConstants.OLDEST_TIMESTAMP); + List results = new ArrayList(); + results = new ArrayList(); + assertEquals(true, scanner.next(results)); + assertEquals(kvs[0], results.get(0)); + assertEquals(kvs[2], results.get(1)); + assertEquals(kvs[3], results.get(2)); + assertEquals(kvs[5], results.get(3)); + assertEquals(kvs[9], results.get(4)); + assertEquals(kvs[14], results.get(5)); + assertEquals(kvs[15], results.get(6)); + assertEquals(7, results.size()); + } }