HBASE-15378 Scanner cannot handle heartbeat message with no results (Phil Yang)

This commit is contained in:
tedyu 2016-03-09 09:28:54 -08:00
parent 9e967e5c1d
commit ad9b91a904
2 changed files with 91 additions and 34 deletions

View File

@ -381,9 +381,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
Result[] values = null; Result[] values = null;
long remainingResultSize = maxScannerResultSize; long remainingResultSize = maxScannerResultSize;
int countdown = this.caching; int countdown = this.caching;
// We need to reset it if it's a new callable that was created with a countdown in nextScanner
// We need to reset it if it's a new callable that was created
// with a countdown in nextScanner
callable.setCaching(this.caching); callable.setCaching(this.caching);
// This flag is set when we want to skip the result returned. We do // This flag is set when we want to skip the result returned. We do
// this when we reset scanner because it split under us. // this when we reset scanner because it split under us.
@ -397,12 +395,11 @@ public abstract class ClientScanner extends AbstractClientScanner {
// returns an empty array if scanning is to go on and we've just // returns an empty array if scanning is to go on and we've just
// exhausted current region. // exhausted current region.
values = call(callable, caller, scannerTimeout); values = call(callable, caller, scannerTimeout);
// When the replica switch happens, we need to do certain operations // When the replica switch happens, we need to do certain operations again.
// again. The callable will openScanner with the right startkey // The callable will openScanner with the right startkey but we need to pick up
// but we need to pick up from there. Bypass the rest of the loop // from there. Bypass the rest of the loop and let the catch-up happen in the beginning
// and let the catch-up happen in the beginning of the loop as it // of the loop as it happens for the cases where we see exceptions.
// happens for the cases where we see exceptions. Since only openScanner // Since only openScanner would have happened, values would be null
// would have happened, values would be null
if (values == null && callable.switchedToADifferentReplica()) { if (values == null && callable.switchedToADifferentReplica()) {
// Any accumulated partial results are no longer valid since the callable will // Any accumulated partial results are no longer valid since the callable will
// openScanner with the correct startkey and we must pick up from there // openScanner with the correct startkey and we must pick up from there
@ -415,7 +412,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
// An exception was thrown which makes any partial results that we were collecting // An exception was thrown which makes any partial results that we were collecting
// invalid. The scanner will need to be reset to the beginning of a row. // invalid. The scanner will need to be reset to the beginning of a row.
clearPartialResults(); clearPartialResults();
// DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us // DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us
// to reset the scanner and come back in again. // to reset the scanner and come back in again.
if (e instanceof UnknownScannerException) { if (e instanceof UnknownScannerException) {
@ -424,8 +420,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
// a ScannerTimeoutException. Else, it's because the region moved and we used the old // a ScannerTimeoutException. Else, it's because the region moved and we used the old
// id against the new region server; reset the scanner. // id against the new region server; reset the scanner.
if (timeout < System.currentTimeMillis()) { if (timeout < System.currentTimeMillis()) {
LOG.info("For hints related to the following exception, please try taking a look at: " + LOG.info("For hints related to the following exception, please try taking a look at: "
"https://hbase.apache.org/book.html#trouble.client.scantimeout"); + "https://hbase.apache.org/book.html#trouble.client.scantimeout");
long elapsed = System.currentTimeMillis() - lastNext; long elapsed = System.currentTimeMillis() - lastNext;
ScannerTimeoutException ex = ScannerTimeoutException ex =
new ScannerTimeoutException(elapsed + "ms passed since the last invocation, " new ScannerTimeoutException(elapsed + "ms passed since the last invocation, "
@ -440,8 +436,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
if ((cause != null && cause instanceof NotServingRegionException) || if ((cause != null && cause instanceof NotServingRegionException) ||
(cause != null && cause instanceof RegionServerStoppedException) || (cause != null && cause instanceof RegionServerStoppedException) ||
e instanceof OutOfOrderScannerNextException) { e instanceof OutOfOrderScannerNextException) {
// Pass // Pass. It is easier writing the if loop test as list of what is allowed rather than
// It is easier writing the if loop test as list of what is allowed rather than
// as a list of what is not allowed... so if in here, it means we do not throw. // as a list of what is not allowed... so if in here, it means we do not throw.
} else { } else {
throw e; throw e;
@ -449,11 +444,9 @@ public abstract class ClientScanner extends AbstractClientScanner {
} }
// Else, its signal from depths of ScannerCallable that we need to reset the scanner. // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
if (this.lastResult != null) { if (this.lastResult != null) {
// The region has moved. We need to open a brand new scanner at // The region has moved. We need to open a brand new scanner at the new location.
// the new location. // Reset the startRow to the row we've seen last so that the new scanner starts at
// Reset the startRow to the row we've seen last so that the new // the correct row. Otherwise we may see previously returned rows again.
// scanner starts at the correct row. Otherwise we may see previously
// returned rows again.
// (ScannerCallable by now has "relocated" the correct region) // (ScannerCallable by now has "relocated" the correct region)
if (scan.isReversed()) { if (scan.isReversed()) {
scan.setStartRow(createClosestRowBefore(lastResult.getRow())); scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
@ -475,7 +468,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
// Set this to zero so we don't try and do an rpc and close on remote server when // Set this to zero so we don't try and do an rpc and close on remote server when
// the exception we got was UnknownScanner or the Server is going down. // the exception we got was UnknownScanner or the Server is going down.
callable = null; callable = null;
// This continue will take us to while at end of loop where we will set up new scanner. // This continue will take us to while at end of loop where we will set up new scanner.
continue; continue;
} }
@ -499,18 +491,20 @@ public abstract class ClientScanner extends AbstractClientScanner {
this.lastResult = rs; this.lastResult = rs;
} }
} }
if (callable.isHeartbeatMessage()) {
if (cache.size() > 0) {
// Caller of this method just wants a Result. If we see a heartbeat message, it means // Caller of this method just wants a Result. If we see a heartbeat message, it means
// processing of the scan is taking a long time server side. Rather than continue to // processing of the scan is taking a long time server side. Rather than continue to
// loop until a limit (e.g. size or caching) is reached, break out early to avoid causing // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing
// unnecesary delays to the caller // unnecesary delays to the caller
if (callable.isHeartbeatMessage() && cache.size() > 0) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Heartbeat message received and cache contains Results." LOG.trace("Heartbeat message received and cache contains Results."
+ " Breaking out of scan loop"); + " Breaking out of scan loop");
} }
break; break;
} }
continue;
}
// We expect that the server won't have more results for us when we exhaust // We expect that the server won't have more results for us when we exhaust
// the size (bytes or count) of the results returned. If the server *does* inform us that // the size (bytes or count) of the results returned. If the server *does* inform us that
@ -519,14 +513,15 @@ public abstract class ClientScanner extends AbstractClientScanner {
if (null != values && values.length > 0 && callable.hasMoreResultsContext()) { if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
// Only adhere to more server results when we don't have any partialResults // Only adhere to more server results when we don't have any partialResults
// as it keeps the outer loop logic the same. // as it keeps the outer loop logic the same.
serverHasMoreResults = callable.getServerHasMoreResults() & partialResults.isEmpty(); serverHasMoreResults = callable.getServerHasMoreResults() && partialResults.isEmpty();
} }
// Values == null means server-side filter has determined we must STOP // Values == null means server-side filter has determined we must STOP
// !partialResults.isEmpty() means that we are still accumulating partial Results for a // !partialResults.isEmpty() means that we are still accumulating partial Results for a
// row. We should not change scanners before we receive all the partial Results for that // row. We should not change scanners before we receive all the partial Results for that
// row. // row.
} while (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults) } while ((callable != null && callable.isHeartbeatMessage())
&& (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))); || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
&& (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))));
} }
/** /**

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -33,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
@ -47,6 +49,8 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ScannerCallable; import org.apache.hadoop.hbase.client.ScannerCallable;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
@ -86,7 +90,7 @@ public class TestScannerHeartbeatMessages {
*/ */
private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable"); private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable");
private static int NUM_ROWS = 10; private static int NUM_ROWS = 5;
private static byte[] ROW = Bytes.toBytes("testRow"); private static byte[] ROW = Bytes.toBytes("testRow");
private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
@ -197,6 +201,7 @@ public class TestScannerHeartbeatMessages {
public void testScannerHeartbeatMessages() throws Exception { public void testScannerHeartbeatMessages() throws Exception {
testImportanceOfHeartbeats(testHeartbeatBetweenRows()); testImportanceOfHeartbeats(testHeartbeatBetweenRows());
testImportanceOfHeartbeats(testHeartbeatBetweenColumnFamilies()); testImportanceOfHeartbeats(testHeartbeatBetweenColumnFamilies());
testImportanceOfHeartbeats(testHeartbeatWithSparseFilter());
} }
/** /**
@ -275,6 +280,63 @@ public class TestScannerHeartbeatMessages {
}; };
} }
public static class SparseFilter extends FilterBase{
@Override
public ReturnCode filterKeyValue(Cell v) throws IOException {
try {
Thread.sleep(SERVER_TIME_LIMIT + 10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ?
ReturnCode.INCLUDE :
ReturnCode.SKIP;
}
public static Filter parseFrom(final byte [] pbBytes){
return new SparseFilter();
}
}
/**
* Test the case that there is a filter which filters most of cells
* @throws Exception
*/
public Callable<Void> testHeartbeatWithSparseFilter() throws Exception {
return new Callable<Void>() {
@Override
public Void call() throws Exception {
Scan scan = new Scan();
scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE);
scan.setFilter(new SparseFilter());
ResultScanner scanner = TABLE.getScanner(scan);
int num = 0;
while (scanner.next() != null) {
num++;
}
assertEquals(1, num);
scanner.close();
scan = new Scan();
scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE);
scan.setFilter(new SparseFilter());
scan.setAllowPartialResults(true);
scanner = TABLE.getScanner(scan);
num = 0;
while (scanner.next() != null) {
num++;
}
assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
scanner.close();
return null;
}
};
}
/** /**
* Test the equivalence of a scan versus the same scan executed when heartbeat messages are * Test the equivalence of a scan versus the same scan executed when heartbeat messages are
* necessary * necessary