diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java index d05eb579fcc..2d4a430e2ad 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java @@ -3895,16 +3895,6 @@ public final class RPCProtos { * */ int getPriority(); - - // optional uint32 timeout = 7; - /** - * optional uint32 timeout = 7; - */ - boolean hasTimeout(); - /** - * optional uint32 timeout = 7; - */ - int getTimeout(); } /** * Protobuf type {@code hbase.pb.RequestHeader} @@ -4007,11 +3997,6 @@ public final class RPCProtos { priority_ = input.readUInt32(); break; } - case 56: { - bitField0_ |= 0x00000040; - timeout_ = input.readUInt32(); - break; - } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -4225,22 +4210,6 @@ public final class RPCProtos { return priority_; } - // optional uint32 timeout = 7; - public static final int TIMEOUT_FIELD_NUMBER = 7; - private int timeout_; - /** - * optional uint32 timeout = 7; - */ - public boolean hasTimeout() { - return ((bitField0_ & 0x00000040) == 0x00000040); - } - /** - * optional uint32 timeout = 7; - */ - public int getTimeout() { - return timeout_; - } - private void initFields() { callId_ = 0; traceInfo_ = org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo.getDefaultInstance(); @@ -4248,7 +4217,6 @@ public final class RPCProtos { requestParam_ = false; cellBlockMeta_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta.getDefaultInstance(); priority_ = 0; - timeout_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -4280,9 +4248,6 @@ public final class RPCProtos { if (((bitField0_ & 0x00000020) == 0x00000020)) { output.writeUInt32(6, priority_); } - if (((bitField0_ & 0x00000040) == 0x00000040)) { - output.writeUInt32(7, timeout_); - } getUnknownFields().writeTo(output); } @@ -4316,10 +4281,6 @@ public final class RPCProtos { size += com.google.protobuf.CodedOutputStream .computeUInt32Size(6, priority_); } - if (((bitField0_ & 0x00000040) == 0x00000040)) { - size += com.google.protobuf.CodedOutputStream - .computeUInt32Size(7, timeout_); - } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -4373,11 +4334,6 @@ public final class RPCProtos { result = result && (getPriority() == other.getPriority()); } - result = result && (hasTimeout() == other.hasTimeout()); - if (hasTimeout()) { - result = result && (getTimeout() - == other.getTimeout()); - } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -4415,10 +4371,6 @@ public final class RPCProtos { hash = (37 * hash) + PRIORITY_FIELD_NUMBER; hash = (53 * hash) + getPriority(); } - if (hasTimeout()) { - hash = (37 * hash) + TIMEOUT_FIELD_NUMBER; - hash = (53 * hash) + getTimeout(); - } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -4554,8 +4506,6 @@ public final class RPCProtos { bitField0_ = (bitField0_ & ~0x00000010); priority_ = 0; bitField0_ = (bitField0_ & ~0x00000020); - timeout_ = 0; - bitField0_ = (bitField0_ & ~0x00000040); return this; } @@ -4616,10 +4566,6 @@ public final class RPCProtos { to_bitField0_ |= 0x00000020; } result.priority_ = priority_; - if (((from_bitField0_ & 0x00000040) == 0x00000040)) { - to_bitField0_ |= 0x00000040; - } - result.timeout_ = timeout_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -4656,9 +4602,6 @@ public final class RPCProtos { if (other.hasPriority()) { setPriority(other.getPriority()); } - if (other.hasTimeout()) { - setTimeout(other.getTimeout()); - } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -5181,39 +5124,6 @@ public final class RPCProtos { return this; } - // optional uint32 timeout = 7; - private int timeout_ ; - /** - * optional uint32 timeout = 7; - */ - public boolean hasTimeout() { - return ((bitField0_ & 0x00000040) == 0x00000040); - } - /** - * optional uint32 timeout = 7; - */ - public int getTimeout() { - return timeout_; - } - /** - * optional uint32 timeout = 7; - */ - public Builder setTimeout(int value) { - bitField0_ |= 0x00000040; - timeout_ = value; - onChanged(); - return this; - } - /** - * optional uint32 timeout = 7; - */ - public Builder clearTimeout() { - bitField0_ = (bitField0_ & ~0x00000040); - timeout_ = 0; - onChanged(); - return this; - } - // @@protoc_insertion_point(builder_scope:hbase.pb.RequestHeader) } @@ -6231,17 +6141,17 @@ public final class RPCProtos { "llBlockMeta\022\016\n\006length\030\001 \001(\r\"|\n\021Exception" + "Response\022\034\n\024exception_class_name\030\001 \001(\t\022\023", "\n\013stack_trace\030\002 \001(\t\022\020\n\010hostname\030\003 \001(\t\022\014\n" + - "\004port\030\004 \001(\005\022\024\n\014do_not_retry\030\005 \001(\010\"\311\001\n\rRe" + + "\004port\030\004 \001(\005\022\024\n\014do_not_retry\030\005 \001(\010\"\270\001\n\rRe" + "questHeader\022\017\n\007call_id\030\001 \001(\r\022&\n\ntrace_in" + "fo\030\002 \001(\0132\022.hbase.pb.RPCTInfo\022\023\n\013method_n" + "ame\030\003 \001(\t\022\025\n\rrequest_param\030\004 \001(\010\0220\n\017cell" + "_block_meta\030\005 \001(\0132\027.hbase.pb.CellBlockMe" + - "ta\022\020\n\010priority\030\006 \001(\r\022\017\n\007timeout\030\007 \001(\r\"\203\001" + - "\n\016ResponseHeader\022\017\n\007call_id\030\001 \001(\r\022.\n\texc" + - "eption\030\002 \001(\0132\033.hbase.pb.ExceptionRespons" + - "e\0220\n\017cell_block_meta\030\003 \001(\0132\027.hbase.pb.Ce", - "llBlockMetaB<\n*org.apache.hadoop.hbase.p" + - "rotobuf.generatedB\tRPCProtosH\001\240\001\001" + "ta\022\020\n\010priority\030\006 \001(\r\"\203\001\n\016ResponseHeader\022" + + "\017\n\007call_id\030\001 \001(\r\022.\n\texception\030\002 \001(\0132\033.hb" + + "ase.pb.ExceptionResponse\0220\n\017cell_block_m" + + "eta\030\003 \001(\0132\027.hbase.pb.CellBlockMetaB<\n*or", + "g.apache.hadoop.hbase.protobuf.generated" + + "B\tRPCProtosH\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -6277,7 +6187,7 @@ public final class RPCProtos { internal_static_hbase_pb_RequestHeader_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_RequestHeader_descriptor, - new java.lang.String[] { "CallId", "TraceInfo", "MethodName", "RequestParam", "CellBlockMeta", "Priority", "Timeout", }); + new java.lang.String[] { "CallId", "TraceInfo", "MethodName", "RequestParam", "CellBlockMeta", "Priority", }); internal_static_hbase_pb_ResponseHeader_descriptor = getDescriptor().getMessageTypes().get(5); internal_static_hbase_pb_ResponseHeader_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/RPC.proto b/hbase-protocol/src/main/protobuf/RPC.proto index 8413d2590f3..59bb03d9e2e 100644 --- a/hbase-protocol/src/main/protobuf/RPC.proto +++ b/hbase-protocol/src/main/protobuf/RPC.proto @@ -125,7 +125,6 @@ message RequestHeader { // 0 is NORMAL priority. 200 is HIGH. If no priority, treat it as NORMAL. // See HConstants. optional uint32 priority = 6; - optional uint32 timeout = 7; } message ResponseHeader { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 00e08c994b6..3514245c0f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -114,7 +114,7 @@ public class CallRunner { } // make the call resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner, - call.timestamp, this.status, call.timeout); + call.timestamp, this.status); } catch (Throwable e) { RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e); errorThrowable = e; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 483ce86ef51..b9a9b2614fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -264,13 +264,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time"; private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size"; - /** - * Minimum allowable timeout (in milliseconds) in rpc request's header. This - * configuration exists to prevent the rpc service regarding this request as timeout immediately. - */ - private static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout"; - private static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20; - /** Default value for above params */ private static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds @@ -281,9 +274,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private final int maxRequestSize; private final int warnResponseTime; private final int warnResponseSize; - - private final int minClientRequestTimeout; - private final Server server; private final List services; @@ -312,7 +302,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { protected Connection connection; // connection to client protected long timestamp; // the time received when response is null // the time served when response is not null - protected int timeout; /** * Chain of buffers to send as response. */ @@ -336,7 +325,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { justification="Can't figure why this complaint is happening... see below") Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, Connection connection, Responder responder, - long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout) { + long size, TraceInfo tinfo, final InetAddress remoteAddress) { this.id = id; this.service = service; this.md = md; @@ -354,7 +343,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.remoteAddress = remoteAddress; this.retryImmediatelySupported = connection == null? null: connection.retryImmediatelySupported; - this.timeout = timeout; } /** @@ -1287,13 +1275,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // Fake 'call' for failed authorization response private static final int AUTHORIZATION_FAILED_CALLID = -1; private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null, - null, null, this, null, 0, null, null, 0); + null, null, this, null, 0, null, null); private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream(); // Fake 'call' for SASL context setup private static final int SASL_CALLID = -33; private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null, - 0, null, null, 0); + 0, null, null); // was authentication allowed with a fallback to simple auth private boolean authenticatedWithFallback; @@ -1711,7 +1699,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private int doBadPreambleHandling(final String msg, final Exception e) throws IOException { LOG.warn(msg); - Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null,0); + Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null); setupResponse(null, fakeCall, e, msg); responder.doRespond(fakeCall); // Returning -1 closes out the connection. @@ -1886,7 +1874,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) { final Call callTooBig = new Call(id, this.service, null, null, null, null, this, - responder, totalRequestSize, null, null, 0); + responder, totalRequestSize, null, null); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION); setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION, @@ -1935,7 +1923,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { final Call readParamsFailedCall = new Call(id, this.service, null, null, null, null, this, - responder, totalRequestSize, null, null, 0); + responder, totalRequestSize, null, null); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); setupResponse(responseBuffer, readParamsFailedCall, t, msg + "; " + t.getMessage()); @@ -1946,12 +1934,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { TraceInfo traceInfo = header.hasTraceInfo() ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId()) : null; - int timeout = 0; - if (header.hasTimeout()){ - timeout = Math.max(minClientRequestTimeout, header.getTimeout()); - } Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, - totalRequestSize, traceInfo, this.addr, timeout); + totalRequestSize, traceInfo, this.addr); if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) { callQueueSize.add(-1 * call.getSize()); @@ -2103,8 +2087,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT); this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME); this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE); - this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT, - DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT); + this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE); // Start the listener here and let it bind to the port @@ -2245,12 +2228,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.secretManager = (SecretManager) secretManager; } - public Pair call(BlockingService service, MethodDescriptor md, - Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) - throws IOException { - return call(service, md, param, cellScanner, receiveTime, status, 0); - } - /** * This is a server side method, which is invoked over RPC. On success * the return response has protobuf response payload. On failure, the @@ -2258,8 +2235,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { */ @Override public Pair call(BlockingService service, MethodDescriptor md, - Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, - int timeout) + Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) throws IOException { try { status.setRPC(md.getName(), new Object[]{param}, receiveTime); @@ -2269,7 +2245,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { //get an instance of the method arg type long startTime = System.currentTimeMillis(); PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner); - controller.setCallTimeout(timeout); Message result = service.callBlockingMethod(md, controller, param); long endTime = System.currentTimeMillis(); int processingTime = (int) (endTime - startTime); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java index dd7e584dee1..ab8b48586d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java @@ -52,11 +52,6 @@ public interface RpcServerInterface { Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) throws IOException, ServiceException; - Pair call(BlockingService service, MethodDescriptor md, - Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, - int timeout) - throws IOException, ServiceException; - void setErrorHandler(HBaseRPCErrorHandler handler); HBaseRPCErrorHandler getErrorHandler(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index ebd85bda3c1..95ab36dfec0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -88,7 +88,6 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; -import org.apache.hadoop.hbase.ipc.TimeLimitedRpcController; import org.apache.hadoop.hbase.master.MasterRpcServices; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -2699,14 +2698,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; } - if (controller instanceof TimeLimitedRpcController) { - TimeLimitedRpcController timeLimitedRpcController = - (TimeLimitedRpcController)controller; - if (timeLimitedRpcController.getCallTimeout() > 0) { - timeLimitDelta = Math.min(timeLimitDelta, - timeLimitedRpcController.getCallTimeout()); - } - } // Use half of whichever timeout value was more restrictive... But don't allow // the time limit to be less than the allowable minimum (could cause an // immediatate timeout before scanning any data). diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java index a958ee09d73..b8eacc70378 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -103,16 +102,16 @@ public class TestScannerHeartbeatMessages { private static int VALUE_SIZE = 128; private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); - - private static int SERVER_TIMEOUT = 6000; - // Time, in milliseconds, that the client will wait for a response from the server before timing // out. This value is used server side to determine when it is necessary to send a heartbeat // message to the client - private static int CLIENT_TIMEOUT = SERVER_TIMEOUT / 3; + private static int CLIENT_TIMEOUT = 2000; + + // The server limits itself to running for half of the CLIENT_TIMEOUT value. + private static int SERVER_TIME_LIMIT = CLIENT_TIMEOUT / 2; // By default, at most one row's worth of cells will be retrieved before the time limit is reached - private static int DEFAULT_ROW_SLEEP_TIME = CLIENT_TIMEOUT / 5; + private static int DEFAULT_ROW_SLEEP_TIME = SERVER_TIME_LIMIT / 2; // By default, at most cells for two column families are retrieved before the time limit is // reached private static int DEFAULT_CF_SLEEP_TIME = DEFAULT_ROW_SLEEP_TIME / NUM_FAMILIES; @@ -125,8 +124,8 @@ public class TestScannerHeartbeatMessages { conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName()); conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName()); - conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT); - conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT); + conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT); conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // Check the timeout condition after every cell @@ -141,7 +140,7 @@ public class TestScannerHeartbeatMessages { Table ht = TEST_UTIL.createTable(name, families); List puts = createPuts(rows, families, qualifiers, cellValue); ht.put(puts); - ht.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); + return ht; } @@ -283,7 +282,7 @@ public class TestScannerHeartbeatMessages { @Override public ReturnCode filterKeyValue(Cell v) throws IOException { try { - Thread.sleep(CLIENT_TIMEOUT/2 + 10); + Thread.sleep(SERVER_TIME_LIMIT + 10); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }