HBASE-19047 CP exposed Scanner types should not extend Shipper.
This commit is contained in:
parent
281bbc40c5
commit
afcaa8747f
|
@ -389,10 +389,8 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
|
|||
if (region.getCoprocessorHost() == null) {
|
||||
scanner = region.getScanner(scan);
|
||||
} else {
|
||||
scanner = region.getCoprocessorHost().preScannerOpen(scan);
|
||||
if (scanner == null) {
|
||||
scanner = region.getScanner(scan);
|
||||
}
|
||||
region.getCoprocessorHost().preScannerOpen(scan);
|
||||
scanner = region.getScanner(scan);
|
||||
scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
|
||||
}
|
||||
if (scanner == null) {
|
||||
|
|
|
@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
|||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
|
||||
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -170,12 +169,11 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
final Scan scan, final RegionScanner s) throws IOException {
|
||||
public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
final Scan scan) throws IOException {
|
||||
if (countOfOpen.incrementAndGet() == 2) { //slowdown openScanner randomly
|
||||
slowdownCode(e);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -733,8 +733,6 @@ public interface RegionObserver {
|
|||
/**
|
||||
* Called before the client opens a new scanner.
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#complete to skip any subsequent chained
|
||||
* coprocessors
|
||||
* <p>
|
||||
|
@ -742,13 +740,9 @@ public interface RegionObserver {
|
|||
* invocation. If need a Cell reference for later use, copy the cell and use that.
|
||||
* @param c the environment provided by the region server
|
||||
* @param scan the Scan specification
|
||||
* @param s if not null, the base scanner
|
||||
* @return an RegionScanner instance to use instead of the base scanner if
|
||||
* overriding default behavior, null otherwise
|
||||
*/
|
||||
default RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
|
||||
RegionScanner s) throws IOException {
|
||||
return s;
|
||||
default void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -2836,17 +2836,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@Override
|
||||
public RegionScanner getScanner(Scan scan) throws IOException {
|
||||
public RegionScannerImpl getScanner(Scan scan) throws IOException {
|
||||
return getScanner(scan, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners)
|
||||
public RegionScannerImpl getScanner(Scan scan, List<KeyValueScanner> additionalScanners)
|
||||
throws IOException {
|
||||
return getScanner(scan, additionalScanners, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||
}
|
||||
|
||||
private RegionScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners,
|
||||
private RegionScannerImpl getScanner(Scan scan, List<KeyValueScanner> additionalScanners,
|
||||
long nonceGroup, long nonce) throws IOException {
|
||||
startRegionOperation(Operation.SCAN);
|
||||
try {
|
||||
|
@ -2873,7 +2873,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
HConstants.NO_NONCE);
|
||||
}
|
||||
|
||||
protected RegionScanner instantiateRegionScanner(Scan scan,
|
||||
protected RegionScannerImpl instantiateRegionScanner(Scan scan,
|
||||
List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException {
|
||||
if (scan.isReversed()) {
|
||||
if (scan.getFilter() != null) {
|
||||
|
@ -5866,7 +5866,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
/**
|
||||
* RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
|
||||
*/
|
||||
class RegionScannerImpl implements RegionScanner, org.apache.hadoop.hbase.ipc.RpcCallback {
|
||||
class RegionScannerImpl
|
||||
implements RegionScanner, Shipper, org.apache.hadoop.hbase.ipc.RpcCallback {
|
||||
// Package local for testability
|
||||
KeyValueHeap storeHeap = null;
|
||||
/** Heap of key-values that are not essential for the provided filters and are thus read
|
||||
|
|
|
@ -348,18 +348,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
private class RegionScannerShippedCallBack implements RpcCallback {
|
||||
|
||||
private final String scannerName;
|
||||
private final RegionScanner scanner;
|
||||
private final Shipper shipper;
|
||||
private final Lease lease;
|
||||
|
||||
public RegionScannerShippedCallBack(String scannerName, RegionScanner scanner, Lease lease) {
|
||||
public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) {
|
||||
this.scannerName = scannerName;
|
||||
this.scanner = scanner;
|
||||
this.shipper = shipper;
|
||||
this.lease = lease;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
this.scanner.shipped();
|
||||
this.shipper.shipped();
|
||||
// We're done. On way out re-add the above removed lease. The lease was temp removed for this
|
||||
// Rpc call and we are at end of the call now. Time to add it back.
|
||||
if (scanners.containsKey(scannerName)) {
|
||||
|
@ -1332,11 +1332,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
return lastBlock;
|
||||
}
|
||||
|
||||
private RegionScannerHolder addScanner(String scannerName, RegionScanner s, HRegion r,
|
||||
boolean needCursor) throws LeaseStillHeldException {
|
||||
private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper,
|
||||
HRegion r, boolean needCursor) throws LeaseStillHeldException {
|
||||
Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
|
||||
new ScannerListener(scannerName));
|
||||
RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, s, lease);
|
||||
RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease);
|
||||
RpcCallback closeCallback;
|
||||
if (s instanceof RpcCallback) {
|
||||
closeCallback = (RpcCallback) s;
|
||||
|
@ -2470,7 +2470,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
|
||||
scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
|
||||
}
|
||||
RegionScanner scanner = null;
|
||||
RegionScannerImpl scanner = null;
|
||||
try {
|
||||
scanner = region.getScanner(scan);
|
||||
scanner.next(results);
|
||||
|
@ -2481,8 +2481,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
// RpcCallContext. The rpc callback will take care of closing the
|
||||
// scanner, for eg in case
|
||||
// of get()
|
||||
assert scanner instanceof org.apache.hadoop.hbase.ipc.RpcCallback;
|
||||
context.setCallBack((RegionScannerImpl) scanner);
|
||||
context.setCallBack(scanner);
|
||||
} else {
|
||||
// The call is from multi() where the results from the get() are
|
||||
// aggregated and then send out to the
|
||||
|
@ -2898,13 +2897,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
scan.addFamily(family);
|
||||
}
|
||||
}
|
||||
RegionScanner scanner = null;
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
scanner = region.getCoprocessorHost().preScannerOpen(scan);
|
||||
}
|
||||
if (scanner == null) {
|
||||
scanner = region.getScanner(scan);
|
||||
// preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a
|
||||
// wrapper for the core created RegionScanner
|
||||
region.getCoprocessorHost().preScannerOpen(scan);
|
||||
}
|
||||
RegionScannerImpl coreScanner = region.getScanner(scan);
|
||||
Shipper shipper = coreScanner;
|
||||
RegionScanner scanner = coreScanner;
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
|
||||
}
|
||||
|
@ -2913,7 +2913,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
builder.setMvccReadPoint(scanner.getMvccReadPoint());
|
||||
builder.setTtl(scannerLeaseTimeoutPeriod);
|
||||
String scannerName = String.valueOf(scannerId);
|
||||
return addScanner(scannerName, scanner, region, scan.isNeedCursorResult());
|
||||
return addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult());
|
||||
}
|
||||
|
||||
private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
|
||||
|
|
|
@ -1150,18 +1150,15 @@ public class RegionCoprocessorHost
|
|||
|
||||
/**
|
||||
* @param scan the Scan specification
|
||||
* @return scanner id to return to client if default operation should be
|
||||
* bypassed, null otherwise
|
||||
* @exception IOException Exception
|
||||
*/
|
||||
public RegionScanner preScannerOpen(final Scan scan) throws IOException {
|
||||
return execOperationWithResult(true, null, coprocEnvironments.isEmpty() ? null :
|
||||
new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter) {
|
||||
@Override
|
||||
public RegionScanner call(RegionObserver observer) throws IOException {
|
||||
return observer.preScannerOpen(this, scan, getResult());
|
||||
}
|
||||
});
|
||||
public void preScannerOpen(final Scan scan) throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
|
||||
@Override
|
||||
public void call(RegionObserver observer) throws IOException {
|
||||
observer.preScannerOpen(this, scan);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.yetus.audience.InterfaceStability;
|
|||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
|
||||
@InterfaceStability.Evolving
|
||||
public interface RegionScanner extends InternalScanner, Shipper {
|
||||
public interface RegionScanner extends InternalScanner {
|
||||
/**
|
||||
* @return The RegionInfo for this scanner.
|
||||
*/
|
||||
|
@ -115,13 +115,4 @@ public interface RegionScanner extends InternalScanner, Shipper {
|
|||
*/
|
||||
boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Empty implementation to provide compatibility for user migrating from 1.X
|
||||
* @see <a href="https://issues.apache.org/jira/browse/HBASE-16626">HBASE-16626</a>
|
||||
*/
|
||||
@Override
|
||||
default void shipped() throws IOException {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2104,10 +2104,9 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
|
|||
}
|
||||
|
||||
@Override
|
||||
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
final Scan scan, final RegionScanner s) throws IOException {
|
||||
public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan)
|
||||
throws IOException {
|
||||
internalPreRead(c, scan, OpType.SCAN);
|
||||
return s;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -534,14 +534,14 @@ public class VisibilityController implements MasterCoprocessor, RegionCoprocesso
|
|||
}
|
||||
|
||||
@Override
|
||||
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
|
||||
RegionScanner s) throws IOException {
|
||||
public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan)
|
||||
throws IOException {
|
||||
if (!initialized) {
|
||||
throw new VisibilityControllerNotReadyException("VisibilityController not yet initialized!");
|
||||
}
|
||||
// Nothing to do if authorization is not enabled
|
||||
if (!authorizationEnabled) {
|
||||
return s;
|
||||
return;
|
||||
}
|
||||
Region region = e.getEnvironment().getRegion();
|
||||
Authorizations authorizations = null;
|
||||
|
@ -556,7 +556,7 @@ public class VisibilityController implements MasterCoprocessor, RegionCoprocesso
|
|||
// filtering. Checking visibility labels for META and NAMESPACE table is not needed.
|
||||
TableName table = region.getRegionInfo().getTable();
|
||||
if (table.isSystemTable() && !table.equals(LABELS_TABLE_NAME)) {
|
||||
return s;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -570,7 +570,6 @@ public class VisibilityController implements MasterCoprocessor, RegionCoprocesso
|
|||
scan.setFilter(visibilityLabelFilter);
|
||||
}
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
|||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
|
@ -83,8 +82,8 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
|
|||
}
|
||||
|
||||
@Override
|
||||
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
|
||||
RegionScanner s) throws IOException {
|
||||
public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan)
|
||||
throws IOException {
|
||||
if (e.getEnvironment().getRegionInfo().isMetaRegion()) {
|
||||
int concurrency = CONCURRENCY.incrementAndGet();
|
||||
for (;;) {
|
||||
|
@ -98,7 +97,6 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
|
|||
}
|
||||
Threads.sleepWithoutInterrupt(10);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
|||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
|
@ -74,12 +73,11 @@ public class TestAsyncRegionLocatorTimeout {
|
|||
}
|
||||
|
||||
@Override
|
||||
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
|
||||
RegionScanner s) throws IOException {
|
||||
public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan)
|
||||
throws IOException {
|
||||
if (SLEEP_MS > 0) {
|
||||
Threads.sleepWithoutInterrupt(SLEEP_MS);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1547,11 +1547,6 @@ public class TestBlockEvictionFromClient {
|
|||
public int getBatch() {
|
||||
return delegate.getBatch();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shipped() throws IOException {
|
||||
this.delegate.shipped();
|
||||
}
|
||||
}
|
||||
|
||||
public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver {
|
||||
|
|
|
@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
|||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
|
||||
import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
|
||||
|
@ -156,11 +155,9 @@ public class TestReplicaWithCluster {
|
|||
}
|
||||
|
||||
@Override
|
||||
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
final Scan scan, final RegionScanner s) throws IOException {
|
||||
|
||||
public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
final Scan scan) throws IOException {
|
||||
int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
|
||||
|
||||
// Fail for the primary replica and replica 1
|
||||
if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
|
||||
LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
|
||||
|
@ -169,8 +166,6 @@ public class TestReplicaWithCluster {
|
|||
} else {
|
||||
LOG.info("We're replica region " + replicaId);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -208,8 +203,8 @@ public class TestReplicaWithCluster {
|
|||
}
|
||||
|
||||
@Override
|
||||
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
final Scan scan, final RegionScanner s) throws IOException {
|
||||
public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
final Scan scan) throws IOException {
|
||||
|
||||
int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
|
||||
|
||||
|
@ -238,8 +233,6 @@ public class TestReplicaWithCluster {
|
|||
} else {
|
||||
LOG.info("Scan, We're replica region " + replicaId);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -57,7 +57,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
|
||||
import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
|
@ -125,10 +124,9 @@ public class TestReplicasClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
final Scan scan, final RegionScanner s) throws IOException {
|
||||
public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
final Scan scan) throws IOException {
|
||||
slowdownCode(e);
|
||||
return s;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.io.Reference;
|
|||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
|
||||
import org.apache.hadoop.hbase.regionserver.Region.Operation;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
|
@ -68,8 +67,6 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
|
||||
|
||||
/**
|
||||
* A sample region observer that tests the RegionObserver interface.
|
||||
* It works with TestRegionObserverInterface to provide the test case.
|
||||
|
@ -223,11 +220,9 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
|
|||
}
|
||||
|
||||
@Override
|
||||
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
final Scan scan,
|
||||
final RegionScanner s) throws IOException {
|
||||
public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan)
|
||||
throws IOException {
|
||||
ctPreScannerOpen.incrementAndGet();
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -146,11 +146,6 @@ public class TestCoprocessorInterface {
|
|||
public int getBatch() {
|
||||
return delegate.getBatch();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shipped() throws IOException {
|
||||
this.delegate.shipped();
|
||||
}
|
||||
}
|
||||
|
||||
public static class CoprocessorImpl implements RegionCoprocessor, RegionObserver {
|
||||
|
|
|
@ -24,7 +24,6 @@ import static org.junit.Assert.assertNull;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
|
@ -55,11 +54,9 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
|
|||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||
import org.apache.hadoop.hbase.regionserver.ScannerContext;
|
||||
|
@ -131,9 +128,9 @@ public class TestRegionObserverScannerOpenHook {
|
|||
}
|
||||
|
||||
@Override
|
||||
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
|
||||
RegionScanner s) throws IOException {
|
||||
return c.getEnvironment().getRegion().getScanner(scan.setFilter(new NoDataFilter()));
|
||||
public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan)
|
||||
throws IOException {
|
||||
scan.setFilter(new NoDataFilter());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.TestFromClientSideWithCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||
|
@ -56,9 +55,5 @@ public class NoOpScanPolicyObserver implements RegionCoprocessor, RegionObserver
|
|||
return new DelegatingInternalScanner(scanner);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
|
||||
RegionScanner s) throws IOException {
|
||||
return c.getEnvironment().getRegion().getScanner(scan);
|
||||
}
|
||||
// TODO add for postScannerOpen
|
||||
}
|
||||
|
|
|
@ -2789,12 +2789,12 @@ public class TestHRegion {
|
|||
scan = new Scan();
|
||||
scan.addFamily(fam2);
|
||||
scan.addFamily(fam4);
|
||||
is = (RegionScannerImpl) region.getScanner(scan);
|
||||
assertEquals(1, ((RegionScannerImpl) is).storeHeap.getHeap().size());
|
||||
is = region.getScanner(scan);
|
||||
assertEquals(1, is.storeHeap.getHeap().size());
|
||||
|
||||
scan = new Scan();
|
||||
is = (RegionScannerImpl) region.getScanner(scan);
|
||||
assertEquals(families.length - 1, ((RegionScannerImpl) is).storeHeap.getHeap().size());
|
||||
is = region.getScanner(scan);
|
||||
assertEquals(families.length - 1, is.storeHeap.getHeap().size());
|
||||
} finally {
|
||||
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
||||
this.region = null;
|
||||
|
@ -5688,7 +5688,7 @@ public class TestHRegion {
|
|||
// create a reverse scan
|
||||
Scan scan = new Scan(Bytes.toBytes("19996"));
|
||||
scan.setReversed(true);
|
||||
RegionScanner scanner = region.getScanner(scan);
|
||||
RegionScannerImpl scanner = region.getScanner(scan);
|
||||
|
||||
// flush the cache. This will reset the store scanner
|
||||
region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
|
||||
|
@ -5709,7 +5709,7 @@ public class TestHRegion {
|
|||
// added here
|
||||
if (!assertDone) {
|
||||
StoreScanner current =
|
||||
(StoreScanner) (((RegionScannerImpl) scanner).storeHeap).getCurrentForTesting();
|
||||
(StoreScanner) (scanner.storeHeap).getCurrentForTesting();
|
||||
List<KeyValueScanner> scanners = current.getAllScannersForTesting();
|
||||
assertEquals("There should be only one scanner the store file scanner", 1,
|
||||
scanners.size());
|
||||
|
|
|
@ -121,7 +121,7 @@ public class TestScanWithBloomError {
|
|||
LOG.info("Scanning column set: " + Arrays.toString(colSet));
|
||||
Scan scan = new Scan(ROW_BYTES, ROW_BYTES);
|
||||
addColumnSetToScan(scan, colSet);
|
||||
RegionScannerImpl scanner = (RegionScannerImpl) region.getScanner(scan);
|
||||
RegionScannerImpl scanner = region.getScanner(scan);
|
||||
KeyValueHeap storeHeap = scanner.getStoreHeapForTesting();
|
||||
assertEquals(0, storeHeap.getHeap().size());
|
||||
StoreScanner storeScanner =
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.Cell;
|
|||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.CellComparatorImpl;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
|
@ -465,7 +464,7 @@ public class TestScannerHeartbeatMessages {
|
|||
|
||||
// Instantiate the custom heartbeat region scanners
|
||||
@Override
|
||||
protected RegionScanner instantiateRegionScanner(Scan scan,
|
||||
protected RegionScannerImpl instantiateRegionScanner(Scan scan,
|
||||
List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException {
|
||||
if (scan.isReversed()) {
|
||||
if (scan.getFilter() != null) {
|
||||
|
|
|
@ -91,8 +91,8 @@ public class TestSwitchToStreamRead {
|
|||
|
||||
@Test
|
||||
public void test() throws IOException {
|
||||
try (RegionScanner scanner = REGION.getScanner(new Scan())) {
|
||||
StoreScanner storeScanner = (StoreScanner) ((RegionScannerImpl) scanner)
|
||||
try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) {
|
||||
StoreScanner storeScanner = (StoreScanner) (scanner)
|
||||
.getStoreHeapForTesting().getCurrentForTesting();
|
||||
for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
|
||||
if (kvs instanceof StoreFileScanner) {
|
||||
|
|
|
@ -919,7 +919,7 @@ public class TestWithDisabledAuthorization extends SecureTestUtil {
|
|||
@Override
|
||||
public Object run() throws Exception {
|
||||
ACCESS_CONTROLLER.preScannerOpen(ObserverContextImpl.createAndPrepare(RCP_ENV),
|
||||
new Scan(), mock(RegionScanner.class));
|
||||
new Scan());
|
||||
return null;
|
||||
}
|
||||
}, SUPERUSER, USER_ADMIN, USER_RW, USER_RO, USER_OWNER, USER_CREATE, USER_QUAL, USER_NONE);
|
||||
|
|
|
@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
|
|||
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||
import org.apache.hadoop.hbase.regionserver.ScannerContext;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
|
@ -338,8 +337,8 @@ public class TestCoprocessorScanPolicy {
|
|||
}
|
||||
|
||||
@Override
|
||||
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
|
||||
RegionScanner s) throws IOException {
|
||||
public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan)
|
||||
throws IOException {
|
||||
Region region = c.getEnvironment().getRegion();
|
||||
TableName tableName = region.getTableDescriptor().getTableName();
|
||||
Long ttl = this.ttls.get(tableName);
|
||||
|
@ -350,7 +349,6 @@ public class TestCoprocessorScanPolicy {
|
|||
if (version != null) {
|
||||
scan.readVersions(version);
|
||||
}
|
||||
return region.getScanner(scan);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue