HBASE-15593 Time limit of scanning should be offered by client (Phil Yang)

This commit is contained in:
stack 2016-05-17 12:49:31 -07:00
parent 9269b8199e
commit e47bfb9078
7 changed files with 158 additions and 27 deletions

View File

@ -3895,6 +3895,16 @@ public final class RPCProtos {
* </pre>
*/
int getPriority();
// optional uint32 timeout = 7;
/**
* <code>optional uint32 timeout = 7;</code>
*/
boolean hasTimeout();
/**
* <code>optional uint32 timeout = 7;</code>
*/
int getTimeout();
}
/**
* Protobuf type {@code hbase.pb.RequestHeader}
@ -3997,6 +4007,11 @@ public final class RPCProtos {
priority_ = input.readUInt32();
break;
}
case 56: {
bitField0_ |= 0x00000040;
timeout_ = input.readUInt32();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -4210,6 +4225,22 @@ public final class RPCProtos {
return priority_;
}
// optional uint32 timeout = 7;
public static final int TIMEOUT_FIELD_NUMBER = 7;
private int timeout_;
/**
* <code>optional uint32 timeout = 7;</code>
*/
public boolean hasTimeout() {
return ((bitField0_ & 0x00000040) == 0x00000040);
}
/**
* <code>optional uint32 timeout = 7;</code>
*/
public int getTimeout() {
return timeout_;
}
private void initFields() {
callId_ = 0;
traceInfo_ = org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo.getDefaultInstance();
@ -4217,6 +4248,7 @@ public final class RPCProtos {
requestParam_ = false;
cellBlockMeta_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta.getDefaultInstance();
priority_ = 0;
timeout_ = 0;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -4248,6 +4280,9 @@ public final class RPCProtos {
if (((bitField0_ & 0x00000020) == 0x00000020)) {
output.writeUInt32(6, priority_);
}
if (((bitField0_ & 0x00000040) == 0x00000040)) {
output.writeUInt32(7, timeout_);
}
getUnknownFields().writeTo(output);
}
@ -4281,6 +4316,10 @@ public final class RPCProtos {
size += com.google.protobuf.CodedOutputStream
.computeUInt32Size(6, priority_);
}
if (((bitField0_ & 0x00000040) == 0x00000040)) {
size += com.google.protobuf.CodedOutputStream
.computeUInt32Size(7, timeout_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -4334,6 +4373,11 @@ public final class RPCProtos {
result = result && (getPriority()
== other.getPriority());
}
result = result && (hasTimeout() == other.hasTimeout());
if (hasTimeout()) {
result = result && (getTimeout()
== other.getTimeout());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -4371,6 +4415,10 @@ public final class RPCProtos {
hash = (37 * hash) + PRIORITY_FIELD_NUMBER;
hash = (53 * hash) + getPriority();
}
if (hasTimeout()) {
hash = (37 * hash) + TIMEOUT_FIELD_NUMBER;
hash = (53 * hash) + getTimeout();
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -4506,6 +4554,8 @@ public final class RPCProtos {
bitField0_ = (bitField0_ & ~0x00000010);
priority_ = 0;
bitField0_ = (bitField0_ & ~0x00000020);
timeout_ = 0;
bitField0_ = (bitField0_ & ~0x00000040);
return this;
}
@ -4566,6 +4616,10 @@ public final class RPCProtos {
to_bitField0_ |= 0x00000020;
}
result.priority_ = priority_;
if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
to_bitField0_ |= 0x00000040;
}
result.timeout_ = timeout_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -4602,6 +4656,9 @@ public final class RPCProtos {
if (other.hasPriority()) {
setPriority(other.getPriority());
}
if (other.hasTimeout()) {
setTimeout(other.getTimeout());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -5124,6 +5181,39 @@ public final class RPCProtos {
return this;
}
// optional uint32 timeout = 7;
private int timeout_ ;
/**
* <code>optional uint32 timeout = 7;</code>
*/
public boolean hasTimeout() {
return ((bitField0_ & 0x00000040) == 0x00000040);
}
/**
* <code>optional uint32 timeout = 7;</code>
*/
public int getTimeout() {
return timeout_;
}
/**
* <code>optional uint32 timeout = 7;</code>
*/
public Builder setTimeout(int value) {
bitField0_ |= 0x00000040;
timeout_ = value;
onChanged();
return this;
}
/**
* <code>optional uint32 timeout = 7;</code>
*/
public Builder clearTimeout() {
bitField0_ = (bitField0_ & ~0x00000040);
timeout_ = 0;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:hbase.pb.RequestHeader)
}
@ -6141,17 +6231,17 @@ public final class RPCProtos {
"llBlockMeta\022\016\n\006length\030\001 \001(\r\"|\n\021Exception" +
"Response\022\034\n\024exception_class_name\030\001 \001(\t\022\023",
"\n\013stack_trace\030\002 \001(\t\022\020\n\010hostname\030\003 \001(\t\022\014\n" +
"\004port\030\004 \001(\005\022\024\n\014do_not_retry\030\005 \001(\010\"\270\001\n\rRe" +
"\004port\030\004 \001(\005\022\024\n\014do_not_retry\030\005 \001(\010\"\311\001\n\rRe" +
"questHeader\022\017\n\007call_id\030\001 \001(\r\022&\n\ntrace_in" +
"fo\030\002 \001(\0132\022.hbase.pb.RPCTInfo\022\023\n\013method_n" +
"ame\030\003 \001(\t\022\025\n\rrequest_param\030\004 \001(\010\0220\n\017cell" +
"_block_meta\030\005 \001(\0132\027.hbase.pb.CellBlockMe" +
"ta\022\020\n\010priority\030\006 \001(\r\"\203\001\n\016ResponseHeader\022" +
"\017\n\007call_id\030\001 \001(\r\022.\n\texception\030\002 \001(\0132\033.hb" +
"ase.pb.ExceptionResponse\0220\n\017cell_block_m" +
"eta\030\003 \001(\0132\027.hbase.pb.CellBlockMetaB<\n*or",
"g.apache.hadoop.hbase.protobuf.generated" +
"B\tRPCProtosH\001\240\001\001"
"ta\022\020\n\010priority\030\006 \001(\r\022\017\n\007timeout\030\007 \001(\r\"\203\001" +
"\n\016ResponseHeader\022\017\n\007call_id\030\001 \001(\r\022.\n\texc" +
"eption\030\002 \001(\0132\033.hbase.pb.ExceptionRespons" +
"e\0220\n\017cell_block_meta\030\003 \001(\0132\027.hbase.pb.Ce",
"llBlockMetaB<\n*org.apache.hadoop.hbase.p" +
"rotobuf.generatedB\tRPCProtosH\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -6187,7 +6277,7 @@ public final class RPCProtos {
internal_static_hbase_pb_RequestHeader_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_hbase_pb_RequestHeader_descriptor,
new java.lang.String[] { "CallId", "TraceInfo", "MethodName", "RequestParam", "CellBlockMeta", "Priority", });
new java.lang.String[] { "CallId", "TraceInfo", "MethodName", "RequestParam", "CellBlockMeta", "Priority", "Timeout", });
internal_static_hbase_pb_ResponseHeader_descriptor =
getDescriptor().getMessageTypes().get(5);
internal_static_hbase_pb_ResponseHeader_fieldAccessorTable = new

View File

@ -125,6 +125,7 @@ message RequestHeader {
// 0 is NORMAL priority. 200 is HIGH. If no priority, treat it as NORMAL.
// See HConstants.
optional uint32 priority = 6;
optional uint32 timeout = 7;
}
message ResponseHeader {

View File

@ -114,7 +114,7 @@ public class CallRunner {
}
// make the call
resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
call.timestamp, this.status);
call.timestamp, this.status, call.timeout);
} catch (Throwable e) {
RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e);
errorThrowable = e;

View File

@ -264,6 +264,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
/**
* Minimum allowable timeout (in milliseconds) in rpc request's header. This
* configuration exists to prevent the rpc service regarding this request as timeout immediately.
*/
private static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout";
private static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
/** Default value for above params */
private static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
@ -274,6 +281,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private final int maxRequestSize;
private final int warnResponseTime;
private final int warnResponseSize;
private final int minClientRequestTimeout;
private final Server server;
private final List<BlockingServiceAndInterface> services;
@ -302,6 +312,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
protected Connection connection; // connection to client
protected long timestamp; // the time received when response is null
// the time served when response is not null
protected int timeout;
/**
* Chain of buffers to send as response.
*/
@ -325,7 +336,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
justification="Can't figure why this complaint is happening... see below")
Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, Connection connection, Responder responder,
long size, TraceInfo tinfo, final InetAddress remoteAddress) {
long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout) {
this.id = id;
this.service = service;
this.md = md;
@ -343,6 +354,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.remoteAddress = remoteAddress;
this.retryImmediatelySupported =
connection == null? null: connection.retryImmediatelySupported;
this.timeout = timeout;
}
/**
@ -1275,13 +1287,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// Fake 'call' for failed authorization response
private static final int AUTHORIZATION_FAILED_CALLID = -1;
private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
null, null, this, null, 0, null, null);
null, null, this, null, 0, null, null, 0);
private ByteArrayOutputStream authFailedResponse =
new ByteArrayOutputStream();
// Fake 'call' for SASL context setup
private static final int SASL_CALLID = -33;
private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
0, null, null);
0, null, null, 0);
// was authentication allowed with a fallback to simple auth
private boolean authenticatedWithFallback;
@ -1699,7 +1711,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
LOG.warn(msg);
Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null);
Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null,0);
setupResponse(null, fakeCall, e, msg);
responder.doRespond(fakeCall);
// Returning -1 closes out the connection.
@ -1874,7 +1886,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
final Call callTooBig =
new Call(id, this.service, null, null, null, null, this,
responder, totalRequestSize, null, null);
responder, totalRequestSize, null, null, 0);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
@ -1923,7 +1935,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
final Call readParamsFailedCall =
new Call(id, this.service, null, null, null, null, this,
responder, totalRequestSize, null, null);
responder, totalRequestSize, null, null, 0);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
setupResponse(responseBuffer, readParamsFailedCall, t,
msg + "; " + t.getMessage());
@ -1934,8 +1946,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
TraceInfo traceInfo = header.hasTraceInfo()
? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
: null;
int timeout = 0;
if (header.hasTimeout()){
timeout = Math.max(minClientRequestTimeout, header.getTimeout());
}
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
totalRequestSize, traceInfo, this.addr);
totalRequestSize, traceInfo, this.addr, timeout);
if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
callQueueSize.add(-1 * call.getSize());
@ -2087,7 +2103,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT,
DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT);
this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
// Start the listener here and let it bind to the port
@ -2228,6 +2245,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
}
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
throws IOException {
return call(service, md, param, cellScanner, receiveTime, status, 0);
}
/**
* This is a server side method, which is invoked over RPC. On success
* the return response has protobuf response payload. On failure, the
@ -2235,7 +2258,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
*/
@Override
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
int timeout)
throws IOException {
try {
status.setRPC(md.getName(), new Object[]{param}, receiveTime);
@ -2245,6 +2269,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
//get an instance of the method arg type
long startTime = System.currentTimeMillis();
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
controller.setCallTimeout(timeout);
Message result = service.callBlockingMethod(md, controller, param);
long endTime = System.currentTimeMillis();
int processingTime = (int) (endTime - startTime);

View File

@ -52,6 +52,11 @@ public interface RpcServerInterface {
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
throws IOException, ServiceException;
Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
int timeout)
throws IOException, ServiceException;
void setErrorHandler(HBaseRPCErrorHandler handler);
HBaseRPCErrorHandler getErrorHandler();

View File

@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.ipc.TimeLimitedRpcController;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@ -2698,6 +2699,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
timeLimitDelta =
scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
}
if (controller instanceof TimeLimitedRpcController) {
TimeLimitedRpcController timeLimitedRpcController =
(TimeLimitedRpcController)controller;
if (timeLimitedRpcController.getCallTimeout() > 0) {
timeLimitDelta = Math.min(timeLimitDelta,
timeLimitedRpcController.getCallTimeout());
}
}
// Use half of whichever timeout value was more restrictive... But don't allow
// the time limit to be less than the allowable minimum (could cause an
// immediatate timeout before scanning any data).

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -102,16 +103,16 @@ public class TestScannerHeartbeatMessages {
private static int VALUE_SIZE = 128;
private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
private static int SERVER_TIMEOUT = 6000;
// Time, in milliseconds, that the client will wait for a response from the server before timing
// out. This value is used server side to determine when it is necessary to send a heartbeat
// message to the client
private static int CLIENT_TIMEOUT = 2000;
// The server limits itself to running for half of the CLIENT_TIMEOUT value.
private static int SERVER_TIME_LIMIT = CLIENT_TIMEOUT / 2;
private static int CLIENT_TIMEOUT = SERVER_TIMEOUT / 3;
// By default, at most one row's worth of cells will be retrieved before the time limit is reached
private static int DEFAULT_ROW_SLEEP_TIME = SERVER_TIME_LIMIT / 2;
private static int DEFAULT_ROW_SLEEP_TIME = CLIENT_TIMEOUT / 5;
// By default, at most cells for two column families are retrieved before the time limit is
// reached
private static int DEFAULT_CF_SLEEP_TIME = DEFAULT_ROW_SLEEP_TIME / NUM_FAMILIES;
@ -124,8 +125,8 @@ public class TestScannerHeartbeatMessages {
conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName());
conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName());
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT);
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT);
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT);
conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1);
// Check the timeout condition after every cell
@ -140,7 +141,7 @@ public class TestScannerHeartbeatMessages {
Table ht = TEST_UTIL.createTable(name, families);
List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
ht.put(puts);
ht.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
return ht;
}
@ -282,7 +283,7 @@ public class TestScannerHeartbeatMessages {
@Override
public ReturnCode filterKeyValue(Cell v) throws IOException {
try {
Thread.sleep(SERVER_TIME_LIMIT + 10);
Thread.sleep(CLIENT_TIMEOUT/2 + 10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}