HBASE-17489 ClientScanner may send a next request to a RegionScanner which has been exhausted

This commit is contained in:
zhangduo 2017-01-22 10:02:29 +08:00
parent c64236584b
commit 57409371a0
4 changed files with 594 additions and 531 deletions

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
@ -27,8 +29,6 @@ import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue.MetaComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
@ -37,9 +37,11 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue.MetaComparator;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@ -48,12 +50,9 @@ import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.annotations.VisibleForTesting;
/**
* Implements the scanner interface for the HBase client.
* If there are multiple regions in a table, this scanner will iterate
* through them all.
* Implements the scanner interface for the HBase client. If there are multiple regions in a table,
* this scanner will iterate through them all.
*/
@InterfaceAudience.Private
public class ClientScanner extends AbstractClientScanner {
@ -236,15 +235,13 @@ public class ClientScanner extends AbstractClientScanner {
return false; // unlikely.
}
private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException {
// If we have just switched replica, don't go to the next scanner yet. Rather, try
// the scanner operations on the new replica, from the right point in the scan
// Note that when we switched to a different replica we left it at a point
// where we just did the "openScanner" with the appropriate startrow
if (callable != null && callable.switchedToADifferentReplica()) return true;
return nextScanner(nbRows, done);
protected final void closeScanner() throws IOException {
if (this.callable != null) {
this.callable.setClose();
call(callable, caller, scannerTimeout);
this.callable = null;
}
}
/*
* Gets a scanner for the next region. If this.currentRegion != null, then we will move to the
* endrow of this.currentRegion. Else we will get scanner at the scan.getStartRow(). We will go no
@ -255,11 +252,7 @@ public class ClientScanner extends AbstractClientScanner {
*/
protected boolean nextScanner(int nbRows, final boolean done) throws IOException {
// Close the previous scanner if it's open
if (this.callable != null) {
this.callable.setClose();
call(callable, caller, scannerTimeout);
this.callable = null;
}
closeScanner();
// Where to start the next scanner
byte[] localStartKey;
@ -376,6 +369,37 @@ public class ClientScanner extends AbstractClientScanner {
return cache != null ? cache.size() : 0;
}
private boolean regionExhausted(Result[] values) {
// This means the server tells us the whole scan operation is done. Usually decided by filter.
if (values == null) {
return true;
}
// Not a heartbeat message and we get nothing, this means the region is exhausted
if (values.length == 0 && !callable.isHeartbeatMessage()) {
return true;
}
// Server tells us that it has no more results for this region. Notice that this flag is get
// from the ScanResponse.getMoreResultsInRegion, not ScanResponse.getMoreResults. If the latter
// one is false then we will get a null values and quit in the first condition of this method.
if (callable.hasMoreResultsContext() && !callable.getServerHasMoreResults()) {
return true;
}
return false;
}
private void closeScannerIfExhausted(boolean exhausted) throws IOException {
if (exhausted) {
if (!partialResults.isEmpty()) {
// XXX: continue if there are partial results. But in fact server should not set
// hasMoreResults to false if there are partial results.
LOG.warn("Server tells us there is no more results for this region but we still have"
+ " partialResults, this should not happen, retry on the current scanner anyway");
} else {
closeScanner();
}
}
}
/**
* Contact the servers to load more {@link Result}s in the cache.
*/
@ -383,17 +407,18 @@ public class ClientScanner extends AbstractClientScanner {
Result[] values = null;
long remainingResultSize = maxScannerResultSize;
int countdown = this.caching;
// This is possible if we just stopped at the boundary of a region in the previous call.
if (callable == null) {
if (!nextScanner(countdown, false)) {
return;
}
}
// We need to reset it if it's a new callable that was created with a countdown in nextScanner
callable.setCaching(this.caching);
// This flag is set when we want to skip the result returned. We do
// this when we reset scanner because it split under us.
boolean retryAfterOutOfOrderException = true;
// We don't expect that the server will have more results for us if
// it doesn't tell us otherwise. We rely on the size or count of results
boolean serverHasMoreResults = false;
boolean allResultsSkipped = false;
do {
allResultsSkipped = false;
for (;;) {
try {
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
@ -439,7 +464,7 @@ public class ClientScanner extends AbstractClientScanner {
// Reset the startRow to the row we've seen last so that the new scanner starts at
// the correct row. Otherwise we may see previously returned rows again.
// (ScannerCallable by now has "relocated" the correct region)
if (!this.lastResult.isPartial() && scan.getBatch() < 0 ) {
if (!this.lastResult.isPartial() && scan.getBatch() < 0) {
if (scan.isReversed()) {
scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
} else {
@ -464,7 +489,10 @@ public class ClientScanner extends AbstractClientScanner {
// Set this to zero so we don't try and do an rpc and close on remote server when
// the exception we got was UnknownScanner or the Server is going down.
callable = null;
// This continue will take us to while at end of loop where we will set up new scanner.
// reopen the scanner
if (!nextScanner(countdown, false)) {
break;
}
continue;
}
long currentTime = System.currentTimeMillis();
@ -489,20 +517,16 @@ public class ClientScanner extends AbstractClientScanner {
}
countdown--;
this.lastResult = rs;
if (this.lastResult.isPartial() || scan.getBatch() > 0 ) {
if (this.lastResult.isPartial() || scan.getBatch() > 0) {
updateLastCellLoadedToCache(this.lastResult);
} else {
this.lastCellLoadedToCache = null;
}
}
if (cache.isEmpty()) {
// all result has been seen before, we need scan more.
allResultsSkipped = true;
continue;
}
}
boolean exhausted = regionExhausted(values);
if (callable.isHeartbeatMessage()) {
if (cache.size() > 0) {
if (!cache.isEmpty()) {
// Caller of this method just wants a Result. If we see a heartbeat message, it means
// processing of the scan is taking a long time server side. Rather than continue to
// loop until a limit (e.g. size or caching) is reached, break out early to avoid causing
@ -511,39 +535,40 @@ public class ClientScanner extends AbstractClientScanner {
LOG.trace("Heartbeat message received and cache contains Results."
+ " Breaking out of scan loop");
}
// we know that the region has not been exhausted yet so just break without calling
// closeScannerIfExhausted
break;
}
}
if (countdown <= 0) {
// we have enough result.
closeScannerIfExhausted(exhausted);
break;
}
if (remainingResultSize <= 0) {
if (!cache.isEmpty()) {
closeScannerIfExhausted(exhausted);
break;
} else {
// we have reached the max result size but we still can not find anything to return to the
// user. Reset the maxResultSize and try again.
remainingResultSize = maxScannerResultSize;
}
}
// we are done with the current region
if (exhausted) {
if (!partialResults.isEmpty()) {
// XXX: continue if there are partial results. But in fact server should not set
// hasMoreResults to false if there are partial results.
LOG.warn("Server tells us there is no more results for this region but we still have"
+ " partialResults, this should not happen, retry on the current scanner anyway");
continue;
}
// We expect that the server won't have more results for us when we exhaust
// the size (bytes or count) of the results returned. If the server *does* inform us that
// there are more results, we want to avoid possiblyNextScanner(...). Only when we actually
// get results is the moreResults context valid.
if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
// Only adhere to more server results when we don't have any partialResults
// as it keeps the outer loop logic the same.
serverHasMoreResults = callable.getServerHasMoreResults() && partialResults.isEmpty();
if (!nextScanner(countdown, values == null)) {
break;
}
}
// Values == null means server-side filter has determined we must STOP
// !partialResults.isEmpty() means that we are still accumulating partial Results for a
// row. We should not change scanners before we receive all the partial Results for that
// row.
} while (allResultsSkipped || (callable != null && callable.isHeartbeatMessage())
|| (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
&& (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))));
}
/**
* @param remainingResultSize
* @param remainingRows
* @param regionHasMoreResults
* @return true when the current region has been exhausted. When the current region has been
* exhausted, the region must be changed before scanning can continue
*/
private boolean doneWithRegion(long remainingResultSize, int remainingRows,
boolean regionHasMoreResults) {
return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults;
}
/**
@ -559,9 +584,8 @@ public class ClientScanner extends AbstractClientScanner {
* @return the list of results that should be added to the cache.
* @throws IOException
*/
protected List<Result>
getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage)
throws IOException {
protected List<Result> getResultsToAddToCache(Result[] resultsFromServer,
boolean heartbeatMessage) throws IOException {
int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
@ -781,8 +805,8 @@ public class ClientScanner extends AbstractClientScanner {
}
/**
* Compare two Cells considering reversed scanner.
* ReversedScanner only reverses rows, not columns.
* Compare two Cells considering reversed scanner. ReversedScanner only reverses rows, not
* columns.
*/
private int compare(Cell a, Cell b) {
int r = 0;

View File

@ -61,13 +61,7 @@ public class ReversedClientScanner extends ClientScanner {
protected boolean nextScanner(int nbRows, final boolean done)
throws IOException {
// Close the previous scanner if it's open
if (this.callable != null) {
this.callable.setClose();
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries
this.caller.callWithoutRetries(callable, scannerTimeout);
this.callable = null;
}
closeScanner();
// Where to start the next scanner
byte[] localStartKey;

View File

@ -150,7 +150,8 @@ public class TestClientScanner {
ScannerCallableWithReplicas.class);
switch (count) {
case 0: // initialize
case 2: // close
case 2: // detect no more results
case 3: // close
count++;
return null;
case 1:
@ -176,8 +177,10 @@ public class TestClientScanner {
scanner.loadCache();
// One more call due to initializeScannerInConstruction()
inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
// One for initializeScannerInConstruction()
// One for fetching the results
// One for fetching null results and quit as we do not have moreResults hint.
inOrder.verify(caller, Mockito.times(3)).callWithoutRetries(
Mockito.any(RetryingCallable.class), Mockito.anyInt());
assertEquals(1, scanner.cache.size());
@ -216,7 +219,8 @@ public class TestClientScanner {
case 1:
count++;
callable.setHasMoreResultsContext(true);
callable.setServerHasMoreResults(false);
// if we set false here the implementation will trigger a close
callable.setServerHasMoreResults(true);
return results;
default:
throw new RuntimeException("Expected only 2 invocations");
@ -283,7 +287,8 @@ public class TestClientScanner {
case 1:
count++;
callable.setHasMoreResultsContext(true);
callable.setServerHasMoreResults(false);
// if we set false here the implementation will trigger a close
callable.setServerHasMoreResults(true);
return results;
default:
throw new RuntimeException("Expected only 2 invocations");
@ -462,13 +467,14 @@ public class TestClientScanner {
Mockito.anyInt());
InOrder inOrder = Mockito.inOrder(caller);
scanner.setRpcFinished(true);
scanner.loadCache();
inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
inOrder.verify(caller, Mockito.times(3)).callWithoutRetries(
Mockito.any(RetryingCallable.class), Mockito.anyInt());
assertEquals(1, scanner.cache.size());
assertEquals(2, scanner.cache.size());
Result r = scanner.cache.poll();
assertNotNull(r);
CellScanner cs = r.cellScanner();
@ -476,15 +482,6 @@ public class TestClientScanner {
assertEquals(kv1, cs.current());
assertFalse(cs.advance());
scanner.setRpcFinished(true);
inOrder = Mockito.inOrder(caller);
scanner.loadCache();
inOrder.verify(caller, Mockito.times(3)).callWithoutRetries(
Mockito.any(RetryingCallable.class), Mockito.anyInt());
r = scanner.cache.poll();
assertNotNull(r);
cs = r.cellScanner();

View File

@ -18,6 +18,13 @@
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import com.google.protobuf.TextFormat;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
@ -35,8 +42,10 @@ import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.mutable.MutableObject;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -91,7 +100,6 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.ipc.TimeLimitedRpcController;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@ -155,8 +163,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResul
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
@ -174,7 +182,6 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
@ -184,18 +191,12 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.zookeeper.KeeperException;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import com.google.protobuf.TextFormat;
/**
* Implements the regionserver RPC services.
*/
@ -248,8 +249,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private final PriorityFunction priority;
private final AtomicLong scannerIdGen = new AtomicLong(0L);
private final ConcurrentHashMap<String, RegionScannerHolder> scanners =
new ConcurrentHashMap<String, RegionScannerHolder>();
private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
/**
* The lease timeout period for client scanners (milliseconds).
@ -267,28 +267,28 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private final long minimumScanTimeLimitDelta;
/**
* Holder class which holds the RegionScanner and nextCallSeq together.
* Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
*/
private static class RegionScannerHolder {
private AtomicLong nextCallSeq = new AtomicLong(0);
private RegionScanner s;
private Region r;
private static final class RegionScannerHolder {
public RegionScannerHolder(RegionScanner s, Region r) {
private final AtomicLong nextCallSeq = new AtomicLong(0);
private final String scannerName;
private final RegionScanner s;
private final Region r;
public RegionScannerHolder(String scannerName, RegionScanner s, Region r) {
this.scannerName = scannerName;
this.s = s;
this.r = r;
}
private long getNextCallSeq() {
public long getNextCallSeq() {
return nextCallSeq.get();
}
private void incNextCallSeq() {
nextCallSeq.incrementAndGet();
}
private void rollbackNextCallSeq() {
nextCallSeq.decrementAndGet();
public boolean incNextCallSeq(long currentSeq) {
// Use CAS to prevent multiple scan request running on the same scanner.
return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
}
}
@ -405,7 +405,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
private void addResults(final ScanResponse.Builder builder, final List<Result> results,
final RpcController controller, boolean isDefaultRegion) {
final PayloadCarryingRpcController controller, boolean isDefaultRegion) {
builder.setStale(!isDefaultRegion);
if (results == null || results.isEmpty()) return;
if (isClientCellBlockSupport()) {
@ -413,10 +413,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
builder.addCellsPerResult(res.size());
builder.addPartialFlagPerResult(res.isPartial());
}
((PayloadCarryingRpcController)controller).
setCellScanner(CellUtil.createCellScanner(results));
controller.setCellScanner(CellUtil.createCellScanner(results));
} else {
for (Result res: results) {
for (Result res : results) {
ClientProtos.Result pbr = ProtobufUtil.toResult(res);
builder.addResults(pbr);
}
@ -1054,6 +1053,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
public
RegionScanner getScanner(long scannerId) {
String scannerIdString = Long.toString(scannerId);
RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
@ -1087,19 +1087,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return 0L;
}
long addScanner(RegionScanner s, Region r) throws LeaseStillHeldException {
long scannerId = this.scannerIdGen.incrementAndGet();
String scannerName = String.valueOf(scannerId);
RegionScannerHolder existing =
scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
new ScannerListener(scannerName));
return scannerId;
}
/**
* Method to account for the size of retained cells and retained data blocks.
* @return an object that represents the last referenced block from this response.
@ -1124,6 +1111,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return lastBlock;
}
private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r)
throws LeaseStillHeldException {
regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
new ScannerListener(scannerName));
RegionScannerHolder rsh = new RegionScannerHolder(scannerName, s, r);
RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
return rsh;
}
/**
* Find the HRegion based on a region specifier
@ -2410,74 +2406,28 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
/**
* Scan data in a table.
*
* @param controller the RPC controller
* @param request the scan request
* @throws ServiceException
*/
// This is used to keep compatible with the old client implementation. Consider remove it if we
// decide to drop the support of the client that still sends close request to a region scanner
// which has already been exhausted.
@Deprecated
private static final IOException SCANNER_ALREADY_CLOSED = new IOException() {
private static final long serialVersionUID = -4305297078988180130L;
@Override
public ScanResponse scan(final RpcController controller, final ScanRequest request)
throws ServiceException {
OperationQuota quota = null;
Leases.Lease lease = null;
String scannerName = null;
try {
if (!request.hasScannerId() && !request.hasScan()) {
throw new DoNotRetryIOException(
"Missing required input: scannerId or scan");
public Throwable fillInStackTrace() {
return this;
}
long scannerId = -1;
if (request.hasScannerId()) {
scannerId = request.getScannerId();
scannerName = String.valueOf(scannerId);
}
try {
checkOpen();
} catch (IOException e) {
// If checkOpen failed, server not running or filesystem gone,
// cancel this lease; filesystem is gone or we're closing or something.
if (scannerName != null) {
LOG.debug("Server shutting down and client tried to access missing scanner "
+ scannerName);
if (regionServer.leases != null) {
try {
regionServer.leases.cancelLease(scannerName);
} catch (LeaseException le) {
// No problem, ignore
if (LOG.isTraceEnabled()) {
LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
}
}
}
}
throw e;
}
requestCount.increment();
rpcScanRequestCount.increment();
};
int ttl = 0;
Region region = null;
RegionScanner scanner = null;
RegionScannerHolder rsh = null;
boolean moreResults = true;
boolean closeScanner = false;
boolean isSmallScan = false;
RpcCallContext context = RpcServer.getCurrentCall();
Object lastBlock = null;
ScanResponse.Builder builder = ScanResponse.newBuilder();
if (request.hasCloseScanner()) {
closeScanner = request.getCloseScanner();
}
int rows = closeScanner ? 0 : 1;
if (request.hasNumberOfRows()) {
rows = request.getNumberOfRows();
}
if (request.hasScannerId()) {
rsh = scanners.get(scannerName);
private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException {
String scannerName = Long.toString(request.getScannerId());
RegionScannerHolder rsh = scanners.get(scannerName);
if (rsh == null) {
// just ignore the close request if scanner does not exists.
if (request.hasCloseScanner() && request.getCloseScanner()) {
throw SCANNER_ALREADY_CLOSED;
} else {
LOG.warn("Client tried to access missing scanner " + scannerName);
throw new UnknownScannerException(
"Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
@ -2487,15 +2437,33 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
+ "possible fix would be increasing the value of"
+ "'hbase.client.scanner.timeout.period' configuration.");
}
scanner = rsh.s;
HRegionInfo hri = scanner.getRegionInfo();
region = regionServer.getRegion(hri.getRegionName());
if (region != rsh.r) { // Yes, should be the same instance
throw new NotServingRegionException("Region was re-opened after the scanner"
+ scannerName + " was created: " + hri.getRegionNameAsString());
}
} else {
region = getRegion(request.getRegion());
HRegionInfo hri = rsh.s.getRegionInfo();
// Yes, should be the same instance
if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) {
String msg = "Region was re-opened after the scanner" + scannerName + " was created: "
+ hri.getRegionNameAsString();
LOG.warn(msg + ", closing...");
scanners.remove(scannerName);
try {
rsh.s.close();
} catch (IOException e) {
LOG.warn("Getting exception closing " + scannerName, e);
} finally {
try {
regionServer.leases.cancelLease(scannerName);
} catch (LeaseException e) {
LOG.warn("Getting exception closing " + scannerName, e);
}
}
throw new NotServingRegionException(msg);
}
return rsh;
}
private Pair<RegionScannerHolder, Boolean> newRegionScanner(ScanRequest request,
ScanResponse.Builder builder) throws IOException {
Region region = getRegion(request.getRegion());
ClientProtos.Scan protoScan = request.getScan();
boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
Scan scan = ProtobufUtil.toScan(protoScan);
@ -2503,15 +2471,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (!isLoadingCfsOnDemandSet) {
scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
}
isSmallScan = scan.isSmall();
if (!scan.hasFamilies()) {
// Adding all families to scanner
for (byte[] family: region.getTableDesc().getFamiliesKeys()) {
for (byte[] family : region.getTableDesc().getFamiliesKeys()) {
scan.addFamily(family);
}
}
RegionScanner scanner = null;
if (region.getCoprocessorHost() != null) {
scanner = region.getCoprocessorHost().preScannerOpen(scan);
}
@ -2521,67 +2487,77 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (region.getCoprocessorHost() != null) {
scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
}
scannerId = addScanner(scanner, region);
scannerName = String.valueOf(scannerId);
ttl = this.scannerLeaseTimeoutPeriod;
long scannerId = this.scannerIdGen.incrementAndGet();
builder.setScannerId(scannerId);
builder.setMvccReadPoint(scanner.getMvccReadPoint());
}
if (request.hasRenew() && request.getRenew()) {
rsh = scanners.get(scannerName);
lease = regionServer.leases.removeLease(scannerName);
if (lease != null && rsh != null) {
regionServer.leases.addLease(lease);
// Increment the nextCallSeq value which is the next expected from client.
rsh.incNextCallSeq();
}
return builder.build();
builder.setTtl(scannerLeaseTimeoutPeriod);
String scannerName = String.valueOf(scannerId);
return Pair.newPair(addScanner(scannerName, scanner, region), scan.isSmall());
}
quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
if (rows > 0) {
private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
throws OutOfOrderScannerNextException {
// if nextCallSeq does not match throw Exception straight away. This needs to be
// performed even before checking of Lease.
// See HBASE-5974
if (request.hasNextCallSeq()) {
if (rsh == null) {
rsh = scanners.get(scannerName);
}
if (rsh != null) {
if (request.getNextCallSeq() != rsh.getNextCallSeq()) {
throw new OutOfOrderScannerNextException(
"Expected nextCallSeq: " + rsh.getNextCallSeq()
+ " But the nextCallSeq got from client: " + request.getNextCallSeq() +
"; request=" + TextFormat.shortDebugString(request));
}
// Increment the nextCallSeq value which is the next expected from client.
rsh.incNextCallSeq();
long callSeq = request.getNextCallSeq();
if (!rsh.incNextCallSeq(callSeq)) {
throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.getNextCallSeq()
+ " But the nextCallSeq got from client: " + request.getNextCallSeq() + "; request="
+ TextFormat.shortDebugString(request));
}
}
}
private void addScannerLeaseBack(Leases.Lease lease) {
try {
// Remove lease while its being processed in server; protects against case
// where processing of request takes > lease expiration time.
lease = regionServer.leases.removeLease(scannerName);
List<Result> results = new ArrayList<Result>();
boolean done = false;
// Call coprocessor. Get region info from scanner.
if (region != null && region.getCoprocessorHost() != null) {
Boolean bypass = region.getCoprocessorHost().preScannerNext(
scanner, results, rows);
if (!results.isEmpty()) {
for (Result r : results) {
lastBlock = addSize(context, r, lastBlock);
}
}
if (bypass != null && bypass.booleanValue()) {
done = true;
regionServer.leases.addLease(lease);
} catch (LeaseStillHeldException e) {
// should not happen as the scanner id is unique.
throw new AssertionError(e);
}
}
if (!done) {
long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
if (maxResultSize <= 0) {
private long getTimeLimit(PayloadCarryingRpcController controller,
boolean allowHeartbeatMessages) {
// Set the time limit to be half of the more restrictive timeout value (one of the
// timeout values must be positive). In the event that both values are positive, the
// more restrictive of the two is used to calculate the limit.
if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
long timeLimitDelta;
if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
} else {
timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
}
if (controller != null && controller.getCallTimeout() > 0) {
timeLimitDelta = Math.min(timeLimitDelta, controller.getCallTimeout());
}
// Use half of whichever timeout value was more restrictive... But don't allow
// the time limit to be less than the allowable minimum (could cause an
// immediatate timeout before scanning any data).
timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
// XXX: Can not use EnvironmentEdge here because TestIncrementTimeRange use a
// ManualEnvironmentEdge. Consider using System.nanoTime instead.
return System.currentTimeMillis() + timeLimitDelta;
}
// Default value of timeLimit is negative to indicate no timeLimit should be
// enforced.
return -1L;
}
// return whether we have more results in region.
private boolean scan(PayloadCarryingRpcController controller, ScanRequest request,
RegionScannerHolder rsh, boolean isSmallScan, long maxQuotaResultSize, int rows,
List<Result> results, ScanResponse.Builder builder, MutableObject lastBlock,
RpcCallContext context) throws IOException {
Region region = rsh.r;
RegionScanner scanner = rsh.s;
long maxResultSize;
if (scanner.getMaxResultSize() > 0) {
maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
} else {
maxResultSize = maxQuotaResultSize;
}
// This is cells inside a row. Default size is 10 so if many versions or many cfs,
@ -2592,7 +2568,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
try {
int i = 0;
long before = EnvironmentEdgeManager.currentTime();
synchronized(scanner) {
synchronized (scanner) {
boolean stale = (region.getRegionInfo().getReplicaId() != 0);
boolean clientHandlesPartials =
request.hasClientHandlesPartials() && request.getClientHandlesPartials();
@ -2618,43 +2594,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// heartbeats AND partials
boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
// Default value of timeLimit is negative to indicate no timeLimit should be
// enforced.
long timeLimit = -1;
// Set the time limit to be half of the more restrictive timeout value (one of the
// timeout values must be positive). In the event that both values are positive, the
// more restrictive of the two is used to calculate the limit.
if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
long timeLimitDelta;
if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
} else {
timeLimitDelta =
scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
}
if (controller instanceof TimeLimitedRpcController) {
TimeLimitedRpcController timeLimitedRpcController =
(TimeLimitedRpcController)controller;
if (timeLimitedRpcController.getCallTimeout() > 0) {
timeLimitDelta = Math.min(timeLimitDelta,
timeLimitedRpcController.getCallTimeout());
}
}
// Use half of whichever timeout value was more restrictive... But don't allow
// the time limit to be less than the allowable minimum (could cause an
// immediatate timeout before scanning any data).
timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
timeLimit = System.currentTimeMillis() + timeLimitDelta;
}
long timeLimit = getTimeLimit(controller, allowHeartbeatMessages);
final LimitScope sizeScope =
allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
final LimitScope timeScope =
allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
boolean trackMetrics =
request.hasTrackScanMetrics() && request.getTrackScanMetrics();
boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
// Configure with limits for this RPC. Set keep progress true since size progress
// towards size limit should be kept between calls to nextRaw
@ -2664,7 +2611,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
contextBuilder.setTimeLimit(timeScope, timeLimit);
contextBuilder.setTrackMetrics(trackMetrics);
ScannerContext scannerContext = contextBuilder.build();
boolean limitReached = false;
while (i < rows) {
// Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
@ -2680,7 +2626,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (!values.isEmpty()) {
final boolean partial = scannerContext.partialResultFormed();
Result r = Result.create(values, null, stale, partial);
lastBlock = addSize(context, r, lastBlock);
lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
results.add(r);
i++;
}
@ -2692,8 +2638,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (limitReached || !moreRows) {
if (LOG.isTraceEnabled()) {
LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: "
+ moreRows + " scannerContext: " + scannerContext);
LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: " + moreRows
+ " scannerContext: " + scannerContext);
}
// We only want to mark a ScanResponse as a heartbeat message in the event that
// there are more values to be read server side. If there aren't more values,
@ -2707,7 +2653,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
values.clear();
}
if (limitReached || moreRows) {
// We stopped prematurely
builder.setMoreResultsInRegion(true);
@ -2743,36 +2688,180 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} finally {
region.closeRegionOperation();
}
// coprocessor postNext hook
if (region != null && region.getCoprocessorHost() != null) {
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
}
return builder.getMoreResultsInRegion();
}
/**
* Scan data in a table.
*
* @param controller the RPC controller
* @param request the scan request
* @throws ServiceException
*/
@Override
public ScanResponse scan(final RpcController controller, final ScanRequest request)
throws ServiceException {
if (controller != null && !(controller instanceof PayloadCarryingRpcController)) {
throw new UnsupportedOperationException(
"We only do PayloadCarryingRpcControllers! FIX IF A PROBLEM: " + controller);
}
if (!request.hasScannerId() && !request.hasScan()) {
throw new ServiceException(
new DoNotRetryIOException("Missing required input: scannerId or scan"));
}
try {
checkOpen();
} catch (IOException e) {
if (request.hasScannerId()) {
String scannerName = Long.toString(request.getScannerId());
if (LOG.isDebugEnabled()) {
LOG.debug(
"Server shutting down and client tried to access missing scanner " + scannerName);
}
if (regionServer.leases != null) {
try {
regionServer.leases.cancelLease(scannerName);
} catch (LeaseException le) {
// No problem, ignore
if (LOG.isTraceEnabled()) {
LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
}
}
}
}
throw new ServiceException(e);
}
requestCount.increment();
rpcScanRequestCount.increment();
RegionScannerHolder rsh;
ScanResponse.Builder builder = ScanResponse.newBuilder();
boolean isSmallScan;
try {
if (request.hasScannerId()) {
rsh = getRegionScanner(request);
isSmallScan = false;
} else {
Pair<RegionScannerHolder, Boolean> pair = newRegionScanner(request, builder);
rsh = pair.getFirst();
isSmallScan = pair.getSecond().booleanValue();
}
} catch (IOException e) {
if (e == SCANNER_ALREADY_CLOSED) {
// Now we will close scanner automatically if there are no more results for this region but
// the old client will still send a close request to us. Just ignore it and return.
return builder.build();
}
throw new ServiceException(e);
}
Region region = rsh.r;
String scannerName = rsh.scannerName;
Leases.Lease lease;
try {
// Remove lease while its being processed in server; protects against case
// where processing of request takes > lease expiration time.
lease = regionServer.leases.removeLease(scannerName);
} catch (LeaseException e) {
throw new ServiceException(e);
}
if (request.hasRenew() && request.getRenew()) {
// add back and return
addScannerLeaseBack(lease);
try {
checkScanNextCallSeq(request, rsh);
} catch (OutOfOrderScannerNextException e) {
throw new ServiceException(e);
}
return builder.build();
}
OperationQuota quota;
try {
quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
} catch (IOException e) {
addScannerLeaseBack(lease);
throw new ServiceException(e);
};
try {
checkScanNextCallSeq(request, rsh);
} catch (OutOfOrderScannerNextException e) {
addScannerLeaseBack(lease);
throw new ServiceException(e);
}
// Now we have increased the next call sequence. If we give client an error, the retry will
// never success. So we'd better close the scanner and return a DoNotRetryIOException to client
// and then client will try to open a new scanner.
boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false;
int rows; // this is scan.getCaching
if (request.hasNumberOfRows()) {
rows = request.getNumberOfRows();
} else {
rows = closeScanner ? 0 : 1;
}
RpcCallContext context = RpcServer.getCurrentCall();
// now let's do the real scan.
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
RegionScanner scanner = rsh.s;
boolean moreResults = true;
boolean moreResultsInRegion = true;
MutableObject lastBlock = new MutableObject();
boolean scannerClosed = false;
try {
List<Result> results = new ArrayList<>();
if (rows > 0) {
boolean done = false;
// Call coprocessor. Get region info from scanner.
if (region.getCoprocessorHost() != null) {
Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
if (!results.isEmpty()) {
for (Result r : results) {
lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
}
}
if (bypass != null && bypass.booleanValue()) {
done = true;
}
}
if (!done) {
moreResultsInRegion = scan((PayloadCarryingRpcController) controller, request, rsh,
isSmallScan, maxQuotaResultSize, rows, results, builder, lastBlock, context);
}
}
quota.addScanResult(results);
// If the scanner's filter - if any - is done with the scan
// and wants to tell the client to stop the scan. This is done by passing
// a null result, and setting moreResults to false.
if (scanner.isFilterDone() && results.isEmpty()) {
// If the scanner's filter - if any - is done with the scan
// only set moreResults to false if the results is empty. This is used to keep compatible
// with the old scan implementation where we just ignore the returned results if moreResults
// is false. Can remove the isEmpty check after we get rid of the old implementation.
moreResults = false;
results = null;
} else {
addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
}
} catch (IOException e) {
addResults(builder, results, (PayloadCarryingRpcController) controller,
RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
if (!moreResults || !moreResultsInRegion || closeScanner) {
scannerClosed = true;
closeScanner(region, scanner, scannerName, context);
}
builder.setMoreResults(moreResults);
return builder.build();
} catch (Exception e) {
try {
// scanner is closed here
scannerClosed = true;
// The scanner state might be left in a dirty state, so we will tell the Client to
// fail this RPC and close the scanner while opening up another one from the start of
// row that the client has last seen.
closeScanner(region, scanner, scannerName);
closeScanner(region, scanner, scannerName, context);
// If it is a CorruptHFileException or a FileNotFoundException, throw the
// DoNotRetryIOException. This can avoid the retry in ClientScanner.
if (e instanceof CorruptHFileException || e instanceof FileNotFoundException) {
throw new DoNotRetryIOException(e);
}
// We closed the scanner already. Instead of throwing the IOException, and client
// retrying with the same scannerId only to get USE on the next RPC, we directly throw
// a special exception to save an RPC.
@ -2784,74 +2873,33 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
+ " scanner state for clients older than 1.3.", e);
}
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
} finally {
// We're done. On way out re-add the above removed lease.
if (!scannerClosed) {
// Adding resets expiration time on lease.
if (scanners.containsKey(scannerName)) {
if (lease != null) regionServer.leases.addLease(lease);
ttl = this.scannerLeaseTimeoutPeriod;
addScannerLeaseBack(lease);
}
}
}
if (!moreResults || closeScanner) {
ttl = 0;
moreResults = false;
closeScanner(region, scanner, scannerName);
}
if (ttl > 0) {
builder.setTtl(ttl);
}
builder.setScannerId(scannerId);
builder.setMoreResults(moreResults);
return builder.build();
} catch (IOException ie) {
if (scannerName != null && ie instanceof NotServingRegionException) {
RegionScannerHolder rsh = scanners.remove(scannerName);
if (rsh != null) {
try {
RegionScanner scanner = rsh.s;
LOG.warn(scannerName + " encountered " + ie.getMessage() + ", closing ...");
scanner.close();
regionServer.leases.cancelLease(scannerName);
} catch (IOException e) {
LOG.warn("Getting exception closing " + scannerName, e);
}
}
}
throw new ServiceException(ie);
} finally {
if (quota != null) {
quota.close();
}
}
}
private boolean closeScanner(Region region, RegionScanner scanner, String scannerName)
throws IOException {
if (region != null && region.getCoprocessorHost() != null) {
private void closeScanner(Region region, RegionScanner scanner, String scannerName,
RpcCallContext context) throws IOException {
if (region.getCoprocessorHost() != null) {
if (region.getCoprocessorHost().preScannerClose(scanner)) {
return true; // bypass
// bypass the actual close.
return;
}
}
RegionScannerHolder rsh = scanners.remove(scannerName);
if (rsh != null) {
scanner = rsh.s;
scanner.close();
try {
regionServer.leases.cancelLease(scannerName);
} catch (LeaseException le) {
// No problem, ignore
if (LOG.isTraceEnabled()) {
LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
}
}
if (region != null && region.getCoprocessorHost() != null) {
rsh.s.close();
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerClose(scanner);
}
}
return false;
}
@Override