HBASE-13090 Progress heartbeats for long running scanners

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Jonathan Lawlor 2015-03-10 14:24:07 -07:00 committed by stack
parent 91e09bc0ab
commit a4f77d49a5
18 changed files with 1234 additions and 104 deletions

View File

@ -394,7 +394,6 @@ public class ClientScanner extends AbstractClientScanner {
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
values = call(callable, caller, scannerTimeout);
// When the replica switch happens, we need to do certain operations
// again. The callable will openScanner with the right startkey
// but we need to pick up from there. Bypass the rest of the loop
@ -483,7 +482,8 @@ public class ClientScanner extends AbstractClientScanner {
// Groom the array of Results that we received back from the server before adding that
// Results to the scanner's cache. If partial results are not allowed to be seen by the
// caller, all book keeping will be performed within this method.
List<Result> resultsToAddToCache = getResultsToAddToCache(values);
List<Result> resultsToAddToCache =
getResultsToAddToCache(values, callable.isHeartbeatMessage());
if (!resultsToAddToCache.isEmpty()) {
for (Result rs : resultsToAddToCache) {
cache.add(rs);
@ -495,6 +495,19 @@ public class ClientScanner extends AbstractClientScanner {
this.lastResult = rs;
}
}
// Caller of this method just wants a Result. If we see a heartbeat message, it means
// processing of the scan is taking a long time server side. Rather than continue to
// loop until a limit (e.g. size or caching) is reached, break out early to avoid causing
// unnecesary delays to the caller
if (callable.isHeartbeatMessage() && cache.size() > 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("Heartbeat message received and cache contains Results."
+ " Breaking out of scan loop");
}
break;
}
// We expect that the server won't have more results for us when we exhaust
// the size (bytes or count) of the results returned. If the server *does* inform us that
// there are more results, we want to avoid possiblyNextScanner(...). Only when we actually
@ -508,20 +521,38 @@ public class ClientScanner extends AbstractClientScanner {
// !partialResults.isEmpty() means that we are still accumulating partial Results for a
// row. We should not change scanners before we receive all the partial Results for that
// row.
} while (remainingResultSize > 0 && countdown > 0 && !serverHasMoreResults
} while (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
&& (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)));
}
/**
* @param remainingResultSize
* @param remainingRows
* @param regionHasMoreResults
* @return true when the current region has been exhausted. When the current region has been
* exhausted, the region must be changed before scanning can continue
*/
private boolean doneWithRegion(long remainingResultSize, int remainingRows,
boolean regionHasMoreResults) {
return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults;
}
/**
* This method ensures all of our book keeping regarding partial results is kept up to date. This
* method should be called once we know that the results we received back from the RPC request do
* not contain errors. We return a list of results that should be added to the cache. In general,
* this list will contain all NON-partial results from the input array (unless the client has
* specified that they are okay with receiving partial results)
* @param resultsFromServer The array of {@link Result}s returned from the server
* @param heartbeatMessage Flag indicating whether or not the response received from the server
* represented a complete response, or a heartbeat message that was sent to keep the
* client-server connection alive
* @return the list of results that should be added to the cache.
* @throws IOException
*/
protected List<Result> getResultsToAddToCache(Result[] resultsFromServer) throws IOException {
protected List<Result>
getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage)
throws IOException {
int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
@ -539,10 +570,14 @@ public class ClientScanner extends AbstractClientScanner {
return resultsToAddToCache;
}
// If no results were returned it indicates that we have the all the partial results necessary
// to construct the complete result.
// If no results were returned it indicates that either we have the all the partial results
// necessary to construct the complete result or the server had to send a heartbeat message
// to the client to keep the client-server connection alive
if (resultsFromServer == null || resultsFromServer.length == 0) {
if (!partialResults.isEmpty()) {
// If this response was an empty heartbeat message, then we have not exhausted the region
// and thus there may be more partials server side that still need to be added to the partial
// list before we form the complete Result
if (!partialResults.isEmpty() && !heartbeatMessage) {
resultsToAddToCache.add(Result.createCompleteResult(partialResults));
clearPartialResults();
}

View File

@ -79,6 +79,12 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
protected final int id;
protected boolean serverHasMoreResultsContext;
protected boolean serverHasMoreResults;
/**
* Saves whether or not the most recent response from the server was a heartbeat message.
* Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()}
*/
protected boolean heartbeatMessage = false;
static {
try {
myAddress = DNS.getDefaultHost("default", "default");
@ -194,6 +200,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
} else {
Result [] rrs = null;
ScanRequest request = null;
// Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server
setHeartbeatMessage(false);
try {
incRPCcallsMetrics();
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
@ -214,6 +222,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
// See HBASE-5974
nextCallSeq++;
long timestamp = System.currentTimeMillis();
setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
// Results are returned via controller
CellScanner cellScanner = controller.cellScanner();
rrs = ResponseConverter.getResults(cellScanner, response);
@ -294,6 +303,20 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
return null;
}
/**
* @return true when the most recent RPC response indicated that the response was a heartbeat
* message. Heartbeat messages are sent back from the server when the processing of the
* scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
* timeouts during long running scan operations.
*/
protected boolean isHeartbeatMessage() {
return heartbeatMessage;
}
protected void setHeartbeatMessage(boolean heartbeatMessage) {
this.heartbeatMessage = heartbeatMessage;
}
private void incRPCcallsMetrics() {
if (this.scanMetrics == null) {
return;

View File

@ -273,6 +273,16 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
return replicaSwitched.get();
}
/**
* @return true when the most recent RPC response indicated that the response was a heartbeat
* message. Heartbeat messages are sent back from the server when the processing of the
* scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
* timeouts during long running scan operations.
*/
public boolean isHeartbeatMessage() {
return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
}
private int addCallsForCurrentReplica(
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);

View File

@ -491,6 +491,7 @@ public final class RequestConverter {
builder.setRegion(region);
builder.setScan(ProtobufUtil.toScan(scan));
builder.setClientHandlesPartials(true);
builder.setClientHandlesHeartbeats(true);
return builder.build();
}
@ -509,6 +510,7 @@ public final class RequestConverter {
builder.setCloseScanner(closeScanner);
builder.setScannerId(scannerId);
builder.setClientHandlesPartials(true);
builder.setClientHandlesHeartbeats(true);
return builder.build();
}
@ -529,6 +531,7 @@ public final class RequestConverter {
builder.setScannerId(scannerId);
builder.setNextCallSeq(nextCallSeq);
builder.setClientHandlesPartials(true);
builder.setClientHandlesHeartbeats(true);
return builder.build();
}

View File

@ -800,6 +800,18 @@ possible configurations would overwhelm and obscure the important.
take for a remote call to time out. It uses pings to check connections
but will eventually throw a TimeoutException.</description>
</property>
<property>
<name>hbase.cells.scanned.per.heartbeat.check</name>
<value>10000</value>
<description>The number of cells scanned in between heartbeat checks. Heartbeat
checks occur during the processing of scans to determine whether or not the
server should stop scanning in order to send back a heartbeat message to the
client. Heartbeat messages are used to keep the client-server connection alive
during long running scans. Small values mean that the heartbeat checks will
occur more often and thus will provide a tighter bound on the execution time of
the scan. Larger values mean that the heartbeat checks occur less frequently
</description>
</property>
<property>
<name>hbase.rpc.shortoperation.timeout</name>
<value>10000</value>

View File

@ -16433,6 +16433,16 @@ public final class ClientProtos {
* <code>optional bool client_handles_partials = 7;</code>
*/
boolean getClientHandlesPartials();
// optional bool client_handles_heartbeats = 8;
/**
* <code>optional bool client_handles_heartbeats = 8;</code>
*/
boolean hasClientHandlesHeartbeats();
/**
* <code>optional bool client_handles_heartbeats = 8;</code>
*/
boolean getClientHandlesHeartbeats();
}
/**
* Protobuf type {@code ScanRequest}
@ -16549,6 +16559,11 @@ public final class ClientProtos {
clientHandlesPartials_ = input.readBool();
break;
}
case 64: {
bitField0_ |= 0x00000080;
clientHandlesHeartbeats_ = input.readBool();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -16713,6 +16728,22 @@ public final class ClientProtos {
return clientHandlesPartials_;
}
// optional bool client_handles_heartbeats = 8;
public static final int CLIENT_HANDLES_HEARTBEATS_FIELD_NUMBER = 8;
private boolean clientHandlesHeartbeats_;
/**
* <code>optional bool client_handles_heartbeats = 8;</code>
*/
public boolean hasClientHandlesHeartbeats() {
return ((bitField0_ & 0x00000080) == 0x00000080);
}
/**
* <code>optional bool client_handles_heartbeats = 8;</code>
*/
public boolean getClientHandlesHeartbeats() {
return clientHandlesHeartbeats_;
}
private void initFields() {
region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance();
@ -16721,6 +16752,7 @@ public final class ClientProtos {
closeScanner_ = false;
nextCallSeq_ = 0L;
clientHandlesPartials_ = false;
clientHandlesHeartbeats_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -16767,6 +16799,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000040) == 0x00000040)) {
output.writeBool(7, clientHandlesPartials_);
}
if (((bitField0_ & 0x00000080) == 0x00000080)) {
output.writeBool(8, clientHandlesHeartbeats_);
}
getUnknownFields().writeTo(output);
}
@ -16804,6 +16839,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(7, clientHandlesPartials_);
}
if (((bitField0_ & 0x00000080) == 0x00000080)) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(8, clientHandlesHeartbeats_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -16862,6 +16901,11 @@ public final class ClientProtos {
result = result && (getClientHandlesPartials()
== other.getClientHandlesPartials());
}
result = result && (hasClientHandlesHeartbeats() == other.hasClientHandlesHeartbeats());
if (hasClientHandlesHeartbeats()) {
result = result && (getClientHandlesHeartbeats()
== other.getClientHandlesHeartbeats());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -16903,6 +16947,10 @@ public final class ClientProtos {
hash = (37 * hash) + CLIENT_HANDLES_PARTIALS_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getClientHandlesPartials());
}
if (hasClientHandlesHeartbeats()) {
hash = (37 * hash) + CLIENT_HANDLES_HEARTBEATS_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getClientHandlesHeartbeats());
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -17049,6 +17097,8 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00000020);
clientHandlesPartials_ = false;
bitField0_ = (bitField0_ & ~0x00000040);
clientHandlesHeartbeats_ = false;
bitField0_ = (bitField0_ & ~0x00000080);
return this;
}
@ -17113,6 +17163,10 @@ public final class ClientProtos {
to_bitField0_ |= 0x00000040;
}
result.clientHandlesPartials_ = clientHandlesPartials_;
if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
to_bitField0_ |= 0x00000080;
}
result.clientHandlesHeartbeats_ = clientHandlesHeartbeats_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -17150,6 +17204,9 @@ public final class ClientProtos {
if (other.hasClientHandlesPartials()) {
setClientHandlesPartials(other.getClientHandlesPartials());
}
if (other.hasClientHandlesHeartbeats()) {
setClientHandlesHeartbeats(other.getClientHandlesHeartbeats());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -17588,6 +17645,39 @@ public final class ClientProtos {
return this;
}
// optional bool client_handles_heartbeats = 8;
private boolean clientHandlesHeartbeats_ ;
/**
* <code>optional bool client_handles_heartbeats = 8;</code>
*/
public boolean hasClientHandlesHeartbeats() {
return ((bitField0_ & 0x00000080) == 0x00000080);
}
/**
* <code>optional bool client_handles_heartbeats = 8;</code>
*/
public boolean getClientHandlesHeartbeats() {
return clientHandlesHeartbeats_;
}
/**
* <code>optional bool client_handles_heartbeats = 8;</code>
*/
public Builder setClientHandlesHeartbeats(boolean value) {
bitField0_ |= 0x00000080;
clientHandlesHeartbeats_ = value;
onChanged();
return this;
}
/**
* <code>optional bool client_handles_heartbeats = 8;</code>
*/
public Builder clearClientHandlesHeartbeats() {
bitField0_ = (bitField0_ & ~0x00000080);
clientHandlesHeartbeats_ = false;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:ScanRequest)
}
@ -17806,6 +17896,30 @@ public final class ClientProtos {
* </pre>
*/
boolean getMoreResultsInRegion();
// optional bool heartbeat_message = 9;
/**
* <code>optional bool heartbeat_message = 9;</code>
*
* <pre>
* This field is filled in if the server is sending back a heartbeat message.
* Heartbeat messages are sent back to the client to prevent the scanner from
* timing out. Seeing a heartbeat message communicates to the Client that the
* server would have continued to scan had the time limit not been reached.
* </pre>
*/
boolean hasHeartbeatMessage();
/**
* <code>optional bool heartbeat_message = 9;</code>
*
* <pre>
* This field is filled in if the server is sending back a heartbeat message.
* Heartbeat messages are sent back to the client to prevent the scanner from
* timing out. Seeing a heartbeat message communicates to the Client that the
* server would have continued to scan had the time limit not been reached.
* </pre>
*/
boolean getHeartbeatMessage();
}
/**
* Protobuf type {@code ScanResponse}
@ -17939,6 +18053,11 @@ public final class ClientProtos {
moreResultsInRegion_ = input.readBool();
break;
}
case 72: {
bitField0_ |= 0x00000020;
heartbeatMessage_ = input.readBool();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -18252,6 +18371,36 @@ public final class ClientProtos {
return moreResultsInRegion_;
}
// optional bool heartbeat_message = 9;
public static final int HEARTBEAT_MESSAGE_FIELD_NUMBER = 9;
private boolean heartbeatMessage_;
/**
* <code>optional bool heartbeat_message = 9;</code>
*
* <pre>
* This field is filled in if the server is sending back a heartbeat message.
* Heartbeat messages are sent back to the client to prevent the scanner from
* timing out. Seeing a heartbeat message communicates to the Client that the
* server would have continued to scan had the time limit not been reached.
* </pre>
*/
public boolean hasHeartbeatMessage() {
return ((bitField0_ & 0x00000020) == 0x00000020);
}
/**
* <code>optional bool heartbeat_message = 9;</code>
*
* <pre>
* This field is filled in if the server is sending back a heartbeat message.
* Heartbeat messages are sent back to the client to prevent the scanner from
* timing out. Seeing a heartbeat message communicates to the Client that the
* server would have continued to scan had the time limit not been reached.
* </pre>
*/
public boolean getHeartbeatMessage() {
return heartbeatMessage_;
}
private void initFields() {
cellsPerResult_ = java.util.Collections.emptyList();
scannerId_ = 0L;
@ -18261,6 +18410,7 @@ public final class ClientProtos {
stale_ = false;
partialFlagPerResult_ = java.util.Collections.emptyList();
moreResultsInRegion_ = false;
heartbeatMessage_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -18298,6 +18448,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeBool(8, moreResultsInRegion_);
}
if (((bitField0_ & 0x00000020) == 0x00000020)) {
output.writeBool(9, heartbeatMessage_);
}
getUnknownFields().writeTo(output);
}
@ -18346,6 +18499,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(8, moreResultsInRegion_);
}
if (((bitField0_ & 0x00000020) == 0x00000020)) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(9, heartbeatMessage_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -18400,6 +18557,11 @@ public final class ClientProtos {
result = result && (getMoreResultsInRegion()
== other.getMoreResultsInRegion());
}
result = result && (hasHeartbeatMessage() == other.hasHeartbeatMessage());
if (hasHeartbeatMessage()) {
result = result && (getHeartbeatMessage()
== other.getHeartbeatMessage());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -18445,6 +18607,10 @@ public final class ClientProtos {
hash = (37 * hash) + MORE_RESULTS_IN_REGION_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getMoreResultsInRegion());
}
if (hasHeartbeatMessage()) {
hash = (37 * hash) + HEARTBEAT_MESSAGE_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getHeartbeatMessage());
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -18581,6 +18747,8 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00000040);
moreResultsInRegion_ = false;
bitField0_ = (bitField0_ & ~0x00000080);
heartbeatMessage_ = false;
bitField0_ = (bitField0_ & ~0x00000100);
return this;
}
@ -18648,6 +18816,10 @@ public final class ClientProtos {
to_bitField0_ |= 0x00000010;
}
result.moreResultsInRegion_ = moreResultsInRegion_;
if (((from_bitField0_ & 0x00000100) == 0x00000100)) {
to_bitField0_ |= 0x00000020;
}
result.heartbeatMessage_ = heartbeatMessage_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -18725,6 +18897,9 @@ public final class ClientProtos {
if (other.hasMoreResultsInRegion()) {
setMoreResultsInRegion(other.getMoreResultsInRegion());
}
if (other.hasHeartbeatMessage()) {
setHeartbeatMessage(other.getHeartbeatMessage());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -19561,6 +19736,67 @@ public final class ClientProtos {
return this;
}
// optional bool heartbeat_message = 9;
private boolean heartbeatMessage_ ;
/**
* <code>optional bool heartbeat_message = 9;</code>
*
* <pre>
* This field is filled in if the server is sending back a heartbeat message.
* Heartbeat messages are sent back to the client to prevent the scanner from
* timing out. Seeing a heartbeat message communicates to the Client that the
* server would have continued to scan had the time limit not been reached.
* </pre>
*/
public boolean hasHeartbeatMessage() {
return ((bitField0_ & 0x00000100) == 0x00000100);
}
/**
* <code>optional bool heartbeat_message = 9;</code>
*
* <pre>
* This field is filled in if the server is sending back a heartbeat message.
* Heartbeat messages are sent back to the client to prevent the scanner from
* timing out. Seeing a heartbeat message communicates to the Client that the
* server would have continued to scan had the time limit not been reached.
* </pre>
*/
public boolean getHeartbeatMessage() {
return heartbeatMessage_;
}
/**
* <code>optional bool heartbeat_message = 9;</code>
*
* <pre>
* This field is filled in if the server is sending back a heartbeat message.
* Heartbeat messages are sent back to the client to prevent the scanner from
* timing out. Seeing a heartbeat message communicates to the Client that the
* server would have continued to scan had the time limit not been reached.
* </pre>
*/
public Builder setHeartbeatMessage(boolean value) {
bitField0_ |= 0x00000100;
heartbeatMessage_ = value;
onChanged();
return this;
}
/**
* <code>optional bool heartbeat_message = 9;</code>
*
* <pre>
* This field is filled in if the server is sending back a heartbeat message.
* Heartbeat messages are sent back to the client to prevent the scanner from
* timing out. Seeing a heartbeat message communicates to the Client that the
* server would have continued to scan had the time limit not been reached.
* </pre>
*/
public Builder clearHeartbeatMessage() {
bitField0_ = (bitField0_ & ~0x00000100);
heartbeatMessage_ = false;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:ScanResponse)
}
@ -32690,63 +32926,64 @@ public final class ClientProtos {
"lies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027\n\010r" +
"eversed\030\017 \001(\010:\005false\022)\n\013consistency\030\020 \001(" +
"\0162\014.Consistency:\006STRONG\022\017\n\007caching\030\021 \001(\r" +
"\"\277\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio",
"\"\342\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio",
"nSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\022\n\nscann" +
"er_id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rc" +
"lose_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(" +
"\004\022\037\n\027client_handles_partials\030\007 \001(\010\"\311\001\n\014S" +
"canResponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n" +
"\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022" +
"\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r" +
"\n\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_result" +
"\030\007 \003(\010\022\036\n\026more_results_in_region\030\010 \001(\010\"\263" +
"\001\n\024BulkLoadHFileRequest\022 \n\006region\030\001 \002(\0132",
"\020.RegionSpecifier\0225\n\013family_path\030\002 \003(\0132 " +
".BulkLoadHFileRequest.FamilyPath\022\026\n\016assi" +
"gn_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family" +
"\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRes" +
"ponse\022\016\n\006loaded\030\001 \002(\010\"a\n\026CoprocessorServ" +
"iceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002" +
"(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014" +
"\"9\n\030CoprocessorServiceResult\022\035\n\005value\030\001 " +
"\001(\0132\016.NameBytesPair\"d\n\031CoprocessorServic" +
"eRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifi",
"er\022%\n\004call\030\002 \002(\0132\027.CoprocessorServiceCal" +
"l\"]\n\032CoprocessorServiceResponse\022 \n\006regio" +
"n\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005value\030\002 \002(\013" +
"2\016.NameBytesPair\"{\n\006Action\022\r\n\005index\030\001 \001(" +
"\r\022 \n\010mutation\030\002 \001(\0132\016.MutationProto\022\021\n\003g" +
"et\030\003 \001(\0132\004.Get\022-\n\014service_call\030\004 \001(\0132\027.C" +
"oprocessorServiceCall\"Y\n\014RegionAction\022 \n" +
"\006region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006atomi" +
"c\030\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action\"D\n\017Regi" +
"onLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\r",
"heapOccupancy\030\002 \001(\005:\0010\"\266\001\n\021ResultOrExcep" +
"tion\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 \001(\0132\007.Re" +
"sult\022!\n\texception\030\003 \001(\0132\016.NameBytesPair\022" +
"1\n\016service_result\030\004 \001(\0132\031.CoprocessorSer" +
"viceResult\022#\n\tloadStats\030\005 \001(\0132\020.RegionLo" +
"adStats\"f\n\022RegionActionResult\022-\n\021resultO" +
"rException\030\001 \003(\0132\022.ResultOrException\022!\n\t" +
"exception\030\002 \001(\0132\016.NameBytesPair\"f\n\014Multi" +
"Request\022#\n\014regionAction\030\001 \003(\0132\r.RegionAc" +
"tion\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tcondition\030\003 ",
"\001(\0132\n.Condition\"S\n\rMultiResponse\022/\n\022regi" +
"onActionResult\030\001 \003(\0132\023.RegionActionResul" +
"t\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consistency\022\n\n\006S" +
"TRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rClientService\022" +
" \n\003Get\022\013.GetRequest\032\014.GetResponse\022)\n\006Mut" +
"ate\022\016.MutateRequest\032\017.MutateResponse\022#\n\004" +
"Scan\022\014.ScanRequest\032\r.ScanResponse\022>\n\rBul" +
"kLoadHFile\022\025.BulkLoadHFileRequest\032\026.Bulk" +
"LoadHFileResponse\022F\n\013ExecService\022\032.Copro" +
"cessorServiceRequest\032\033.CoprocessorServic",
"eResponse\022R\n\027ExecRegionServerService\022\032.C" +
"oprocessorServiceRequest\032\033.CoprocessorSe" +
"rviceResponse\022&\n\005Multi\022\r.MultiRequest\032\016." +
"MultiResponseBB\n*org.apache.hadoop.hbase" +
".protobuf.generatedB\014ClientProtosH\001\210\001\001\240\001" +
"\001"
"\004\022\037\n\027client_handles_partials\030\007 \001(\010\022!\n\031cl" +
"ient_handles_heartbeats\030\010 \001(\010\"\344\001\n\014ScanRe" +
"sponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n\nscan" +
"ner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003tt" +
"l\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r\n\005sta" +
"le\030\006 \001(\010\022\037\n\027partial_flag_per_result\030\007 \003(" +
"\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031\n\021hea",
"rtbeat_message\030\t \001(\010\"\263\001\n\024BulkLoadHFileRe" +
"quest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" +
"5\n\013family_path\030\002 \003(\0132 .BulkLoadHFileRequ" +
"est.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032*" +
"\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002" +
"(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded\030\001 " +
"\002(\010\"a\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002" +
"(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_name\030" +
"\003 \002(\t\022\017\n\007request\030\004 \002(\014\"9\n\030CoprocessorSer" +
"viceResult\022\035\n\005value\030\001 \001(\0132\016.NameBytesPai",
"r\"d\n\031CoprocessorServiceRequest\022 \n\006region" +
"\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027" +
".CoprocessorServiceCall\"]\n\032CoprocessorSe" +
"rviceResponse\022 \n\006region\030\001 \002(\0132\020.RegionSp" +
"ecifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"{" +
"\n\006Action\022\r\n\005index\030\001 \001(\r\022 \n\010mutation\030\002 \001(" +
"\0132\016.MutationProto\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n\014" +
"service_call\030\004 \001(\0132\027.CoprocessorServiceC" +
"all\"Y\n\014RegionAction\022 \n\006region\030\001 \002(\0132\020.Re" +
"gionSpecifier\022\016\n\006atomic\030\002 \001(\010\022\027\n\006action\030",
"\003 \003(\0132\007.Action\"D\n\017RegionLoadStats\022\027\n\014mem" +
"storeLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(" +
"\005:\0010\"\266\001\n\021ResultOrException\022\r\n\005index\030\001 \001(" +
"\r\022\027\n\006result\030\002 \001(\0132\007.Result\022!\n\texception\030" +
"\003 \001(\0132\016.NameBytesPair\0221\n\016service_result\030" +
"\004 \001(\0132\031.CoprocessorServiceResult\022#\n\tload" +
"Stats\030\005 \001(\0132\020.RegionLoadStats\"f\n\022RegionA" +
"ctionResult\022-\n\021resultOrException\030\001 \003(\0132\022" +
".ResultOrException\022!\n\texception\030\002 \001(\0132\016." +
"NameBytesPair\"f\n\014MultiRequest\022#\n\014regionA",
"ction\030\001 \003(\0132\r.RegionAction\022\022\n\nnonceGroup" +
"\030\002 \001(\004\022\035\n\tcondition\030\003 \001(\0132\n.Condition\"S\n" +
"\rMultiResponse\022/\n\022regionActionResult\030\001 \003" +
"(\0132\023.RegionActionResult\022\021\n\tprocessed\030\002 \001" +
"(\010*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELIN" +
"E\020\0012\205\003\n\rClientService\022 \n\003Get\022\013.GetReques" +
"t\032\014.GetResponse\022)\n\006Mutate\022\016.MutateReques" +
"t\032\017.MutateResponse\022#\n\004Scan\022\014.ScanRequest" +
"\032\r.ScanResponse\022>\n\rBulkLoadHFile\022\025.BulkL" +
"oadHFileRequest\032\026.BulkLoadHFileResponse\022",
"F\n\013ExecService\022\032.CoprocessorServiceReque" +
"st\032\033.CoprocessorServiceResponse\022R\n\027ExecR" +
"egionServerService\022\032.CoprocessorServiceR" +
"equest\032\033.CoprocessorServiceResponse\022&\n\005M" +
"ulti\022\r.MultiRequest\032\016.MultiResponseBB\n*o" +
"rg.apache.hadoop.hbase.protobuf.generate" +
"dB\014ClientProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -32842,13 +33079,13 @@ public final class ClientProtos {
internal_static_ScanRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ScanRequest_descriptor,
new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", });
new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", });
internal_static_ScanResponse_descriptor =
getDescriptor().getMessageTypes().get(13);
internal_static_ScanResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ScanResponse_descriptor,
new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", });
new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "HeartbeatMessage", });
internal_static_BulkLoadHFileRequest_descriptor =
getDescriptor().getMessageTypes().get(14);
internal_static_BulkLoadHFileRequest_fieldAccessorTable = new

View File

@ -275,6 +275,7 @@ message ScanRequest {
optional bool close_scanner = 5;
optional uint64 next_call_seq = 6;
optional bool client_handles_partials = 7;
optional bool client_handles_heartbeats = 8;
}
/**
@ -313,6 +314,12 @@ message ScanResponse {
// reasons such as the size in bytes or quantity of results accumulated. This field
// will true when more results exist in the current region.
optional bool more_results_in_region = 8;
// This field is filled in if the server is sending back a heartbeat message.
// Heartbeat messages are sent back to the client to prevent the scanner from
// timing out. Seeing a heartbeat message communicates to the Client that the
// server would have continued to scan had the time limit not been reached.
optional bool heartbeat_message = 9;
}
/**

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.mortbay.log.Log;
@ -72,8 +71,7 @@ public class ClientSideRegionScanner extends AbstractClientScanner {
@Override
public Result next() throws IOException {
values.clear();
scanner.nextRaw(values, NoLimitScannerContext.getInstance());
scanner.nextRaw(values);
if (values.isEmpty()) {
//we are done
return null;

View File

@ -5288,8 +5288,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
@Override
public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext)
throws IOException {
public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext) throws IOException {
if (this.filterClosed) {
throw new UnknownScannerException("Scanner was closed (timed out?) " +
"after we renewed it. Could be caused by a very slow scanner " +
@ -5327,7 +5326,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
moreValues = nextInternal(tmpList, scannerContext);
outResults.addAll(tmpList);
}
// If the size limit was reached it means a partial Result is being returned. Returning a
// partial Result means that we should not reset the filters; filters should only be reset in
// between rows
@ -5395,6 +5394,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
ScannerContext.NextState state =
moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
return scannerContext.setScannerState(state).hasMoreValues();
} else if (scannerContext.checkTimeLimit(limitScope)) {
ScannerContext.NextState state =
moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
return scannerContext.setScannerState(state).hasMoreValues();
}
} while (moreCellsInRow);
@ -5443,6 +5446,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// progress.
int initialBatchProgress = scannerContext.getBatchProgress();
long initialSizeProgress = scannerContext.getSizeProgress();
long initialTimeProgress = scannerContext.getTimeProgress();
// The loop here is used only when at some point during the next we determine
// that due to effects of filters or otherwise, we have an empty row in the result.
@ -5454,7 +5458,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// progress should be kept.
if (scannerContext.getKeepProgress()) {
// Progress should be kept. Reset to initial values seen at start of method invocation.
scannerContext.setProgress(initialBatchProgress, initialSizeProgress);
scannerContext
.setProgress(initialBatchProgress, initialSizeProgress, initialTimeProgress);
} else {
scannerContext.clearProgress();
}
@ -5502,6 +5507,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
+ " formed. Changing scope of limits that may create partials");
}
scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
}
// Check if we were getting data from the joinedHeap and hit the limit.
@ -5537,6 +5543,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
return true;
}
Cell nextKv = this.storeHeap.peek();
stopRow = nextKv == null ||
isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
@ -5550,12 +5557,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
ret = filter.filterRowCellsWithRet(results);
// We don't know how the results have changed after being filtered. Must set progress
// according to contents of results now.
// according to contents of results now. However, a change in the results should not
// affect the time progress. Thus preserve whatever time progress has been made
long timeProgress = scannerContext.getTimeProgress();
if (scannerContext.getKeepProgress()) {
scannerContext.setProgress(initialBatchProgress, initialSizeProgress);
scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
initialTimeProgress);
} else {
scannerContext.clearProgress();
}
scannerContext.setTimeProgress(timeProgress);
scannerContext.incrementBatchProgress(results.size());
for (Cell cell : results) {
scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
@ -5580,14 +5591,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// These values are not needed for filter to work, so we postpone their
// fetch to (possibly) reduce amount of data loads from disk.
if (this.joinedHeap != null) {
Cell nextJoinedKv = joinedHeap.peek();
// If joinedHeap is pointing to some other row, try to seek to a correct one.
boolean mayHaveData = (nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv,
currentRow, offset, length))
|| (this.joinedHeap.requestSeek(
KeyValueUtil.createFirstOnRow(currentRow, offset, length), true, true)
&& joinedHeap.peek() != null && CellUtil.matchingRow(joinedHeap.peek(),
currentRow, offset, length));
boolean mayHaveData = joinedHeapMayHaveData(currentRow, offset, length);
if (mayHaveData) {
joinedContinuationRow = current;
populateFromJoinedHeap(results, scannerContext);
@ -5630,6 +5634,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
/**
* @param currentRow
* @param offset
* @param length
* @return true when the joined heap may have data for the current row
* @throws IOException
*/
private boolean joinedHeapMayHaveData(byte[] currentRow, int offset, short length)
throws IOException {
Cell nextJoinedKv = joinedHeap.peek();
boolean matchCurrentRow =
nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv, currentRow, offset, length);
boolean matchAfterSeek = false;
// If the next value in the joined heap does not match the current row, try to seek to the
// correct row
if (!matchCurrentRow) {
Cell firstOnCurrentRow = KeyValueUtil.createFirstOnRow(currentRow, offset, length);
boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
matchAfterSeek =
seekSuccessful && joinedHeap.peek() != null
&& CellUtil.matchingRow(joinedHeap.peek(), currentRow, offset, length);
}
return matchCurrentRow || matchAfterSeek;
}
/**
* This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines
* both filterRow & filterRow(List<KeyValue> kvs) functions. While 0.94 code or older, it may

View File

@ -144,6 +144,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
InternalScanner currentAsInternal = (InternalScanner)this.current;
boolean moreCells = currentAsInternal.next(result, scannerContext);
Cell pee = this.current.peek();
/*
* By definition, any InternalScanner must return false only when it has no
* further rows to be fetched. So, we can close a scanner if it returns
@ -151,6 +152,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
* more efficient to close scanners which are not needed than keep them in
* the heap. This is also required for certain optimizations.
*/
if (pee == null || !moreCells) {
this.current.close();
} else {

View File

@ -68,7 +68,22 @@ public class NoLimitScannerContext extends ScannerContext {
}
@Override
void setProgress(int batchProgress, long sizeProgress) {
void setTimeProgress(long timeProgress) {
// Do nothing. NoLimitScannerContext instances are immutable post-construction
}
@Override
void updateTimeProgress() {
// Do nothing. NoLimitScannerContext instances are immutable post-construction
}
@Override
void setProgress(int batchProgress, long sizeProgress, long timeProgress) {
// Do nothing. NoLimitScannerContext instances are immutable post-construction
}
@Override
void clearProgress() {
// Do nothing. NoLimitScannerContext instances are immutable post-construction
}
@ -77,6 +92,11 @@ public class NoLimitScannerContext extends ScannerContext {
// Do nothing. NoLimitScannerContext instances are immutable post-construction
}
@Override
void setTimeLimitScope(LimitScope scope) {
// Do nothing. NoLimitScannerContext instances are immutable post-construction
}
@Override
NextState setScannerState(NextState state) {
// Do nothing. NoLimitScannerContext instances are immutable post-construction
@ -95,6 +115,12 @@ public class NoLimitScannerContext extends ScannerContext {
return false;
}
@Override
boolean checkTimeLimit(LimitScope checkerScope) {
// No limits can be specified, thus return false to indicate no limit has been reached.
return false;
}
@Override
boolean checkAnyLimitReached(LimitScope checkerScope) {
return false;

View File

@ -194,6 +194,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
"hbase.region.server.rpc.scheduler.factory.class";
/**
* Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
* configuration exists to prevent the scenario where a time limit is specified to be so
* restrictive that the time limit is reached immediately (before any cells are scanned).
*/
private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
"hbase.region.server.rpc.minimum.scan.time.limit.delta";
/**
* Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
*/
private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
// Request counter. (Includes requests that are not serviced by regions.)
final Counter requestCount = new Counter();
// Server to handle client requests.
@ -215,6 +227,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private final int scannerLeaseTimeoutPeriod;
/**
* The RPC timeout period (milliseconds)
*/
private final int rpcTimeout;
/**
* The minimum allowable delta to use for the scan limit
*/
private final long minimumScanTimeLimitDelta;
/**
* Holder class which holds the RegionScanner and nextCallSeq together.
*/
@ -831,6 +853,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
maxScannerResultSize = rs.conf.getLong(
HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
rpcTimeout = rs.conf.getInt(
HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
minimumScanTimeLimitDelta = rs.conf.getLong(
REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
// Set our address, however we need the final port that was given to rpcServer
isa = new InetSocketAddress(initialIsa.getHostName(), rpcServer.getListenerAddress().getPort());
@ -2264,6 +2292,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
boolean stale = (region.getRegionInfo().getReplicaId() != 0);
boolean clientHandlesPartials =
request.hasClientHandlesPartials() && request.getClientHandlesPartials();
boolean clientHandlesHeartbeats =
request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
// On the server side we must ensure that the correct ordering of partial results is
// returned to the client to allow them to properly reconstruct the partial results.
@ -2275,23 +2305,52 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
boolean moreRows = false;
// Heartbeat messages occur when the processing of the ScanRequest is exceeds a
// certain time threshold on the server. When the time threshold is exceeded, the
// server stops the scan and sends back whatever Results it has accumulated within
// that time period (may be empty). Since heartbeat messages have the potential to
// create partial Results (in the event that the timeout occurs in the middle of a
// row), we must only generate heartbeat messages when the client can handle both
// heartbeats AND partials
boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
// Default value of timeLimit is negative to indicate no timeLimit should be
// enforced.
long timeLimit = -1;
// Set the time limit to be half of the more restrictive timeout value (one of the
// timeout values must be positive). In the event that both values are positive, the
// more restrictive of the two is used to calculate the limit.
if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
long timeLimitDelta;
if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
} else {
timeLimitDelta =
scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
}
// Use half of whichever timeout value was more restrictive... But don't allow
// the time limit to be less than the allowable minimum (could cause an
// immediatate timeout before scanning any data).
timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
timeLimit = System.currentTimeMillis() + timeLimitDelta;
}
final LimitScope sizeScope =
allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
final LimitScope timeScope =
allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
// Configure with limits for this RPC. Set keep progress true since size progress
// towards size limit should be kept between calls to nextRaw
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
contextBuilder.setSizeLimit(sizeScope, maxResultSize);
contextBuilder.setBatchLimit(scanner.getBatch());
contextBuilder.setTimeLimit(timeScope, timeLimit);
ScannerContext scannerContext = contextBuilder.build();
boolean limitReached = false;
while (i < rows) {
// Stop collecting results if we have exceeded maxResultSize
if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS)) {
builder.setMoreResultsInRegion(true);
break;
}
// Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
// batch limit is a limit on the number of cells per Result. Thus, if progress is
// being tracked (i.e. scannerContext.keepProgress() is true) then we need to
@ -2310,14 +2369,31 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
results.add(Result.create(values, null, stale, partial));
i++;
}
if (!moreRows) {
boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
boolean rowLimitReached = i >= rows;
limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
if (limitReached || !moreRows) {
if (LOG.isTraceEnabled()) {
LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: "
+ moreRows + " scannerContext: " + scannerContext);
}
// We only want to mark a ScanResponse as a heartbeat message in the event that
// there are more values to be read server side. If there aren't more values,
// marking it as a heartbeat is wasteful because the client will need to issue
// another ScanRequest only to realize that they already have all the values
if (moreRows) {
// Heartbeat messages occur when the time limit has been reached.
builder.setHeartbeatMessage(timeLimitReached);
}
break;
}
values.clear();
}
if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS) || i >= rows ||
moreRows) {
if (limitReached || moreRows) {
// We stopped prematurely
builder.setMoreResultsInRegion(true);
} else {

View File

@ -101,7 +101,7 @@ public class ScannerContext {
if (limitsToCopy != null) this.limits.copy(limitsToCopy);
// Progress fields are initialized to 0
progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0);
progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0);
this.keepProgress = keepProgress;
this.scannerState = DEFAULT_STATE;
@ -137,6 +137,13 @@ public class ScannerContext {
progress.setSize(currentSize + size);
}
/**
* Update the time progress with {@link System#currentTimeMillis()}
*/
void updateTimeProgress() {
progress.setTime(System.currentTimeMillis());
}
int getBatchProgress() {
return progress.getBatch();
}
@ -145,9 +152,14 @@ public class ScannerContext {
return progress.getSize();
}
void setProgress(int batchProgress, long sizeProgress) {
long getTimeProgress() {
return progress.getTime();
}
void setProgress(int batchProgress, long sizeProgress, long timeProgress) {
setBatchProgress(batchProgress);
setSizeProgress(sizeProgress);
setTimeProgress(timeProgress);
}
void setSizeProgress(long sizeProgress) {
@ -158,12 +170,16 @@ public class ScannerContext {
progress.setBatch(batchProgress);
}
void setTimeProgress(long timeProgress) {
progress.setTime(timeProgress);
}
/**
* Clear away any progress that has been made so far. All progress fields are reset to initial
* values
*/
void clearProgress() {
progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0);
progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0);
}
/**
@ -172,7 +188,7 @@ public class ScannerContext {
* allows the {@link NoLimitScannerContext} to cleanly override this setter and simply return the
* new state, thus preserving the immutability of {@link NoLimitScannerContext}
* @param state
* @return The state that
* @return The state that was passed in.
*/
NextState setScannerState(NextState state) {
if (!NextState.isValidState(state)) {
@ -188,7 +204,8 @@ public class ScannerContext {
* reached in the middle of a row.
*/
boolean partialResultFormed() {
return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW;
return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW
|| scannerState == NextState.TIME_LIMIT_REACHED_MID_ROW;
}
/**
@ -207,12 +224,20 @@ public class ScannerContext {
return limits.canEnforceSizeLimitFromScope(checkerScope) && limits.getSize() > 0;
}
/**
* @param checkerScope
* @return true if the time limit can be enforced in the checker's scope
*/
boolean hasTimeLimit(LimitScope checkerScope) {
return limits.canEnforceTimeLimitFromScope(checkerScope) && limits.getTime() > 0;
}
/**
* @param checkerScope
* @return true if any limit can be enforced within the checker's scope
*/
boolean hasAnyLimit(LimitScope checkerScope) {
return hasBatchLimit(checkerScope) || hasSizeLimit(checkerScope);
return hasBatchLimit(checkerScope) || hasSizeLimit(checkerScope) || hasTimeLimit(checkerScope);
}
/**
@ -222,6 +247,13 @@ public class ScannerContext {
limits.setSizeScope(scope);
}
/**
* @param scope The scope in which the time limit will be enforced
*/
void setTimeLimitScope(LimitScope scope) {
limits.setTimeScope(scope);
}
int getBatchLimit() {
return limits.getBatch();
}
@ -230,6 +262,10 @@ public class ScannerContext {
return limits.getSize();
}
long getTimeLimit() {
return limits.getTime();
}
/**
* @param checkerScope The scope that the limit is being checked from
* @return true when the limit is enforceable from the checker's scope and it has been reached
@ -246,12 +282,22 @@ public class ScannerContext {
return hasSizeLimit(checkerScope) && progress.getSize() >= limits.getSize();
}
/**
* @param checkerScope The scope that the limit is being checked from. The time limit is always
* checked against {@link System#currentTimeMillis()}
* @return true when the limit is enforceable from the checker's scope and it has been reached
*/
boolean checkTimeLimit(LimitScope checkerScope) {
return hasTimeLimit(checkerScope) && progress.getTime() >= limits.getTime();
}
/**
* @param checkerScope The scope that the limits are being checked from
* @return true when some limit is enforceable from the checker's scope and it has been reached
*/
boolean checkAnyLimitReached(LimitScope checkerScope) {
return checkSizeLimit(checkerScope) || checkBatchLimit(checkerScope);
return checkSizeLimit(checkerScope) || checkBatchLimit(checkerScope)
|| checkTimeLimit(checkerScope);
}
@Override
@ -305,6 +351,12 @@ public class ScannerContext {
return this;
}
public Builder setTimeLimit(LimitScope timeScope, long timeLimit) {
limits.setTime(timeLimit);
limits.setTimeScope(timeScope);
return this;
}
public Builder setBatchLimit(int batchLimit) {
limits.setBatch(batchLimit);
return this;
@ -328,6 +380,13 @@ public class ScannerContext {
* of a row and thus a partial results was formed
*/
SIZE_LIMIT_REACHED_MID_ROW(true, true),
TIME_LIMIT_REACHED(true, true),
/**
* Special case of time limit reached to indicate that the time limit was reached in the middle
* of a row and thus a partial results was formed
*/
TIME_LIMIT_REACHED_MID_ROW(true, true),
BATCH_LIMIT_REACHED(true, true);
private boolean moreValues;
@ -419,6 +478,7 @@ public class ScannerContext {
*/
private static int DEFAULT_BATCH = -1;
private static long DEFAULT_SIZE = -1L;
private static long DEFAULT_TIME = -1L;
/**
* Default scope that is assigned to a limit if a scope is not specified.
@ -432,19 +492,23 @@ public class ScannerContext {
LimitScope sizeScope = DEFAULT_SCOPE;
long size = DEFAULT_SIZE;
LimitScope timeScope = DEFAULT_SCOPE;
long time = DEFAULT_TIME;
/**
* Fields keep their default values.
*/
LimitFields() {
}
LimitFields(int batch, LimitScope sizeScope, long size) {
setFields(batch, sizeScope, size);
LimitFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) {
setFields(batch, sizeScope, size, timeScope, time);
}
void copy(LimitFields limitsToCopy) {
if (limitsToCopy != null) {
setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getSize());
setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getSize(),
limitsToCopy.getTimeScope(), limitsToCopy.getTime());
}
}
@ -454,10 +518,12 @@ public class ScannerContext {
* @param sizeScope
* @param size
*/
void setFields(int batch, LimitScope sizeScope, long size) {
void setFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) {
setBatch(batch);
setSizeScope(sizeScope);
setSize(size);
setTimeScope(timeScope);
setTime(time);
}
int getBatch() {
@ -506,6 +572,36 @@ public class ScannerContext {
return this.sizeScope.canEnforceLimitFromScope(checkerScope);
}
long getTime() {
return this.time;
}
void setTime(long time) {
this.time = time;
}
/**
* @return {@link LimitScope} indicating scope in which the time limit is enforced
*/
LimitScope getTimeScope() {
return this.timeScope;
}
/**
* Change the scope in which the time limit is enforced
*/
void setTimeScope(LimitScope scope) {
this.timeScope = scope;
}
/**
* @param checkerScope
* @return true when the limit can be enforced from the scope of the checker
*/
boolean canEnforceTimeLimitFromScope(LimitScope checkerScope) {
return this.sizeScope.canEnforceLimitFromScope(checkerScope);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@ -520,6 +616,12 @@ public class ScannerContext {
sb.append(", sizeScope:");
sb.append(sizeScope);
sb.append(", time:");
sb.append(time);
sb.append(", timeScope:");
sb.append(timeScope);
sb.append("}");
return sb.toString();
}

View File

@ -83,6 +83,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
protected final long now;
protected final int minVersions;
protected final long maxRowSize;
protected final long cellsPerHeartbeatCheck;
/**
* The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
@ -100,6 +101,19 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
protected static boolean lazySeekEnabledGlobally =
LAZY_SEEK_ENABLED_BY_DEFAULT;
/**
* The number of cells scanned in between timeout checks. Specifying a larger value means that
* timeout checks will occur less frequently. Specifying a small value will lead to more frequent
* timeout checks.
*/
public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
"hbase.cells.scanned.per.heartbeat.check";
/**
* Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
*/
public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
// if heap == null and lastTop != null, you need to reseek given the key below
protected Cell lastTop = null;
@ -137,9 +151,17 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.maxRowSize =
conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT);
this.scanUsePread = conf.getBoolean("hbase.storescanner.use.pread", scan.isSmall());
long tmpCellsPerTimeoutCheck =
conf.getLong(HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK,
DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK);
this.cellsPerHeartbeatCheck =
tmpCellsPerTimeoutCheck > 0 ? tmpCellsPerTimeoutCheck
: DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
} else {
this.maxRowSize = HConstants.TABLE_MAX_ROWSIZE_DEFAULT;
this.scanUsePread = scan.isSmall();
this.cellsPerHeartbeatCheck = DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
}
// We look up row-column Bloom filters for multi-column queries as part of
@ -458,6 +480,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override
public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
lock.lock();
try {
if (scannerContext == null) {
throw new IllegalArgumentException("Scanner context cannot be null");
@ -507,6 +530,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
long totalBytesRead = 0;
LOOP: while((cell = this.heap.peek()) != null) {
// Update and check the time limit based on the configured value of cellsPerTimeoutCheck
if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
scannerContext.updateTimeProgress();
if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
}
}
if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
checkScanOrder(prevCell, cell, comparator);
prevCell = cell;

View File

@ -217,7 +217,8 @@ public class TestPartialResultsFromClientSide {
count++;
}
assertTrue(scanner2.next() == null);
r2 = scanner2.next();
assertTrue("r2: " + r2 + " Should be null", r2 == null);
scanner1.close();
scanner2.close();

View File

@ -143,7 +143,6 @@ public class TestCoprocessorInterface {
public int getBatch() {
return delegate.getBatch();
}
}
public static class CoprocessorImpl extends BaseRegionObserver {

View File

@ -0,0 +1,538 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ScannerCallable;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* Here we test to make sure that scans return the expected Results when the server is sending the
* Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent
* the scanner on the client side from timing out). A heartbeat message is sent from the server to
* the client when the server has exceeded the time limit during the processing of the scan. When
* the time limit is reached, the server will return to the Client whatever Results it has
* accumulated (potentially empty).
*/
@Category(MediumTests.class)
public class TestScannerHeartbeatMessages {
final Log LOG = LogFactory.getLog(getClass());
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static Table TABLE = null;
/**
* Table configuration
*/
private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable");
private static int NUM_ROWS = 10;
private static byte[] ROW = Bytes.toBytes("testRow");
private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
private static int NUM_FAMILIES = 3;
private static byte[] FAMILY = Bytes.toBytes("testFamily");
private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES);
private static int NUM_QUALIFIERS = 3;
private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS);
private static int VALUE_SIZE = 128;
private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
// Time, in milliseconds, that the client will wait for a response from the server before timing
// out. This value is used server side to determine when it is necessary to send a heartbeat
// message to the client
private static int CLIENT_TIMEOUT = 500;
// The server limits itself to running for half of the CLIENT_TIMEOUT value.
private static int SERVER_TIME_LIMIT = CLIENT_TIMEOUT / 2;
// By default, at most one row's worth of cells will be retrieved before the time limit is reached
private static int DEFAULT_ROW_SLEEP_TIME = SERVER_TIME_LIMIT / 2;
// By default, at most cells for two column families are retrieved before the time limit is
// reached
private static int DEFAULT_CF_SLEEP_TIME = SERVER_TIME_LIMIT / 2;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
((Log4JLogger) ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger) HeartbeatRPCServices.LOG).getLogger().setLevel(Level.ALL);
Configuration conf = TEST_UTIL.getConfiguration();
conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName());
conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName());
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT);
conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1);
// Check the timeout condition after every cell
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
TEST_UTIL.startMiniCluster(1);
TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
}
static Table createTestTable(TableName name, byte[][] rows, byte[][] families,
byte[][] qualifiers, byte[] cellValue) throws IOException {
Table ht = TEST_UTIL.createTable(name, families);
List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
ht.put(puts);
return ht;
}
/**
* Make puts to put the input value into each combination of row, family, and qualifier
* @param rows
* @param families
* @param qualifiers
* @param value
* @return
* @throws IOException
*/
static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
byte[] value) throws IOException {
Put put;
ArrayList<Put> puts = new ArrayList<>();
for (int row = 0; row < rows.length; row++) {
put = new Put(rows[row]);
for (int fam = 0; fam < families.length; fam++) {
for (int qual = 0; qual < qualifiers.length; qual++) {
KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value);
put.add(kv);
}
}
puts.add(put);
}
return puts;
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Before
public void setupBeforeTest() throws Exception {
disableSleeping();
}
@After
public void teardownAfterTest() throws Exception {
disableSleeping();
}
/**
* Test a variety of scan configurations to ensure that they return the expected Results when
* heartbeat messages are necessary. These tests are accumulated under one test case to ensure
* that they don't run in parallel. If the tests ran in parallel, they may conflict with each
* other due to changing static variables
*/
@Test
public void testScannerHeartbeatMessages() throws Exception {
testImportanceOfHeartbeats(testHeartbeatBetweenRows());
testImportanceOfHeartbeats(testHeartbeatBetweenColumnFamilies());
}
/**
* Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass
* when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are
* disabled, the test should throw an exception.
* @param testCallable
* @throws InterruptedException
*/
public void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException {
HeartbeatRPCServices.heartbeatsEnabled = true;
try {
testCallable.call();
} catch (Exception e) {
fail("Heartbeat messages are enabled, exceptions should NOT be thrown. Exception trace:"
+ ExceptionUtils.getStackTrace(e));
}
HeartbeatRPCServices.heartbeatsEnabled = false;
try {
testCallable.call();
} catch (Exception e) {
return;
} finally {
HeartbeatRPCServices.heartbeatsEnabled = true;
}
fail("Heartbeats messages are disabled, an exception should be thrown. If an exception "
+ " is not thrown, the test case is not testing the importance of heartbeat messages");
}
/**
* Test the case that the time limit for the scan is reached after each full row of cells is
* fetched.
* @throws Exception
*/
public Callable<Void> testHeartbeatBetweenRows() throws Exception {
return new Callable<Void>() {
@Override
public Void call() throws Exception {
// Configure the scan so that it can read the entire table in a single RPC. We want to test
// the case where a scan stops on the server side due to a time limit
Scan scan = new Scan();
scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE);
testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, false);
return null;
}
};
}
/**
* Test the case that the time limit for scans is reached in between column families
* @throws Exception
*/
public Callable<Void> testHeartbeatBetweenColumnFamilies() throws Exception {
return new Callable<Void>() {
@Override
public Void call() throws Exception {
// Configure the scan so that it can read the entire table in a single RPC. We want to test
// the case where a scan stops on the server side due to a time limit
Scan baseScan = new Scan();
baseScan.setMaxResultSize(Long.MAX_VALUE);
baseScan.setCaching(Integer.MAX_VALUE);
// Copy the scan before each test. When a scan object is used by a scanner, some of its
// fields may be changed such as start row
Scan scanCopy = new Scan(baseScan);
testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, false);
scanCopy = new Scan(baseScan);
testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, true);
return null;
}
};
}
/**
* Test the equivalence of a scan versus the same scan executed when heartbeat messages are
* necessary
* @param scan The scan configuration being tested
* @param rowSleepTime The time to sleep between fetches of row cells
* @param cfSleepTime The time to sleep between fetches of column family cells
* @param sleepBeforeCf set to true when column family sleeps should occur before the cells for
* that column family are fetched
* @throws Exception
*/
public void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
int cfSleepTime, boolean sleepBeforeCf) throws Exception {
disableSleeping();
final ResultScanner scanner = TABLE.getScanner(scan);
final ResultScanner scannerWithHeartbeats = TABLE.getScanner(scan);
Result r1 = null;
Result r2 = null;
while ((r1 = scanner.next()) != null) {
// Enforce the specified sleep conditions during calls to the heartbeat scanner
configureSleepTime(rowSleepTime, cfSleepTime, sleepBeforeCf);
r2 = scannerWithHeartbeats.next();
disableSleeping();
assertTrue(r2 != null);
try {
Result.compareResults(r1, r2);
} catch (Exception e) {
fail(e.getMessage());
}
}
assertTrue(scannerWithHeartbeats.next() == null);
scanner.close();
scannerWithHeartbeats.close();
}
/**
* Helper method for setting the time to sleep between rows and column families. If a sleep time
* is negative then that sleep will be disabled
* @param rowSleepTime
* @param cfSleepTime
*/
private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) {
HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0;
HeartbeatHRegion.rowSleepTime = rowSleepTime;
HeartbeatHRegion.sleepBetweenColumnFamilies = cfSleepTime > 0;
HeartbeatHRegion.columnFamilySleepTime = cfSleepTime;
HeartbeatHRegion.sleepBeforeColumnFamily = sleepBeforeCf;
}
/**
* Disable the sleeping mechanism server side.
*/
private static void disableSleeping() {
HeartbeatHRegion.sleepBetweenRows = false;
HeartbeatHRegion.sleepBetweenColumnFamilies = false;
}
/**
* Custom HRegionServer instance that instantiates {@link HeartbeatRPCServices} in place of
* {@link RSRpcServices} to allow us to toggle support for heartbeat messages
*/
private static class HeartbeatHRegionServer extends HRegionServer {
public HeartbeatHRegionServer(Configuration conf) throws IOException, InterruptedException {
super(conf);
}
public HeartbeatHRegionServer(Configuration conf, CoordinatedStateManager csm)
throws IOException {
super(conf, csm);
}
@Override
protected RSRpcServices createRpcServices() throws IOException {
return new HeartbeatRPCServices(this);
}
}
/**
* Custom RSRpcServices instance that allows heartbeat support to be toggled
*/
private static class HeartbeatRPCServices extends RSRpcServices {
private static boolean heartbeatsEnabled = true;
public HeartbeatRPCServices(HRegionServer rs) throws IOException {
super(rs);
}
@Override
public ScanResponse scan(RpcController controller, ScanRequest request)
throws ServiceException {
ScanRequest.Builder builder = ScanRequest.newBuilder(request);
builder.setClientHandlesHeartbeats(heartbeatsEnabled);
return super.scan(controller, builder.build());
}
}
/**
* Custom HRegion class that instantiates {@link RegionScanner}s with configurable sleep times
* between fetches of row Results and/or column family cells. Useful for emulating an instance
* where the server is taking a long time to process a client's scan request
*/
private static class HeartbeatHRegion extends HRegion {
// Row sleeps occur AFTER each row worth of cells is retrieved.
private static int rowSleepTime = DEFAULT_ROW_SLEEP_TIME;
private static boolean sleepBetweenRows = false;
// The sleep for column families can be initiated before or after we fetch the cells for the
// column family. If the sleep occurs BEFORE then the time limits will be reached inside
// StoreScanner while we are fetching individual cells. If the sleep occurs AFTER then the time
// limit will be reached inside RegionScanner after all the cells for a column family have been
// retrieved.
private static boolean sleepBeforeColumnFamily = false;
private static int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME;
private static boolean sleepBetweenColumnFamilies = false;
public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
HRegionInfo regionInfo, HTableDescriptor htd, RegionServerServices rsServices) {
super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
}
public HeartbeatHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam,
HTableDescriptor htd, RegionServerServices rsServices) {
super(fs, wal, confParam, htd, rsServices);
}
private static void columnFamilySleep() {
if (HeartbeatHRegion.sleepBetweenColumnFamilies) {
try {
Thread.sleep(HeartbeatHRegion.columnFamilySleepTime);
} catch (InterruptedException e) {
}
}
}
private static void rowSleep() {
try {
if (HeartbeatHRegion.sleepBetweenRows) {
Thread.sleep(HeartbeatHRegion.rowSleepTime);
}
} catch (InterruptedException e) {
}
}
// Instantiate the custom heartbeat region scanners
@Override
protected RegionScanner instantiateRegionScanner(Scan scan,
List<KeyValueScanner> additionalScanners) throws IOException {
if (scan.isReversed()) {
if (scan.getFilter() != null) {
scan.getFilter().setReversed(true);
}
return new HeartbeatReversedRegionScanner(scan, additionalScanners, this);
}
return new HeartbeatRegionScanner(scan, additionalScanners, this);
}
}
/**
* Custom ReversedRegionScanner that can be configured to sleep between retrievals of row Results
* and/or column family cells
*/
private static class HeartbeatReversedRegionScanner extends ReversedRegionScannerImpl {
HeartbeatReversedRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners,
HRegion region) throws IOException {
super(scan, additionalScanners, region);
}
@Override
public boolean nextRaw(List<Cell> outResults, ScannerContext context)
throws IOException {
boolean moreRows = super.nextRaw(outResults, context);
HeartbeatHRegion.rowSleep();
return moreRows;
}
@Override
protected void initializeKVHeap(List<KeyValueScanner> scanners,
List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
this.storeHeap = new HeartbeatReversedKVHeap(scanners, region.getComparator());
if (!joinedScanners.isEmpty()) {
this.joinedHeap = new HeartbeatReversedKVHeap(joinedScanners, region.getComparator());
}
}
}
/**
* Custom RegionScanner that can be configured to sleep between retrievals of row Results and/or
* column family cells
*/
private static class HeartbeatRegionScanner extends RegionScannerImpl {
HeartbeatRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
throws IOException {
region.super(scan, additionalScanners, region);
}
@Override
public boolean nextRaw(List<Cell> outResults, ScannerContext context)
throws IOException {
boolean moreRows = super.nextRaw(outResults, context);
HeartbeatHRegion.rowSleep();
return moreRows;
}
@Override
protected void initializeKVHeap(List<KeyValueScanner> scanners,
List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
this.storeHeap = new HeartbeatKVHeap(scanners, region.getComparator());
if (!joinedScanners.isEmpty()) {
this.joinedHeap = new HeartbeatKVHeap(joinedScanners, region.getComparator());
}
}
}
/**
* Custom KV Heap that can be configured to sleep/wait in between retrievals of column family
* cells. Useful for testing
*/
private static final class HeartbeatKVHeap extends KeyValueHeap {
public HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, KVComparator comparator)
throws IOException {
super(scanners, comparator);
}
HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator)
throws IOException {
super(scanners, comparator);
}
@Override
public boolean next(List<Cell> result, ScannerContext context)
throws IOException {
if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
boolean moreRows = super.next(result, context);
if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
return moreRows;
}
}
/**
* Custom reversed KV Heap that can be configured to sleep in between retrievals of column family
* cells.
*/
private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap {
public HeartbeatReversedKVHeap(List<? extends KeyValueScanner> scanners,
KVComparator comparator) throws IOException {
super(scanners, comparator);
}
@Override
public boolean next(List<Cell> result, ScannerContext context)
throws IOException {
if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
boolean moreRows = super.next(result, context);
if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
return moreRows;
}
}
}

View File

@ -778,7 +778,6 @@ public class TestStripeCompactionPolicy {
public boolean next(List<Cell> results) throws IOException {
if (kvs.isEmpty()) return false;
results.add(kvs.remove(0));
return !kvs.isEmpty();
}