From a4f77d49a5ae347c78e3d5934c4fc005d3914cb1 Mon Sep 17 00:00:00 2001 From: Jonathan Lawlor Date: Tue, 10 Mar 2015 14:24:07 -0700 Subject: [PATCH] HBASE-13090 Progress heartbeats for long running scanners Signed-off-by: stack --- .../hadoop/hbase/client/ClientScanner.java | 49 +- .../hadoop/hbase/client/ScannerCallable.java | 23 + .../client/ScannerCallableWithReplicas.java | 10 + .../hbase/protobuf/RequestConverter.java | 3 + .../src/main/resources/hbase-default.xml | 12 + .../protobuf/generated/ClientProtos.java | 349 ++++++++++-- hbase-protocol/src/main/protobuf/Client.proto | 7 + .../hbase/client/ClientSideRegionScanner.java | 4 +- .../hadoop/hbase/regionserver/HRegion.java | 59 +- .../hbase/regionserver/KeyValueHeap.java | 2 + .../regionserver/NoLimitScannerContext.java | 28 +- .../hbase/regionserver/RSRpcServices.java | 94 ++- .../hbase/regionserver/ScannerContext.java | 124 +++- .../hbase/regionserver/StoreScanner.java | 31 + .../TestPartialResultsFromClientSide.java | 3 +- .../coprocessor/TestCoprocessorInterface.java | 1 - .../TestScannerHeartbeatMessages.java | 538 ++++++++++++++++++ .../TestStripeCompactionPolicy.java | 1 - 18 files changed, 1234 insertions(+), 104 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 0c28e0515e9..0ee29f2dc26 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -394,7 +394,6 @@ public class ClientScanner extends AbstractClientScanner { // returns an empty array if scanning is to go on and we've just // exhausted current region. values = call(callable, caller, scannerTimeout); - // When the replica switch happens, we need to do certain operations // again. The callable will openScanner with the right startkey // but we need to pick up from there. Bypass the rest of the loop @@ -483,7 +482,8 @@ public class ClientScanner extends AbstractClientScanner { // Groom the array of Results that we received back from the server before adding that // Results to the scanner's cache. If partial results are not allowed to be seen by the // caller, all book keeping will be performed within this method. - List resultsToAddToCache = getResultsToAddToCache(values); + List resultsToAddToCache = + getResultsToAddToCache(values, callable.isHeartbeatMessage()); if (!resultsToAddToCache.isEmpty()) { for (Result rs : resultsToAddToCache) { cache.add(rs); @@ -495,6 +495,19 @@ public class ClientScanner extends AbstractClientScanner { this.lastResult = rs; } } + + // Caller of this method just wants a Result. If we see a heartbeat message, it means + // processing of the scan is taking a long time server side. Rather than continue to + // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing + // unnecesary delays to the caller + if (callable.isHeartbeatMessage() && cache.size() > 0) { + if (LOG.isTraceEnabled()) { + LOG.trace("Heartbeat message received and cache contains Results." + + " Breaking out of scan loop"); + } + break; + } + // We expect that the server won't have more results for us when we exhaust // the size (bytes or count) of the results returned. If the server *does* inform us that // there are more results, we want to avoid possiblyNextScanner(...). Only when we actually @@ -508,20 +521,38 @@ public class ClientScanner extends AbstractClientScanner { // !partialResults.isEmpty() means that we are still accumulating partial Results for a // row. We should not change scanners before we receive all the partial Results for that // row. - } while (remainingResultSize > 0 && countdown > 0 && !serverHasMoreResults + } while (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults) && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))); } + /** + * @param remainingResultSize + * @param remainingRows + * @param regionHasMoreResults + * @return true when the current region has been exhausted. When the current region has been + * exhausted, the region must be changed before scanning can continue + */ + private boolean doneWithRegion(long remainingResultSize, int remainingRows, + boolean regionHasMoreResults) { + return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults; + } + /** * This method ensures all of our book keeping regarding partial results is kept up to date. This * method should be called once we know that the results we received back from the RPC request do * not contain errors. We return a list of results that should be added to the cache. In general, * this list will contain all NON-partial results from the input array (unless the client has * specified that they are okay with receiving partial results) + * @param resultsFromServer The array of {@link Result}s returned from the server + * @param heartbeatMessage Flag indicating whether or not the response received from the server + * represented a complete response, or a heartbeat message that was sent to keep the + * client-server connection alive * @return the list of results that should be added to the cache. * @throws IOException */ - protected List getResultsToAddToCache(Result[] resultsFromServer) throws IOException { + protected List + getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage) + throws IOException { int resultSize = resultsFromServer != null ? resultsFromServer.length : 0; List resultsToAddToCache = new ArrayList(resultSize); @@ -539,10 +570,14 @@ public class ClientScanner extends AbstractClientScanner { return resultsToAddToCache; } - // If no results were returned it indicates that we have the all the partial results necessary - // to construct the complete result. + // If no results were returned it indicates that either we have the all the partial results + // necessary to construct the complete result or the server had to send a heartbeat message + // to the client to keep the client-server connection alive if (resultsFromServer == null || resultsFromServer.length == 0) { - if (!partialResults.isEmpty()) { + // If this response was an empty heartbeat message, then we have not exhausted the region + // and thus there may be more partials server side that still need to be added to the partial + // list before we form the complete Result + if (!partialResults.isEmpty() && !heartbeatMessage) { resultsToAddToCache.add(Result.createCompleteResult(partialResults)); clearPartialResults(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index ce61ef6be1f..19947729844 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -79,6 +79,12 @@ public class ScannerCallable extends RegionServerCallable { protected final int id; protected boolean serverHasMoreResultsContext; protected boolean serverHasMoreResults; + + /** + * Saves whether or not the most recent response from the server was a heartbeat message. + * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()} + */ + protected boolean heartbeatMessage = false; static { try { myAddress = DNS.getDefaultHost("default", "default"); @@ -194,6 +200,8 @@ public class ScannerCallable extends RegionServerCallable { } else { Result [] rrs = null; ScanRequest request = null; + // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server + setHeartbeatMessage(false); try { incRPCcallsMetrics(); request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq); @@ -214,6 +222,7 @@ public class ScannerCallable extends RegionServerCallable { // See HBASE-5974 nextCallSeq++; long timestamp = System.currentTimeMillis(); + setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage()); // Results are returned via controller CellScanner cellScanner = controller.cellScanner(); rrs = ResponseConverter.getResults(cellScanner, response); @@ -294,6 +303,20 @@ public class ScannerCallable extends RegionServerCallable { return null; } + /** + * @return true when the most recent RPC response indicated that the response was a heartbeat + * message. Heartbeat messages are sent back from the server when the processing of the + * scan request exceeds a certain time threshold. Heartbeats allow the server to avoid + * timeouts during long running scan operations. + */ + protected boolean isHeartbeatMessage() { + return heartbeatMessage; + } + + protected void setHeartbeatMessage(boolean heartbeatMessage) { + this.heartbeatMessage = heartbeatMessage; + } + private void incRPCcallsMetrics() { if (this.scanMetrics == null) { return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index e32a2d22051..c0335f96949 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -273,6 +273,16 @@ class ScannerCallableWithReplicas implements RetryingCallable { return replicaSwitched.get(); } + /** + * @return true when the most recent RPC response indicated that the response was a heartbeat + * message. Heartbeat messages are sent back from the server when the processing of the + * scan request exceeds a certain time threshold. Heartbeats allow the server to avoid + * timeouts during long running scan operations. + */ + public boolean isHeartbeatMessage() { + return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage(); + } + private int addCallsForCurrentReplica( ResultBoundedCompletionService> cs, RegionLocations rl) { RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index 62262e00aa8..26314d98db8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -491,6 +491,7 @@ public final class RequestConverter { builder.setRegion(region); builder.setScan(ProtobufUtil.toScan(scan)); builder.setClientHandlesPartials(true); + builder.setClientHandlesHeartbeats(true); return builder.build(); } @@ -509,6 +510,7 @@ public final class RequestConverter { builder.setCloseScanner(closeScanner); builder.setScannerId(scannerId); builder.setClientHandlesPartials(true); + builder.setClientHandlesHeartbeats(true); return builder.build(); } @@ -529,6 +531,7 @@ public final class RequestConverter { builder.setScannerId(scannerId); builder.setNextCallSeq(nextCallSeq); builder.setClientHandlesPartials(true); + builder.setClientHandlesHeartbeats(true); return builder.build(); } diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 2a98d7d69e2..86a5104613d 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -800,6 +800,18 @@ possible configurations would overwhelm and obscure the important. take for a remote call to time out. It uses pings to check connections but will eventually throw a TimeoutException. + + hbase.cells.scanned.per.heartbeat.check + 10000 + The number of cells scanned in between heartbeat checks. Heartbeat + checks occur during the processing of scans to determine whether or not the + server should stop scanning in order to send back a heartbeat message to the + client. Heartbeat messages are used to keep the client-server connection alive + during long running scans. Small values mean that the heartbeat checks will + occur more often and thus will provide a tighter bound on the execution time of + the scan. Larger values mean that the heartbeat checks occur less frequently + + hbase.rpc.shortoperation.timeout 10000 diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 60ab6515d8d..2991ece7ed4 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -16433,6 +16433,16 @@ public final class ClientProtos { * optional bool client_handles_partials = 7; */ boolean getClientHandlesPartials(); + + // optional bool client_handles_heartbeats = 8; + /** + * optional bool client_handles_heartbeats = 8; + */ + boolean hasClientHandlesHeartbeats(); + /** + * optional bool client_handles_heartbeats = 8; + */ + boolean getClientHandlesHeartbeats(); } /** * Protobuf type {@code ScanRequest} @@ -16549,6 +16559,11 @@ public final class ClientProtos { clientHandlesPartials_ = input.readBool(); break; } + case 64: { + bitField0_ |= 0x00000080; + clientHandlesHeartbeats_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -16713,6 +16728,22 @@ public final class ClientProtos { return clientHandlesPartials_; } + // optional bool client_handles_heartbeats = 8; + public static final int CLIENT_HANDLES_HEARTBEATS_FIELD_NUMBER = 8; + private boolean clientHandlesHeartbeats_; + /** + * optional bool client_handles_heartbeats = 8; + */ + public boolean hasClientHandlesHeartbeats() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional bool client_handles_heartbeats = 8; + */ + public boolean getClientHandlesHeartbeats() { + return clientHandlesHeartbeats_; + } + private void initFields() { region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance(); @@ -16721,6 +16752,7 @@ public final class ClientProtos { closeScanner_ = false; nextCallSeq_ = 0L; clientHandlesPartials_ = false; + clientHandlesHeartbeats_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -16767,6 +16799,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000040) == 0x00000040)) { output.writeBool(7, clientHandlesPartials_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + output.writeBool(8, clientHandlesHeartbeats_); + } getUnknownFields().writeTo(output); } @@ -16804,6 +16839,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(7, clientHandlesPartials_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(8, clientHandlesHeartbeats_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -16862,6 +16901,11 @@ public final class ClientProtos { result = result && (getClientHandlesPartials() == other.getClientHandlesPartials()); } + result = result && (hasClientHandlesHeartbeats() == other.hasClientHandlesHeartbeats()); + if (hasClientHandlesHeartbeats()) { + result = result && (getClientHandlesHeartbeats() + == other.getClientHandlesHeartbeats()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -16903,6 +16947,10 @@ public final class ClientProtos { hash = (37 * hash) + CLIENT_HANDLES_PARTIALS_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getClientHandlesPartials()); } + if (hasClientHandlesHeartbeats()) { + hash = (37 * hash) + CLIENT_HANDLES_HEARTBEATS_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getClientHandlesHeartbeats()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -17049,6 +17097,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000020); clientHandlesPartials_ = false; bitField0_ = (bitField0_ & ~0x00000040); + clientHandlesHeartbeats_ = false; + bitField0_ = (bitField0_ & ~0x00000080); return this; } @@ -17113,6 +17163,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00000040; } result.clientHandlesPartials_ = clientHandlesPartials_; + if (((from_bitField0_ & 0x00000080) == 0x00000080)) { + to_bitField0_ |= 0x00000080; + } + result.clientHandlesHeartbeats_ = clientHandlesHeartbeats_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -17150,6 +17204,9 @@ public final class ClientProtos { if (other.hasClientHandlesPartials()) { setClientHandlesPartials(other.getClientHandlesPartials()); } + if (other.hasClientHandlesHeartbeats()) { + setClientHandlesHeartbeats(other.getClientHandlesHeartbeats()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -17588,6 +17645,39 @@ public final class ClientProtos { return this; } + // optional bool client_handles_heartbeats = 8; + private boolean clientHandlesHeartbeats_ ; + /** + * optional bool client_handles_heartbeats = 8; + */ + public boolean hasClientHandlesHeartbeats() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional bool client_handles_heartbeats = 8; + */ + public boolean getClientHandlesHeartbeats() { + return clientHandlesHeartbeats_; + } + /** + * optional bool client_handles_heartbeats = 8; + */ + public Builder setClientHandlesHeartbeats(boolean value) { + bitField0_ |= 0x00000080; + clientHandlesHeartbeats_ = value; + onChanged(); + return this; + } + /** + * optional bool client_handles_heartbeats = 8; + */ + public Builder clearClientHandlesHeartbeats() { + bitField0_ = (bitField0_ & ~0x00000080); + clientHandlesHeartbeats_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:ScanRequest) } @@ -17806,6 +17896,30 @@ public final class ClientProtos { * */ boolean getMoreResultsInRegion(); + + // optional bool heartbeat_message = 9; + /** + * optional bool heartbeat_message = 9; + * + *
+     * This field is filled in if the server is sending back a heartbeat message.
+     * Heartbeat messages are sent back to the client to prevent the scanner from
+     * timing out. Seeing a heartbeat message communicates to the Client that the
+     * server would have continued to scan had the time limit not been reached.
+     * 
+ */ + boolean hasHeartbeatMessage(); + /** + * optional bool heartbeat_message = 9; + * + *
+     * This field is filled in if the server is sending back a heartbeat message.
+     * Heartbeat messages are sent back to the client to prevent the scanner from
+     * timing out. Seeing a heartbeat message communicates to the Client that the
+     * server would have continued to scan had the time limit not been reached.
+     * 
+ */ + boolean getHeartbeatMessage(); } /** * Protobuf type {@code ScanResponse} @@ -17939,6 +18053,11 @@ public final class ClientProtos { moreResultsInRegion_ = input.readBool(); break; } + case 72: { + bitField0_ |= 0x00000020; + heartbeatMessage_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -18252,6 +18371,36 @@ public final class ClientProtos { return moreResultsInRegion_; } + // optional bool heartbeat_message = 9; + public static final int HEARTBEAT_MESSAGE_FIELD_NUMBER = 9; + private boolean heartbeatMessage_; + /** + * optional bool heartbeat_message = 9; + * + *
+     * This field is filled in if the server is sending back a heartbeat message.
+     * Heartbeat messages are sent back to the client to prevent the scanner from
+     * timing out. Seeing a heartbeat message communicates to the Client that the
+     * server would have continued to scan had the time limit not been reached.
+     * 
+ */ + public boolean hasHeartbeatMessage() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional bool heartbeat_message = 9; + * + *
+     * This field is filled in if the server is sending back a heartbeat message.
+     * Heartbeat messages are sent back to the client to prevent the scanner from
+     * timing out. Seeing a heartbeat message communicates to the Client that the
+     * server would have continued to scan had the time limit not been reached.
+     * 
+ */ + public boolean getHeartbeatMessage() { + return heartbeatMessage_; + } + private void initFields() { cellsPerResult_ = java.util.Collections.emptyList(); scannerId_ = 0L; @@ -18261,6 +18410,7 @@ public final class ClientProtos { stale_ = false; partialFlagPerResult_ = java.util.Collections.emptyList(); moreResultsInRegion_ = false; + heartbeatMessage_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -18298,6 +18448,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000010) == 0x00000010)) { output.writeBool(8, moreResultsInRegion_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeBool(9, heartbeatMessage_); + } getUnknownFields().writeTo(output); } @@ -18346,6 +18499,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(8, moreResultsInRegion_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(9, heartbeatMessage_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -18400,6 +18557,11 @@ public final class ClientProtos { result = result && (getMoreResultsInRegion() == other.getMoreResultsInRegion()); } + result = result && (hasHeartbeatMessage() == other.hasHeartbeatMessage()); + if (hasHeartbeatMessage()) { + result = result && (getHeartbeatMessage() + == other.getHeartbeatMessage()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -18445,6 +18607,10 @@ public final class ClientProtos { hash = (37 * hash) + MORE_RESULTS_IN_REGION_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getMoreResultsInRegion()); } + if (hasHeartbeatMessage()) { + hash = (37 * hash) + HEARTBEAT_MESSAGE_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getHeartbeatMessage()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -18581,6 +18747,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000040); moreResultsInRegion_ = false; bitField0_ = (bitField0_ & ~0x00000080); + heartbeatMessage_ = false; + bitField0_ = (bitField0_ & ~0x00000100); return this; } @@ -18648,6 +18816,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00000010; } result.moreResultsInRegion_ = moreResultsInRegion_; + if (((from_bitField0_ & 0x00000100) == 0x00000100)) { + to_bitField0_ |= 0x00000020; + } + result.heartbeatMessage_ = heartbeatMessage_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -18725,6 +18897,9 @@ public final class ClientProtos { if (other.hasMoreResultsInRegion()) { setMoreResultsInRegion(other.getMoreResultsInRegion()); } + if (other.hasHeartbeatMessage()) { + setHeartbeatMessage(other.getHeartbeatMessage()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -19561,6 +19736,67 @@ public final class ClientProtos { return this; } + // optional bool heartbeat_message = 9; + private boolean heartbeatMessage_ ; + /** + * optional bool heartbeat_message = 9; + * + *
+       * This field is filled in if the server is sending back a heartbeat message.
+       * Heartbeat messages are sent back to the client to prevent the scanner from
+       * timing out. Seeing a heartbeat message communicates to the Client that the
+       * server would have continued to scan had the time limit not been reached.
+       * 
+ */ + public boolean hasHeartbeatMessage() { + return ((bitField0_ & 0x00000100) == 0x00000100); + } + /** + * optional bool heartbeat_message = 9; + * + *
+       * This field is filled in if the server is sending back a heartbeat message.
+       * Heartbeat messages are sent back to the client to prevent the scanner from
+       * timing out. Seeing a heartbeat message communicates to the Client that the
+       * server would have continued to scan had the time limit not been reached.
+       * 
+ */ + public boolean getHeartbeatMessage() { + return heartbeatMessage_; + } + /** + * optional bool heartbeat_message = 9; + * + *
+       * This field is filled in if the server is sending back a heartbeat message.
+       * Heartbeat messages are sent back to the client to prevent the scanner from
+       * timing out. Seeing a heartbeat message communicates to the Client that the
+       * server would have continued to scan had the time limit not been reached.
+       * 
+ */ + public Builder setHeartbeatMessage(boolean value) { + bitField0_ |= 0x00000100; + heartbeatMessage_ = value; + onChanged(); + return this; + } + /** + * optional bool heartbeat_message = 9; + * + *
+       * This field is filled in if the server is sending back a heartbeat message.
+       * Heartbeat messages are sent back to the client to prevent the scanner from
+       * timing out. Seeing a heartbeat message communicates to the Client that the
+       * server would have continued to scan had the time limit not been reached.
+       * 
+ */ + public Builder clearHeartbeatMessage() { + bitField0_ = (bitField0_ & ~0x00000100); + heartbeatMessage_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:ScanResponse) } @@ -32690,63 +32926,64 @@ public final class ClientProtos { "lies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027\n\010r" + "eversed\030\017 \001(\010:\005false\022)\n\013consistency\030\020 \001(" + "\0162\014.Consistency:\006STRONG\022\017\n\007caching\030\021 \001(\r" + - "\"\277\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio", + "\"\342\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio", "nSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\022\n\nscann" + "er_id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rc" + "lose_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(" + - "\004\022\037\n\027client_handles_partials\030\007 \001(\010\"\311\001\n\014S" + - "canResponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n" + - "\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022" + - "\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r" + - "\n\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_result" + - "\030\007 \003(\010\022\036\n\026more_results_in_region\030\010 \001(\010\"\263" + - "\001\n\024BulkLoadHFileRequest\022 \n\006region\030\001 \002(\0132", - "\020.RegionSpecifier\0225\n\013family_path\030\002 \003(\0132 " + - ".BulkLoadHFileRequest.FamilyPath\022\026\n\016assi" + - "gn_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family" + - "\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRes" + - "ponse\022\016\n\006loaded\030\001 \002(\010\"a\n\026CoprocessorServ" + - "iceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002" + - "(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014" + - "\"9\n\030CoprocessorServiceResult\022\035\n\005value\030\001 " + - "\001(\0132\016.NameBytesPair\"d\n\031CoprocessorServic" + - "eRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifi", - "er\022%\n\004call\030\002 \002(\0132\027.CoprocessorServiceCal" + - "l\"]\n\032CoprocessorServiceResponse\022 \n\006regio" + - "n\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005value\030\002 \002(\013" + - "2\016.NameBytesPair\"{\n\006Action\022\r\n\005index\030\001 \001(" + - "\r\022 \n\010mutation\030\002 \001(\0132\016.MutationProto\022\021\n\003g" + - "et\030\003 \001(\0132\004.Get\022-\n\014service_call\030\004 \001(\0132\027.C" + - "oprocessorServiceCall\"Y\n\014RegionAction\022 \n" + - "\006region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006atomi" + - "c\030\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action\"D\n\017Regi" + - "onLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\r", - "heapOccupancy\030\002 \001(\005:\0010\"\266\001\n\021ResultOrExcep" + - "tion\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 \001(\0132\007.Re" + - "sult\022!\n\texception\030\003 \001(\0132\016.NameBytesPair\022" + - "1\n\016service_result\030\004 \001(\0132\031.CoprocessorSer" + - "viceResult\022#\n\tloadStats\030\005 \001(\0132\020.RegionLo" + - "adStats\"f\n\022RegionActionResult\022-\n\021resultO" + - "rException\030\001 \003(\0132\022.ResultOrException\022!\n\t" + - "exception\030\002 \001(\0132\016.NameBytesPair\"f\n\014Multi" + - "Request\022#\n\014regionAction\030\001 \003(\0132\r.RegionAc" + - "tion\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tcondition\030\003 ", - "\001(\0132\n.Condition\"S\n\rMultiResponse\022/\n\022regi" + - "onActionResult\030\001 \003(\0132\023.RegionActionResul" + - "t\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consistency\022\n\n\006S" + - "TRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rClientService\022" + - " \n\003Get\022\013.GetRequest\032\014.GetResponse\022)\n\006Mut" + - "ate\022\016.MutateRequest\032\017.MutateResponse\022#\n\004" + - "Scan\022\014.ScanRequest\032\r.ScanResponse\022>\n\rBul" + - "kLoadHFile\022\025.BulkLoadHFileRequest\032\026.Bulk" + - "LoadHFileResponse\022F\n\013ExecService\022\032.Copro" + - "cessorServiceRequest\032\033.CoprocessorServic", - "eResponse\022R\n\027ExecRegionServerService\022\032.C" + - "oprocessorServiceRequest\032\033.CoprocessorSe" + - "rviceResponse\022&\n\005Multi\022\r.MultiRequest\032\016." + - "MultiResponseBB\n*org.apache.hadoop.hbase" + - ".protobuf.generatedB\014ClientProtosH\001\210\001\001\240\001" + - "\001" + "\004\022\037\n\027client_handles_partials\030\007 \001(\010\022!\n\031cl" + + "ient_handles_heartbeats\030\010 \001(\010\"\344\001\n\014ScanRe" + + "sponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n\nscan" + + "ner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003tt" + + "l\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r\n\005sta" + + "le\030\006 \001(\010\022\037\n\027partial_flag_per_result\030\007 \003(" + + "\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031\n\021hea", + "rtbeat_message\030\t \001(\010\"\263\001\n\024BulkLoadHFileRe" + + "quest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" + + "5\n\013family_path\030\002 \003(\0132 .BulkLoadHFileRequ" + + "est.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032*" + + "\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002" + + "(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded\030\001 " + + "\002(\010\"a\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002" + + "(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_name\030" + + "\003 \002(\t\022\017\n\007request\030\004 \002(\014\"9\n\030CoprocessorSer" + + "viceResult\022\035\n\005value\030\001 \001(\0132\016.NameBytesPai", + "r\"d\n\031CoprocessorServiceRequest\022 \n\006region" + + "\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027" + + ".CoprocessorServiceCall\"]\n\032CoprocessorSe" + + "rviceResponse\022 \n\006region\030\001 \002(\0132\020.RegionSp" + + "ecifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"{" + + "\n\006Action\022\r\n\005index\030\001 \001(\r\022 \n\010mutation\030\002 \001(" + + "\0132\016.MutationProto\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n\014" + + "service_call\030\004 \001(\0132\027.CoprocessorServiceC" + + "all\"Y\n\014RegionAction\022 \n\006region\030\001 \002(\0132\020.Re" + + "gionSpecifier\022\016\n\006atomic\030\002 \001(\010\022\027\n\006action\030", + "\003 \003(\0132\007.Action\"D\n\017RegionLoadStats\022\027\n\014mem" + + "storeLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(" + + "\005:\0010\"\266\001\n\021ResultOrException\022\r\n\005index\030\001 \001(" + + "\r\022\027\n\006result\030\002 \001(\0132\007.Result\022!\n\texception\030" + + "\003 \001(\0132\016.NameBytesPair\0221\n\016service_result\030" + + "\004 \001(\0132\031.CoprocessorServiceResult\022#\n\tload" + + "Stats\030\005 \001(\0132\020.RegionLoadStats\"f\n\022RegionA" + + "ctionResult\022-\n\021resultOrException\030\001 \003(\0132\022" + + ".ResultOrException\022!\n\texception\030\002 \001(\0132\016." + + "NameBytesPair\"f\n\014MultiRequest\022#\n\014regionA", + "ction\030\001 \003(\0132\r.RegionAction\022\022\n\nnonceGroup" + + "\030\002 \001(\004\022\035\n\tcondition\030\003 \001(\0132\n.Condition\"S\n" + + "\rMultiResponse\022/\n\022regionActionResult\030\001 \003" + + "(\0132\023.RegionActionResult\022\021\n\tprocessed\030\002 \001" + + "(\010*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELIN" + + "E\020\0012\205\003\n\rClientService\022 \n\003Get\022\013.GetReques" + + "t\032\014.GetResponse\022)\n\006Mutate\022\016.MutateReques" + + "t\032\017.MutateResponse\022#\n\004Scan\022\014.ScanRequest" + + "\032\r.ScanResponse\022>\n\rBulkLoadHFile\022\025.BulkL" + + "oadHFileRequest\032\026.BulkLoadHFileResponse\022", + "F\n\013ExecService\022\032.CoprocessorServiceReque" + + "st\032\033.CoprocessorServiceResponse\022R\n\027ExecR" + + "egionServerService\022\032.CoprocessorServiceR" + + "equest\032\033.CoprocessorServiceResponse\022&\n\005M" + + "ulti\022\r.MultiRequest\032\016.MultiResponseBB\n*o" + + "rg.apache.hadoop.hbase.protobuf.generate" + + "dB\014ClientProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -32842,13 +33079,13 @@ public final class ClientProtos { internal_static_ScanRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ScanRequest_descriptor, - new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", }); + new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", }); internal_static_ScanResponse_descriptor = getDescriptor().getMessageTypes().get(13); internal_static_ScanResponse_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ScanResponse_descriptor, - new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", }); + new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "HeartbeatMessage", }); internal_static_BulkLoadHFileRequest_descriptor = getDescriptor().getMessageTypes().get(14); internal_static_BulkLoadHFileRequest_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index e0c370b3c4f..3a48cc89ac3 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -275,6 +275,7 @@ message ScanRequest { optional bool close_scanner = 5; optional uint64 next_call_seq = 6; optional bool client_handles_partials = 7; + optional bool client_handles_heartbeats = 8; } /** @@ -313,6 +314,12 @@ message ScanResponse { // reasons such as the size in bytes or quantity of results accumulated. This field // will true when more results exist in the current region. optional bool more_results_in_region = 8; + + // This field is filled in if the server is sending back a heartbeat message. + // Heartbeat messages are sent back to the client to prevent the scanner from + // timing out. Seeing a heartbeat message communicates to the Client that the + // server would have continued to scan had the time limit not been reached. + optional bool heartbeat_message = 9; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java index 5809983385f..9d7bcc0a325 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.mortbay.log.Log; @@ -72,8 +71,7 @@ public class ClientSideRegionScanner extends AbstractClientScanner { @Override public Result next() throws IOException { values.clear(); - - scanner.nextRaw(values, NoLimitScannerContext.getInstance()); + scanner.nextRaw(values); if (values.isEmpty()) { //we are done return null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 8103445786e..2afe4ad8d46 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5288,8 +5288,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override - public synchronized boolean next(List outResults, ScannerContext scannerContext) - throws IOException { + public synchronized boolean next(List outResults, ScannerContext scannerContext) throws IOException { if (this.filterClosed) { throw new UnknownScannerException("Scanner was closed (timed out?) " + "after we renewed it. Could be caused by a very slow scanner " + @@ -5327,7 +5326,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi moreValues = nextInternal(tmpList, scannerContext); outResults.addAll(tmpList); } - + // If the size limit was reached it means a partial Result is being returned. Returning a // partial Result means that we should not reset the filters; filters should only be reset in // between rows @@ -5395,6 +5394,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi ScannerContext.NextState state = moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; return scannerContext.setScannerState(state).hasMoreValues(); + } else if (scannerContext.checkTimeLimit(limitScope)) { + ScannerContext.NextState state = + moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; + return scannerContext.setScannerState(state).hasMoreValues(); } } while (moreCellsInRow); @@ -5443,6 +5446,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // progress. int initialBatchProgress = scannerContext.getBatchProgress(); long initialSizeProgress = scannerContext.getSizeProgress(); + long initialTimeProgress = scannerContext.getTimeProgress(); // The loop here is used only when at some point during the next we determine // that due to effects of filters or otherwise, we have an empty row in the result. @@ -5454,7 +5458,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // progress should be kept. if (scannerContext.getKeepProgress()) { // Progress should be kept. Reset to initial values seen at start of method invocation. - scannerContext.setProgress(initialBatchProgress, initialSizeProgress); + scannerContext + .setProgress(initialBatchProgress, initialSizeProgress, initialTimeProgress); } else { scannerContext.clearProgress(); } @@ -5502,6 +5507,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi + " formed. Changing scope of limits that may create partials"); } scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS); + scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS); } // Check if we were getting data from the joinedHeap and hit the limit. @@ -5537,6 +5543,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } return true; } + Cell nextKv = this.storeHeap.peek(); stopRow = nextKv == null || isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength()); @@ -5550,12 +5557,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi ret = filter.filterRowCellsWithRet(results); // We don't know how the results have changed after being filtered. Must set progress - // according to contents of results now. + // according to contents of results now. However, a change in the results should not + // affect the time progress. Thus preserve whatever time progress has been made + long timeProgress = scannerContext.getTimeProgress(); if (scannerContext.getKeepProgress()) { - scannerContext.setProgress(initialBatchProgress, initialSizeProgress); + scannerContext.setProgress(initialBatchProgress, initialSizeProgress, + initialTimeProgress); } else { scannerContext.clearProgress(); } + scannerContext.setTimeProgress(timeProgress); scannerContext.incrementBatchProgress(results.size()); for (Cell cell : results) { scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell)); @@ -5580,14 +5591,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // These values are not needed for filter to work, so we postpone their // fetch to (possibly) reduce amount of data loads from disk. if (this.joinedHeap != null) { - Cell nextJoinedKv = joinedHeap.peek(); - // If joinedHeap is pointing to some other row, try to seek to a correct one. - boolean mayHaveData = (nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv, - currentRow, offset, length)) - || (this.joinedHeap.requestSeek( - KeyValueUtil.createFirstOnRow(currentRow, offset, length), true, true) - && joinedHeap.peek() != null && CellUtil.matchingRow(joinedHeap.peek(), - currentRow, offset, length)); + boolean mayHaveData = joinedHeapMayHaveData(currentRow, offset, length); if (mayHaveData) { joinedContinuationRow = current; populateFromJoinedHeap(results, scannerContext); @@ -5630,6 +5634,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + /** + * @param currentRow + * @param offset + * @param length + * @return true when the joined heap may have data for the current row + * @throws IOException + */ + private boolean joinedHeapMayHaveData(byte[] currentRow, int offset, short length) + throws IOException { + Cell nextJoinedKv = joinedHeap.peek(); + boolean matchCurrentRow = + nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv, currentRow, offset, length); + boolean matchAfterSeek = false; + + // If the next value in the joined heap does not match the current row, try to seek to the + // correct row + if (!matchCurrentRow) { + Cell firstOnCurrentRow = KeyValueUtil.createFirstOnRow(currentRow, offset, length); + boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true); + matchAfterSeek = + seekSuccessful && joinedHeap.peek() != null + && CellUtil.matchingRow(joinedHeap.peek(), currentRow, offset, length); + } + + return matchCurrentRow || matchAfterSeek; + } + /** * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines * both filterRow & filterRow(List kvs) functions. While 0.94 code or older, it may diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 761267f65ee..64334534435 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -144,6 +144,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner InternalScanner currentAsInternal = (InternalScanner)this.current; boolean moreCells = currentAsInternal.next(result, scannerContext); Cell pee = this.current.peek(); + /* * By definition, any InternalScanner must return false only when it has no * further rows to be fetched. So, we can close a scanner if it returns @@ -151,6 +152,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner * more efficient to close scanners which are not needed than keep them in * the heap. This is also required for certain optimizations. */ + if (pee == null || !moreCells) { this.current.close(); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java index 1484e8072e9..66ed6c087b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java @@ -68,7 +68,22 @@ public class NoLimitScannerContext extends ScannerContext { } @Override - void setProgress(int batchProgress, long sizeProgress) { + void setTimeProgress(long timeProgress) { + // Do nothing. NoLimitScannerContext instances are immutable post-construction + } + + @Override + void updateTimeProgress() { + // Do nothing. NoLimitScannerContext instances are immutable post-construction + } + + @Override + void setProgress(int batchProgress, long sizeProgress, long timeProgress) { + // Do nothing. NoLimitScannerContext instances are immutable post-construction + } + + @Override + void clearProgress() { // Do nothing. NoLimitScannerContext instances are immutable post-construction } @@ -77,6 +92,11 @@ public class NoLimitScannerContext extends ScannerContext { // Do nothing. NoLimitScannerContext instances are immutable post-construction } + @Override + void setTimeLimitScope(LimitScope scope) { + // Do nothing. NoLimitScannerContext instances are immutable post-construction + } + @Override NextState setScannerState(NextState state) { // Do nothing. NoLimitScannerContext instances are immutable post-construction @@ -95,6 +115,12 @@ public class NoLimitScannerContext extends ScannerContext { return false; } + @Override + boolean checkTimeLimit(LimitScope checkerScope) { + // No limits can be specified, thus return false to indicate no limit has been reached. + return false; + } + @Override boolean checkAnyLimitReached(LimitScope checkerScope) { return false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 0ef1d99a5c5..a10e0c37316 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -194,6 +194,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler, public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = "hbase.region.server.rpc.scheduler.factory.class"; + /** + * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This + * configuration exists to prevent the scenario where a time limit is specified to be so + * restrictive that the time limit is reached immediately (before any cells are scanned). + */ + private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = + "hbase.region.server.rpc.minimum.scan.time.limit.delta"; + /** + * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} + */ + private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; + // Request counter. (Includes requests that are not serviced by regions.) final Counter requestCount = new Counter(); // Server to handle client requests. @@ -215,6 +227,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ private final int scannerLeaseTimeoutPeriod; + /** + * The RPC timeout period (milliseconds) + */ + private final int rpcTimeout; + + /** + * The minimum allowable delta to use for the scan limit + */ + private final long minimumScanTimeLimitDelta; + /** * Holder class which holds the RegionScanner and nextCallSeq together. */ @@ -831,6 +853,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, maxScannerResultSize = rs.conf.getLong( HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); + rpcTimeout = rs.conf.getInt( + HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + minimumScanTimeLimitDelta = rs.conf.getLong( + REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); // Set our address, however we need the final port that was given to rpcServer isa = new InetSocketAddress(initialIsa.getHostName(), rpcServer.getListenerAddress().getPort()); @@ -2264,6 +2292,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, boolean stale = (region.getRegionInfo().getReplicaId() != 0); boolean clientHandlesPartials = request.hasClientHandlesPartials() && request.getClientHandlesPartials(); + boolean clientHandlesHeartbeats = + request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); // On the server side we must ensure that the correct ordering of partial results is // returned to the client to allow them to properly reconstruct the partial results. @@ -2275,23 +2305,52 @@ public class RSRpcServices implements HBaseRPCErrorHandler, clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan; boolean moreRows = false; + // Heartbeat messages occur when the processing of the ScanRequest is exceeds a + // certain time threshold on the server. When the time threshold is exceeded, the + // server stops the scan and sends back whatever Results it has accumulated within + // that time period (may be empty). Since heartbeat messages have the potential to + // create partial Results (in the event that the timeout occurs in the middle of a + // row), we must only generate heartbeat messages when the client can handle both + // heartbeats AND partials + boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; + + // Default value of timeLimit is negative to indicate no timeLimit should be + // enforced. + long timeLimit = -1; + + // Set the time limit to be half of the more restrictive timeout value (one of the + // timeout values must be positive). In the event that both values are positive, the + // more restrictive of the two is used to calculate the limit. + if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) { + long timeLimitDelta; + if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) { + timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout); + } else { + timeLimitDelta = + scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; + } + // Use half of whichever timeout value was more restrictive... But don't allow + // the time limit to be less than the allowable minimum (could cause an + // immediatate timeout before scanning any data). + timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); + timeLimit = System.currentTimeMillis() + timeLimitDelta; + } + final LimitScope sizeScope = allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; + final LimitScope timeScope = + allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; // Configure with limits for this RPC. Set keep progress true since size progress // towards size limit should be kept between calls to nextRaw ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); contextBuilder.setSizeLimit(sizeScope, maxResultSize); contextBuilder.setBatchLimit(scanner.getBatch()); + contextBuilder.setTimeLimit(timeScope, timeLimit); ScannerContext scannerContext = contextBuilder.build(); + boolean limitReached = false; while (i < rows) { - // Stop collecting results if we have exceeded maxResultSize - if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS)) { - builder.setMoreResultsInRegion(true); - break; - } - // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The // batch limit is a limit on the number of cells per Result. Thus, if progress is // being tracked (i.e. scannerContext.keepProgress() is true) then we need to @@ -2310,14 +2369,31 @@ public class RSRpcServices implements HBaseRPCErrorHandler, results.add(Result.create(values, null, stale, partial)); i++; } - if (!moreRows) { + + boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); + boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); + boolean rowLimitReached = i >= rows; + limitReached = sizeLimitReached || timeLimitReached || rowLimitReached; + + if (limitReached || !moreRows) { + if (LOG.isTraceEnabled()) { + LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: " + + moreRows + " scannerContext: " + scannerContext); + } + // We only want to mark a ScanResponse as a heartbeat message in the event that + // there are more values to be read server side. If there aren't more values, + // marking it as a heartbeat is wasteful because the client will need to issue + // another ScanRequest only to realize that they already have all the values + if (moreRows) { + // Heartbeat messages occur when the time limit has been reached. + builder.setHeartbeatMessage(timeLimitReached); + } break; } values.clear(); } - if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS) || i >= rows || - moreRows) { + if (limitReached || moreRows) { // We stopped prematurely builder.setMoreResultsInRegion(true); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index 6e487ca7a0c..7c8ff7d0a42 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -101,7 +101,7 @@ public class ScannerContext { if (limitsToCopy != null) this.limits.copy(limitsToCopy); // Progress fields are initialized to 0 - progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0); + progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0); this.keepProgress = keepProgress; this.scannerState = DEFAULT_STATE; @@ -137,6 +137,13 @@ public class ScannerContext { progress.setSize(currentSize + size); } + /** + * Update the time progress with {@link System#currentTimeMillis()} + */ + void updateTimeProgress() { + progress.setTime(System.currentTimeMillis()); + } + int getBatchProgress() { return progress.getBatch(); } @@ -145,9 +152,14 @@ public class ScannerContext { return progress.getSize(); } - void setProgress(int batchProgress, long sizeProgress) { + long getTimeProgress() { + return progress.getTime(); + } + + void setProgress(int batchProgress, long sizeProgress, long timeProgress) { setBatchProgress(batchProgress); setSizeProgress(sizeProgress); + setTimeProgress(timeProgress); } void setSizeProgress(long sizeProgress) { @@ -158,12 +170,16 @@ public class ScannerContext { progress.setBatch(batchProgress); } + void setTimeProgress(long timeProgress) { + progress.setTime(timeProgress); + } + /** * Clear away any progress that has been made so far. All progress fields are reset to initial * values */ void clearProgress() { - progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0); + progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0); } /** @@ -172,7 +188,7 @@ public class ScannerContext { * allows the {@link NoLimitScannerContext} to cleanly override this setter and simply return the * new state, thus preserving the immutability of {@link NoLimitScannerContext} * @param state - * @return The state that + * @return The state that was passed in. */ NextState setScannerState(NextState state) { if (!NextState.isValidState(state)) { @@ -188,7 +204,8 @@ public class ScannerContext { * reached in the middle of a row. */ boolean partialResultFormed() { - return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW; + return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW + || scannerState == NextState.TIME_LIMIT_REACHED_MID_ROW; } /** @@ -207,12 +224,20 @@ public class ScannerContext { return limits.canEnforceSizeLimitFromScope(checkerScope) && limits.getSize() > 0; } + /** + * @param checkerScope + * @return true if the time limit can be enforced in the checker's scope + */ + boolean hasTimeLimit(LimitScope checkerScope) { + return limits.canEnforceTimeLimitFromScope(checkerScope) && limits.getTime() > 0; + } + /** * @param checkerScope * @return true if any limit can be enforced within the checker's scope */ boolean hasAnyLimit(LimitScope checkerScope) { - return hasBatchLimit(checkerScope) || hasSizeLimit(checkerScope); + return hasBatchLimit(checkerScope) || hasSizeLimit(checkerScope) || hasTimeLimit(checkerScope); } /** @@ -222,6 +247,13 @@ public class ScannerContext { limits.setSizeScope(scope); } + /** + * @param scope The scope in which the time limit will be enforced + */ + void setTimeLimitScope(LimitScope scope) { + limits.setTimeScope(scope); + } + int getBatchLimit() { return limits.getBatch(); } @@ -230,6 +262,10 @@ public class ScannerContext { return limits.getSize(); } + long getTimeLimit() { + return limits.getTime(); + } + /** * @param checkerScope The scope that the limit is being checked from * @return true when the limit is enforceable from the checker's scope and it has been reached @@ -246,12 +282,22 @@ public class ScannerContext { return hasSizeLimit(checkerScope) && progress.getSize() >= limits.getSize(); } + /** + * @param checkerScope The scope that the limit is being checked from. The time limit is always + * checked against {@link System#currentTimeMillis()} + * @return true when the limit is enforceable from the checker's scope and it has been reached + */ + boolean checkTimeLimit(LimitScope checkerScope) { + return hasTimeLimit(checkerScope) && progress.getTime() >= limits.getTime(); + } + /** * @param checkerScope The scope that the limits are being checked from * @return true when some limit is enforceable from the checker's scope and it has been reached */ boolean checkAnyLimitReached(LimitScope checkerScope) { - return checkSizeLimit(checkerScope) || checkBatchLimit(checkerScope); + return checkSizeLimit(checkerScope) || checkBatchLimit(checkerScope) + || checkTimeLimit(checkerScope); } @Override @@ -305,6 +351,12 @@ public class ScannerContext { return this; } + public Builder setTimeLimit(LimitScope timeScope, long timeLimit) { + limits.setTime(timeLimit); + limits.setTimeScope(timeScope); + return this; + } + public Builder setBatchLimit(int batchLimit) { limits.setBatch(batchLimit); return this; @@ -328,6 +380,13 @@ public class ScannerContext { * of a row and thus a partial results was formed */ SIZE_LIMIT_REACHED_MID_ROW(true, true), + TIME_LIMIT_REACHED(true, true), + + /** + * Special case of time limit reached to indicate that the time limit was reached in the middle + * of a row and thus a partial results was formed + */ + TIME_LIMIT_REACHED_MID_ROW(true, true), BATCH_LIMIT_REACHED(true, true); private boolean moreValues; @@ -419,6 +478,7 @@ public class ScannerContext { */ private static int DEFAULT_BATCH = -1; private static long DEFAULT_SIZE = -1L; + private static long DEFAULT_TIME = -1L; /** * Default scope that is assigned to a limit if a scope is not specified. @@ -432,19 +492,23 @@ public class ScannerContext { LimitScope sizeScope = DEFAULT_SCOPE; long size = DEFAULT_SIZE; + LimitScope timeScope = DEFAULT_SCOPE; + long time = DEFAULT_TIME; + /** * Fields keep their default values. */ LimitFields() { } - LimitFields(int batch, LimitScope sizeScope, long size) { - setFields(batch, sizeScope, size); + LimitFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) { + setFields(batch, sizeScope, size, timeScope, time); } void copy(LimitFields limitsToCopy) { if (limitsToCopy != null) { - setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getSize()); + setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getSize(), + limitsToCopy.getTimeScope(), limitsToCopy.getTime()); } } @@ -454,10 +518,12 @@ public class ScannerContext { * @param sizeScope * @param size */ - void setFields(int batch, LimitScope sizeScope, long size) { + void setFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) { setBatch(batch); setSizeScope(sizeScope); setSize(size); + setTimeScope(timeScope); + setTime(time); } int getBatch() { @@ -506,6 +572,36 @@ public class ScannerContext { return this.sizeScope.canEnforceLimitFromScope(checkerScope); } + long getTime() { + return this.time; + } + + void setTime(long time) { + this.time = time; + } + + /** + * @return {@link LimitScope} indicating scope in which the time limit is enforced + */ + LimitScope getTimeScope() { + return this.timeScope; + } + + /** + * Change the scope in which the time limit is enforced + */ + void setTimeScope(LimitScope scope) { + this.timeScope = scope; + } + + /** + * @param checkerScope + * @return true when the limit can be enforced from the scope of the checker + */ + boolean canEnforceTimeLimitFromScope(LimitScope checkerScope) { + return this.sizeScope.canEnforceLimitFromScope(checkerScope); + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -520,6 +616,12 @@ public class ScannerContext { sb.append(", sizeScope:"); sb.append(sizeScope); + sb.append(", time:"); + sb.append(time); + + sb.append(", timeScope:"); + sb.append(timeScope); + sb.append("}"); return sb.toString(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 665ed461dae..f7e06ef7a43 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -83,6 +83,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner protected final long now; protected final int minVersions; protected final long maxRowSize; + protected final long cellsPerHeartbeatCheck; /** * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not @@ -100,6 +101,19 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner protected static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT; + /** + * The number of cells scanned in between timeout checks. Specifying a larger value means that + * timeout checks will occur less frequently. Specifying a small value will lead to more frequent + * timeout checks. + */ + public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = + "hbase.cells.scanned.per.heartbeat.check"; + + /** + * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}. + */ + public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000; + // if heap == null and lastTop != null, you need to reseek given the key below protected Cell lastTop = null; @@ -137,9 +151,17 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.maxRowSize = conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT); this.scanUsePread = conf.getBoolean("hbase.storescanner.use.pread", scan.isSmall()); + + long tmpCellsPerTimeoutCheck = + conf.getLong(HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, + DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK); + this.cellsPerHeartbeatCheck = + tmpCellsPerTimeoutCheck > 0 ? tmpCellsPerTimeoutCheck + : DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK; } else { this.maxRowSize = HConstants.TABLE_MAX_ROWSIZE_DEFAULT; this.scanUsePread = scan.isSmall(); + this.cellsPerHeartbeatCheck = DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK; } // We look up row-column Bloom filters for multi-column queries as part of @@ -458,6 +480,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public boolean next(List outResult, ScannerContext scannerContext) throws IOException { lock.lock(); + try { if (scannerContext == null) { throw new IllegalArgumentException("Scanner context cannot be null"); @@ -507,6 +530,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner long totalBytesRead = 0; LOOP: while((cell = this.heap.peek()) != null) { + // Update and check the time limit based on the configured value of cellsPerTimeoutCheck + if ((kvsScanned % cellsPerHeartbeatCheck == 0)) { + scannerContext.updateTimeProgress(); + if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { + return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); + } + } + if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap. checkScanOrder(prevCell, cell, comparator); prevCell = cell; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java index eef955ecf0d..3794e590238 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java @@ -217,7 +217,8 @@ public class TestPartialResultsFromClientSide { count++; } - assertTrue(scanner2.next() == null); + r2 = scanner2.next(); + assertTrue("r2: " + r2 + " Should be null", r2 == null); scanner1.close(); scanner2.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java index 67eca80bd4d..b2b718ca42e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java @@ -143,7 +143,6 @@ public class TestCoprocessorInterface { public int getBatch() { return delegate.getBatch(); } - } public static class CoprocessorImpl extends BaseRegionObserver { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java new file mode 100644 index 00000000000..495be300b72 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java @@ -0,0 +1,538 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; + +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HTestConst; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * Here we test to make sure that scans return the expected Results when the server is sending the + * Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent + * the scanner on the client side from timing out). A heartbeat message is sent from the server to + * the client when the server has exceeded the time limit during the processing of the scan. When + * the time limit is reached, the server will return to the Client whatever Results it has + * accumulated (potentially empty). + */ +@Category(MediumTests.class) +public class TestScannerHeartbeatMessages { + final Log LOG = LogFactory.getLog(getClass()); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static Table TABLE = null; + + /** + * Table configuration + */ + private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable"); + + private static int NUM_ROWS = 10; + private static byte[] ROW = Bytes.toBytes("testRow"); + private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); + + private static int NUM_FAMILIES = 3; + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES); + + private static int NUM_QUALIFIERS = 3; + private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS); + + private static int VALUE_SIZE = 128; + private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); + + // Time, in milliseconds, that the client will wait for a response from the server before timing + // out. This value is used server side to determine when it is necessary to send a heartbeat + // message to the client + private static int CLIENT_TIMEOUT = 500; + + // The server limits itself to running for half of the CLIENT_TIMEOUT value. + private static int SERVER_TIME_LIMIT = CLIENT_TIMEOUT / 2; + + // By default, at most one row's worth of cells will be retrieved before the time limit is reached + private static int DEFAULT_ROW_SLEEP_TIME = SERVER_TIME_LIMIT / 2; + // By default, at most cells for two column families are retrieved before the time limit is + // reached + private static int DEFAULT_CF_SLEEP_TIME = SERVER_TIME_LIMIT / 2; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + ((Log4JLogger) ScannerCallable.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger) HeartbeatRPCServices.LOG).getLogger().setLevel(Level.ALL); + Configuration conf = TEST_UTIL.getConfiguration(); + + conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName()); + conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName()); + conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT); + conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); + + // Check the timeout condition after every cell + conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1); + TEST_UTIL.startMiniCluster(1); + + TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); + } + + static Table createTestTable(TableName name, byte[][] rows, byte[][] families, + byte[][] qualifiers, byte[] cellValue) throws IOException { + Table ht = TEST_UTIL.createTable(name, families); + List puts = createPuts(rows, families, qualifiers, cellValue); + ht.put(puts); + + return ht; + } + + /** + * Make puts to put the input value into each combination of row, family, and qualifier + * @param rows + * @param families + * @param qualifiers + * @param value + * @return + * @throws IOException + */ + static ArrayList createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, + byte[] value) throws IOException { + Put put; + ArrayList puts = new ArrayList<>(); + + for (int row = 0; row < rows.length; row++) { + put = new Put(rows[row]); + for (int fam = 0; fam < families.length; fam++) { + for (int qual = 0; qual < qualifiers.length; qual++) { + KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value); + put.add(kv); + } + } + puts.add(put); + } + + return puts; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setupBeforeTest() throws Exception { + disableSleeping(); + } + + @After + public void teardownAfterTest() throws Exception { + disableSleeping(); + } + + /** + * Test a variety of scan configurations to ensure that they return the expected Results when + * heartbeat messages are necessary. These tests are accumulated under one test case to ensure + * that they don't run in parallel. If the tests ran in parallel, they may conflict with each + * other due to changing static variables + */ + @Test + public void testScannerHeartbeatMessages() throws Exception { + testImportanceOfHeartbeats(testHeartbeatBetweenRows()); + testImportanceOfHeartbeats(testHeartbeatBetweenColumnFamilies()); + } + + /** + * Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass + * when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are + * disabled, the test should throw an exception. + * @param testCallable + * @throws InterruptedException + */ + public void testImportanceOfHeartbeats(Callable testCallable) throws InterruptedException { + HeartbeatRPCServices.heartbeatsEnabled = true; + + try { + testCallable.call(); + } catch (Exception e) { + fail("Heartbeat messages are enabled, exceptions should NOT be thrown. Exception trace:" + + ExceptionUtils.getStackTrace(e)); + } + + HeartbeatRPCServices.heartbeatsEnabled = false; + try { + testCallable.call(); + } catch (Exception e) { + return; + } finally { + HeartbeatRPCServices.heartbeatsEnabled = true; + } + fail("Heartbeats messages are disabled, an exception should be thrown. If an exception " + + " is not thrown, the test case is not testing the importance of heartbeat messages"); + } + + /** + * Test the case that the time limit for the scan is reached after each full row of cells is + * fetched. + * @throws Exception + */ + public Callable testHeartbeatBetweenRows() throws Exception { + return new Callable() { + + @Override + public Void call() throws Exception { + // Configure the scan so that it can read the entire table in a single RPC. We want to test + // the case where a scan stops on the server side due to a time limit + Scan scan = new Scan(); + scan.setMaxResultSize(Long.MAX_VALUE); + scan.setCaching(Integer.MAX_VALUE); + + testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, false); + return null; + } + }; + } + + /** + * Test the case that the time limit for scans is reached in between column families + * @throws Exception + */ + public Callable testHeartbeatBetweenColumnFamilies() throws Exception { + return new Callable() { + @Override + public Void call() throws Exception { + // Configure the scan so that it can read the entire table in a single RPC. We want to test + // the case where a scan stops on the server side due to a time limit + Scan baseScan = new Scan(); + baseScan.setMaxResultSize(Long.MAX_VALUE); + baseScan.setCaching(Integer.MAX_VALUE); + + // Copy the scan before each test. When a scan object is used by a scanner, some of its + // fields may be changed such as start row + Scan scanCopy = new Scan(baseScan); + testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, false); + scanCopy = new Scan(baseScan); + testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, true); + return null; + } + }; + } + + /** + * Test the equivalence of a scan versus the same scan executed when heartbeat messages are + * necessary + * @param scan The scan configuration being tested + * @param rowSleepTime The time to sleep between fetches of row cells + * @param cfSleepTime The time to sleep between fetches of column family cells + * @param sleepBeforeCf set to true when column family sleeps should occur before the cells for + * that column family are fetched + * @throws Exception + */ + public void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime, + int cfSleepTime, boolean sleepBeforeCf) throws Exception { + disableSleeping(); + final ResultScanner scanner = TABLE.getScanner(scan); + final ResultScanner scannerWithHeartbeats = TABLE.getScanner(scan); + + Result r1 = null; + Result r2 = null; + + while ((r1 = scanner.next()) != null) { + // Enforce the specified sleep conditions during calls to the heartbeat scanner + configureSleepTime(rowSleepTime, cfSleepTime, sleepBeforeCf); + r2 = scannerWithHeartbeats.next(); + disableSleeping(); + + assertTrue(r2 != null); + try { + Result.compareResults(r1, r2); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + assertTrue(scannerWithHeartbeats.next() == null); + scanner.close(); + scannerWithHeartbeats.close(); + } + + /** + * Helper method for setting the time to sleep between rows and column families. If a sleep time + * is negative then that sleep will be disabled + * @param rowSleepTime + * @param cfSleepTime + */ + private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) { + HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0; + HeartbeatHRegion.rowSleepTime = rowSleepTime; + + HeartbeatHRegion.sleepBetweenColumnFamilies = cfSleepTime > 0; + HeartbeatHRegion.columnFamilySleepTime = cfSleepTime; + HeartbeatHRegion.sleepBeforeColumnFamily = sleepBeforeCf; + } + + /** + * Disable the sleeping mechanism server side. + */ + private static void disableSleeping() { + HeartbeatHRegion.sleepBetweenRows = false; + HeartbeatHRegion.sleepBetweenColumnFamilies = false; + } + + /** + * Custom HRegionServer instance that instantiates {@link HeartbeatRPCServices} in place of + * {@link RSRpcServices} to allow us to toggle support for heartbeat messages + */ + private static class HeartbeatHRegionServer extends HRegionServer { + public HeartbeatHRegionServer(Configuration conf) throws IOException, InterruptedException { + super(conf); + } + + public HeartbeatHRegionServer(Configuration conf, CoordinatedStateManager csm) + throws IOException { + super(conf, csm); + } + + @Override + protected RSRpcServices createRpcServices() throws IOException { + return new HeartbeatRPCServices(this); + } + } + + /** + * Custom RSRpcServices instance that allows heartbeat support to be toggled + */ + private static class HeartbeatRPCServices extends RSRpcServices { + private static boolean heartbeatsEnabled = true; + + public HeartbeatRPCServices(HRegionServer rs) throws IOException { + super(rs); + } + + @Override + public ScanResponse scan(RpcController controller, ScanRequest request) + throws ServiceException { + ScanRequest.Builder builder = ScanRequest.newBuilder(request); + builder.setClientHandlesHeartbeats(heartbeatsEnabled); + return super.scan(controller, builder.build()); + } + } + + /** + * Custom HRegion class that instantiates {@link RegionScanner}s with configurable sleep times + * between fetches of row Results and/or column family cells. Useful for emulating an instance + * where the server is taking a long time to process a client's scan request + */ + private static class HeartbeatHRegion extends HRegion { + // Row sleeps occur AFTER each row worth of cells is retrieved. + private static int rowSleepTime = DEFAULT_ROW_SLEEP_TIME; + private static boolean sleepBetweenRows = false; + + // The sleep for column families can be initiated before or after we fetch the cells for the + // column family. If the sleep occurs BEFORE then the time limits will be reached inside + // StoreScanner while we are fetching individual cells. If the sleep occurs AFTER then the time + // limit will be reached inside RegionScanner after all the cells for a column family have been + // retrieved. + private static boolean sleepBeforeColumnFamily = false; + private static int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME; + private static boolean sleepBetweenColumnFamilies = false; + + public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, + HRegionInfo regionInfo, HTableDescriptor htd, RegionServerServices rsServices) { + super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); + } + + public HeartbeatHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam, + HTableDescriptor htd, RegionServerServices rsServices) { + super(fs, wal, confParam, htd, rsServices); + } + + private static void columnFamilySleep() { + if (HeartbeatHRegion.sleepBetweenColumnFamilies) { + try { + Thread.sleep(HeartbeatHRegion.columnFamilySleepTime); + } catch (InterruptedException e) { + } + } + } + + private static void rowSleep() { + try { + if (HeartbeatHRegion.sleepBetweenRows) { + Thread.sleep(HeartbeatHRegion.rowSleepTime); + } + } catch (InterruptedException e) { + } + } + + // Instantiate the custom heartbeat region scanners + @Override + protected RegionScanner instantiateRegionScanner(Scan scan, + List additionalScanners) throws IOException { + if (scan.isReversed()) { + if (scan.getFilter() != null) { + scan.getFilter().setReversed(true); + } + return new HeartbeatReversedRegionScanner(scan, additionalScanners, this); + } + return new HeartbeatRegionScanner(scan, additionalScanners, this); + } + } + + /** + * Custom ReversedRegionScanner that can be configured to sleep between retrievals of row Results + * and/or column family cells + */ + private static class HeartbeatReversedRegionScanner extends ReversedRegionScannerImpl { + HeartbeatReversedRegionScanner(Scan scan, List additionalScanners, + HRegion region) throws IOException { + super(scan, additionalScanners, region); + } + + @Override + public boolean nextRaw(List outResults, ScannerContext context) + throws IOException { + boolean moreRows = super.nextRaw(outResults, context); + HeartbeatHRegion.rowSleep(); + return moreRows; + } + + @Override + protected void initializeKVHeap(List scanners, + List joinedScanners, HRegion region) throws IOException { + this.storeHeap = new HeartbeatReversedKVHeap(scanners, region.getComparator()); + if (!joinedScanners.isEmpty()) { + this.joinedHeap = new HeartbeatReversedKVHeap(joinedScanners, region.getComparator()); + } + } + } + + /** + * Custom RegionScanner that can be configured to sleep between retrievals of row Results and/or + * column family cells + */ + private static class HeartbeatRegionScanner extends RegionScannerImpl { + HeartbeatRegionScanner(Scan scan, List additionalScanners, HRegion region) + throws IOException { + region.super(scan, additionalScanners, region); + } + + @Override + public boolean nextRaw(List outResults, ScannerContext context) + throws IOException { + boolean moreRows = super.nextRaw(outResults, context); + HeartbeatHRegion.rowSleep(); + return moreRows; + } + + @Override + protected void initializeKVHeap(List scanners, + List joinedScanners, HRegion region) throws IOException { + this.storeHeap = new HeartbeatKVHeap(scanners, region.getComparator()); + if (!joinedScanners.isEmpty()) { + this.joinedHeap = new HeartbeatKVHeap(joinedScanners, region.getComparator()); + } + } + } + + /** + * Custom KV Heap that can be configured to sleep/wait in between retrievals of column family + * cells. Useful for testing + */ + private static final class HeartbeatKVHeap extends KeyValueHeap { + public HeartbeatKVHeap(List scanners, KVComparator comparator) + throws IOException { + super(scanners, comparator); + } + + HeartbeatKVHeap(List scanners, KVScannerComparator comparator) + throws IOException { + super(scanners, comparator); + } + + @Override + public boolean next(List result, ScannerContext context) + throws IOException { + if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); + boolean moreRows = super.next(result, context); + if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); + return moreRows; + } + } + + /** + * Custom reversed KV Heap that can be configured to sleep in between retrievals of column family + * cells. + */ + private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap { + public HeartbeatReversedKVHeap(List scanners, + KVComparator comparator) throws IOException { + super(scanners, comparator); + } + + @Override + public boolean next(List result, ScannerContext context) + throws IOException { + if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); + boolean moreRows = super.next(result, context); + if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); + return moreRows; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index ab9ee79db5e..5e62af1ec3a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -778,7 +778,6 @@ public class TestStripeCompactionPolicy { public boolean next(List results) throws IOException { if (kvs.isEmpty()) return false; results.add(kvs.remove(0)); - return !kvs.isEmpty(); }