diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java index 922658283c4..c861192fc68 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.regionserver.DeleteTracker; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion.Operation; import org.apache.hadoop.hbase.regionserver.InternalScanner; @@ -501,4 +502,11 @@ public abstract class BaseRegionObserver implements RegionObserver { public void postCloseRegionOperation(final ObserverContext ctx, Operation op) throws IOException { } + + @Override + public DeleteTracker postInstantiateDeleteTracker( + final ObserverContext ctx, DeleteTracker delTracker) + throws IOException { + return delTracker; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index ef403d25e79..96cc3bdfbd8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.regionserver.DeleteTracker; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion.Operation; import org.apache.hadoop.hbase.regionserver.InternalScanner; @@ -1109,4 +1110,18 @@ public interface RegionObserver extends Coprocessor { */ Cell postMutationBeforeWAL(ObserverContext ctx, MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException; + + /** + * Called after the ScanQueryMatcher creates ScanDeleteTracker. Implementing + * this hook would help in creating customised DeleteTracker and returning + * the newly created DeleteTracker + * + * @param ctx the environment provided by the region server + * @param delTracker the deleteTracker that is created by the QueryMatcher + * @return the Delete Tracker + * @throws IOException + */ + DeleteTracker postInstantiateDeleteTracker( + final ObserverContext ctx, DeleteTracker delTracker) + throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 2e7c287cb8e..177f153c60a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -2152,4 +2152,27 @@ public class RegionCoprocessorHost } } + public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + Thread currentThread = Thread.currentThread(); + ClassLoader cl = currentThread.getContextClassLoader(); + try { + currentThread.setContextClassLoader(env.getClassLoader()); + tracker = ((RegionObserver) env.getInstance()).postInstantiateDeleteTracker(ctx, + tracker); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } finally { + currentThread.setContextClassLoader(cl); + } + if (ctx.shouldComplete()) { + break; + } + } + } + return tracker; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java index 1123e143fb9..205c7f854bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java @@ -112,6 +112,8 @@ public class ScanQueryMatcher { * first column. * */ private boolean hasNullColumn = true; + + private RegionCoprocessorHost regionCoprocessorHost= null; // By default, when hbase.hstore.time.to.purge.deletes is 0ms, a delete // marker is always removed during a major compaction. If set to non-zero @@ -145,13 +147,16 @@ public class ScanQueryMatcher { * @param earliestPutTs Earliest put seen in any of the store files. * @param oldestUnexpiredTS the oldest timestamp we are interested in, * based on TTL + * @param regionCoprocessorHost + * @throws IOException */ - public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, - NavigableSet columns, ScanType scanType, - long readPointToUse, long earliestPutTs, long oldestUnexpiredTS) { + public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet columns, + ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, + RegionCoprocessorHost regionCoprocessorHost) throws IOException { this.tr = scan.getTimeRange(); this.rowComparator = scanInfo.getComparator(); - this.deletes = new ScanDeleteTracker(); + this.regionCoprocessorHost = regionCoprocessorHost; + this.deletes = instantiateDeleteTracker(); this.stopRow = scan.getStopRow(); this.startKey = KeyValueUtil.createFirstDeleteFamilyOnRow(scan.getStartRow(), scanInfo.getFamily()); @@ -194,6 +199,14 @@ public class ScanQueryMatcher { this.isReversed = scan.isReversed(); } + private DeleteTracker instantiateDeleteTracker() throws IOException { + DeleteTracker tracker = new ScanDeleteTracker(); + if (regionCoprocessorHost != null) { + tracker = regionCoprocessorHost.postInstantiateDeleteTracker(tracker); + } + return tracker; + } + /** * Construct a QueryMatcher for a scan that drop deletes from a limited range of rows. * @param scan @@ -204,12 +217,14 @@ public class ScanQueryMatcher { * based on TTL * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW. * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW. + * @param regionCoprocessorHost + * @throws IOException */ public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet columns, - long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, - byte[] dropDeletesFromRow, byte[] dropDeletesToRow) { + long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, byte[] dropDeletesFromRow, + byte[] dropDeletesToRow, RegionCoprocessorHost regionCoprocessorHost) throws IOException { this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs, - oldestUnexpiredTS); + oldestUnexpiredTS, regionCoprocessorHost); Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null)); this.dropDeletesFromRow = dropDeletesFromRow; this.dropDeletesToRow = dropDeletesToRow; @@ -219,10 +234,10 @@ public class ScanQueryMatcher { * Constructor for tests */ ScanQueryMatcher(Scan scan, ScanInfo scanInfo, - NavigableSet columns, long oldestUnexpiredTS) { + NavigableSet columns, long oldestUnexpiredTS) throws IOException { this(scan, scanInfo, columns, ScanType.USER_SCAN, Long.MAX_VALUE, /* max Readpoint to track versions */ - HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS); + HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, null); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index e80249c9714..90b61b8b40d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -162,7 +162,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } matcher = new ScanQueryMatcher(scan, scanInfo, columns, ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP, - oldestUnexpiredTS); + oldestUnexpiredTS, store.getCoprocessorHost()); this.store.addChangedReaderObserver(this); @@ -226,11 +226,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this(store, false, scan, null, scanInfo.getTtl(), scanInfo.getMinVersions(), ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED)); if (dropDeletesFromRow == null) { - matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, - smallestReadPoint, earliestPutTs, oldestUnexpiredTS); + matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint, + earliestPutTs, oldestUnexpiredTS, store.getCoprocessorHost()); } else { - matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, - earliestPutTs, oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow); + matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs, + oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost()); } // Filter the list of scanners using Bloom filters, time range, TTL, etc. @@ -270,7 +270,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(), scanInfo.getMinVersions(), readPt); this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType, - Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS); + Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, null); // In unit tests, the store could be null if (this.store != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java index 37031a4d3f8..4945ad1d782 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java @@ -19,13 +19,14 @@ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode.INCLUDE; +import static org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode.SKIP; + import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; -import static org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode.*; - import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; @@ -341,7 +342,7 @@ public class TestQueryMatcher extends HBaseTestCase { NavigableSet cols = get.getFamilyMap().get(fam2); ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE, - HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, from, to); + HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, from, to, null); List actual = new ArrayList(rows.length); byte[] prevRow = null;