HBASE-4721 Retain Delete Markers after Major Compaction
Summary: major compactions *try* to retain delete markers for the configured time interval Test Plan: added a new test testDeleteMarkerLongevity() in TestStoreScanner following tests passed TestMemStore TestStoreScanner TestQueryMatcher TestCompaction Reviewers: jgray, dhruba, lhofhansl, Karthik, nspiegelberg Reviewed By: nspiegelberg CC: HBase Diffs Facebook Group, lhofhansl, khemani, nspiegelberg Differential Revision: 321 git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1208055 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a10fb0ccad
commit
99263fecfc
|
@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
|
|||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.regionserver.DeleteTracker.DeleteResult;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
|
||||
|
||||
|
@ -103,6 +104,27 @@ public class ScanQueryMatcher {
|
|||
* */
|
||||
private boolean hasNullColumn = true;
|
||||
|
||||
// By default, when hbase.hstore.time.to.purge.deletes is 0ms, a delete
|
||||
// marker is always removed during a major compaction. If set to non-zero
|
||||
// value then major compaction will try to keep a delete marker around for
|
||||
// the given number of milliseconds. We want to keep the delete markers
|
||||
// around a bit longer because old puts might appear out-of-order. For
|
||||
// example, during log replication between two clusters.
|
||||
//
|
||||
// If the delete marker has lived longer than its column-family's TTL then
|
||||
// the delete marker will be removed even if time.to.purge.deletes has not
|
||||
// passed. This is because all the Puts that this delete marker can influence
|
||||
// would have also expired. (Removing of delete markers on col family TTL will
|
||||
// not happen if min-versions is set to non-zero)
|
||||
//
|
||||
// But, if time.to.purge.deletes has not expired then a delete
|
||||
// marker will not be removed just because there are no Puts that it is
|
||||
// currently influencing. This is because Puts, that this delete can
|
||||
// influence. may appear out of order.
|
||||
private final long timeToPurgeDeletes;
|
||||
|
||||
private final boolean isUserScan;
|
||||
|
||||
/**
|
||||
* Construct a QueryMatcher for a scan
|
||||
* @param scan
|
||||
|
@ -113,8 +135,7 @@ public class ScanQueryMatcher {
|
|||
*/
|
||||
public ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo,
|
||||
NavigableSet<byte[]> columns, StoreScanner.ScanType scanType,
|
||||
long readPointToUse,
|
||||
long earliestPutTs) {
|
||||
long readPointToUse, long earliestPutTs) {
|
||||
this.tr = scan.getTimeRange();
|
||||
this.rowComparator = scanInfo.getComparator().getRawComparator();
|
||||
this.deletes = new ScanDeleteTracker();
|
||||
|
@ -124,14 +145,16 @@ public class ScanQueryMatcher {
|
|||
this.filter = scan.getFilter();
|
||||
this.earliestPutTs = earliestPutTs;
|
||||
this.maxReadPointToTrackVersions = readPointToUse;
|
||||
this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes();
|
||||
|
||||
/* how to deal with deletes */
|
||||
this.isUserScan = scanType == ScanType.USER_SCAN;
|
||||
// keep deleted cells: if compaction or raw scan
|
||||
this.keepDeletedCells = (scanInfo.getKeepDeletedCells() && scanType != ScanType.USER_SCAN) || scan.isRaw();
|
||||
this.keepDeletedCells = (scanInfo.getKeepDeletedCells() && !isUserScan) || scan.isRaw();
|
||||
// retain deletes: if minor compaction or raw scan
|
||||
this.retainDeletesInOutput = scanType == ScanType.MINOR_COMPACT || scan.isRaw();
|
||||
// seePastDeleteMarker: user initiated scans
|
||||
this.seePastDeleteMarkers = scanInfo.getKeepDeletedCells() && scanType == ScanType.USER_SCAN;
|
||||
this.seePastDeleteMarkers = scanInfo.getKeepDeletedCells() && isUserScan;
|
||||
|
||||
int maxVersions = Math.min(scan.getMaxVersions(), scanInfo.getMaxVersions());
|
||||
// Single branch to deal with two types of reads (columns vs all in family)
|
||||
|
@ -261,8 +284,12 @@ public class ScanQueryMatcher {
|
|||
}
|
||||
// Can't early out now, because DelFam come before any other keys
|
||||
}
|
||||
if (retainDeletesInOutput) {
|
||||
// always include
|
||||
if (retainDeletesInOutput ||
|
||||
(!isUserScan &&
|
||||
(EnvironmentEdgeManager.currentTimeMillis() - timestamp) <=
|
||||
timeToPurgeDeletes)) {
|
||||
// always include or it is not time yet to check whether it is OK
|
||||
// to purge deltes or not
|
||||
return MatchCode.INCLUDE;
|
||||
} else if (keepDeletedCells) {
|
||||
if (timestamp < earliestPutTs) {
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
/**
|
||||
* Keeps track of the columns for a scan if they are not explicitly specified
|
||||
|
@ -53,7 +54,7 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
|
|||
public ScanWildcardColumnTracker(int minVersion, int maxVersion, long ttl) {
|
||||
this.maxVersions = maxVersion;
|
||||
this.minVersions = minVersion;
|
||||
this.oldestStamp = System.currentTimeMillis() - ttl;
|
||||
this.oldestStamp = EnvironmentEdgeManager.currentTimeMillis() - ttl;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -193,9 +193,14 @@ public class Store extends SchemaConfigured implements HeapSize {
|
|||
// second -> ms adjust for user data
|
||||
this.ttl *= 1000;
|
||||
}
|
||||
// used by ScanQueryMatcher
|
||||
long timeToPurgeDeletes =
|
||||
Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
|
||||
LOG.info("time to purge deletes set to " + timeToPurgeDeletes +
|
||||
"ms in store " + this);
|
||||
scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
|
||||
family.getMaxVersions(), ttl, family.getKeepDeletedCells(),
|
||||
this.comparator);
|
||||
timeToPurgeDeletes, this.comparator);
|
||||
this.memstore = new MemStore(conf, this.comparator);
|
||||
this.storeNameStr = getColumnFamilyName();
|
||||
|
||||
|
@ -2039,6 +2044,7 @@ public class Store extends SchemaConfigured implements HeapSize {
|
|||
private int maxVersions;
|
||||
private long ttl;
|
||||
private boolean keepDeletedCells;
|
||||
private long timeToPurgeDeletes;
|
||||
private KVComparator comparator;
|
||||
|
||||
public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
|
||||
|
@ -2050,17 +2056,21 @@ public class Store extends SchemaConfigured implements HeapSize {
|
|||
* @param minVersions Store's MIN_VERSIONS setting
|
||||
* @param maxVersions Store's VERSIONS setting
|
||||
* @param ttl Store's TTL (in ms)
|
||||
* @param timeToPurgeDeletes duration in ms after which a delete marker can
|
||||
* be purged during a major compaction.
|
||||
* @param keepDeletedCells Store's keepDeletedCells setting
|
||||
* @param comparator The store's comparator
|
||||
*/
|
||||
public ScanInfo(byte[] family, int minVersions, int maxVersions, long ttl,
|
||||
boolean keepDeletedCells, KVComparator comparator) {
|
||||
boolean keepDeletedCells, long timeToPurgeDeletes,
|
||||
KVComparator comparator) {
|
||||
|
||||
this.family = family;
|
||||
this.minVersions = minVersions;
|
||||
this.maxVersions = maxVersions;
|
||||
this.ttl = ttl;
|
||||
this.keepDeletedCells = keepDeletedCells;
|
||||
this.timeToPurgeDeletes = timeToPurgeDeletes;
|
||||
this.comparator = comparator;
|
||||
}
|
||||
|
||||
|
@ -2084,6 +2094,10 @@ public class Store extends SchemaConfigured implements HeapSize {
|
|||
return keepDeletedCells;
|
||||
}
|
||||
|
||||
public long getTimeToPurgeDeletes() {
|
||||
return timeToPurgeDeletes;
|
||||
}
|
||||
|
||||
public KVComparator getComparator() {
|
||||
return comparator;
|
||||
}
|
||||
|
|
|
@ -155,10 +155,19 @@ class StoreScanner extends NonLazyKeyValueScanner
|
|||
StoreScanner(final Scan scan, Store.ScanInfo scanInfo,
|
||||
StoreScanner.ScanType scanType, final NavigableSet<byte[]> columns,
|
||||
final List<KeyValueScanner> scanners) throws IOException {
|
||||
this(scan, scanInfo, scanType, columns, scanners,
|
||||
HConstants.LATEST_TIMESTAMP);
|
||||
}
|
||||
|
||||
// Constructor for testing.
|
||||
StoreScanner(final Scan scan, Store.ScanInfo scanInfo,
|
||||
StoreScanner.ScanType scanType, final NavigableSet<byte[]> columns,
|
||||
final List<KeyValueScanner> scanners, long earliestPutTs)
|
||||
throws IOException {
|
||||
this(null, scan.getCacheBlocks(), scan, columns);
|
||||
this.initializeMetricNames();
|
||||
this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
|
||||
Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP);
|
||||
Long.MAX_VALUE, earliestPutTs);
|
||||
|
||||
// Seek all scanners to the initial key
|
||||
for (KeyValueScanner scanner : scanners) {
|
||||
|
|
|
@ -234,7 +234,7 @@ public class TestCompaction extends HBaseTestCase {
|
|||
Store.ScanInfo old = store.scanInfo;
|
||||
Store.ScanInfo si = new Store.ScanInfo(old.getFamily(),
|
||||
old.getMinVersions(), old.getMaxVersions(), ttl,
|
||||
old.getKeepDeletedCells(), old.getComparator());
|
||||
old.getKeepDeletedCells(), 0, old.getComparator());
|
||||
store.scanInfo = si;
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
|
@ -445,7 +445,7 @@ public class TestCompaction extends HBaseTestCase {
|
|||
Store.ScanInfo old = store.scanInfo;
|
||||
Store.ScanInfo si = new Store.ScanInfo(old.getFamily(),
|
||||
old.getMinVersions(), old.getMaxVersions(), ttl,
|
||||
old.getKeepDeletedCells(), old.getComparator());
|
||||
old.getKeepDeletedCells(), 0, old.getComparator());
|
||||
store.scanInfo = si;
|
||||
}
|
||||
Thread.sleep(ttl);
|
||||
|
|
|
@ -90,7 +90,7 @@ public class TestMemStore extends TestCase {
|
|||
List<KeyValue> result = new ArrayList<KeyValue>();
|
||||
MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
|
||||
ScanInfo scanInfo = new ScanInfo(null, 0, 1, HConstants.LATEST_TIMESTAMP, false,
|
||||
this.memstore.comparator);
|
||||
0, this.memstore.comparator);
|
||||
ScanType scanType = ScanType.USER_SCAN;
|
||||
StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
|
||||
int count = 0;
|
||||
|
@ -554,7 +554,7 @@ public class TestMemStore extends TestCase {
|
|||
//starting from each row, validate results should contain the starting row
|
||||
for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
|
||||
ScanInfo scanInfo = new ScanInfo(FAMILY, 0, 1, Integer.MAX_VALUE, false,
|
||||
this.memstore.comparator);
|
||||
0, this.memstore.comparator);
|
||||
ScanType scanType = ScanType.USER_SCAN;
|
||||
InternalScanner scanner = new StoreScanner(new Scan(
|
||||
Bytes.toBytes(startRowId)), scanInfo, scanType, null,
|
||||
|
|
|
@ -98,7 +98,7 @@ public class TestQueryMatcher extends HBaseTestCase {
|
|||
// 2,4,5
|
||||
|
||||
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new Store.ScanInfo(fam2,
|
||||
0, 1, ttl, false, rowComparator), get.getFamilyMap().get(fam2));
|
||||
0, 1, ttl, false, 0, rowComparator), get.getFamilyMap().get(fam2));
|
||||
|
||||
List<KeyValue> memstore = new ArrayList<KeyValue>();
|
||||
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
|
||||
|
@ -142,7 +142,7 @@ public class TestQueryMatcher extends HBaseTestCase {
|
|||
expected.add(ScanQueryMatcher.MatchCode.DONE);
|
||||
|
||||
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new Store.ScanInfo(fam2,
|
||||
0, 1, ttl, false, rowComparator), null);
|
||||
0, 1, ttl, false, 0, rowComparator), null);
|
||||
|
||||
List<KeyValue> memstore = new ArrayList<KeyValue>();
|
||||
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
|
||||
|
@ -193,7 +193,7 @@ public class TestQueryMatcher extends HBaseTestCase {
|
|||
};
|
||||
|
||||
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new Store.ScanInfo(fam2,
|
||||
0, 1, testTTL, false, rowComparator), get.getFamilyMap().get(fam2));
|
||||
0, 1, testTTL, false, 0, rowComparator), get.getFamilyMap().get(fam2));
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
KeyValue [] kvs = new KeyValue[] {
|
||||
|
@ -245,7 +245,7 @@ public class TestQueryMatcher extends HBaseTestCase {
|
|||
};
|
||||
|
||||
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new Store.ScanInfo(fam2,
|
||||
0, 1, testTTL, false, rowComparator), null);
|
||||
0, 1, testTTL, false, 0, rowComparator), null);
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
KeyValue [] kvs = new KeyValue[] {
|
||||
|
|
|
@ -22,12 +22,16 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.regionserver.Store.ScanInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
|
||||
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -41,8 +45,8 @@ import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixtu
|
|||
public class TestStoreScanner extends TestCase {
|
||||
private static final String CF_STR = "cf";
|
||||
final byte [] CF = Bytes.toBytes(CF_STR);
|
||||
private ScanInfo scanInfo = new ScanInfo(CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE, false,
|
||||
KeyValue.COMPARATOR);
|
||||
private ScanInfo scanInfo = new ScanInfo(CF, 0, Integer.MAX_VALUE,
|
||||
Long.MAX_VALUE, false, 0, KeyValue.COMPARATOR);
|
||||
private ScanType scanType = ScanType.USER_SCAN;
|
||||
|
||||
public void setUp() throws Exception {
|
||||
|
@ -411,7 +415,7 @@ public class TestStoreScanner extends TestCase {
|
|||
List<KeyValueScanner> scanners = scanFixture(kvs);
|
||||
Scan scan = new Scan();
|
||||
scan.setMaxVersions(1);
|
||||
ScanInfo scanInfo = new ScanInfo(CF, 0, 1, 500, false,
|
||||
ScanInfo scanInfo = new ScanInfo(CF, 0, 1, 500, false, 0,
|
||||
KeyValue.COMPARATOR);
|
||||
ScanType scanType = ScanType.USER_SCAN;
|
||||
StoreScanner scanner =
|
||||
|
@ -482,7 +486,8 @@ public class TestStoreScanner extends TestCase {
|
|||
Scan scan = new Scan();
|
||||
scan.setMaxVersions(1);
|
||||
// scanner with ttl equal to 500
|
||||
ScanInfo scanInfo = new ScanInfo(CF, 0, 1, 500, false, KeyValue.COMPARATOR);
|
||||
ScanInfo scanInfo = new ScanInfo(CF, 0, 1, 500, false, 0,
|
||||
KeyValue.COMPARATOR);
|
||||
ScanType scanType = ScanType.USER_SCAN;
|
||||
StoreScanner scanner =
|
||||
new StoreScanner(scan, scanInfo, scanType, null, scanners);
|
||||
|
@ -495,4 +500,71 @@ public class TestStoreScanner extends TestCase {
|
|||
|
||||
assertEquals(false, scanner.next(results));
|
||||
}
|
||||
|
||||
public void testDeleteMarkerLongevity() throws Exception {
|
||||
final long now = System.currentTimeMillis();
|
||||
EnvironmentEdgeManagerTestHelper.injectEdge(new EnvironmentEdge() {
|
||||
public long currentTimeMillis() {
|
||||
return now;
|
||||
}
|
||||
});
|
||||
KeyValue [] kvs = new KeyValue[] {
|
||||
/*0*/ new KeyValue(Bytes.toBytes("R1"), Bytes.toBytes("cf"), null,
|
||||
now-100, KeyValue.Type.DeleteFamily), // live
|
||||
/*1*/ new KeyValue(Bytes.toBytes("R1"), Bytes.toBytes("cf"), null,
|
||||
now-1000, KeyValue.Type.DeleteFamily), // expired
|
||||
/*2*/ KeyValueTestUtil.create("R1", "cf", "a", now-50,
|
||||
KeyValue.Type.Put, "v3"), // live
|
||||
/*3*/ KeyValueTestUtil.create("R1", "cf", "a", now-55,
|
||||
KeyValue.Type.Delete, "dontcare"), // live
|
||||
/*4*/ KeyValueTestUtil.create("R1", "cf", "a", now-55,
|
||||
KeyValue.Type.Put, "deleted-version v2"), // deleted
|
||||
/*5*/ KeyValueTestUtil.create("R1", "cf", "a", now-60,
|
||||
KeyValue.Type.Put, "v1"), // live
|
||||
/*6*/ KeyValueTestUtil.create("R1", "cf", "a", now-65,
|
||||
KeyValue.Type.Put, "v0"), // max-version reached
|
||||
/*7*/ KeyValueTestUtil.create("R1", "cf", "a",
|
||||
now-100, KeyValue.Type.DeleteColumn, "dont-care"), // max-version
|
||||
/*8*/ KeyValueTestUtil.create("R1", "cf", "b", now-600,
|
||||
KeyValue.Type.DeleteColumn, "dont-care"), //expired
|
||||
/*9*/ KeyValueTestUtil.create("R1", "cf", "b", now-70,
|
||||
KeyValue.Type.Put, "v2"), //live
|
||||
/*10*/ KeyValueTestUtil.create("R1", "cf", "b", now-750,
|
||||
KeyValue.Type.Put, "v1"), //expired
|
||||
/*11*/ KeyValueTestUtil.create("R1", "cf", "c", now-500,
|
||||
KeyValue.Type.Delete, "dontcare"), //expired
|
||||
/*12*/ KeyValueTestUtil.create("R1", "cf", "c", now-600,
|
||||
KeyValue.Type.Put, "v1"), //expired
|
||||
/*13*/ KeyValueTestUtil.create("R1", "cf", "c", now-1000,
|
||||
KeyValue.Type.Delete, "dontcare"), //expired
|
||||
/*14*/ KeyValueTestUtil.create("R1", "cf", "d", now-60,
|
||||
KeyValue.Type.Put, "expired put"), //live
|
||||
/*15*/ KeyValueTestUtil.create("R1", "cf", "d", now-100,
|
||||
KeyValue.Type.Delete, "not-expired delete"), //live
|
||||
};
|
||||
List<KeyValueScanner> scanners = scanFixture(kvs);
|
||||
Scan scan = new Scan();
|
||||
scan.setMaxVersions(2);
|
||||
Store.ScanInfo scanInfo = new Store.ScanInfo(Bytes.toBytes("cf"),
|
||||
0 /* minVersions */,
|
||||
2 /* maxVersions */, 500 /* ttl */,
|
||||
false /* keepDeletedCells */,
|
||||
200, /* timeToPurgeDeletes */
|
||||
KeyValue.COMPARATOR);
|
||||
StoreScanner scanner =
|
||||
new StoreScanner(scan, scanInfo,
|
||||
StoreScanner.ScanType.MAJOR_COMPACT, null, scanners,
|
||||
HConstants.OLDEST_TIMESTAMP);
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
results = new ArrayList<KeyValue>();
|
||||
assertEquals(true, scanner.next(results));
|
||||
assertEquals(kvs[0], results.get(0));
|
||||
assertEquals(kvs[2], results.get(1));
|
||||
assertEquals(kvs[3], results.get(2));
|
||||
assertEquals(kvs[5], results.get(3));
|
||||
assertEquals(kvs[9], results.get(4));
|
||||
assertEquals(kvs[14], results.get(5));
|
||||
assertEquals(kvs[15], results.get(6));
|
||||
assertEquals(7, results.size());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue