diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
index 2d4a430e2ad..d05eb579fcc 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
@@ -3895,6 +3895,16 @@ public final class RPCProtos {
*
*/
int getPriority();
+
+ // optional uint32 timeout = 7;
+ /**
+ * optional uint32 timeout = 7;
+ */
+ boolean hasTimeout();
+ /**
+ * optional uint32 timeout = 7;
+ */
+ int getTimeout();
}
/**
* Protobuf type {@code hbase.pb.RequestHeader}
@@ -3997,6 +4007,11 @@ public final class RPCProtos {
priority_ = input.readUInt32();
break;
}
+ case 56: {
+ bitField0_ |= 0x00000040;
+ timeout_ = input.readUInt32();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -4210,6 +4225,22 @@ public final class RPCProtos {
return priority_;
}
+ // optional uint32 timeout = 7;
+ public static final int TIMEOUT_FIELD_NUMBER = 7;
+ private int timeout_;
+ /**
+ * optional uint32 timeout = 7;
+ */
+ public boolean hasTimeout() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ /**
+ * optional uint32 timeout = 7;
+ */
+ public int getTimeout() {
+ return timeout_;
+ }
+
private void initFields() {
callId_ = 0;
traceInfo_ = org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo.getDefaultInstance();
@@ -4217,6 +4248,7 @@ public final class RPCProtos {
requestParam_ = false;
cellBlockMeta_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta.getDefaultInstance();
priority_ = 0;
+ timeout_ = 0;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -4248,6 +4280,9 @@ public final class RPCProtos {
if (((bitField0_ & 0x00000020) == 0x00000020)) {
output.writeUInt32(6, priority_);
}
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ output.writeUInt32(7, timeout_);
+ }
getUnknownFields().writeTo(output);
}
@@ -4281,6 +4316,10 @@ public final class RPCProtos {
size += com.google.protobuf.CodedOutputStream
.computeUInt32Size(6, priority_);
}
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeUInt32Size(7, timeout_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -4334,6 +4373,11 @@ public final class RPCProtos {
result = result && (getPriority()
== other.getPriority());
}
+ result = result && (hasTimeout() == other.hasTimeout());
+ if (hasTimeout()) {
+ result = result && (getTimeout()
+ == other.getTimeout());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -4371,6 +4415,10 @@ public final class RPCProtos {
hash = (37 * hash) + PRIORITY_FIELD_NUMBER;
hash = (53 * hash) + getPriority();
}
+ if (hasTimeout()) {
+ hash = (37 * hash) + TIMEOUT_FIELD_NUMBER;
+ hash = (53 * hash) + getTimeout();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -4506,6 +4554,8 @@ public final class RPCProtos {
bitField0_ = (bitField0_ & ~0x00000010);
priority_ = 0;
bitField0_ = (bitField0_ & ~0x00000020);
+ timeout_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000040);
return this;
}
@@ -4566,6 +4616,10 @@ public final class RPCProtos {
to_bitField0_ |= 0x00000020;
}
result.priority_ = priority_;
+ if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+ to_bitField0_ |= 0x00000040;
+ }
+ result.timeout_ = timeout_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -4602,6 +4656,9 @@ public final class RPCProtos {
if (other.hasPriority()) {
setPriority(other.getPriority());
}
+ if (other.hasTimeout()) {
+ setTimeout(other.getTimeout());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -5124,6 +5181,39 @@ public final class RPCProtos {
return this;
}
+ // optional uint32 timeout = 7;
+ private int timeout_ ;
+ /**
+ * optional uint32 timeout = 7;
+ */
+ public boolean hasTimeout() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ /**
+ * optional uint32 timeout = 7;
+ */
+ public int getTimeout() {
+ return timeout_;
+ }
+ /**
+ * optional uint32 timeout = 7;
+ */
+ public Builder setTimeout(int value) {
+ bitField0_ |= 0x00000040;
+ timeout_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional uint32 timeout = 7;
+ */
+ public Builder clearTimeout() {
+ bitField0_ = (bitField0_ & ~0x00000040);
+ timeout_ = 0;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:hbase.pb.RequestHeader)
}
@@ -6141,17 +6231,17 @@ public final class RPCProtos {
"llBlockMeta\022\016\n\006length\030\001 \001(\r\"|\n\021Exception" +
"Response\022\034\n\024exception_class_name\030\001 \001(\t\022\023",
"\n\013stack_trace\030\002 \001(\t\022\020\n\010hostname\030\003 \001(\t\022\014\n" +
- "\004port\030\004 \001(\005\022\024\n\014do_not_retry\030\005 \001(\010\"\270\001\n\rRe" +
+ "\004port\030\004 \001(\005\022\024\n\014do_not_retry\030\005 \001(\010\"\311\001\n\rRe" +
"questHeader\022\017\n\007call_id\030\001 \001(\r\022&\n\ntrace_in" +
"fo\030\002 \001(\0132\022.hbase.pb.RPCTInfo\022\023\n\013method_n" +
"ame\030\003 \001(\t\022\025\n\rrequest_param\030\004 \001(\010\0220\n\017cell" +
"_block_meta\030\005 \001(\0132\027.hbase.pb.CellBlockMe" +
- "ta\022\020\n\010priority\030\006 \001(\r\"\203\001\n\016ResponseHeader\022" +
- "\017\n\007call_id\030\001 \001(\r\022.\n\texception\030\002 \001(\0132\033.hb" +
- "ase.pb.ExceptionResponse\0220\n\017cell_block_m" +
- "eta\030\003 \001(\0132\027.hbase.pb.CellBlockMetaB<\n*or",
- "g.apache.hadoop.hbase.protobuf.generated" +
- "B\tRPCProtosH\001\240\001\001"
+ "ta\022\020\n\010priority\030\006 \001(\r\022\017\n\007timeout\030\007 \001(\r\"\203\001" +
+ "\n\016ResponseHeader\022\017\n\007call_id\030\001 \001(\r\022.\n\texc" +
+ "eption\030\002 \001(\0132\033.hbase.pb.ExceptionRespons" +
+ "e\0220\n\017cell_block_meta\030\003 \001(\0132\027.hbase.pb.Ce",
+ "llBlockMetaB<\n*org.apache.hadoop.hbase.p" +
+ "rotobuf.generatedB\tRPCProtosH\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -6187,7 +6277,7 @@ public final class RPCProtos {
internal_static_hbase_pb_RequestHeader_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_hbase_pb_RequestHeader_descriptor,
- new java.lang.String[] { "CallId", "TraceInfo", "MethodName", "RequestParam", "CellBlockMeta", "Priority", });
+ new java.lang.String[] { "CallId", "TraceInfo", "MethodName", "RequestParam", "CellBlockMeta", "Priority", "Timeout", });
internal_static_hbase_pb_ResponseHeader_descriptor =
getDescriptor().getMessageTypes().get(5);
internal_static_hbase_pb_ResponseHeader_fieldAccessorTable = new
diff --git a/hbase-protocol/src/main/protobuf/RPC.proto b/hbase-protocol/src/main/protobuf/RPC.proto
index 59bb03d9e2e..8413d2590f3 100644
--- a/hbase-protocol/src/main/protobuf/RPC.proto
+++ b/hbase-protocol/src/main/protobuf/RPC.proto
@@ -125,6 +125,7 @@ message RequestHeader {
// 0 is NORMAL priority. 200 is HIGH. If no priority, treat it as NORMAL.
// See HConstants.
optional uint32 priority = 6;
+ optional uint32 timeout = 7;
}
message ResponseHeader {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 3514245c0f4..00e08c994b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -114,7 +114,7 @@ public class CallRunner {
}
// make the call
resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
- call.timestamp, this.status);
+ call.timestamp, this.status, call.timeout);
} catch (Throwable e) {
RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e);
errorThrowable = e;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index b9a9b2614fd..483ce86ef51 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -264,6 +264,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
+ /**
+ * Minimum allowable timeout (in milliseconds) in rpc request's header. This
+ * configuration exists to prevent the rpc service regarding this request as timeout immediately.
+ */
+ private static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout";
+ private static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
+
/** Default value for above params */
private static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
@@ -274,6 +281,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private final int maxRequestSize;
private final int warnResponseTime;
private final int warnResponseSize;
+
+ private final int minClientRequestTimeout;
+
private final Server server;
private final List services;
@@ -302,6 +312,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
protected Connection connection; // connection to client
protected long timestamp; // the time received when response is null
// the time served when response is not null
+ protected int timeout;
/**
* Chain of buffers to send as response.
*/
@@ -325,7 +336,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
justification="Can't figure why this complaint is happening... see below")
Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, Connection connection, Responder responder,
- long size, TraceInfo tinfo, final InetAddress remoteAddress) {
+ long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout) {
this.id = id;
this.service = service;
this.md = md;
@@ -343,6 +354,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.remoteAddress = remoteAddress;
this.retryImmediatelySupported =
connection == null? null: connection.retryImmediatelySupported;
+ this.timeout = timeout;
}
/**
@@ -1275,13 +1287,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// Fake 'call' for failed authorization response
private static final int AUTHORIZATION_FAILED_CALLID = -1;
private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
- null, null, this, null, 0, null, null);
+ null, null, this, null, 0, null, null, 0);
private ByteArrayOutputStream authFailedResponse =
new ByteArrayOutputStream();
// Fake 'call' for SASL context setup
private static final int SASL_CALLID = -33;
private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
- 0, null, null);
+ 0, null, null, 0);
// was authentication allowed with a fallback to simple auth
private boolean authenticatedWithFallback;
@@ -1699,7 +1711,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
LOG.warn(msg);
- Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null);
+ Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null,0);
setupResponse(null, fakeCall, e, msg);
responder.doRespond(fakeCall);
// Returning -1 closes out the connection.
@@ -1874,7 +1886,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
final Call callTooBig =
new Call(id, this.service, null, null, null, null, this,
- responder, totalRequestSize, null, null);
+ responder, totalRequestSize, null, null, 0);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
@@ -1923,7 +1935,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
final Call readParamsFailedCall =
new Call(id, this.service, null, null, null, null, this,
- responder, totalRequestSize, null, null);
+ responder, totalRequestSize, null, null, 0);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
setupResponse(responseBuffer, readParamsFailedCall, t,
msg + "; " + t.getMessage());
@@ -1934,8 +1946,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
TraceInfo traceInfo = header.hasTraceInfo()
? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
: null;
+ int timeout = 0;
+ if (header.hasTimeout()){
+ timeout = Math.max(minClientRequestTimeout, header.getTimeout());
+ }
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
- totalRequestSize, traceInfo, this.addr);
+ totalRequestSize, traceInfo, this.addr, timeout);
if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
callQueueSize.add(-1 * call.getSize());
@@ -2087,7 +2103,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
-
+ this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT,
+ DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT);
this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
// Start the listener here and let it bind to the port
@@ -2228,6 +2245,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.secretManager = (SecretManager) secretManager;
}
+ public Pair call(BlockingService service, MethodDescriptor md,
+ Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
+ throws IOException {
+ return call(service, md, param, cellScanner, receiveTime, status, 0);
+ }
+
/**
* This is a server side method, which is invoked over RPC. On success
* the return response has protobuf response payload. On failure, the
@@ -2235,7 +2258,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
*/
@Override
public Pair call(BlockingService service, MethodDescriptor md,
- Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
+ Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
+ int timeout)
throws IOException {
try {
status.setRPC(md.getName(), new Object[]{param}, receiveTime);
@@ -2245,6 +2269,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
//get an instance of the method arg type
long startTime = System.currentTimeMillis();
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
+ controller.setCallTimeout(timeout);
Message result = service.callBlockingMethod(md, controller, param);
long endTime = System.currentTimeMillis();
int processingTime = (int) (endTime - startTime);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
index ab8b48586d6..dd7e584dee1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
@@ -52,6 +52,11 @@ public interface RpcServerInterface {
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
throws IOException, ServiceException;
+ Pair call(BlockingService service, MethodDescriptor md,
+ Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
+ int timeout)
+ throws IOException, ServiceException;
+
void setErrorHandler(HBaseRPCErrorHandler handler);
HBaseRPCErrorHandler getErrorHandler();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 95ab36dfec0..ebd85bda3c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.ipc.TimeLimitedRpcController;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -2698,6 +2699,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
timeLimitDelta =
scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
}
+ if (controller instanceof TimeLimitedRpcController) {
+ TimeLimitedRpcController timeLimitedRpcController =
+ (TimeLimitedRpcController)controller;
+ if (timeLimitedRpcController.getCallTimeout() > 0) {
+ timeLimitDelta = Math.min(timeLimitDelta,
+ timeLimitedRpcController.getCallTimeout());
+ }
+ }
// Use half of whichever timeout value was more restrictive... But don't allow
// the time limit to be less than the allowable minimum (could cause an
// immediatate timeout before scanning any data).
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
index b8eacc70378..a958ee09d73 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -102,16 +103,16 @@ public class TestScannerHeartbeatMessages {
private static int VALUE_SIZE = 128;
private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
+
+ private static int SERVER_TIMEOUT = 6000;
+
// Time, in milliseconds, that the client will wait for a response from the server before timing
// out. This value is used server side to determine when it is necessary to send a heartbeat
// message to the client
- private static int CLIENT_TIMEOUT = 2000;
-
- // The server limits itself to running for half of the CLIENT_TIMEOUT value.
- private static int SERVER_TIME_LIMIT = CLIENT_TIMEOUT / 2;
+ private static int CLIENT_TIMEOUT = SERVER_TIMEOUT / 3;
// By default, at most one row's worth of cells will be retrieved before the time limit is reached
- private static int DEFAULT_ROW_SLEEP_TIME = SERVER_TIME_LIMIT / 2;
+ private static int DEFAULT_ROW_SLEEP_TIME = CLIENT_TIMEOUT / 5;
// By default, at most cells for two column families are retrieved before the time limit is
// reached
private static int DEFAULT_CF_SLEEP_TIME = DEFAULT_ROW_SLEEP_TIME / NUM_FAMILIES;
@@ -124,8 +125,8 @@ public class TestScannerHeartbeatMessages {
conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName());
conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName());
- conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
- conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT);
+ conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT);
+ conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT);
conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1);
// Check the timeout condition after every cell
@@ -140,7 +141,7 @@ public class TestScannerHeartbeatMessages {
Table ht = TEST_UTIL.createTable(name, families);
List puts = createPuts(rows, families, qualifiers, cellValue);
ht.put(puts);
-
+ ht.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
return ht;
}
@@ -282,7 +283,7 @@ public class TestScannerHeartbeatMessages {
@Override
public ReturnCode filterKeyValue(Cell v) throws IOException {
try {
- Thread.sleep(SERVER_TIME_LIMIT + 10);
+ Thread.sleep(CLIENT_TIMEOUT/2 + 10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}