HBASE-8056 allow StoreScanner to drop deletes from some part of the compaction range

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1456082 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
sershe 2013-03-13 18:38:31 +00:00
parent a6ab559d3d
commit 88c3d9da6d
3 changed files with 146 additions and 4 deletions

View File

@ -33,6 +33,8 @@ import org.apache.hadoop.hbase.regionserver.DeleteTracker.DeleteResult;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.common.base.Preconditions;
/** /**
* A query matcher that is specifically designed for the scan case. * A query matcher that is specifically designed for the scan case.
*/ */
@ -64,7 +66,8 @@ public class ScanQueryMatcher {
* marker to reach deleted rows. * marker to reach deleted rows.
*/ */
/** whether to retain delete markers */ /** whether to retain delete markers */
private final boolean retainDeletesInOutput; private boolean retainDeletesInOutput;
/** whether to return deleted rows */ /** whether to return deleted rows */
private final boolean keepDeletedCells; private final boolean keepDeletedCells;
/** whether time range queries can see rows "behind" a delete */ /** whether time range queries can see rows "behind" a delete */
@ -97,6 +100,8 @@ public class ScanQueryMatcher {
/** readPoint over which the KVs are unconditionally included */ /** readPoint over which the KVs are unconditionally included */
protected long maxReadPointToTrackVersions; protected long maxReadPointToTrackVersions;
private byte[] dropDeletesFromRow = null, dropDeletesToRow = null;
/** /**
* This variable shows whether there is an null column in the query. There * This variable shows whether there is an null column in the query. There
* always exists a null column in the wildcard column query. * always exists a null column in the wildcard column query.
@ -179,6 +184,27 @@ public class ScanQueryMatcher {
} }
} }
/**
* Construct a QueryMatcher for a scan that drop deletes from a limited range of rows.
* @param scan
* @param scanInfo The store's immutable scan info
* @param columns
* @param earliestPutTs Earliest put seen in any of the store files.
* @param oldestUnexpiredTS the oldest timestamp we are interested in,
* based on TTL
* @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
* @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
*/
public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
long readPointToUse, long earliestPutTs, long oldestUnexpiredTS,
byte[] dropDeletesFromRow, byte[] dropDeletesToRow) {
this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs,
oldestUnexpiredTS);
Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null));
this.dropDeletesFromRow = dropDeletesFromRow;
this.dropDeletesToRow = dropDeletesToRow;
}
/* /*
* Constructor for tests * Constructor for tests
*/ */
@ -372,6 +398,33 @@ public class ScanQueryMatcher {
} }
/** Handle partial-drop-deletes. As we match keys in order, when we have a range from which
* we can drop deletes, we can set retainDeletesInOutput to false for the duration of this
* range only, and maintain consistency. */
private void checkPartialDropDeleteRange(byte [] row, int offset, short length) {
// If partial-drop-deletes are used, initially, dropDeletesFromRow and dropDeletesToRow
// are both set, and the matcher is set to retain deletes. We assume ordered keys. When
// dropDeletesFromRow is leq current kv, we start dropping deletes and reset
// dropDeletesFromRow; thus the 2nd "if" starts to apply.
if ((dropDeletesFromRow != null)
&& ((dropDeletesFromRow == HConstants.EMPTY_START_ROW)
|| (Bytes.compareTo(row, offset, length,
dropDeletesFromRow, 0, dropDeletesFromRow.length) >= 0))) {
retainDeletesInOutput = false;
dropDeletesFromRow = null;
}
// If dropDeletesFromRow is null and dropDeletesToRow is set, we are inside the partial-
// drop-deletes range. When dropDeletesToRow is leq current kv, we stop dropping deletes,
// and reset dropDeletesToRow so that we don't do any more compares.
if ((dropDeletesFromRow == null)
&& (dropDeletesToRow != null) && (dropDeletesToRow != HConstants.EMPTY_END_ROW)
&& (Bytes.compareTo(row, offset, length,
dropDeletesToRow, 0, dropDeletesToRow.length) >= 0)) {
retainDeletesInOutput = true;
dropDeletesToRow = null;
}
}
public boolean moreRowsMayExistAfter(KeyValue kv) { public boolean moreRowsMayExistAfter(KeyValue kv) {
if (!Bytes.equals(stopRow , HConstants.EMPTY_END_ROW) && if (!Bytes.equals(stopRow , HConstants.EMPTY_END_ROW) &&
rowComparator.compareRows(kv.getBuffer(),kv.getRowOffset(), rowComparator.compareRows(kv.getBuffer(),kv.getRowOffset(),
@ -389,6 +442,7 @@ public class ScanQueryMatcher {
* @param row * @param row
*/ */
public void setRow(byte [] row, int offset, short length) { public void setRow(byte [] row, int offset, short length) {
checkPartialDropDeleteRange(row, offset, length);
this.row = row; this.row = row;
this.rowOffset = offset; this.rowOffset = offset;
this.rowLength = length; this.rowLength = length;

View File

@ -171,7 +171,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
} }
/** /**
* Used for major compactions.<p> * Used for compactions.<p>
* *
* Opens a scanner across specified StoreFiles. * Opens a scanner across specified StoreFiles.
* @param store who we scan * @param store who we scan
@ -183,10 +183,39 @@ public class StoreScanner extends NonLazyKeyValueScanner
public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
List<? extends KeyValueScanner> scanners, ScanType scanType, List<? extends KeyValueScanner> scanners, ScanType scanType,
long smallestReadPoint, long earliestPutTs) throws IOException { long smallestReadPoint, long earliestPutTs) throws IOException {
this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
}
/**
* Used for compactions that drop deletes from a limited range of rows.<p>
*
* Opens a scanner across specified StoreFiles.
* @param store who we scan
* @param scan the spec
* @param scanners ancillary scanners
* @param smallestReadPoint the readPoint that we should use for tracking versions
* @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
* @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
*/
public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
}
private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
this(store, false, scan, null, scanInfo.getTtl(), this(store, false, scan, null, scanInfo.getTtl(),
scanInfo.getMinVersions()); scanInfo.getMinVersions());
if (dropDeletesFromRow == null) {
matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType,
smallestReadPoint, earliestPutTs, oldestUnexpiredTS); smallestReadPoint, earliestPutTs, oldestUnexpiredTS);
} else {
matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint,
earliestPutTs, oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow);
}
// Filter the list of scanners using Bloom filters, time range, TTL, etc. // Filter the list of scanners using Bloom filters, time range, TTL, etc.
scanners = selectScannersFrom(scanners); scanners = selectScannersFrom(scanners);
@ -380,6 +409,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 : assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 :
"Key " + prevKV + " followed by a " + "smaller key " + kv + " in cf " + store; "Key " + prevKV + " followed by a " + "smaller key " + kv + " in cf " + store;
prevKV = kv; prevKV = kv;
ScanQueryMatcher.MatchCode qcode = matcher.match(kv); ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
switch(qcode) { switch(qcode) {
case INCLUDE: case INCLUDE:

View File

@ -22,10 +22,15 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.NavigableSet;
import static org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode.*;
import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
@ -40,6 +45,7 @@ public class TestQueryMatcher extends HBaseTestCase {
private byte[] row1; private byte[] row1;
private byte[] row2; private byte[] row2;
private byte[] row3;
private byte[] fam1; private byte[] fam1;
private byte[] fam2; private byte[] fam2;
private byte[] col1; private byte[] col1;
@ -60,6 +66,7 @@ public class TestQueryMatcher extends HBaseTestCase {
super.setUp(); super.setUp();
row1 = Bytes.toBytes("row1"); row1 = Bytes.toBytes("row1");
row2 = Bytes.toBytes("row2"); row2 = Bytes.toBytes("row2");
row3 = Bytes.toBytes("row3");
fam1 = Bytes.toBytes("fam1"); fam1 = Bytes.toBytes("fam1");
fam2 = Bytes.toBytes("fam2"); fam2 = Bytes.toBytes("fam2");
col1 = Bytes.toBytes("col1"); col1 = Bytes.toBytes("col1");
@ -283,5 +290,56 @@ public class TestQueryMatcher extends HBaseTestCase {
} }
} }
public void testMatch_PartialRangeDropDeletes() throws Exception {
long now = EnvironmentEdgeManager.currentTimeMillis();
ScanInfo scanInfo = new ScanInfo(fam2, 0, 1, ttl, false, 0, rowComparator);
NavigableSet<byte[]> cols = get.getFamilyMap().get(fam2);
// Some ranges.
testDropDeletes(
row2, row3, new byte[][] { row1, row2, row2, row3 }, INCLUDE, SKIP, SKIP, INCLUDE);
testDropDeletes(row2, row3, new byte[][] { row1, row1, row2 }, INCLUDE, INCLUDE, SKIP);
testDropDeletes(row2, row3, new byte[][] { row2, row3, row3 }, SKIP, INCLUDE, INCLUDE);
testDropDeletes(row1, row3, new byte[][] { row1, row2, row3 }, SKIP, SKIP, INCLUDE);
// Open ranges.
testDropDeletes(HConstants.EMPTY_START_ROW, row3,
new byte[][] { row1, row2, row3 }, SKIP, SKIP, INCLUDE);
testDropDeletes(row2, HConstants.EMPTY_END_ROW,
new byte[][] { row1, row2, row3 }, INCLUDE, SKIP, SKIP);
testDropDeletes(HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
new byte[][] { row1, row2, row3, row3 }, SKIP, SKIP, SKIP, SKIP);
// No KVs in range.
testDropDeletes(row2, row3, new byte[][] { row1, row1, row3 }, INCLUDE, INCLUDE, INCLUDE);
testDropDeletes(row2, row3, new byte[][] { row3, row3 }, INCLUDE, INCLUDE);
testDropDeletes(row2, row3, new byte[][] { row1, row1 }, INCLUDE, INCLUDE);
}
private void testDropDeletes(
byte[] from, byte[] to, byte[][] rows, MatchCode... expected) throws IOException {
long now = EnvironmentEdgeManager.currentTimeMillis();
// Set time to purge deletes to negative value to avoid it ever happening.
ScanInfo scanInfo = new ScanInfo(fam2, 0, 1, ttl, false, -1L, rowComparator);
NavigableSet<byte[]> cols = get.getFamilyMap().get(fam2);
ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE,
HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, from, to);
List<ScanQueryMatcher.MatchCode> actual =
new ArrayList<ScanQueryMatcher.MatchCode>(rows.length);
byte[] prevRow = null;
for (byte[] row : rows) {
if (prevRow == null || !Bytes.equals(prevRow, row)) {
qm.setRow(row, 0, (short)row.length);
prevRow = row;
}
actual.add(qm.match(new KeyValue(row, fam2, null, now, Type.Delete)));
}
assertEquals(expected.length, actual.size());
for (int i = 0; i < expected.length; i++) {
if (PRINT) System.out.println("expected " + expected[i] + ", actual " + actual.get(i));
assertEquals(expected[i], actual.get(i));
}
}
} }