HBASE-15593 Time limit of scanning should be offered by client
Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
parent
7b5d5394c0
commit
3bd9220f0c
|
@ -397,6 +397,8 @@ public class AsyncRpcChannelImpl implements AsyncRpcChannel {
|
|||
if (call.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) {
|
||||
requestHeaderBuilder.setPriority(call.getPriority());
|
||||
}
|
||||
requestHeaderBuilder.setTimeout(call.rpcTimeout > Integer.MAX_VALUE ?
|
||||
Integer.MAX_VALUE : (int)call.rpcTimeout);
|
||||
|
||||
RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
|
||||
|
||||
|
|
|
@ -907,6 +907,7 @@ public class RpcClientImpl extends AbstractRpcClient {
|
|||
if (priority != PayloadCarryingRpcController.PRIORITY_UNSET) {
|
||||
builder.setPriority(priority);
|
||||
}
|
||||
builder.setTimeout(call.timeout);
|
||||
RequestHeader requestHeader = builder.build();
|
||||
|
||||
setupIOstreams();
|
||||
|
|
|
@ -3895,6 +3895,16 @@ public final class RPCProtos {
|
|||
* </pre>
|
||||
*/
|
||||
int getPriority();
|
||||
|
||||
// optional uint32 timeout = 7;
|
||||
/**
|
||||
* <code>optional uint32 timeout = 7;</code>
|
||||
*/
|
||||
boolean hasTimeout();
|
||||
/**
|
||||
* <code>optional uint32 timeout = 7;</code>
|
||||
*/
|
||||
int getTimeout();
|
||||
}
|
||||
/**
|
||||
* Protobuf type {@code hbase.pb.RequestHeader}
|
||||
|
@ -3997,6 +4007,11 @@ public final class RPCProtos {
|
|||
priority_ = input.readUInt32();
|
||||
break;
|
||||
}
|
||||
case 56: {
|
||||
bitField0_ |= 0x00000040;
|
||||
timeout_ = input.readUInt32();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
|
||||
|
@ -4210,6 +4225,22 @@ public final class RPCProtos {
|
|||
return priority_;
|
||||
}
|
||||
|
||||
// optional uint32 timeout = 7;
|
||||
public static final int TIMEOUT_FIELD_NUMBER = 7;
|
||||
private int timeout_;
|
||||
/**
|
||||
* <code>optional uint32 timeout = 7;</code>
|
||||
*/
|
||||
public boolean hasTimeout() {
|
||||
return ((bitField0_ & 0x00000040) == 0x00000040);
|
||||
}
|
||||
/**
|
||||
* <code>optional uint32 timeout = 7;</code>
|
||||
*/
|
||||
public int getTimeout() {
|
||||
return timeout_;
|
||||
}
|
||||
|
||||
private void initFields() {
|
||||
callId_ = 0;
|
||||
traceInfo_ = org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo.getDefaultInstance();
|
||||
|
@ -4217,6 +4248,7 @@ public final class RPCProtos {
|
|||
requestParam_ = false;
|
||||
cellBlockMeta_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta.getDefaultInstance();
|
||||
priority_ = 0;
|
||||
timeout_ = 0;
|
||||
}
|
||||
private byte memoizedIsInitialized = -1;
|
||||
public final boolean isInitialized() {
|
||||
|
@ -4248,6 +4280,9 @@ public final class RPCProtos {
|
|||
if (((bitField0_ & 0x00000020) == 0x00000020)) {
|
||||
output.writeUInt32(6, priority_);
|
||||
}
|
||||
if (((bitField0_ & 0x00000040) == 0x00000040)) {
|
||||
output.writeUInt32(7, timeout_);
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
||||
|
@ -4281,6 +4316,10 @@ public final class RPCProtos {
|
|||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeUInt32Size(6, priority_);
|
||||
}
|
||||
if (((bitField0_ & 0x00000040) == 0x00000040)) {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeUInt32Size(7, timeout_);
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
return size;
|
||||
|
@ -4334,6 +4373,11 @@ public final class RPCProtos {
|
|||
result = result && (getPriority()
|
||||
== other.getPriority());
|
||||
}
|
||||
result = result && (hasTimeout() == other.hasTimeout());
|
||||
if (hasTimeout()) {
|
||||
result = result && (getTimeout()
|
||||
== other.getTimeout());
|
||||
}
|
||||
result = result &&
|
||||
getUnknownFields().equals(other.getUnknownFields());
|
||||
return result;
|
||||
|
@ -4371,6 +4415,10 @@ public final class RPCProtos {
|
|||
hash = (37 * hash) + PRIORITY_FIELD_NUMBER;
|
||||
hash = (53 * hash) + getPriority();
|
||||
}
|
||||
if (hasTimeout()) {
|
||||
hash = (37 * hash) + TIMEOUT_FIELD_NUMBER;
|
||||
hash = (53 * hash) + getTimeout();
|
||||
}
|
||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||
memoizedHashCode = hash;
|
||||
return hash;
|
||||
|
@ -4506,6 +4554,8 @@ public final class RPCProtos {
|
|||
bitField0_ = (bitField0_ & ~0x00000010);
|
||||
priority_ = 0;
|
||||
bitField0_ = (bitField0_ & ~0x00000020);
|
||||
timeout_ = 0;
|
||||
bitField0_ = (bitField0_ & ~0x00000040);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -4566,6 +4616,10 @@ public final class RPCProtos {
|
|||
to_bitField0_ |= 0x00000020;
|
||||
}
|
||||
result.priority_ = priority_;
|
||||
if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
|
||||
to_bitField0_ |= 0x00000040;
|
||||
}
|
||||
result.timeout_ = timeout_;
|
||||
result.bitField0_ = to_bitField0_;
|
||||
onBuilt();
|
||||
return result;
|
||||
|
@ -4602,6 +4656,9 @@ public final class RPCProtos {
|
|||
if (other.hasPriority()) {
|
||||
setPriority(other.getPriority());
|
||||
}
|
||||
if (other.hasTimeout()) {
|
||||
setTimeout(other.getTimeout());
|
||||
}
|
||||
this.mergeUnknownFields(other.getUnknownFields());
|
||||
return this;
|
||||
}
|
||||
|
@ -5124,6 +5181,39 @@ public final class RPCProtos {
|
|||
return this;
|
||||
}
|
||||
|
||||
// optional uint32 timeout = 7;
|
||||
private int timeout_ ;
|
||||
/**
|
||||
* <code>optional uint32 timeout = 7;</code>
|
||||
*/
|
||||
public boolean hasTimeout() {
|
||||
return ((bitField0_ & 0x00000040) == 0x00000040);
|
||||
}
|
||||
/**
|
||||
* <code>optional uint32 timeout = 7;</code>
|
||||
*/
|
||||
public int getTimeout() {
|
||||
return timeout_;
|
||||
}
|
||||
/**
|
||||
* <code>optional uint32 timeout = 7;</code>
|
||||
*/
|
||||
public Builder setTimeout(int value) {
|
||||
bitField0_ |= 0x00000040;
|
||||
timeout_ = value;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>optional uint32 timeout = 7;</code>
|
||||
*/
|
||||
public Builder clearTimeout() {
|
||||
bitField0_ = (bitField0_ & ~0x00000040);
|
||||
timeout_ = 0;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
// @@protoc_insertion_point(builder_scope:hbase.pb.RequestHeader)
|
||||
}
|
||||
|
||||
|
@ -6141,17 +6231,17 @@ public final class RPCProtos {
|
|||
"llBlockMeta\022\016\n\006length\030\001 \001(\r\"|\n\021Exception" +
|
||||
"Response\022\034\n\024exception_class_name\030\001 \001(\t\022\023",
|
||||
"\n\013stack_trace\030\002 \001(\t\022\020\n\010hostname\030\003 \001(\t\022\014\n" +
|
||||
"\004port\030\004 \001(\005\022\024\n\014do_not_retry\030\005 \001(\010\"\270\001\n\rRe" +
|
||||
"\004port\030\004 \001(\005\022\024\n\014do_not_retry\030\005 \001(\010\"\311\001\n\rRe" +
|
||||
"questHeader\022\017\n\007call_id\030\001 \001(\r\022&\n\ntrace_in" +
|
||||
"fo\030\002 \001(\0132\022.hbase.pb.RPCTInfo\022\023\n\013method_n" +
|
||||
"ame\030\003 \001(\t\022\025\n\rrequest_param\030\004 \001(\010\0220\n\017cell" +
|
||||
"_block_meta\030\005 \001(\0132\027.hbase.pb.CellBlockMe" +
|
||||
"ta\022\020\n\010priority\030\006 \001(\r\"\203\001\n\016ResponseHeader\022" +
|
||||
"\017\n\007call_id\030\001 \001(\r\022.\n\texception\030\002 \001(\0132\033.hb" +
|
||||
"ase.pb.ExceptionResponse\0220\n\017cell_block_m" +
|
||||
"eta\030\003 \001(\0132\027.hbase.pb.CellBlockMetaB<\n*or",
|
||||
"g.apache.hadoop.hbase.protobuf.generated" +
|
||||
"B\tRPCProtosH\001\240\001\001"
|
||||
"ta\022\020\n\010priority\030\006 \001(\r\022\017\n\007timeout\030\007 \001(\r\"\203\001" +
|
||||
"\n\016ResponseHeader\022\017\n\007call_id\030\001 \001(\r\022.\n\texc" +
|
||||
"eption\030\002 \001(\0132\033.hbase.pb.ExceptionRespons" +
|
||||
"e\0220\n\017cell_block_meta\030\003 \001(\0132\027.hbase.pb.Ce",
|
||||
"llBlockMetaB<\n*org.apache.hadoop.hbase.p" +
|
||||
"rotobuf.generatedB\tRPCProtosH\001\240\001\001"
|
||||
};
|
||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
@ -6187,7 +6277,7 @@ public final class RPCProtos {
|
|||
internal_static_hbase_pb_RequestHeader_fieldAccessorTable = new
|
||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_hbase_pb_RequestHeader_descriptor,
|
||||
new java.lang.String[] { "CallId", "TraceInfo", "MethodName", "RequestParam", "CellBlockMeta", "Priority", });
|
||||
new java.lang.String[] { "CallId", "TraceInfo", "MethodName", "RequestParam", "CellBlockMeta", "Priority", "Timeout", });
|
||||
internal_static_hbase_pb_ResponseHeader_descriptor =
|
||||
getDescriptor().getMessageTypes().get(5);
|
||||
internal_static_hbase_pb_ResponseHeader_fieldAccessorTable = new
|
||||
|
|
|
@ -125,6 +125,7 @@ message RequestHeader {
|
|||
// 0 is NORMAL priority. 200 is HIGH. If no priority, treat it as NORMAL.
|
||||
// See HConstants.
|
||||
optional uint32 priority = 6;
|
||||
optional uint32 timeout = 7;
|
||||
}
|
||||
|
||||
message ResponseHeader {
|
||||
|
|
|
@ -114,7 +114,7 @@ public class CallRunner {
|
|||
}
|
||||
// make the call
|
||||
resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
|
||||
call.timestamp, this.status);
|
||||
call.timestamp, this.status, call.timeout);
|
||||
} catch (Throwable e) {
|
||||
RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e);
|
||||
errorThrowable = e;
|
||||
|
|
|
@ -264,6 +264,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
|
||||
private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
|
||||
|
||||
/**
|
||||
* Minimum allowable timeout (in milliseconds) in rpc request's header. This
|
||||
* configuration exists to prevent the rpc service regarding this request as timeout immediately.
|
||||
*/
|
||||
private static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout";
|
||||
private static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
|
||||
|
||||
/** Default value for above params */
|
||||
private static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
|
||||
private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
|
||||
|
@ -274,6 +281,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
private final int maxRequestSize;
|
||||
private final int warnResponseTime;
|
||||
private final int warnResponseSize;
|
||||
|
||||
private final int minClientRequestTimeout;
|
||||
|
||||
private final Server server;
|
||||
private final List<BlockingServiceAndInterface> services;
|
||||
|
||||
|
@ -302,6 +312,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
protected Connection connection; // connection to client
|
||||
protected long timestamp; // the time received when response is null
|
||||
// the time served when response is not null
|
||||
protected int timeout;
|
||||
/**
|
||||
* Chain of buffers to send as response.
|
||||
*/
|
||||
|
@ -325,7 +336,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
justification="Can't figure why this complaint is happening... see below")
|
||||
Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
|
||||
Message param, CellScanner cellScanner, Connection connection, Responder responder,
|
||||
long size, TraceInfo tinfo, final InetAddress remoteAddress) {
|
||||
long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout) {
|
||||
this.id = id;
|
||||
this.service = service;
|
||||
this.md = md;
|
||||
|
@ -343,6 +354,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
this.remoteAddress = remoteAddress;
|
||||
this.retryImmediatelySupported =
|
||||
connection == null? null: connection.retryImmediatelySupported;
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1275,13 +1287,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
// Fake 'call' for failed authorization response
|
||||
private static final int AUTHORIZATION_FAILED_CALLID = -1;
|
||||
private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
|
||||
null, null, this, null, 0, null, null);
|
||||
null, null, this, null, 0, null, null, 0);
|
||||
private ByteArrayOutputStream authFailedResponse =
|
||||
new ByteArrayOutputStream();
|
||||
// Fake 'call' for SASL context setup
|
||||
private static final int SASL_CALLID = -33;
|
||||
private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
|
||||
0, null, null);
|
||||
0, null, null, 0);
|
||||
|
||||
// was authentication allowed with a fallback to simple auth
|
||||
private boolean authenticatedWithFallback;
|
||||
|
@ -1699,7 +1711,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
|
||||
private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
|
||||
LOG.warn(msg);
|
||||
Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null);
|
||||
Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null,0);
|
||||
setupResponse(null, fakeCall, e, msg);
|
||||
responder.doRespond(fakeCall);
|
||||
// Returning -1 closes out the connection.
|
||||
|
@ -1874,7 +1886,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
|
||||
final Call callTooBig =
|
||||
new Call(id, this.service, null, null, null, null, this,
|
||||
responder, totalRequestSize, null, null);
|
||||
responder, totalRequestSize, null, null, 0);
|
||||
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
|
||||
metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
|
||||
setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
|
||||
|
@ -1923,7 +1935,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
|
||||
final Call readParamsFailedCall =
|
||||
new Call(id, this.service, null, null, null, null, this,
|
||||
responder, totalRequestSize, null, null);
|
||||
responder, totalRequestSize, null, null, 0);
|
||||
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
|
||||
setupResponse(responseBuffer, readParamsFailedCall, t,
|
||||
msg + "; " + t.getMessage());
|
||||
|
@ -1934,8 +1946,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
TraceInfo traceInfo = header.hasTraceInfo()
|
||||
? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
|
||||
: null;
|
||||
int timeout = 0;
|
||||
if (header.hasTimeout()){
|
||||
timeout = Math.max(minClientRequestTimeout, header.getTimeout());
|
||||
}
|
||||
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
|
||||
totalRequestSize, traceInfo, this.addr);
|
||||
totalRequestSize, traceInfo, this.addr, timeout);
|
||||
|
||||
if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
|
||||
callQueueSize.add(-1 * call.getSize());
|
||||
|
@ -2087,7 +2103,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
||||
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
|
||||
this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
|
||||
|
||||
this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT,
|
||||
DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT);
|
||||
this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
|
||||
|
||||
// Start the listener here and let it bind to the port
|
||||
|
@ -2228,6 +2245,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
|
||||
}
|
||||
|
||||
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
|
||||
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
|
||||
throws IOException {
|
||||
return call(service, md, param, cellScanner, receiveTime, status, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is a server side method, which is invoked over RPC. On success
|
||||
* the return response has protobuf response payload. On failure, the
|
||||
|
@ -2235,7 +2258,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
*/
|
||||
@Override
|
||||
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
|
||||
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
|
||||
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
|
||||
int timeout)
|
||||
throws IOException {
|
||||
try {
|
||||
status.setRPC(md.getName(), new Object[]{param}, receiveTime);
|
||||
|
@ -2245,6 +2269,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
//get an instance of the method arg type
|
||||
long startTime = System.currentTimeMillis();
|
||||
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
|
||||
controller.setCallTimeout(timeout);
|
||||
Message result = service.callBlockingMethod(md, controller, param);
|
||||
long endTime = System.currentTimeMillis();
|
||||
int processingTime = (int) (endTime - startTime);
|
||||
|
|
|
@ -52,6 +52,11 @@ public interface RpcServerInterface {
|
|||
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
|
||||
throws IOException, ServiceException;
|
||||
|
||||
Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
|
||||
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
|
||||
int timeout)
|
||||
throws IOException, ServiceException;
|
||||
|
||||
void setErrorHandler(HBaseRPCErrorHandler handler);
|
||||
HBaseRPCErrorHandler getErrorHandler();
|
||||
|
||||
|
|
|
@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
|
|||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
||||
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
||||
import org.apache.hadoop.hbase.ipc.TimeLimitedRpcController;
|
||||
import org.apache.hadoop.hbase.master.MasterRpcServices;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||
|
@ -2698,6 +2699,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
timeLimitDelta =
|
||||
scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
|
||||
}
|
||||
if (controller instanceof TimeLimitedRpcController) {
|
||||
TimeLimitedRpcController timeLimitedRpcController =
|
||||
(TimeLimitedRpcController)controller;
|
||||
if (timeLimitedRpcController.getCallTimeout() > 0) {
|
||||
timeLimitDelta = Math.min(timeLimitDelta,
|
||||
timeLimitedRpcController.getCallTimeout());
|
||||
}
|
||||
}
|
||||
// Use half of whichever timeout value was more restrictive... But don't allow
|
||||
// the time limit to be less than the allowable minimum (could cause an
|
||||
// immediatate timeout before scanning any data).
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
@ -102,16 +103,16 @@ public class TestScannerHeartbeatMessages {
|
|||
private static int VALUE_SIZE = 128;
|
||||
private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
|
||||
|
||||
|
||||
private static int SERVER_TIMEOUT = 6000;
|
||||
|
||||
// Time, in milliseconds, that the client will wait for a response from the server before timing
|
||||
// out. This value is used server side to determine when it is necessary to send a heartbeat
|
||||
// message to the client
|
||||
private static int CLIENT_TIMEOUT = 2000;
|
||||
|
||||
// The server limits itself to running for half of the CLIENT_TIMEOUT value.
|
||||
private static int SERVER_TIME_LIMIT = CLIENT_TIMEOUT / 2;
|
||||
private static int CLIENT_TIMEOUT = SERVER_TIMEOUT / 3;
|
||||
|
||||
// By default, at most one row's worth of cells will be retrieved before the time limit is reached
|
||||
private static int DEFAULT_ROW_SLEEP_TIME = SERVER_TIME_LIMIT / 2;
|
||||
private static int DEFAULT_ROW_SLEEP_TIME = CLIENT_TIMEOUT / 5;
|
||||
// By default, at most cells for two column families are retrieved before the time limit is
|
||||
// reached
|
||||
private static int DEFAULT_CF_SLEEP_TIME = DEFAULT_ROW_SLEEP_TIME / NUM_FAMILIES;
|
||||
|
@ -124,8 +125,8 @@ public class TestScannerHeartbeatMessages {
|
|||
|
||||
conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName());
|
||||
conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName());
|
||||
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
|
||||
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT);
|
||||
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT);
|
||||
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT);
|
||||
conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1);
|
||||
|
||||
// Check the timeout condition after every cell
|
||||
|
@ -140,7 +141,7 @@ public class TestScannerHeartbeatMessages {
|
|||
Table ht = TEST_UTIL.createTable(name, families);
|
||||
List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
|
||||
ht.put(puts);
|
||||
|
||||
ht.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
|
||||
return ht;
|
||||
}
|
||||
|
||||
|
@ -282,7 +283,7 @@ public class TestScannerHeartbeatMessages {
|
|||
@Override
|
||||
public ReturnCode filterKeyValue(Cell v) throws IOException {
|
||||
try {
|
||||
Thread.sleep(SERVER_TIME_LIMIT + 10);
|
||||
Thread.sleep(CLIENT_TIMEOUT/2 + 10);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue