HBASE-11054-Create new hook in StoreScanner to help user creating his own delete tracker (Ram)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1590220 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
ramkrishna 2014-04-26 08:24:56 +00:00
parent 833fa82041
commit 6d291a55b4
6 changed files with 80 additions and 18 deletions

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.DeleteTracker;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegion.Operation; import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
@ -501,4 +502,11 @@ public abstract class BaseRegionObserver implements RegionObserver {
public void postCloseRegionOperation(final ObserverContext<RegionCoprocessorEnvironment> ctx, public void postCloseRegionOperation(final ObserverContext<RegionCoprocessorEnvironment> ctx,
Operation op) throws IOException { Operation op) throws IOException {
} }
@Override
public DeleteTracker postInstantiateDeleteTracker(
final ObserverContext<RegionCoprocessorEnvironment> ctx, DeleteTracker delTracker)
throws IOException {
return delTracker;
}
} }

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.DeleteTracker;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegion.Operation; import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
@ -1109,4 +1110,18 @@ public interface RegionObserver extends Coprocessor {
*/ */
Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx, Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException; MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException;
/**
* Called after the ScanQueryMatcher creates ScanDeleteTracker. Implementing
* this hook would help in creating customised DeleteTracker and returning
* the newly created DeleteTracker
*
* @param ctx the environment provided by the region server
* @param delTracker the deleteTracker that is created by the QueryMatcher
* @return the Delete Tracker
* @throws IOException
*/
DeleteTracker postInstantiateDeleteTracker(
final ObserverContext<RegionCoprocessorEnvironment> ctx, DeleteTracker delTracker)
throws IOException;
} }

View File

@ -2152,4 +2152,27 @@ public class RegionCoprocessorHost
} }
} }
public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
Thread currentThread = Thread.currentThread();
ClassLoader cl = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(env.getClassLoader());
tracker = ((RegionObserver) env.getInstance()).postInstantiateDeleteTracker(ctx,
tracker);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
} finally {
currentThread.setContextClassLoader(cl);
}
if (ctx.shouldComplete()) {
break;
}
}
}
return tracker;
}
} }

View File

@ -113,6 +113,8 @@ public class ScanQueryMatcher {
* */ * */
private boolean hasNullColumn = true; private boolean hasNullColumn = true;
private RegionCoprocessorHost regionCoprocessorHost= null;
// By default, when hbase.hstore.time.to.purge.deletes is 0ms, a delete // By default, when hbase.hstore.time.to.purge.deletes is 0ms, a delete
// marker is always removed during a major compaction. If set to non-zero // marker is always removed during a major compaction. If set to non-zero
// value then major compaction will try to keep a delete marker around for // value then major compaction will try to keep a delete marker around for
@ -145,13 +147,16 @@ public class ScanQueryMatcher {
* @param earliestPutTs Earliest put seen in any of the store files. * @param earliestPutTs Earliest put seen in any of the store files.
* @param oldestUnexpiredTS the oldest timestamp we are interested in, * @param oldestUnexpiredTS the oldest timestamp we are interested in,
* based on TTL * based on TTL
* @param regionCoprocessorHost
* @throws IOException
*/ */
public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
NavigableSet<byte[]> columns, ScanType scanType, ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS,
long readPointToUse, long earliestPutTs, long oldestUnexpiredTS) { RegionCoprocessorHost regionCoprocessorHost) throws IOException {
this.tr = scan.getTimeRange(); this.tr = scan.getTimeRange();
this.rowComparator = scanInfo.getComparator(); this.rowComparator = scanInfo.getComparator();
this.deletes = new ScanDeleteTracker(); this.regionCoprocessorHost = regionCoprocessorHost;
this.deletes = instantiateDeleteTracker();
this.stopRow = scan.getStopRow(); this.stopRow = scan.getStopRow();
this.startKey = KeyValueUtil.createFirstDeleteFamilyOnRow(scan.getStartRow(), this.startKey = KeyValueUtil.createFirstDeleteFamilyOnRow(scan.getStartRow(),
scanInfo.getFamily()); scanInfo.getFamily());
@ -194,6 +199,14 @@ public class ScanQueryMatcher {
this.isReversed = scan.isReversed(); this.isReversed = scan.isReversed();
} }
private DeleteTracker instantiateDeleteTracker() throws IOException {
DeleteTracker tracker = new ScanDeleteTracker();
if (regionCoprocessorHost != null) {
tracker = regionCoprocessorHost.postInstantiateDeleteTracker(tracker);
}
return tracker;
}
/** /**
* Construct a QueryMatcher for a scan that drop deletes from a limited range of rows. * Construct a QueryMatcher for a scan that drop deletes from a limited range of rows.
* @param scan * @param scan
@ -204,12 +217,14 @@ public class ScanQueryMatcher {
* based on TTL * based on TTL
* @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW. * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
* @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW. * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
* @param regionCoprocessorHost
* @throws IOException
*/ */
public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, byte[] dropDeletesFromRow,
byte[] dropDeletesFromRow, byte[] dropDeletesToRow) { byte[] dropDeletesToRow, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs, this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs,
oldestUnexpiredTS); oldestUnexpiredTS, regionCoprocessorHost);
Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null)); Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null));
this.dropDeletesFromRow = dropDeletesFromRow; this.dropDeletesFromRow = dropDeletesFromRow;
this.dropDeletesToRow = dropDeletesToRow; this.dropDeletesToRow = dropDeletesToRow;
@ -219,10 +234,10 @@ public class ScanQueryMatcher {
* Constructor for tests * Constructor for tests
*/ */
ScanQueryMatcher(Scan scan, ScanInfo scanInfo, ScanQueryMatcher(Scan scan, ScanInfo scanInfo,
NavigableSet<byte[]> columns, long oldestUnexpiredTS) { NavigableSet<byte[]> columns, long oldestUnexpiredTS) throws IOException {
this(scan, scanInfo, columns, ScanType.USER_SCAN, this(scan, scanInfo, columns, ScanType.USER_SCAN,
Long.MAX_VALUE, /* max Readpoint to track versions */ Long.MAX_VALUE, /* max Readpoint to track versions */
HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS); HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, null);
} }
/** /**

View File

@ -162,7 +162,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
} }
matcher = new ScanQueryMatcher(scan, scanInfo, columns, matcher = new ScanQueryMatcher(scan, scanInfo, columns,
ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP, ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
oldestUnexpiredTS); oldestUnexpiredTS, store.getCoprocessorHost());
this.store.addChangedReaderObserver(this); this.store.addChangedReaderObserver(this);
@ -226,11 +226,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this(store, false, scan, null, scanInfo.getTtl(), scanInfo.getMinVersions(), this(store, false, scan, null, scanInfo.getTtl(), scanInfo.getMinVersions(),
((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED)); ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
if (dropDeletesFromRow == null) { if (dropDeletesFromRow == null) {
matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
smallestReadPoint, earliestPutTs, oldestUnexpiredTS); earliestPutTs, oldestUnexpiredTS, store.getCoprocessorHost());
} else { } else {
matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
earliestPutTs, oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow); oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
} }
// Filter the list of scanners using Bloom filters, time range, TTL, etc. // Filter the list of scanners using Bloom filters, time range, TTL, etc.
@ -270,7 +270,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(), this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
scanInfo.getMinVersions(), readPt); scanInfo.getMinVersions(), readPt);
this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType, this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS); Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, null);
// In unit tests, the store could be null // In unit tests, the store could be null
if (this.store != null) { if (this.store != null) {

View File

@ -19,13 +19,14 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode.INCLUDE;
import static org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode.SKIP;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.NavigableSet; import java.util.NavigableSet;
import static org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode.*;
import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
@ -341,7 +342,7 @@ public class TestQueryMatcher extends HBaseTestCase {
NavigableSet<byte[]> cols = get.getFamilyMap().get(fam2); NavigableSet<byte[]> cols = get.getFamilyMap().get(fam2);
ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE, ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE,
HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, from, to); HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, from, to, null);
List<ScanQueryMatcher.MatchCode> actual = List<ScanQueryMatcher.MatchCode> actual =
new ArrayList<ScanQueryMatcher.MatchCode>(rows.length); new ArrayList<ScanQueryMatcher.MatchCode>(rows.length);
byte[] prevRow = null; byte[] prevRow = null;