From 57409371a063ed4d93292ca6e0b7d5b8866e1ffc Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sun, 22 Jan 2017 10:02:29 +0800 Subject: [PATCH] HBASE-17489 ClientScanner may send a next request to a RegionScanner which has been exhausted --- .../hadoop/hbase/client/ClientScanner.java | 166 ++-- .../hbase/client/ReversedClientScanner.java | 8 +- .../hbase/client/TestClientScanner.java | 29 +- .../hbase/regionserver/RSRpcServices.java | 922 +++++++++--------- 4 files changed, 594 insertions(+), 531 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 0898385bfb3..fb2bc4bcd1c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -27,8 +29,6 @@ import java.util.concurrent.ExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.KeyValue.MetaComparator; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; @@ -37,9 +37,11 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue.MetaComparator; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -48,12 +50,9 @@ import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.Bytes; -import com.google.common.annotations.VisibleForTesting; - /** - * Implements the scanner interface for the HBase client. - * If there are multiple regions in a table, this scanner will iterate - * through them all. + * Implements the scanner interface for the HBase client. If there are multiple regions in a table, + * this scanner will iterate through them all. */ @InterfaceAudience.Private public class ClientScanner extends AbstractClientScanner { @@ -236,15 +235,13 @@ public class ClientScanner extends AbstractClientScanner { return false; // unlikely. } - private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException { - // If we have just switched replica, don't go to the next scanner yet. Rather, try - // the scanner operations on the new replica, from the right point in the scan - // Note that when we switched to a different replica we left it at a point - // where we just did the "openScanner" with the appropriate startrow - if (callable != null && callable.switchedToADifferentReplica()) return true; - return nextScanner(nbRows, done); + protected final void closeScanner() throws IOException { + if (this.callable != null) { + this.callable.setClose(); + call(callable, caller, scannerTimeout); + this.callable = null; + } } - /* * Gets a scanner for the next region. If this.currentRegion != null, then we will move to the * endrow of this.currentRegion. Else we will get scanner at the scan.getStartRow(). We will go no @@ -255,11 +252,7 @@ public class ClientScanner extends AbstractClientScanner { */ protected boolean nextScanner(int nbRows, final boolean done) throws IOException { // Close the previous scanner if it's open - if (this.callable != null) { - this.callable.setClose(); - call(callable, caller, scannerTimeout); - this.callable = null; - } + closeScanner(); // Where to start the next scanner byte[] localStartKey; @@ -376,6 +369,37 @@ public class ClientScanner extends AbstractClientScanner { return cache != null ? cache.size() : 0; } + private boolean regionExhausted(Result[] values) { + // This means the server tells us the whole scan operation is done. Usually decided by filter. + if (values == null) { + return true; + } + // Not a heartbeat message and we get nothing, this means the region is exhausted + if (values.length == 0 && !callable.isHeartbeatMessage()) { + return true; + } + // Server tells us that it has no more results for this region. Notice that this flag is get + // from the ScanResponse.getMoreResultsInRegion, not ScanResponse.getMoreResults. If the latter + // one is false then we will get a null values and quit in the first condition of this method. + if (callable.hasMoreResultsContext() && !callable.getServerHasMoreResults()) { + return true; + } + return false; + } + + private void closeScannerIfExhausted(boolean exhausted) throws IOException { + if (exhausted) { + if (!partialResults.isEmpty()) { + // XXX: continue if there are partial results. But in fact server should not set + // hasMoreResults to false if there are partial results. + LOG.warn("Server tells us there is no more results for this region but we still have" + + " partialResults, this should not happen, retry on the current scanner anyway"); + } else { + closeScanner(); + } + } + } + /** * Contact the servers to load more {@link Result}s in the cache. */ @@ -383,17 +407,18 @@ public class ClientScanner extends AbstractClientScanner { Result[] values = null; long remainingResultSize = maxScannerResultSize; int countdown = this.caching; + // This is possible if we just stopped at the boundary of a region in the previous call. + if (callable == null) { + if (!nextScanner(countdown, false)) { + return; + } + } // We need to reset it if it's a new callable that was created with a countdown in nextScanner callable.setCaching(this.caching); // This flag is set when we want to skip the result returned. We do // this when we reset scanner because it split under us. boolean retryAfterOutOfOrderException = true; - // We don't expect that the server will have more results for us if - // it doesn't tell us otherwise. We rely on the size or count of results - boolean serverHasMoreResults = false; - boolean allResultsSkipped = false; - do { - allResultsSkipped = false; + for (;;) { try { // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just @@ -439,7 +464,7 @@ public class ClientScanner extends AbstractClientScanner { // Reset the startRow to the row we've seen last so that the new scanner starts at // the correct row. Otherwise we may see previously returned rows again. // (ScannerCallable by now has "relocated" the correct region) - if (!this.lastResult.isPartial() && scan.getBatch() < 0 ) { + if (!this.lastResult.isPartial() && scan.getBatch() < 0) { if (scan.isReversed()) { scan.setStartRow(createClosestRowBefore(lastResult.getRow())); } else { @@ -464,7 +489,10 @@ public class ClientScanner extends AbstractClientScanner { // Set this to zero so we don't try and do an rpc and close on remote server when // the exception we got was UnknownScanner or the Server is going down. callable = null; - // This continue will take us to while at end of loop where we will set up new scanner. + // reopen the scanner + if (!nextScanner(countdown, false)) { + break; + } continue; } long currentTime = System.currentTimeMillis(); @@ -489,61 +517,58 @@ public class ClientScanner extends AbstractClientScanner { } countdown--; this.lastResult = rs; - if (this.lastResult.isPartial() || scan.getBatch() > 0 ) { + if (this.lastResult.isPartial() || scan.getBatch() > 0) { updateLastCellLoadedToCache(this.lastResult); } else { this.lastCellLoadedToCache = null; } } - if (cache.isEmpty()) { - // all result has been seen before, we need scan more. - allResultsSkipped = true; - continue; - } } + boolean exhausted = regionExhausted(values); if (callable.isHeartbeatMessage()) { - if (cache.size() > 0) { + if (!cache.isEmpty()) { // Caller of this method just wants a Result. If we see a heartbeat message, it means // processing of the scan is taking a long time server side. Rather than continue to // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing // unnecesary delays to the caller if (LOG.isTraceEnabled()) { LOG.trace("Heartbeat message received and cache contains Results." - + " Breaking out of scan loop"); + + " Breaking out of scan loop"); } + // we know that the region has not been exhausted yet so just break without calling + // closeScannerIfExhausted break; } - continue; } - - // We expect that the server won't have more results for us when we exhaust - // the size (bytes or count) of the results returned. If the server *does* inform us that - // there are more results, we want to avoid possiblyNextScanner(...). Only when we actually - // get results is the moreResults context valid. - if (null != values && values.length > 0 && callable.hasMoreResultsContext()) { - // Only adhere to more server results when we don't have any partialResults - // as it keeps the outer loop logic the same. - serverHasMoreResults = callable.getServerHasMoreResults() && partialResults.isEmpty(); + if (countdown <= 0) { + // we have enough result. + closeScannerIfExhausted(exhausted); + break; } - // Values == null means server-side filter has determined we must STOP - // !partialResults.isEmpty() means that we are still accumulating partial Results for a - // row. We should not change scanners before we receive all the partial Results for that - // row. - } while (allResultsSkipped || (callable != null && callable.isHeartbeatMessage()) - || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults) - && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)))); - } - - /** - * @param remainingResultSize - * @param remainingRows - * @param regionHasMoreResults - * @return true when the current region has been exhausted. When the current region has been - * exhausted, the region must be changed before scanning can continue - */ - private boolean doneWithRegion(long remainingResultSize, int remainingRows, - boolean regionHasMoreResults) { - return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults; + if (remainingResultSize <= 0) { + if (!cache.isEmpty()) { + closeScannerIfExhausted(exhausted); + break; + } else { + // we have reached the max result size but we still can not find anything to return to the + // user. Reset the maxResultSize and try again. + remainingResultSize = maxScannerResultSize; + } + } + // we are done with the current region + if (exhausted) { + if (!partialResults.isEmpty()) { + // XXX: continue if there are partial results. But in fact server should not set + // hasMoreResults to false if there are partial results. + LOG.warn("Server tells us there is no more results for this region but we still have" + + " partialResults, this should not happen, retry on the current scanner anyway"); + continue; + } + if (!nextScanner(countdown, values == null)) { + break; + } + } + } } /** @@ -559,9 +584,8 @@ public class ClientScanner extends AbstractClientScanner { * @return the list of results that should be added to the cache. * @throws IOException */ - protected List - getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage) - throws IOException { + protected List getResultsToAddToCache(Result[] resultsFromServer, + boolean heartbeatMessage) throws IOException { int resultSize = resultsFromServer != null ? resultsFromServer.length : 0; List resultsToAddToCache = new ArrayList(resultSize); @@ -576,7 +600,7 @@ public class ClientScanner extends AbstractClientScanner { // the batch size even though it may not be the last group of cells for that row. if (allowPartials || isBatchSet) { addResultsToList(resultsToAddToCache, resultsFromServer, 0, - (null == resultsFromServer ? 0 : resultsFromServer.length)); + (null == resultsFromServer ? 0 : resultsFromServer.length)); return resultsToAddToCache; } @@ -781,8 +805,8 @@ public class ClientScanner extends AbstractClientScanner { } /** - * Compare two Cells considering reversed scanner. - * ReversedScanner only reverses rows, not columns. + * Compare two Cells considering reversed scanner. ReversedScanner only reverses rows, not + * columns. */ private int compare(Cell a, Cell b) { int r = 0; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index 13b164d695f..ca998ae26b5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -61,13 +61,7 @@ public class ReversedClientScanner extends ClientScanner { protected boolean nextScanner(int nbRows, final boolean done) throws IOException { // Close the previous scanner if it's open - if (this.callable != null) { - this.callable.setClose(); - // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, - // we do a callWithRetries - this.caller.callWithoutRetries(callable, scannerTimeout); - this.callable = null; - } + closeScanner(); // Where to start the next scanner byte[] localStartKey; diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index 7fb18d74f6d..828d435500e 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -150,7 +150,8 @@ public class TestClientScanner { ScannerCallableWithReplicas.class); switch (count) { case 0: // initialize - case 2: // close + case 2: // detect no more results + case 3: // close count++; return null; case 1: @@ -176,8 +177,10 @@ public class TestClientScanner { scanner.loadCache(); - // One more call due to initializeScannerInConstruction() - inOrder.verify(caller, Mockito.times(2)).callWithoutRetries( + // One for initializeScannerInConstruction() + // One for fetching the results + // One for fetching null results and quit as we do not have moreResults hint. + inOrder.verify(caller, Mockito.times(3)).callWithoutRetries( Mockito.any(RetryingCallable.class), Mockito.anyInt()); assertEquals(1, scanner.cache.size()); @@ -216,7 +219,8 @@ public class TestClientScanner { case 1: count++; callable.setHasMoreResultsContext(true); - callable.setServerHasMoreResults(false); + // if we set false here the implementation will trigger a close + callable.setServerHasMoreResults(true); return results; default: throw new RuntimeException("Expected only 2 invocations"); @@ -283,7 +287,8 @@ public class TestClientScanner { case 1: count++; callable.setHasMoreResultsContext(true); - callable.setServerHasMoreResults(false); + // if we set false here the implementation will trigger a close + callable.setServerHasMoreResults(true); return results; default: throw new RuntimeException("Expected only 2 invocations"); @@ -462,13 +467,14 @@ public class TestClientScanner { Mockito.anyInt()); InOrder inOrder = Mockito.inOrder(caller); + scanner.setRpcFinished(true); scanner.loadCache(); - inOrder.verify(caller, Mockito.times(2)).callWithoutRetries( + inOrder.verify(caller, Mockito.times(3)).callWithoutRetries( Mockito.any(RetryingCallable.class), Mockito.anyInt()); - assertEquals(1, scanner.cache.size()); + assertEquals(2, scanner.cache.size()); Result r = scanner.cache.poll(); assertNotNull(r); CellScanner cs = r.cellScanner(); @@ -476,15 +482,6 @@ public class TestClientScanner { assertEquals(kv1, cs.current()); assertFalse(cs.advance()); - scanner.setRpcFinished(true); - - inOrder = Mockito.inOrder(caller); - - scanner.loadCache(); - - inOrder.verify(caller, Mockito.times(3)).callWithoutRetries( - Mockito.any(RetryingCallable.class), Mockito.anyInt()); - r = scanner.cache.poll(); assertNotNull(r); cs = r.cellScanner(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 2e9303bd1a6..3310bfda15c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -18,6 +18,13 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; +import com.google.protobuf.Message; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import com.google.protobuf.TextFormat; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; @@ -35,8 +42,10 @@ import java.util.NavigableMap; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang.mutable.MutableObject; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -91,7 +100,6 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; -import org.apache.hadoop.hbase.ipc.TimeLimitedRpcController; import org.apache.hadoop.hbase.master.MasterRpcServices; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -155,8 +163,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResul import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; @@ -174,7 +182,6 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; -import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; @@ -184,18 +191,12 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.zookeeper.KeeperException; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ByteString; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -import com.google.protobuf.TextFormat; - /** * Implements the regionserver RPC services. */ @@ -248,8 +249,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, private final PriorityFunction priority; private final AtomicLong scannerIdGen = new AtomicLong(0L); - private final ConcurrentHashMap scanners = - new ConcurrentHashMap(); + private final ConcurrentMap scanners = new ConcurrentHashMap<>(); /** * The lease timeout period for client scanners (milliseconds). @@ -267,28 +267,28 @@ public class RSRpcServices implements HBaseRPCErrorHandler, private final long minimumScanTimeLimitDelta; /** - * Holder class which holds the RegionScanner and nextCallSeq together. + * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. */ - private static class RegionScannerHolder { - private AtomicLong nextCallSeq = new AtomicLong(0); - private RegionScanner s; - private Region r; + private static final class RegionScannerHolder { - public RegionScannerHolder(RegionScanner s, Region r) { + private final AtomicLong nextCallSeq = new AtomicLong(0); + private final String scannerName; + private final RegionScanner s; + private final Region r; + + public RegionScannerHolder(String scannerName, RegionScanner s, Region r) { + this.scannerName = scannerName; this.s = s; this.r = r; } - private long getNextCallSeq() { + public long getNextCallSeq() { return nextCallSeq.get(); } - private void incNextCallSeq() { - nextCallSeq.incrementAndGet(); - } - - private void rollbackNextCallSeq() { - nextCallSeq.decrementAndGet(); + public boolean incNextCallSeq(long currentSeq) { + // Use CAS to prevent multiple scan request running on the same scanner. + return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); } } @@ -405,7 +405,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } private void addResults(final ScanResponse.Builder builder, final List results, - final RpcController controller, boolean isDefaultRegion) { + final PayloadCarryingRpcController controller, boolean isDefaultRegion) { builder.setStale(!isDefaultRegion); if (results == null || results.isEmpty()) return; if (isClientCellBlockSupport()) { @@ -413,10 +413,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, builder.addCellsPerResult(res.size()); builder.addPartialFlagPerResult(res.isPartial()); } - ((PayloadCarryingRpcController)controller). - setCellScanner(CellUtil.createCellScanner(results)); + controller.setCellScanner(CellUtil.createCellScanner(results)); } else { - for (Result res: results) { + for (Result res : results) { ClientProtos.Result pbr = ProtobufUtil.toResult(res); builder.addResults(pbr); } @@ -1054,6 +1053,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + public RegionScanner getScanner(long scannerId) { String scannerIdString = Long.toString(scannerId); RegionScannerHolder scannerHolder = scanners.get(scannerIdString); @@ -1087,19 +1087,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return 0L; } - long addScanner(RegionScanner s, Region r) throws LeaseStillHeldException { - long scannerId = this.scannerIdGen.incrementAndGet(); - String scannerName = String.valueOf(scannerId); - - RegionScannerHolder existing = - scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r)); - assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!"; - - regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod, - new ScannerListener(scannerName)); - return scannerId; - } - /** * Method to account for the size of retained cells and retained data blocks. * @return an object that represents the last referenced block from this response. @@ -1124,6 +1111,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return lastBlock; } + private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r) + throws LeaseStillHeldException { + regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod, + new ScannerListener(scannerName)); + RegionScannerHolder rsh = new RegionScannerHolder(scannerName, s, r); + RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); + assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!"; + return rsh; + } /** * Find the HRegion based on a region specifier @@ -2410,6 +2406,295 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + // This is used to keep compatible with the old client implementation. Consider remove it if we + // decide to drop the support of the client that still sends close request to a region scanner + // which has already been exhausted. + @Deprecated + private static final IOException SCANNER_ALREADY_CLOSED = new IOException() { + + private static final long serialVersionUID = -4305297078988180130L; + + @Override + public Throwable fillInStackTrace() { + return this; + } + }; + + private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException { + String scannerName = Long.toString(request.getScannerId()); + RegionScannerHolder rsh = scanners.get(scannerName); + if (rsh == null) { + // just ignore the close request if scanner does not exists. + if (request.hasCloseScanner() && request.getCloseScanner()) { + throw SCANNER_ALREADY_CLOSED; + } else { + LOG.warn("Client tried to access missing scanner " + scannerName); + throw new UnknownScannerException( + "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " + + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " + + "long wait between consecutive client checkins, c) Server may be closing down, " + + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " + + "possible fix would be increasing the value of" + + "'hbase.client.scanner.timeout.period' configuration."); + } + } + HRegionInfo hri = rsh.s.getRegionInfo(); + // Yes, should be the same instance + if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) { + String msg = "Region was re-opened after the scanner" + scannerName + " was created: " + + hri.getRegionNameAsString(); + LOG.warn(msg + ", closing..."); + scanners.remove(scannerName); + try { + rsh.s.close(); + } catch (IOException e) { + LOG.warn("Getting exception closing " + scannerName, e); + } finally { + try { + regionServer.leases.cancelLease(scannerName); + } catch (LeaseException e) { + LOG.warn("Getting exception closing " + scannerName, e); + } + } + throw new NotServingRegionException(msg); + } + return rsh; + } + + private Pair newRegionScanner(ScanRequest request, + ScanResponse.Builder builder) throws IOException { + Region region = getRegion(request.getRegion()); + ClientProtos.Scan protoScan = request.getScan(); + boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); + Scan scan = ProtobufUtil.toScan(protoScan); + // if the request doesn't set this, get the default region setting. + if (!isLoadingCfsOnDemandSet) { + scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); + } + if (!scan.hasFamilies()) { + // Adding all families to scanner + for (byte[] family : region.getTableDesc().getFamiliesKeys()) { + scan.addFamily(family); + } + } + RegionScanner scanner = null; + if (region.getCoprocessorHost() != null) { + scanner = region.getCoprocessorHost().preScannerOpen(scan); + } + if (scanner == null) { + scanner = region.getScanner(scan); + } + if (region.getCoprocessorHost() != null) { + scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); + } + long scannerId = this.scannerIdGen.incrementAndGet(); + builder.setScannerId(scannerId); + builder.setMvccReadPoint(scanner.getMvccReadPoint()); + builder.setTtl(scannerLeaseTimeoutPeriod); + String scannerName = String.valueOf(scannerId); + return Pair.newPair(addScanner(scannerName, scanner, region), scan.isSmall()); + } + + private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) + throws OutOfOrderScannerNextException { + // if nextCallSeq does not match throw Exception straight away. This needs to be + // performed even before checking of Lease. + // See HBASE-5974 + if (request.hasNextCallSeq()) { + long callSeq = request.getNextCallSeq(); + if (!rsh.incNextCallSeq(callSeq)) { + throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.getNextCallSeq() + + " But the nextCallSeq got from client: " + request.getNextCallSeq() + "; request=" + + TextFormat.shortDebugString(request)); + } + } + } + + private void addScannerLeaseBack(Leases.Lease lease) { + try { + regionServer.leases.addLease(lease); + } catch (LeaseStillHeldException e) { + // should not happen as the scanner id is unique. + throw new AssertionError(e); + } + } + + private long getTimeLimit(PayloadCarryingRpcController controller, + boolean allowHeartbeatMessages) { + // Set the time limit to be half of the more restrictive timeout value (one of the + // timeout values must be positive). In the event that both values are positive, the + // more restrictive of the two is used to calculate the limit. + if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) { + long timeLimitDelta; + if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) { + timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout); + } else { + timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; + } + if (controller != null && controller.getCallTimeout() > 0) { + timeLimitDelta = Math.min(timeLimitDelta, controller.getCallTimeout()); + } + // Use half of whichever timeout value was more restrictive... But don't allow + // the time limit to be less than the allowable minimum (could cause an + // immediatate timeout before scanning any data). + timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); + // XXX: Can not use EnvironmentEdge here because TestIncrementTimeRange use a + // ManualEnvironmentEdge. Consider using System.nanoTime instead. + return System.currentTimeMillis() + timeLimitDelta; + } + // Default value of timeLimit is negative to indicate no timeLimit should be + // enforced. + return -1L; + } + + // return whether we have more results in region. + private boolean scan(PayloadCarryingRpcController controller, ScanRequest request, + RegionScannerHolder rsh, boolean isSmallScan, long maxQuotaResultSize, int rows, + List results, ScanResponse.Builder builder, MutableObject lastBlock, + RpcCallContext context) throws IOException { + Region region = rsh.r; + RegionScanner scanner = rsh.s; + long maxResultSize; + if (scanner.getMaxResultSize() > 0) { + maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); + } else { + maxResultSize = maxQuotaResultSize; + } + // This is cells inside a row. Default size is 10 so if many versions or many cfs, + // then we'll resize. Resizings show in profiler. Set it higher than 10. For now + // arbitrary 32. TODO: keep record of general size of results being returned. + List values = new ArrayList(32); + region.startRegionOperation(Operation.SCAN); + try { + int i = 0; + long before = EnvironmentEdgeManager.currentTime(); + synchronized (scanner) { + boolean stale = (region.getRegionInfo().getReplicaId() != 0); + boolean clientHandlesPartials = + request.hasClientHandlesPartials() && request.getClientHandlesPartials(); + boolean clientHandlesHeartbeats = + request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); + + // On the server side we must ensure that the correct ordering of partial results is + // returned to the client to allow them to properly reconstruct the partial results. + // If the coprocessor host is adding to the result list, we cannot guarantee the + // correct ordering of partial results and so we prevent partial results from being + // formed. + boolean serverGuaranteesOrderOfPartials = results.isEmpty(); + boolean allowPartialResults = + clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan; + boolean moreRows = false; + + // Heartbeat messages occur when the processing of the ScanRequest is exceeds a + // certain time threshold on the server. When the time threshold is exceeded, the + // server stops the scan and sends back whatever Results it has accumulated within + // that time period (may be empty). Since heartbeat messages have the potential to + // create partial Results (in the event that the timeout occurs in the middle of a + // row), we must only generate heartbeat messages when the client can handle both + // heartbeats AND partials + boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; + + long timeLimit = getTimeLimit(controller, allowHeartbeatMessages); + + final LimitScope sizeScope = + allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; + final LimitScope timeScope = + allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; + + boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics(); + + // Configure with limits for this RPC. Set keep progress true since size progress + // towards size limit should be kept between calls to nextRaw + ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); + contextBuilder.setSizeLimit(sizeScope, maxResultSize); + contextBuilder.setBatchLimit(scanner.getBatch()); + contextBuilder.setTimeLimit(timeScope, timeLimit); + contextBuilder.setTrackMetrics(trackMetrics); + ScannerContext scannerContext = contextBuilder.build(); + boolean limitReached = false; + while (i < rows) { + // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The + // batch limit is a limit on the number of cells per Result. Thus, if progress is + // being tracked (i.e. scannerContext.keepProgress() is true) then we need to + // reset the batch progress between nextRaw invocations since we don't want the + // batch progress from previous calls to affect future calls + scannerContext.setBatchProgress(0); + + // Collect values to be returned here + moreRows = scanner.nextRaw(values, scannerContext); + + if (!values.isEmpty()) { + final boolean partial = scannerContext.partialResultFormed(); + Result r = Result.create(values, null, stale, partial); + lastBlock.setValue(addSize(context, r, lastBlock.getValue())); + results.add(r); + i++; + } + + boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); + boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); + boolean rowLimitReached = i >= rows; + limitReached = sizeLimitReached || timeLimitReached || rowLimitReached; + + if (limitReached || !moreRows) { + if (LOG.isTraceEnabled()) { + LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: " + moreRows + + " scannerContext: " + scannerContext); + } + // We only want to mark a ScanResponse as a heartbeat message in the event that + // there are more values to be read server side. If there aren't more values, + // marking it as a heartbeat is wasteful because the client will need to issue + // another ScanRequest only to realize that they already have all the values + if (moreRows) { + // Heartbeat messages occur when the time limit has been reached. + builder.setHeartbeatMessage(timeLimitReached); + } + break; + } + values.clear(); + } + if (limitReached || moreRows) { + // We stopped prematurely + builder.setMoreResultsInRegion(true); + } else { + // We didn't get a single batch + builder.setMoreResultsInRegion(false); + } + + // Check to see if the client requested that we track metrics server side. If the + // client requested metrics, retrieve the metrics from the scanner context. + if (trackMetrics) { + Map metrics = scannerContext.getMetrics().getMetricsMap(); + ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); + NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); + + for (Entry entry : metrics.entrySet()) { + pairBuilder.setName(entry.getKey()); + pairBuilder.setValue(entry.getValue()); + metricBuilder.addMetrics(pairBuilder.build()); + } + + builder.setScanMetrics(metricBuilder.build()); + } + } + region.updateReadRequestsCount(i); + long end = EnvironmentEdgeManager.currentTime(); + long responseCellSize = context != null ? context.getResponseCellSize() : 0; + region.getMetrics().updateScanTime(end - before); + if (regionServer.metricsRegionServer != null) { + regionServer.metricsRegionServer.updateScanSize(responseCellSize); + regionServer.metricsRegionServer.updateScanTime(end - before); + } + } finally { + region.closeRegionOperation(); + } + // coprocessor postNext hook + if (region.getCoprocessorHost() != null) { + region.getCoprocessorHost().postScannerNext(scanner, results, rows, true); + } + return builder.getMoreResultsInRegion(); + } + /** * Scan data in a table. * @@ -2419,439 +2704,202 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ @Override public ScanResponse scan(final RpcController controller, final ScanRequest request) - throws ServiceException { - OperationQuota quota = null; - Leases.Lease lease = null; - String scannerName = null; + throws ServiceException { + if (controller != null && !(controller instanceof PayloadCarryingRpcController)) { + throw new UnsupportedOperationException( + "We only do PayloadCarryingRpcControllers! FIX IF A PROBLEM: " + controller); + } + if (!request.hasScannerId() && !request.hasScan()) { + throw new ServiceException( + new DoNotRetryIOException("Missing required input: scannerId or scan")); + } try { - if (!request.hasScannerId() && !request.hasScan()) { - throw new DoNotRetryIOException( - "Missing required input: scannerId or scan"); - } - long scannerId = -1; + checkOpen(); + } catch (IOException e) { if (request.hasScannerId()) { - scannerId = request.getScannerId(); - scannerName = String.valueOf(scannerId); - } - try { - checkOpen(); - } catch (IOException e) { - // If checkOpen failed, server not running or filesystem gone, - // cancel this lease; filesystem is gone or we're closing or something. - if (scannerName != null) { - LOG.debug("Server shutting down and client tried to access missing scanner " - + scannerName); - if (regionServer.leases != null) { - try { - regionServer.leases.cancelLease(scannerName); - } catch (LeaseException le) { - // No problem, ignore - if (LOG.isTraceEnabled()) { - LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); - } - } + String scannerName = Long.toString(request.getScannerId()); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Server shutting down and client tried to access missing scanner " + scannerName); + } + if (regionServer.leases != null) { + try { + regionServer.leases.cancelLease(scannerName); + } catch (LeaseException le) { + // No problem, ignore + if (LOG.isTraceEnabled()) { + LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); + } } } - throw e; - } - requestCount.increment(); - rpcScanRequestCount.increment(); - - int ttl = 0; - Region region = null; - RegionScanner scanner = null; - RegionScannerHolder rsh = null; - boolean moreResults = true; - boolean closeScanner = false; - boolean isSmallScan = false; - RpcCallContext context = RpcServer.getCurrentCall(); - Object lastBlock = null; - - ScanResponse.Builder builder = ScanResponse.newBuilder(); - if (request.hasCloseScanner()) { - closeScanner = request.getCloseScanner(); - } - int rows = closeScanner ? 0 : 1; - if (request.hasNumberOfRows()) { - rows = request.getNumberOfRows(); } + throw new ServiceException(e); + } + requestCount.increment(); + rpcScanRequestCount.increment(); + RegionScannerHolder rsh; + ScanResponse.Builder builder = ScanResponse.newBuilder(); + boolean isSmallScan; + try { if (request.hasScannerId()) { - rsh = scanners.get(scannerName); - if (rsh == null) { - LOG.warn("Client tried to access missing scanner " + scannerName); - throw new UnknownScannerException( - "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " - + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " - + "long wait between consecutive client checkins, c) Server may be closing down, " - + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " - + "possible fix would be increasing the value of" - + "'hbase.client.scanner.timeout.period' configuration."); - } - scanner = rsh.s; - HRegionInfo hri = scanner.getRegionInfo(); - region = regionServer.getRegion(hri.getRegionName()); - if (region != rsh.r) { // Yes, should be the same instance - throw new NotServingRegionException("Region was re-opened after the scanner" - + scannerName + " was created: " + hri.getRegionNameAsString()); - } + rsh = getRegionScanner(request); + isSmallScan = false; } else { - region = getRegion(request.getRegion()); - ClientProtos.Scan protoScan = request.getScan(); - boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); - Scan scan = ProtobufUtil.toScan(protoScan); - // if the request doesn't set this, get the default region setting. - if (!isLoadingCfsOnDemandSet) { - scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); - } - - isSmallScan = scan.isSmall(); - if (!scan.hasFamilies()) { - // Adding all families to scanner - for (byte[] family: region.getTableDesc().getFamiliesKeys()) { - scan.addFamily(family); - } - } - - if (region.getCoprocessorHost() != null) { - scanner = region.getCoprocessorHost().preScannerOpen(scan); - } - if (scanner == null) { - scanner = region.getScanner(scan); - } - if (region.getCoprocessorHost() != null) { - scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); - } - scannerId = addScanner(scanner, region); - scannerName = String.valueOf(scannerId); - ttl = this.scannerLeaseTimeoutPeriod; - builder.setMvccReadPoint(scanner.getMvccReadPoint()); + Pair pair = newRegionScanner(request, builder); + rsh = pair.getFirst(); + isSmallScan = pair.getSecond().booleanValue(); } - if (request.hasRenew() && request.getRenew()) { - rsh = scanners.get(scannerName); - lease = regionServer.leases.removeLease(scannerName); - if (lease != null && rsh != null) { - regionServer.leases.addLease(lease); - // Increment the nextCallSeq value which is the next expected from client. - rsh.incNextCallSeq(); - } + } catch (IOException e) { + if (e == SCANNER_ALREADY_CLOSED) { + // Now we will close scanner automatically if there are no more results for this region but + // the old client will still send a close request to us. Just ignore it and return. return builder.build(); } - + throw new ServiceException(e); + } + Region region = rsh.r; + String scannerName = rsh.scannerName; + Leases.Lease lease; + try { + // Remove lease while its being processed in server; protects against case + // where processing of request takes > lease expiration time. + lease = regionServer.leases.removeLease(scannerName); + } catch (LeaseException e) { + throw new ServiceException(e); + } + if (request.hasRenew() && request.getRenew()) { + // add back and return + addScannerLeaseBack(lease); + try { + checkScanNextCallSeq(request, rsh); + } catch (OutOfOrderScannerNextException e) { + throw new ServiceException(e); + } + return builder.build(); + } + OperationQuota quota; + try { quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); - long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); + } catch (IOException e) { + addScannerLeaseBack(lease); + throw new ServiceException(e); + }; + try { + checkScanNextCallSeq(request, rsh); + } catch (OutOfOrderScannerNextException e) { + addScannerLeaseBack(lease); + throw new ServiceException(e); + } + // Now we have increased the next call sequence. If we give client an error, the retry will + // never success. So we'd better close the scanner and return a DoNotRetryIOException to client + // and then client will try to open a new scanner. + boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false; + int rows; // this is scan.getCaching + if (request.hasNumberOfRows()) { + rows = request.getNumberOfRows(); + } else { + rows = closeScanner ? 0 : 1; + } + RpcCallContext context = RpcServer.getCurrentCall(); + // now let's do the real scan. + long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); + RegionScanner scanner = rsh.s; + boolean moreResults = true; + boolean moreResultsInRegion = true; + MutableObject lastBlock = new MutableObject(); + boolean scannerClosed = false; + try { + List results = new ArrayList<>(); if (rows > 0) { - // if nextCallSeq does not match throw Exception straight away. This needs to be - // performed even before checking of Lease. - // See HBASE-5974 - if (request.hasNextCallSeq()) { - if (rsh == null) { - rsh = scanners.get(scannerName); - } - if (rsh != null) { - if (request.getNextCallSeq() != rsh.getNextCallSeq()) { - throw new OutOfOrderScannerNextException( - "Expected nextCallSeq: " + rsh.getNextCallSeq() - + " But the nextCallSeq got from client: " + request.getNextCallSeq() + - "; request=" + TextFormat.shortDebugString(request)); + boolean done = false; + // Call coprocessor. Get region info from scanner. + if (region.getCoprocessorHost() != null) { + Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); + if (!results.isEmpty()) { + for (Result r : results) { + lastBlock.setValue(addSize(context, r, lastBlock.getValue())); } - // Increment the nextCallSeq value which is the next expected from client. - rsh.incNextCallSeq(); + } + if (bypass != null && bypass.booleanValue()) { + done = true; } } - try { - // Remove lease while its being processed in server; protects against case - // where processing of request takes > lease expiration time. - lease = regionServer.leases.removeLease(scannerName); - List results = new ArrayList(); - - boolean done = false; - // Call coprocessor. Get region info from scanner. - if (region != null && region.getCoprocessorHost() != null) { - Boolean bypass = region.getCoprocessorHost().preScannerNext( - scanner, results, rows); - if (!results.isEmpty()) { - for (Result r : results) { - lastBlock = addSize(context, r, lastBlock); - } - } - if (bypass != null && bypass.booleanValue()) { - done = true; - } - } - - if (!done) { - long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); - if (maxResultSize <= 0) { - maxResultSize = maxQuotaResultSize; - } - // This is cells inside a row. Default size is 10 so if many versions or many cfs, - // then we'll resize. Resizings show in profiler. Set it higher than 10. For now - // arbitrary 32. TODO: keep record of general size of results being returned. - List values = new ArrayList(32); - region.startRegionOperation(Operation.SCAN); - try { - int i = 0; - long before = EnvironmentEdgeManager.currentTime(); - synchronized(scanner) { - boolean stale = (region.getRegionInfo().getReplicaId() != 0); - boolean clientHandlesPartials = - request.hasClientHandlesPartials() && request.getClientHandlesPartials(); - boolean clientHandlesHeartbeats = - request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); - - // On the server side we must ensure that the correct ordering of partial results is - // returned to the client to allow them to properly reconstruct the partial results. - // If the coprocessor host is adding to the result list, we cannot guarantee the - // correct ordering of partial results and so we prevent partial results from being - // formed. - boolean serverGuaranteesOrderOfPartials = results.isEmpty(); - boolean allowPartialResults = - clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan; - boolean moreRows = false; - - // Heartbeat messages occur when the processing of the ScanRequest is exceeds a - // certain time threshold on the server. When the time threshold is exceeded, the - // server stops the scan and sends back whatever Results it has accumulated within - // that time period (may be empty). Since heartbeat messages have the potential to - // create partial Results (in the event that the timeout occurs in the middle of a - // row), we must only generate heartbeat messages when the client can handle both - // heartbeats AND partials - boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; - - // Default value of timeLimit is negative to indicate no timeLimit should be - // enforced. - long timeLimit = -1; - - // Set the time limit to be half of the more restrictive timeout value (one of the - // timeout values must be positive). In the event that both values are positive, the - // more restrictive of the two is used to calculate the limit. - if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) { - long timeLimitDelta; - if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) { - timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout); - } else { - timeLimitDelta = - scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; - } - if (controller instanceof TimeLimitedRpcController) { - TimeLimitedRpcController timeLimitedRpcController = - (TimeLimitedRpcController)controller; - if (timeLimitedRpcController.getCallTimeout() > 0) { - timeLimitDelta = Math.min(timeLimitDelta, - timeLimitedRpcController.getCallTimeout()); - } - } - // Use half of whichever timeout value was more restrictive... But don't allow - // the time limit to be less than the allowable minimum (could cause an - // immediatate timeout before scanning any data). - timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); - timeLimit = System.currentTimeMillis() + timeLimitDelta; - } - - final LimitScope sizeScope = - allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; - final LimitScope timeScope = - allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; - - boolean trackMetrics = - request.hasTrackScanMetrics() && request.getTrackScanMetrics(); - - // Configure with limits for this RPC. Set keep progress true since size progress - // towards size limit should be kept between calls to nextRaw - ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); - contextBuilder.setSizeLimit(sizeScope, maxResultSize); - contextBuilder.setBatchLimit(scanner.getBatch()); - contextBuilder.setTimeLimit(timeScope, timeLimit); - contextBuilder.setTrackMetrics(trackMetrics); - ScannerContext scannerContext = contextBuilder.build(); - - boolean limitReached = false; - while (i < rows) { - // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The - // batch limit is a limit on the number of cells per Result. Thus, if progress is - // being tracked (i.e. scannerContext.keepProgress() is true) then we need to - // reset the batch progress between nextRaw invocations since we don't want the - // batch progress from previous calls to affect future calls - scannerContext.setBatchProgress(0); - - // Collect values to be returned here - moreRows = scanner.nextRaw(values, scannerContext); - - if (!values.isEmpty()) { - final boolean partial = scannerContext.partialResultFormed(); - Result r = Result.create(values, null, stale, partial); - lastBlock = addSize(context, r, lastBlock); - results.add(r); - i++; - } - - boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); - boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); - boolean rowLimitReached = i >= rows; - limitReached = sizeLimitReached || timeLimitReached || rowLimitReached; - - if (limitReached || !moreRows) { - if (LOG.isTraceEnabled()) { - LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: " - + moreRows + " scannerContext: " + scannerContext); - } - // We only want to mark a ScanResponse as a heartbeat message in the event that - // there are more values to be read server side. If there aren't more values, - // marking it as a heartbeat is wasteful because the client will need to issue - // another ScanRequest only to realize that they already have all the values - if (moreRows) { - // Heartbeat messages occur when the time limit has been reached. - builder.setHeartbeatMessage(timeLimitReached); - } - break; - } - values.clear(); - } - - if (limitReached || moreRows) { - // We stopped prematurely - builder.setMoreResultsInRegion(true); - } else { - // We didn't get a single batch - builder.setMoreResultsInRegion(false); - } - - // Check to see if the client requested that we track metrics server side. If the - // client requested metrics, retrieve the metrics from the scanner context. - if (trackMetrics) { - Map metrics = scannerContext.getMetrics().getMetricsMap(); - ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); - NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); - - for (Entry entry : metrics.entrySet()) { - pairBuilder.setName(entry.getKey()); - pairBuilder.setValue(entry.getValue()); - metricBuilder.addMetrics(pairBuilder.build()); - } - - builder.setScanMetrics(metricBuilder.build()); - } - } - region.updateReadRequestsCount(i); - long end = EnvironmentEdgeManager.currentTime(); - long responseCellSize = context != null ? context.getResponseCellSize() : 0; - region.getMetrics().updateScanTime(end - before); - if (regionServer.metricsRegionServer != null) { - regionServer.metricsRegionServer.updateScanSize(responseCellSize); - regionServer.metricsRegionServer.updateScanTime(end - before); - } - } finally { - region.closeRegionOperation(); - } - - // coprocessor postNext hook - if (region != null && region.getCoprocessorHost() != null) { - region.getCoprocessorHost().postScannerNext(scanner, results, rows, true); - } - } - - quota.addScanResult(results); - - // If the scanner's filter - if any - is done with the scan - // and wants to tell the client to stop the scan. This is done by passing - // a null result, and setting moreResults to false. - if (scanner.isFilterDone() && results.isEmpty()) { - moreResults = false; - results = null; - } else { - addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())); - } - } catch (IOException e) { - // The scanner state might be left in a dirty state, so we will tell the Client to - // fail this RPC and close the scanner while opening up another one from the start of - // row that the client has last seen. - closeScanner(region, scanner, scannerName); - - // If it is a CorruptHFileException or a FileNotFoundException, throw the - // DoNotRetryIOException. This can avoid the retry in ClientScanner. - if (e instanceof CorruptHFileException || e instanceof FileNotFoundException) { - throw new DoNotRetryIOException(e); - } - - // We closed the scanner already. Instead of throwing the IOException, and client - // retrying with the same scannerId only to get USE on the next RPC, we directly throw - // a special exception to save an RPC. - if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) { - // 1.4.0+ clients know how to handle - throw new ScannerResetException("Scanner is closed on the server-side", e); - } else { - // older clients do not know about SRE. Just throw USE, which they will handle - throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" - + " scanner state for clients older than 1.3.", e); - } - } finally { - // We're done. On way out re-add the above removed lease. - // Adding resets expiration time on lease. - if (scanners.containsKey(scannerName)) { - if (lease != null) regionServer.leases.addLease(lease); - ttl = this.scannerLeaseTimeoutPeriod; - } + if (!done) { + moreResultsInRegion = scan((PayloadCarryingRpcController) controller, request, rsh, + isSmallScan, maxQuotaResultSize, rows, results, builder, lastBlock, context); } } - if (!moreResults || closeScanner) { - ttl = 0; + quota.addScanResult(results); + + if (scanner.isFilterDone() && results.isEmpty()) { + // If the scanner's filter - if any - is done with the scan + // only set moreResults to false if the results is empty. This is used to keep compatible + // with the old scan implementation where we just ignore the returned results if moreResults + // is false. Can remove the isEmpty check after we get rid of the old implementation. moreResults = false; - closeScanner(region, scanner, scannerName); } - - if (ttl > 0) { - builder.setTtl(ttl); + addResults(builder, results, (PayloadCarryingRpcController) controller, + RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())); + if (!moreResults || !moreResultsInRegion || closeScanner) { + scannerClosed = true; + closeScanner(region, scanner, scannerName, context); } - builder.setScannerId(scannerId); builder.setMoreResults(moreResults); return builder.build(); - } catch (IOException ie) { - if (scannerName != null && ie instanceof NotServingRegionException) { - RegionScannerHolder rsh = scanners.remove(scannerName); - if (rsh != null) { - try { - RegionScanner scanner = rsh.s; - LOG.warn(scannerName + " encountered " + ie.getMessage() + ", closing ..."); - scanner.close(); - regionServer.leases.cancelLease(scannerName); - } catch (IOException e) { - LOG.warn("Getting exception closing " + scannerName, e); - } + } catch (Exception e) { + try { + // scanner is closed here + scannerClosed = true; + // The scanner state might be left in a dirty state, so we will tell the Client to + // fail this RPC and close the scanner while opening up another one from the start of + // row that the client has last seen. + closeScanner(region, scanner, scannerName, context); + + + // If it is a CorruptHFileException or a FileNotFoundException, throw the + // DoNotRetryIOException. This can avoid the retry in ClientScanner. + if (e instanceof CorruptHFileException || e instanceof FileNotFoundException) { + throw new DoNotRetryIOException(e); } + // We closed the scanner already. Instead of throwing the IOException, and client + // retrying with the same scannerId only to get USE on the next RPC, we directly throw + // a special exception to save an RPC. + if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) { + // 1.4.0+ clients know how to handle + throw new ScannerResetException("Scanner is closed on the server-side", e); + } else { + // older clients do not know about SRE. Just throw USE, which they will handle + throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" + + " scanner state for clients older than 1.3.", e); + } + } catch (IOException ioe) { + throw new ServiceException(ioe); } - throw new ServiceException(ie); } finally { - if (quota != null) { - quota.close(); + if (!scannerClosed) { + // Adding resets expiration time on lease. + addScannerLeaseBack(lease); } + quota.close(); } } - private boolean closeScanner(Region region, RegionScanner scanner, String scannerName) - throws IOException { - if (region != null && region.getCoprocessorHost() != null) { + private void closeScanner(Region region, RegionScanner scanner, String scannerName, + RpcCallContext context) throws IOException { + if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost().preScannerClose(scanner)) { - return true; // bypass + // bypass the actual close. + return; } } RegionScannerHolder rsh = scanners.remove(scannerName); if (rsh != null) { - scanner = rsh.s; - scanner.close(); - try { - regionServer.leases.cancelLease(scannerName); - } catch (LeaseException le) { - // No problem, ignore - if (LOG.isTraceEnabled()) { - LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); - } - } - if (region != null && region.getCoprocessorHost() != null) { + rsh.s.close(); + if (region.getCoprocessorHost() != null) { region.getCoprocessorHost().postScannerClose(scanner); } } - return false; } @Override