HBASE-16962: Add readPoint to preCompactScannerOpen() and preFlushScannerOpen() API

Signed-off-by: anoopsamjohn <anoopsamjohn@gmail.com>
This commit is contained in:
thiruvel 2016-11-09 11:02:41 -08:00 committed by anoopsamjohn
parent 18b31fdd32
commit 44ab659b93
5 changed files with 85 additions and 15 deletions

View File

@ -102,6 +102,13 @@ public class BaseRegionObserver implements RegionObserver {
return s; return s;
} }
@Override
public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s,
final long readPoint) throws IOException {
return preFlushScannerOpen(c, store, memstoreScanner, s);
}
@Override @Override
public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException { public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
} }
@ -211,6 +218,13 @@ public class BaseRegionObserver implements RegionObserver {
return preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s); return preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s);
} }
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s, CompactionRequest request, long readPoint) throws IOException {
return preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s, request);
}
@Override @Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, final Store store, public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, final Store store,
final StoreFile resultFile) throws IOException { final StoreFile resultFile) throws IOException {

View File

@ -112,11 +112,34 @@ public interface RegionObserver extends Coprocessor {
* @return the scanner to use during the flush. {@code null} if the default implementation * @return the scanner to use during the flush. {@code null} if the default implementation
* is to be used. * is to be used.
* @throws IOException if an error occurred on the coprocessor * @throws IOException if an error occurred on the coprocessor
* @deprecated Use {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner,
* InternalScanner, long)}
*/ */
@Deprecated
InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s) final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s)
throws IOException; throws IOException;
/**
* Called before a memstore is flushed to disk and prior to creating the scanner to read from
* the memstore. To override or modify how a memstore is flushed,
* implementing classes can return a new scanner to provide the KeyValues to be
* stored into the new {@code StoreFile} or null to perform the default processing.
* Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
* effect in this hook.
* @param c the environment provided by the region server
* @param store the store being flushed
* @param memstoreScanner the scanner for the memstore that is flushed
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
* @param readPoint the readpoint to create scanner
* @return the scanner to use during the flush. {@code null} if the default implementation
* is to be used.
* @throws IOException if an error occurred on the coprocessor
*/
InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s,
final long readPoint) throws IOException;
/** /**
* Called before the memstore is flushed to disk. * Called before the memstore is flushed to disk.
* @param c the environment provided by the region server * @param c the environment provided by the region server
@ -283,7 +306,10 @@ public interface RegionObserver extends Coprocessor {
* @return the scanner to use during compaction. {@code null} if the default implementation is to * @return the scanner to use during compaction. {@code null} if the default implementation is to
* be used. * be used.
* @throws IOException if an error occurred on the coprocessor * @throws IOException if an error occurred on the coprocessor
* @deprecated Use {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
* InternalScanner, CompactionRequest, long)} instead.
*/ */
@Deprecated
InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, List<? extends KeyValueScanner> scanners, final ScanType scanType, final Store store, List<? extends KeyValueScanner> scanners, final ScanType scanType,
final long earliestPutTs, final InternalScanner s, CompactionRequest request) final long earliestPutTs, final InternalScanner s, CompactionRequest request)
@ -304,12 +330,38 @@ public interface RegionObserver extends Coprocessor {
* @param earliestPutTs timestamp of the earliest put that was found in any of the involved store * @param earliestPutTs timestamp of the earliest put that was found in any of the involved store
* files * files
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
* @param request compaction request
* @param readPoint the readpoint to create scanner
* @return the scanner to use during compaction. {@code null} if the default implementation is to
* be used.
* @throws IOException if an error occurred on the coprocessor
*/
InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, List<? extends KeyValueScanner> scanners, final ScanType scanType,
final long earliestPutTs, final InternalScanner s, final CompactionRequest request,
final long readPoint) throws IOException;
/**
* Called prior to writing the {@link StoreFile}s selected for compaction into a new
* {@code StoreFile} and prior to creating the scanner used to read the input files. To override
* or modify the compaction process, implementing classes can return a new scanner to provide the
* KeyValues to be stored into the new {@code StoreFile} or null to perform the default
* processing. Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
* effect in this hook.
* @param c the environment provided by the region server
* @param store the store being compacted
* @param scanners the list {@link org.apache.hadoop.hbase.regionserver.StoreFileScanner}s
* to be read from
* @param scanType the {@link ScanType} indicating whether this is a major or minor compaction
* @param earliestPutTs timestamp of the earliest put that was found in any of the involved store
* files
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
* @return the scanner to use during compaction. {@code null} if the default implementation is to * @return the scanner to use during compaction. {@code null} if the default implementation is to
* be used. * be used.
* @throws IOException if an error occurred on the coprocessor * @throws IOException if an error occurred on the coprocessor
* @deprecated Use * @deprecated Use
* {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, * {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
* InternalScanner, CompactionRequest)} instead. * InternalScanner, CompactionRequest, long)} instead.
*/ */
@Deprecated @Deprecated
InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
@ -983,9 +1035,9 @@ public interface RegionObserver extends Coprocessor {
* Called before a store opens a new scanner. * Called before a store opens a new scanner.
* This hook is called when a "user" scanner is opened. * This hook is called when a "user" scanner is opened.
* <p> * <p>
* See {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, InternalScanner)} * See {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, InternalScanner,
* and {@link #preCompactScannerOpen(ObserverContext, * long)} and {@link #preCompactScannerOpen(ObserverContext,
* Store, List, ScanType, long, InternalScanner)} * Store, List, ScanType, long, InternalScanner, CompactionRequest, long)}
* to override scanners created for flushes or compactions, resp. * to override scanners created for flushes or compactions, resp.
* <p> * <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained * Call CoprocessorEnvironment#complete to skip any subsequent chained

View File

@ -520,18 +520,19 @@ public class RegionCoprocessorHost
/** /**
* See * See
* {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)} * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
* InternalScanner, CompactionRequest, long)}
*/ */
public InternalScanner preCompactScannerOpen(final Store store, public InternalScanner preCompactScannerOpen(final Store store,
final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs, final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
final CompactionRequest request) throws IOException { final CompactionRequest request, final long readPoint) throws IOException {
return execOperationWithResult(null, return execOperationWithResult(null,
coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() { coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
@Override @Override
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException { throws IOException {
setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType, setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
earliestPutTs, getResult(), request)); earliestPutTs, getResult(), request, readPoint));
} }
}); });
} }
@ -649,16 +650,16 @@ public class RegionCoprocessorHost
/** /**
* See * See
* {@link RegionObserver#preFlushScannerOpen(ObserverContext, * {@link RegionObserver#preFlushScannerOpen(ObserverContext,
* Store, KeyValueScanner, InternalScanner)} * Store, KeyValueScanner, InternalScanner, long)}
*/ */
public InternalScanner preFlushScannerOpen(final Store store, public InternalScanner preFlushScannerOpen(final Store store,
final KeyValueScanner memstoreScanner) throws IOException { final KeyValueScanner memstoreScanner, final long readPoint) throws IOException {
return execOperationWithResult(null, return execOperationWithResult(null,
coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() { coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
@Override @Override
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException { throws IOException {
setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult())); setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult(), readPoint));
} }
}); });
} }

View File

@ -83,7 +83,8 @@ abstract class StoreFlusher {
long smallestReadPoint) throws IOException { long smallestReadPoint) throws IOException {
InternalScanner scanner = null; InternalScanner scanner = null;
if (store.getCoprocessorHost() != null) { if (store.getCoprocessorHost() != null) {
scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanner); scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanner,
smallestReadPoint);
} }
if (scanner == null) { if (scanner == null) {
Scan scan = new Scan(); Scan scan = new Scan();

View File

@ -286,7 +286,8 @@ public abstract class Compactor<T extends CellSink> {
try { try {
/* Include deletes, unless we are doing a major compaction */ /* Include deletes, unless we are doing a major compaction */
ScanType scanType = scannerFactory.getScanType(request); ScanType scanType = scannerFactory.getScanType(request);
scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user); scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user,
smallestReadPoint);
if (scanner == null) { if (scanner == null) {
scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint); scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint);
} }
@ -337,24 +338,25 @@ public abstract class Compactor<T extends CellSink> {
* @param earliestPutTs Earliest put ts. * @param earliestPutTs Earliest put ts.
* @param scanners File scanners for compaction files. * @param scanners File scanners for compaction files.
* @param user the User * @param user the User
* @param readPoint the read point to help create scanner by Coprocessor if required.
* @return Scanner override by coprocessor; null if not overriding. * @return Scanner override by coprocessor; null if not overriding.
*/ */
protected InternalScanner preCreateCoprocScanner(final CompactionRequest request, protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
final ScanType scanType, final long earliestPutTs, final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs, final List<StoreFileScanner> scanners,
User user) throws IOException { User user, final long readPoint) throws IOException {
if (store.getCoprocessorHost() == null) { if (store.getCoprocessorHost() == null) {
return null; return null;
} }
if (user == null) { if (user == null) {
return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType, return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType,
earliestPutTs, request); earliestPutTs, request, readPoint);
} else { } else {
try { try {
return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() { return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
@Override @Override
public InternalScanner run() throws Exception { public InternalScanner run() throws Exception {
return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, return store.getCoprocessorHost().preCompactScannerOpen(store, scanners,
scanType, earliestPutTs, request); scanType, earliestPutTs, request, readPoint);
} }
}); });
} catch (InterruptedException ie) { } catch (InterruptedException ie) {