HBASE-7712 Pass ScanType into preCompact() (Ted Yu)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1440251 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fee7668b6c
commit
bd154b16c4
|
@ -140,7 +140,8 @@ public abstract class BaseRegionObserver implements RegionObserver {
|
|||
|
||||
@Override
|
||||
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
final HStore store, final InternalScanner scanner) throws IOException {
|
||||
final HStore store, final InternalScanner scanner, final ScanType scanType)
|
||||
throws IOException {
|
||||
return scanner;
|
||||
}
|
||||
|
||||
|
|
|
@ -167,12 +167,14 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param store the store being compacted
|
||||
* @param scanner the scanner over existing data used in the store file
|
||||
* rewriting
|
||||
* @param scanType type of Scan
|
||||
* @return the scanner to use during compaction. Should not be {@code null}
|
||||
* unless the implementation is writing new store files on its own.
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
final HStore store, final InternalScanner scanner) throws IOException;
|
||||
final HStore store, final InternalScanner scanner,
|
||||
final ScanType scanType) throws IOException;
|
||||
|
||||
/**
|
||||
* Called prior to writing the {@link StoreFile}s selected for compaction into
|
||||
|
|
|
@ -133,17 +133,17 @@ class Compactor extends Configured {
|
|||
.preCompactScannerOpen(store, scanners,
|
||||
majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs);
|
||||
}
|
||||
ScanType scanType = majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT;
|
||||
if (scanner == null) {
|
||||
Scan scan = new Scan();
|
||||
scan.setMaxVersions(store.getFamily().getMaxVersions());
|
||||
/* Include deletes, unless we are doing a major compaction */
|
||||
scanner = new StoreScanner(store, store.scanInfo, scan, scanners,
|
||||
majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
|
||||
smallestReadPoint, earliestPutTs);
|
||||
scanType, smallestReadPoint, earliestPutTs);
|
||||
}
|
||||
if (store.getHRegion().getCoprocessorHost() != null) {
|
||||
InternalScanner cpScanner =
|
||||
store.getHRegion().getCoprocessorHost().preCompact(store, scanner);
|
||||
store.getHRegion().getCoprocessorHost().preCompact(store, scanner, scanType);
|
||||
// NULL scanner returned from coprocessor hooks means skip normal processing
|
||||
if (cpScanner == null) {
|
||||
return null;
|
||||
|
|
|
@ -425,9 +425,11 @@ public class RegionCoprocessorHost
|
|||
* Called prior to rewriting the store files selected for compaction
|
||||
* @param store the store being compacted
|
||||
* @param scanner the scanner used to read store data during compaction
|
||||
* @param scanType type of Scan
|
||||
* @throws IOException
|
||||
*/
|
||||
public InternalScanner preCompact(HStore store, InternalScanner scanner) throws IOException {
|
||||
public InternalScanner preCompact(HStore store, InternalScanner scanner,
|
||||
ScanType scanType) throws IOException {
|
||||
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
|
||||
boolean bypass = false;
|
||||
for (RegionEnvironment env: coprocessors) {
|
||||
|
@ -435,7 +437,7 @@ public class RegionCoprocessorHost
|
|||
ctx = ObserverContext.createAndPrepare(env, ctx);
|
||||
try {
|
||||
scanner = ((RegionObserver)env.getInstance()).preCompact(
|
||||
ctx, store, scanner);
|
||||
ctx, store, scanner, scanType);
|
||||
} catch (Throwable e) {
|
||||
handleCoprocessorThrowable(env,e);
|
||||
}
|
||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
|
|||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.security.AccessDeniedException;
|
||||
|
@ -765,7 +766,8 @@ public class AccessController extends BaseRegionObserver
|
|||
|
||||
@Override
|
||||
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
final HStore store, final InternalScanner scanner) throws IOException {
|
||||
final HStore store, final InternalScanner scanner, final ScanType scanType)
|
||||
throws IOException {
|
||||
requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
|
||||
return scanner;
|
||||
}
|
||||
|
|
|
@ -186,7 +186,7 @@ public class SimpleRegionObserver extends BaseRegionObserver {
|
|||
|
||||
@Override
|
||||
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
HStore store, InternalScanner scanner) {
|
||||
HStore store, InternalScanner scanner, ScanType scanType) {
|
||||
hadPreCompact = true;
|
||||
return scanner;
|
||||
}
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.regionserver.HStore;
|
|||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||
import org.apache.hadoop.hbase.regionserver.SplitTransaction;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -186,7 +187,7 @@ public class TestCoprocessorInterface extends HBaseTestCase {
|
|||
}
|
||||
@Override
|
||||
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
HStore store, InternalScanner scanner) {
|
||||
HStore store, InternalScanner scanner, ScanType scanType) {
|
||||
preCompactCalled = true;
|
||||
return scanner;
|
||||
}
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
|
|||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -315,7 +316,7 @@ public class TestRegionObserverInterface {
|
|||
|
||||
@Override
|
||||
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
HStore store, final InternalScanner scanner) {
|
||||
HStore store, final InternalScanner scanner, final ScanType scanType) {
|
||||
return new InternalScanner() {
|
||||
@Override
|
||||
public boolean next(List<KeyValue> results) throws IOException {
|
||||
|
|
|
@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.CheckPermi
|
|||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||
import org.apache.hadoop.hbase.security.AccessDeniedException;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.access.Permission.Action;
|
||||
|
@ -567,7 +568,8 @@ public class TestAccessController {
|
|||
public void testCompact() throws Exception {
|
||||
PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
|
||||
public Object run() throws Exception {
|
||||
ACCESS_CONTROLLER.preCompact(ObserverContext.createAndPrepare(RCP_ENV, null), null, null);
|
||||
ACCESS_CONTROLLER.preCompact(ObserverContext.createAndPrepare(RCP_ENV, null), null, null,
|
||||
ScanType.MINOR_COMPACT);
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue