diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 42597ffb6da..0c6dc168101 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -25,29 +25,30 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti import java.io.IOException; import java.io.InterruptedIOException; -import java.util.LinkedList; +import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.ExecutorService; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; +import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; -import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; -import org.apache.hadoop.hbase.exceptions.ScannerResetException; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.util.Bytes; /** * Implements the scanner interface for the HBase client. If there are multiple regions in a table, @@ -294,25 +295,30 @@ public abstract class ClientScanner extends AbstractClientScanner { } protected void initSyncCache() { - cache = new LinkedList<>(); + cache = new ArrayDeque<>(); } protected Result nextWithSyncCache() throws IOException { - // If the scanner is closed and there's nothing left in the cache, next is a no-op. - if (cache.isEmpty() && this.closed) { + Result result = cache.poll(); + if (result != null) { + return result; + } + // If there is nothing left in the cache and the scanner is closed, + // return a no-op + if (this.closed) { return null; } - if (cache.isEmpty()) { - loadCache(); - } - if (cache.size() > 0) { - return cache.poll(); - } + loadCache(); + + // try again to load from cache + result = cache.poll(); // if we exhausted this scanner before calling close, write out the scan metrics - writeScanMetrics(); - return null; + if (result == null) { + writeScanMetrics(); + } + return result; } @VisibleForTesting @@ -410,11 +416,9 @@ public abstract class ClientScanner extends AbstractClientScanner { long remainingResultSize = maxScannerResultSize; int countdown = this.caching; // This is possible if we just stopped at the boundary of a region in the previous call. - if (callable == null) { - if (!moveToNextRegion()) { - closed = true; - return; - } + if (callable == null && !moveToNextRegion()) { + closed = true; + return; } // This flag is set when we want to skip the result returned. We do // this when we reset scanner because it split under us. @@ -463,15 +467,13 @@ public abstract class ClientScanner extends AbstractClientScanner { scanResultCache.addAndGet(values, callable.isHeartbeatMessage()); int numberOfCompleteRows = scanResultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore; - if (resultsToAddToCache.length > 0) { - for (Result rs : resultsToAddToCache) { - cache.add(rs); - long estimatedHeapSizeOfResult = calcEstimatedSize(rs); - countdown--; - remainingResultSize -= estimatedHeapSizeOfResult; - addEstimatedSize(estimatedHeapSizeOfResult); - this.lastResult = rs; - } + for (Result rs : resultsToAddToCache) { + cache.add(rs); + long estimatedHeapSizeOfResult = calcEstimatedSize(rs); + countdown--; + remainingResultSize -= estimatedHeapSizeOfResult; + addEstimatedSize(estimatedHeapSizeOfResult); + this.lastResult = rs; } if (scan.getLimit() > 0) { @@ -491,10 +493,8 @@ public abstract class ClientScanner extends AbstractClientScanner { // processing of the scan is taking a long time server side. Rather than continue to // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing // unnecesary delays to the caller - if (LOG.isTraceEnabled()) { - LOG.trace("Heartbeat message received and cache contains Results." + - " Breaking out of scan loop"); - } + LOG.trace("Heartbeat message received and cache contains Results. " + + "Breaking out of scan loop"); // we know that the region has not been exhausted yet so just break without calling // closeScannerIfExhausted break; @@ -560,9 +560,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // We used to catch this error, interpret, and rethrow. However, we // have since decided that it's not nice for a scanner's close to // throw exceptions. Chances are it was just due to lease time out. - if (LOG.isDebugEnabled()) { - LOG.debug("scanner failed to close", e); - } + LOG.debug("scanner failed to close", e); } catch (IOException e) { /* An exception other than UnknownScanner is unexpected. */ LOG.warn("scanner failed to close.", e); @@ -574,19 +572,20 @@ public abstract class ClientScanner extends AbstractClientScanner { @Override public boolean renewLease() { - if (callable != null) { - // do not return any rows, do not advance the scanner - callable.setRenew(true); - try { - this.caller.callWithoutRetries(callable, this.scannerTimeout); - } catch (Exception e) { - return false; - } finally { - callable.setRenew(false); - } - return true; + if (callable == null) { + return false; + } + // do not return any rows, do not advance the scanner + callable.setRenew(true); + try { + this.caller.callWithoutRetries(callable, this.scannerTimeout); + return true; + } catch (Exception e) { + LOG.debug("scanner failed to renew lease", e); + return false; + } finally { + callable.setRenew(false); } - return false; } protected void initCache() {