HBASE-13193 RegionScannerImpl filters should not be reset if a partial Result is returned (Jonathan Lawlor)

This commit is contained in:
stack 2015-03-16 13:26:34 -07:00
parent c5aca1919d
commit a75a2ace4f
4 changed files with 133 additions and 53 deletions

View File

@ -71,6 +71,12 @@ public class ClientScanner extends AbstractClientScanner {
* result.
*/
protected final LinkedList<Result> partialResults = new LinkedList<Result>();
/**
* The row for which we are accumulating partial Results (i.e. the row of the Results stored
* inside partialResults). Changes to partialResultsRow and partialResults are kept in sync
* via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()}
*/
protected byte[] partialResultsRow = null;
protected final int caching;
protected long lastNext;
// Keep lastResult returned successfully in case we have to reset scanner.
@ -378,7 +384,7 @@ public class ClientScanner extends AbstractClientScanner {
} catch (DoNotRetryIOException e) {
// An exception was thrown which makes any partial results that we were collecting
// invalid. The scanner will need to be reset to the beginning of a row.
partialResults.clear();
clearPartialResults();
// DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us
// to reset the scanner and come back in again.
@ -515,7 +521,7 @@ public class ClientScanner extends AbstractClientScanner {
if (resultsFromServer == null || resultsFromServer.length == 0) {
if (!partialResults.isEmpty()) {
resultsToAddToCache.add(Result.createCompleteResult(partialResults));
partialResults.clear();
clearPartialResults();
}
return resultsToAddToCache;
@ -534,67 +540,65 @@ public class ClientScanner extends AbstractClientScanner {
LOG.trace(sb.toString());
}
// There are four possibilities cases that can occur while handling partial results
// There are three possibilities cases that can occur while handling partial results
//
// 1. (partial != null && partialResults.isEmpty())
// This is the first partial result that we have received. It should be added to
// the list of partialResults and await the next RPC request at which point another
// portion of the complete result will be received
//
// 2. (partial != null && !partialResults.isEmpty())
// a. values.length == 1
// Since partialResults contains some elements, it means that we are expecting to receive
// the remainder of the complete result within this RPC response. The fact that a partial result
// was returned and it's the ONLY result returned indicates that we are still receiving
// fragments of the complete result. The Result can be completely formed only when we have
// received all of the fragments and thus in this case we simply add the partial result to
// our list.
// 2. !partialResults.isEmpty()
// Since our partialResults list is not empty it means that we have been accumulating partial
// Results for a particular row. We cannot form the complete/whole Result for that row until
// all partials for the row have been received. Thus we loop through all of the Results
// returned from the server and determine whether or not all partial Results for the row have
// been received. We know that we have received all of the partial Results for the row when:
// i) We notice a row change in the Results
// ii) We see a Result for the partial row that is NOT marked as a partial Result
//
// b. values.length > 1
// More than one result has been returned from the server. The fact that we are accumulating
// partials in partialList and we just received more than one result back from the server
// indicates that the FIRST result we received from the server must be the final fragment that
// can be used to complete our result. What this means is that the partial that we received is
// a partial result for a different row, and at this point we should combine the existing
// partials into a complete result, clear the partialList, and begin accumulating partials for
// a new row
//
// 3. (partial == null && !partialResults.isEmpty())
// No partial was received but we are accumulating partials in our list. That means the final
// fragment of the complete result will be the first Result in values[]. We use it to create the
// complete Result, clear the list, and add it to the list of Results that must be added to the
// cache. All other Results in values[] are added after the complete result to maintain proper
// ordering
//
// 4. (partial == null && partialResults.isEmpty())
// 3. (partial == null && partialResults.isEmpty())
// Business as usual. We are not accumulating partial results and there wasn't a partial result
// in the RPC response. This means that all of the results we received from the server are
// complete and can be added directly to the cache
if (partial != null && partialResults.isEmpty()) {
partialResults.add(partial);
addToPartialResults(partial);
// Exclude the last result, it's a partial
addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length - 1);
} else if (partial != null && !partialResults.isEmpty()) {
if (resultsFromServer.length > 1) {
Result finalResult = resultsFromServer[0];
partialResults.add(finalResult);
resultsToAddToCache.add(Result.createCompleteResult(partialResults));
partialResults.clear();
} else if (!partialResults.isEmpty()) {
for (int i = 0; i < resultsFromServer.length; i++) {
Result result = resultsFromServer[i];
// Exclude first result, it was used to form our complete result
// Exclude last result, it's a partial result
addResultsToList(resultsToAddToCache, resultsFromServer, 1, resultsFromServer.length - 1);
// This result is from the same row as the partial Results. Add it to the list of partials
// and check if it was the last partial Result for that row
if (Bytes.equals(partialResultsRow, result.getRow())) {
addToPartialResults(result);
// If the result is not a partial, it is a signal to us that it is the last Result we
// need to form the complete Result client-side
if (!result.isPartial()) {
resultsToAddToCache.add(Result.createCompleteResult(partialResults));
clearPartialResults();
}
} else {
// The row of this result differs from the row of the partial results we have received so
// far. If our list of partials isn't empty, this is a signal to form the complete Result
// since the row has now changed
if (!partialResults.isEmpty()) {
resultsToAddToCache.add(Result.createCompleteResult(partialResults));
clearPartialResults();
}
// It's possible that in one response from the server we receive the final partial for
// one row and receive a partial for a different row. Thus, make sure that all Results
// are added to the proper list
if (result.isPartial()) {
addToPartialResults(result);
} else {
resultsToAddToCache.add(result);
}
}
}
partialResults.add(partial);
} else if (partial == null && !partialResults.isEmpty()) {
Result finalResult = resultsFromServer[0];
partialResults.add(finalResult);
resultsToAddToCache.add(Result.createCompleteResult(partialResults));
partialResults.clear();
// Exclude the first result, it was used to form our complete result
addResultsToList(resultsToAddToCache, resultsFromServer, 1, resultsFromServer.length);
} else { // partial == null && partialResults.isEmpty() -- business as usual
addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length);
}
@ -602,6 +606,31 @@ public class ClientScanner extends AbstractClientScanner {
return resultsToAddToCache;
}
/**
* A convenience method for adding a Result to our list of partials. This method ensure that only
* Results that belong to the same row as the other partials can be added to the list.
* @param result The result that we want to add to our list of partial Results
* @throws IOException
*/
private void addToPartialResults(final Result result) throws IOException {
final byte[] row = result.getRow();
if (partialResultsRow != null && !Bytes.equals(row, partialResultsRow)) {
throw new IOException("Partial result row does not match. All partial results must come "
+ "from the same row. partialResultsRow: " + Bytes.toString(partialResultsRow) + "row: "
+ Bytes.toString(row));
}
partialResultsRow = row;
partialResults.add(result);
}
/**
* Convenience method for clearing the list of partials and resetting the partialResultsRow.
*/
private void clearPartialResults() {
partialResults.clear();
partialResultsRow = null;
}
/**
* Helper method for adding results between the indices [start, end) to the outputList
* @param outputList the list that results will be added to

View File

@ -5538,13 +5538,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
state = nextInternal(tmpList, batchLimit, remainingResultSize);
outResults.addAll(tmpList);
}
// State should never be null, this is a precautionary measure
if (state == null) {
if (LOG.isTraceEnabled()) LOG.trace("State was null. Defaulting to no more values state");
state = NextState.makeState(NextState.State.NO_MORE_VALUES);
// Invalid states should never be returned. Receiving an invalid state means that we have
// no clue how to proceed. Throw an exception.
if (!NextState.isValidState(state)) {
throw new IOException("Invalid state returned from nextInternal. state:" + state);
}
resetFilters();
// If the size limit was reached it means a partial Result is being returned. Returning a
// partial Result means that we should not reset the filters; filters should only be reset in
// between rows
if (!state.sizeLimitReached()) resetFilters();
if (isFilterDoneInternal()) {
state = NextState.makeState(NextState.State.NO_MORE_VALUES, state.getResultSize());
}

View File

@ -201,6 +201,11 @@ public interface InternalScanner extends Closeable {
return resultSize >= 0;
}
@Override
public String toString() {
return "State: " + state + " resultSize: " + resultSize;
}
/**
* Helper method to centralize all checks as to whether or not the state is valid.
* @param state

View File

@ -24,7 +24,9 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -35,6 +37,11 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter;
import org.apache.hadoop.hbase.filter.RandomRowFilter;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -501,7 +508,7 @@ public class TestPartialResultsFromClientSide {
* because the entire row needs to be read for the include/exclude decision to be made
*/
@Test
public void testNoPartialResultsWhenFilterPresent() throws Exception {
public void testNoPartialResultsWhenRowFilterPresent() throws Exception {
Scan scan = new Scan();
scan.setMaxResultSize(1);
scan.setAllowPartialResults(true);
@ -784,4 +791,39 @@ public class TestPartialResultsFromClientSide {
scanner.close();
return numCells;
}
/**
* Test partial Result re-assembly in the presence of different filters. The Results from the
* partial scanner should match the Results returned from a scanner that receives all of the
* results in one RPC to the server. The partial scanner is tested with a variety of different
* result sizes (all of which are less than the size necessary to fetch an entire row)
* @throws Exception
*/
@Test
public void testPartialResultsWithColumnFilter() throws Exception {
testPartialResultsWithColumnFilter(new FirstKeyOnlyFilter());
testPartialResultsWithColumnFilter(new ColumnPrefixFilter(Bytes.toBytes("testQualifier5")));
testPartialResultsWithColumnFilter(new ColumnRangeFilter(Bytes.toBytes("testQualifer1"), true,
Bytes.toBytes("testQualifier7"), true));
Set<byte[]> qualifiers = new LinkedHashSet<>();
qualifiers.add(Bytes.toBytes("testQualifier5"));
testPartialResultsWithColumnFilter(new FirstKeyValueMatchingQualifiersFilter(qualifiers));
}
public void testPartialResultsWithColumnFilter(Filter filter) throws Exception {
assertTrue(!filter.hasFilterRow());
Scan partialScan = new Scan();
partialScan.setFilter(filter);
Scan oneshotScan = new Scan();
oneshotScan.setFilter(filter);
oneshotScan.setMaxResultSize(Long.MAX_VALUE);
for (int i = 1; i <= NUM_COLS; i++) {
partialScan.setMaxResultSize(getResultSizeForNumberOfCells(i));
testEquivalenceOfScanResults(TABLE, partialScan, oneshotScan);
}
}
}