diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 1658e5bf9d2..22a56e355e2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -381,9 +381,7 @@ public abstract class ClientScanner extends AbstractClientScanner { Result[] values = null; long remainingResultSize = maxScannerResultSize; int countdown = this.caching; - - // We need to reset it if it's a new callable that was created - // with a countdown in nextScanner + // We need to reset it if it's a new callable that was created with a countdown in nextScanner callable.setCaching(this.caching); // This flag is set when we want to skip the result returned. We do // this when we reset scanner because it split under us. @@ -397,12 +395,11 @@ public abstract class ClientScanner extends AbstractClientScanner { // returns an empty array if scanning is to go on and we've just // exhausted current region. values = call(callable, caller, scannerTimeout); - // When the replica switch happens, we need to do certain operations - // again. The callable will openScanner with the right startkey - // but we need to pick up from there. Bypass the rest of the loop - // and let the catch-up happen in the beginning of the loop as it - // happens for the cases where we see exceptions. Since only openScanner - // would have happened, values would be null + // When the replica switch happens, we need to do certain operations again. + // The callable will openScanner with the right startkey but we need to pick up + // from there. Bypass the rest of the loop and let the catch-up happen in the beginning + // of the loop as it happens for the cases where we see exceptions. + // Since only openScanner would have happened, values would be null if (values == null && callable.switchedToADifferentReplica()) { // Any accumulated partial results are no longer valid since the callable will // openScanner with the correct startkey and we must pick up from there @@ -415,7 +412,6 @@ public abstract class ClientScanner extends AbstractClientScanner { // An exception was thrown which makes any partial results that we were collecting // invalid. The scanner will need to be reset to the beginning of a row. clearPartialResults(); - // DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us // to reset the scanner and come back in again. if (e instanceof UnknownScannerException) { @@ -424,8 +420,8 @@ public abstract class ClientScanner extends AbstractClientScanner { // a ScannerTimeoutException. Else, it's because the region moved and we used the old // id against the new region server; reset the scanner. if (timeout < System.currentTimeMillis()) { - LOG.info("For hints related to the following exception, please try taking a look at: " + - "https://hbase.apache.org/book.html#trouble.client.scantimeout"); + LOG.info("For hints related to the following exception, please try taking a look at: " + + "https://hbase.apache.org/book.html#trouble.client.scantimeout"); long elapsed = System.currentTimeMillis() - lastNext; ScannerTimeoutException ex = new ScannerTimeoutException(elapsed + "ms passed since the last invocation, " @@ -440,8 +436,7 @@ public abstract class ClientScanner extends AbstractClientScanner { if ((cause != null && cause instanceof NotServingRegionException) || (cause != null && cause instanceof RegionServerStoppedException) || e instanceof OutOfOrderScannerNextException) { - // Pass - // It is easier writing the if loop test as list of what is allowed rather than + // Pass. It is easier writing the if loop test as list of what is allowed rather than // as a list of what is not allowed... so if in here, it means we do not throw. } else { throw e; @@ -449,11 +444,9 @@ public abstract class ClientScanner extends AbstractClientScanner { } // Else, its signal from depths of ScannerCallable that we need to reset the scanner. if (this.lastResult != null) { - // The region has moved. We need to open a brand new scanner at - // the new location. - // Reset the startRow to the row we've seen last so that the new - // scanner starts at the correct row. Otherwise we may see previously - // returned rows again. + // The region has moved. We need to open a brand new scanner at the new location. + // Reset the startRow to the row we've seen last so that the new scanner starts at + // the correct row. Otherwise we may see previously returned rows again. // (ScannerCallable by now has "relocated" the correct region) if (scan.isReversed()) { scan.setStartRow(createClosestRowBefore(lastResult.getRow())); @@ -475,7 +468,6 @@ public abstract class ClientScanner extends AbstractClientScanner { // Set this to zero so we don't try and do an rpc and close on remote server when // the exception we got was UnknownScanner or the Server is going down. callable = null; - // This continue will take us to while at end of loop where we will set up new scanner. continue; } @@ -499,17 +491,19 @@ public abstract class ClientScanner extends AbstractClientScanner { this.lastResult = rs; } } - - // Caller of this method just wants a Result. If we see a heartbeat message, it means - // processing of the scan is taking a long time server side. Rather than continue to - // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing - // unnecesary delays to the caller - if (callable.isHeartbeatMessage() && cache.size() > 0) { - if (LOG.isTraceEnabled()) { - LOG.trace("Heartbeat message received and cache contains Results." - + " Breaking out of scan loop"); + if (callable.isHeartbeatMessage()) { + if (cache.size() > 0) { + // Caller of this method just wants a Result. If we see a heartbeat message, it means + // processing of the scan is taking a long time server side. Rather than continue to + // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing + // unnecesary delays to the caller + if (LOG.isTraceEnabled()) { + LOG.trace("Heartbeat message received and cache contains Results." + + " Breaking out of scan loop"); + } + break; } - break; + continue; } // We expect that the server won't have more results for us when we exhaust @@ -519,14 +513,15 @@ public abstract class ClientScanner extends AbstractClientScanner { if (null != values && values.length > 0 && callable.hasMoreResultsContext()) { // Only adhere to more server results when we don't have any partialResults // as it keeps the outer loop logic the same. - serverHasMoreResults = callable.getServerHasMoreResults() & partialResults.isEmpty(); + serverHasMoreResults = callable.getServerHasMoreResults() && partialResults.isEmpty(); } // Values == null means server-side filter has determined we must STOP // !partialResults.isEmpty() means that we are still accumulating partial Results for a // row. We should not change scanners before we receive all the partial Results for that // row. - } while (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults) - && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))); + } while ((callable != null && callable.isHeartbeatMessage()) + || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults) + && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)))); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java index 14c71d2dbb2..1935c0aefb0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -33,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -47,6 +49,8 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.ScannerCallable; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; @@ -86,7 +90,7 @@ public class TestScannerHeartbeatMessages { */ private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable"); - private static int NUM_ROWS = 10; + private static int NUM_ROWS = 5; private static byte[] ROW = Bytes.toBytes("testRow"); private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); @@ -197,6 +201,7 @@ public class TestScannerHeartbeatMessages { public void testScannerHeartbeatMessages() throws Exception { testImportanceOfHeartbeats(testHeartbeatBetweenRows()); testImportanceOfHeartbeats(testHeartbeatBetweenColumnFamilies()); + testImportanceOfHeartbeats(testHeartbeatWithSparseFilter()); } /** @@ -275,6 +280,63 @@ public class TestScannerHeartbeatMessages { }; } + public static class SparseFilter extends FilterBase{ + + @Override + public ReturnCode filterKeyValue(Cell v) throws IOException { + try { + Thread.sleep(SERVER_TIME_LIMIT + 10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ? + ReturnCode.INCLUDE : + ReturnCode.SKIP; + } + + public static Filter parseFrom(final byte [] pbBytes){ + return new SparseFilter(); + } + } + + /** + * Test the case that there is a filter which filters most of cells + * @throws Exception + */ + public Callable testHeartbeatWithSparseFilter() throws Exception { + return new Callable() { + @Override + public Void call() throws Exception { + Scan scan = new Scan(); + scan.setMaxResultSize(Long.MAX_VALUE); + scan.setCaching(Integer.MAX_VALUE); + scan.setFilter(new SparseFilter()); + ResultScanner scanner = TABLE.getScanner(scan); + int num = 0; + while (scanner.next() != null) { + num++; + } + assertEquals(1, num); + scanner.close(); + + scan = new Scan(); + scan.setMaxResultSize(Long.MAX_VALUE); + scan.setCaching(Integer.MAX_VALUE); + scan.setFilter(new SparseFilter()); + scan.setAllowPartialResults(true); + scanner = TABLE.getScanner(scan); + num = 0; + while (scanner.next() != null) { + num++; + } + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num); + scanner.close(); + + return null; + } + }; + } + /** * Test the equivalence of a scan versus the same scan executed when heartbeat messages are * necessary