HBASE-17489 ClientScanner may send a next request to a RegionScanner which has been exhausted

This commit is contained in:
zhangduo 2017-01-22 10:02:29 +08:00
parent 9a9e3df856
commit 3abd13dacb
4 changed files with 614 additions and 550 deletions

View File

@ -54,9 +54,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
/** /**
* Implements the scanner interface for the HBase client. * Implements the scanner interface for the HBase client. If there are multiple regions in a table,
* If there are multiple regions in a table, this scanner will iterate * this scanner will iterate through them all.
* through them all.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public abstract class ClientScanner extends AbstractClientScanner { public abstract class ClientScanner extends AbstractClientScanner {
@ -229,15 +228,13 @@ public abstract class ClientScanner extends AbstractClientScanner {
return false; // unlikely. return false; // unlikely.
} }
private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException { protected final void closeScanner() throws IOException {
// If we have just switched replica, don't go to the next scanner yet. Rather, try if (this.callable != null) {
// the scanner operations on the new replica, from the right point in the scan this.callable.setClose();
// Note that when we switched to a different replica we left it at a point call(callable, caller, scannerTimeout);
// where we just did the "openScanner" with the appropriate startrow this.callable = null;
if (callable != null && callable.switchedToADifferentReplica()) return true; }
return nextScanner(nbRows, done);
} }
/* /*
* Gets a scanner for the next region. If this.currentRegion != null, then we will move to the * Gets a scanner for the next region. If this.currentRegion != null, then we will move to the
* endrow of this.currentRegion. Else we will get scanner at the scan.getStartRow(). We will go no * endrow of this.currentRegion. Else we will get scanner at the scan.getStartRow(). We will go no
@ -248,11 +245,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
*/ */
protected boolean nextScanner(int nbRows, final boolean done) throws IOException { protected boolean nextScanner(int nbRows, final boolean done) throws IOException {
// Close the previous scanner if it's open // Close the previous scanner if it's open
if (this.callable != null) { closeScanner();
this.callable.setClose();
call(callable, caller, scannerTimeout);
this.callable = null;
}
// Where to start the next scanner // Where to start the next scanner
byte[] localStartKey; byte[] localStartKey;
@ -371,6 +364,37 @@ public abstract class ClientScanner extends AbstractClientScanner {
return cache != null ? cache.size() : 0; return cache != null ? cache.size() : 0;
} }
private boolean regionExhausted(Result[] values) {
// This means the server tells us the whole scan operation is done. Usually decided by filter.
if (values == null) {
return true;
}
// Not a heartbeat message and we get nothing, this means the region is exhausted
if (values.length == 0 && !callable.isHeartbeatMessage()) {
return true;
}
// Server tells us that it has no more results for this region. Notice that this flag is get
// from the ScanResponse.getMoreResultsInRegion, not ScanResponse.getMoreResults. If the latter
// one is false then we will get a null values and quit in the first condition of this method.
if (callable.hasMoreResultsContext() && !callable.getServerHasMoreResults()) {
return true;
}
return false;
}
private void closeScannerIfExhausted(boolean exhausted) throws IOException {
if (exhausted) {
if (!partialResults.isEmpty()) {
// XXX: continue if there are partial results. But in fact server should not set
// hasMoreResults to false if there are partial results.
LOG.warn("Server tells us there is no more results for this region but we still have"
+ " partialResults, this should not happen, retry on the current scanner anyway");
} else {
closeScanner();
}
}
}
/** /**
* Contact the servers to load more {@link Result}s in the cache. * Contact the servers to load more {@link Result}s in the cache.
*/ */
@ -380,17 +404,18 @@ public abstract class ClientScanner extends AbstractClientScanner {
Result[] values = null; Result[] values = null;
long remainingResultSize = maxScannerResultSize; long remainingResultSize = maxScannerResultSize;
int countdown = this.caching; int countdown = this.caching;
// This is possible if we just stopped at the boundary of a region in the previous call.
if (callable == null) {
if (!nextScanner(countdown, false)) {
return;
}
}
// We need to reset it if it's a new callable that was created with a countdown in nextScanner // We need to reset it if it's a new callable that was created with a countdown in nextScanner
callable.setCaching(this.caching); callable.setCaching(this.caching);
// This flag is set when we want to skip the result returned. We do // This flag is set when we want to skip the result returned. We do
// this when we reset scanner because it split under us. // this when we reset scanner because it split under us.
boolean retryAfterOutOfOrderException = true; boolean retryAfterOutOfOrderException = true;
// We don't expect that the server will have more results for us if for (;;) {
// it doesn't tell us otherwise. We rely on the size or count of results
boolean serverHasMoreResults = false;
boolean allResultsSkipped = false;
do {
allResultsSkipped = false;
try { try {
// Server returns a null values if scanning is to stop. Else, // Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just // returns an empty array if scanning is to go on and we've just
@ -436,7 +461,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
// Reset the startRow to the row we've seen last so that the new scanner starts at // Reset the startRow to the row we've seen last so that the new scanner starts at
// the correct row. Otherwise we may see previously returned rows again. // the correct row. Otherwise we may see previously returned rows again.
// (ScannerCallable by now has "relocated" the correct region) // (ScannerCallable by now has "relocated" the correct region)
if (!this.lastResult.isPartial() && scan.getBatch() < 0 ) { if (!this.lastResult.isPartial() && scan.getBatch() < 0) {
if (scan.isReversed()) { if (scan.isReversed()) {
scan.setStartRow(createClosestRowBefore(lastResult.getRow())); scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
} else { } else {
@ -461,7 +486,10 @@ public abstract class ClientScanner extends AbstractClientScanner {
// Set this to zero so we don't try and do an rpc and close on remote server when // Set this to zero so we don't try and do an rpc and close on remote server when
// the exception we got was UnknownScanner or the Server is going down. // the exception we got was UnknownScanner or the Server is going down.
callable = null; callable = null;
// This continue will take us to while at end of loop where we will set up new scanner. // reopen the scanner
if (!nextScanner(countdown, false)) {
break;
}
continue; continue;
} }
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
@ -487,61 +515,58 @@ public abstract class ClientScanner extends AbstractClientScanner {
remainingResultSize -= estimatedHeapSizeOfResult; remainingResultSize -= estimatedHeapSizeOfResult;
addEstimatedSize(estimatedHeapSizeOfResult); addEstimatedSize(estimatedHeapSizeOfResult);
this.lastResult = rs; this.lastResult = rs;
if (this.lastResult.isPartial() || scan.getBatch() > 0 ) { if (this.lastResult.isPartial() || scan.getBatch() > 0) {
updateLastCellLoadedToCache(this.lastResult); updateLastCellLoadedToCache(this.lastResult);
} else { } else {
this.lastCellLoadedToCache = null; this.lastCellLoadedToCache = null;
} }
} }
if (cache.isEmpty()) {
// all result has been seen before, we need scan more.
allResultsSkipped = true;
continue;
}
} }
boolean exhausted = regionExhausted(values);
if (callable.isHeartbeatMessage()) { if (callable.isHeartbeatMessage()) {
if (cache.size() > 0) { if (!cache.isEmpty()) {
// Caller of this method just wants a Result. If we see a heartbeat message, it means // Caller of this method just wants a Result. If we see a heartbeat message, it means
// processing of the scan is taking a long time server side. Rather than continue to // processing of the scan is taking a long time server side. Rather than continue to
// loop until a limit (e.g. size or caching) is reached, break out early to avoid causing // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing
// unnecesary delays to the caller // unnecesary delays to the caller
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Heartbeat message received and cache contains Results." LOG.trace("Heartbeat message received and cache contains Results."
+ " Breaking out of scan loop"); + " Breaking out of scan loop");
} }
// we know that the region has not been exhausted yet so just break without calling
// closeScannerIfExhausted
break; break;
} }
continue;
} }
if (countdown <= 0) {
// We expect that the server won't have more results for us when we exhaust // we have enough result.
// the size (bytes or count) of the results returned. If the server *does* inform us that closeScannerIfExhausted(exhausted);
// there are more results, we want to avoid possiblyNextScanner(...). Only when we actually break;
// get results is the moreResults context valid.
if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
// Only adhere to more server results when we don't have any partialResults
// as it keeps the outer loop logic the same.
serverHasMoreResults = callable.getServerHasMoreResults() && partialResults.isEmpty();
} }
// Values == null means server-side filter has determined we must STOP if (remainingResultSize <= 0) {
// !partialResults.isEmpty() means that we are still accumulating partial Results for a if (!cache.isEmpty()) {
// row. We should not change scanners before we receive all the partial Results for that closeScannerIfExhausted(exhausted);
// row. break;
} while (allResultsSkipped || (callable != null && callable.isHeartbeatMessage()) } else {
|| (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults) // we have reached the max result size but we still can not find anything to return to the
&& (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)))); // user. Reset the maxResultSize and try again.
} remainingResultSize = maxScannerResultSize;
}
/** }
* @param remainingResultSize // we are done with the current region
* @param remainingRows if (exhausted) {
* @param regionHasMoreResults if (!partialResults.isEmpty()) {
* @return true when the current region has been exhausted. When the current region has been // XXX: continue if there are partial results. But in fact server should not set
* exhausted, the region must be changed before scanning can continue // hasMoreResults to false if there are partial results.
*/ LOG.warn("Server tells us there is no more results for this region but we still have"
private boolean doneWithRegion(long remainingResultSize, int remainingRows, + " partialResults, this should not happen, retry on the current scanner anyway");
boolean regionHasMoreResults) { continue;
return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults; }
if (!nextScanner(countdown, values == null)) {
break;
}
}
}
} }
protected void addEstimatedSize(long estimatedHeapSizeOfResult) { protected void addEstimatedSize(long estimatedHeapSizeOfResult) {
@ -566,9 +591,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
* @return the list of results that should be added to the cache. * @return the list of results that should be added to the cache.
* @throws IOException * @throws IOException
*/ */
protected List<Result> protected List<Result> getResultsToAddToCache(Result[] resultsFromServer,
getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage) boolean heartbeatMessage) throws IOException {
throws IOException {
int resultSize = resultsFromServer != null ? resultsFromServer.length : 0; int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize); List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
@ -583,7 +607,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
// the batch size even though it may not be the last group of cells for that row. // the batch size even though it may not be the last group of cells for that row.
if (allowPartials || isBatchSet) { if (allowPartials || isBatchSet) {
addResultsToList(resultsToAddToCache, resultsFromServer, 0, addResultsToList(resultsToAddToCache, resultsFromServer, 0,
(null == resultsFromServer ? 0 : resultsFromServer.length)); (null == resultsFromServer ? 0 : resultsFromServer.length));
return resultsToAddToCache; return resultsToAddToCache;
} }
@ -769,12 +793,12 @@ public abstract class ClientScanner extends AbstractClientScanner {
} }
/** /**
* Compare two Cells considering reversed scanner. * Compare two Cells considering reversed scanner. ReversedScanner only reverses rows, not
* ReversedScanner only reverses rows, not columns. * columns.
*/ */
private int compare(Cell a, Cell b) { private int compare(Cell a, Cell b) {
CellComparator comparator = currentRegion != null && currentRegion.isMetaRegion() ? CellComparator comparator = currentRegion != null && currentRegion.isMetaRegion()
CellComparator.META_COMPARATOR : CellComparator.COMPARATOR; ? CellComparator.META_COMPARATOR : CellComparator.COMPARATOR;
int r = comparator.compareRows(a, b); int r = comparator.compareRows(a, b);
if (r != 0) { if (r != 0) {
return this.scan.isReversed() ? -r : r; return this.scan.isReversed() ? -r : r;

View File

@ -63,13 +63,7 @@ public class ReversedClientScanner extends ClientSimpleScanner {
protected boolean nextScanner(int nbRows, final boolean done) protected boolean nextScanner(int nbRows, final boolean done)
throws IOException { throws IOException {
// Close the previous scanner if it's open // Close the previous scanner if it's open
if (this.callable != null) { closeScanner();
this.callable.setClose();
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries
this.caller.callWithoutRetries(callable, scannerTimeout);
this.callable = null;
}
// Where to start the next scanner // Where to start the next scanner
byte[] localStartKey; byte[] localStartKey;

View File

@ -158,7 +158,8 @@ public class TestClientScanner {
ScannerCallableWithReplicas.class); ScannerCallableWithReplicas.class);
switch (count) { switch (count) {
case 0: // initialize case 0: // initialize
case 2: // close case 2: // detect no more results
case 3: // close
count++; count++;
return null; return null;
case 1: case 1:
@ -184,8 +185,10 @@ public class TestClientScanner {
scanner.loadCache(); scanner.loadCache();
// One more call due to initializeScannerInConstruction() // One for initializeScannerInConstruction()
inOrder.verify(caller, Mockito.times(2)).callWithoutRetries( // One for fetching the results
// One for fetching null results and quit as we do not have moreResults hint.
inOrder.verify(caller, Mockito.times(3)).callWithoutRetries(
Mockito.any(RetryingCallable.class), Mockito.anyInt()); Mockito.any(RetryingCallable.class), Mockito.anyInt());
assertEquals(1, scanner.cache.size()); assertEquals(1, scanner.cache.size());
@ -224,7 +227,8 @@ public class TestClientScanner {
case 1: case 1:
count++; count++;
callable.setHasMoreResultsContext(true); callable.setHasMoreResultsContext(true);
callable.setServerHasMoreResults(false); // if we set false here the implementation will trigger a close
callable.setServerHasMoreResults(true);
return results; return results;
default: default:
throw new RuntimeException("Expected only 2 invocations"); throw new RuntimeException("Expected only 2 invocations");
@ -291,7 +295,8 @@ public class TestClientScanner {
case 1: case 1:
count++; count++;
callable.setHasMoreResultsContext(true); callable.setHasMoreResultsContext(true);
callable.setServerHasMoreResults(false); // if we set false here the implementation will trigger a close
callable.setServerHasMoreResults(true);
return results; return results;
default: default:
throw new RuntimeException("Expected only 2 invocations"); throw new RuntimeException("Expected only 2 invocations");
@ -470,13 +475,14 @@ public class TestClientScanner {
Mockito.anyInt()); Mockito.anyInt());
InOrder inOrder = Mockito.inOrder(caller); InOrder inOrder = Mockito.inOrder(caller);
scanner.setRpcFinished(true);
scanner.loadCache(); scanner.loadCache();
inOrder.verify(caller, Mockito.times(2)).callWithoutRetries( inOrder.verify(caller, Mockito.times(3)).callWithoutRetries(
Mockito.any(RetryingCallable.class), Mockito.anyInt()); Mockito.any(RetryingCallable.class), Mockito.anyInt());
assertEquals(1, scanner.cache.size()); assertEquals(2, scanner.cache.size());
Result r = scanner.cache.poll(); Result r = scanner.cache.poll();
assertNotNull(r); assertNotNull(r);
CellScanner cs = r.cellScanner(); CellScanner cs = r.cellScanner();
@ -484,15 +490,6 @@ public class TestClientScanner {
assertEquals(kv1, cs.current()); assertEquals(kv1, cs.current());
assertFalse(cs.advance()); assertFalse(cs.advance());
scanner.setRpcFinished(true);
inOrder = Mockito.inOrder(caller);
scanner.loadCache();
inOrder.verify(caller, Mockito.times(3)).callWithoutRetries(
Mockito.any(RetryingCallable.class), Mockito.anyInt());
r = scanner.cache.poll(); r = scanner.cache.poll();
assertNotNull(r); assertNotNull(r);
cs = r.cellScanner(); cs = r.cellScanner();