From ede9940a7bd254d496b2ef493d4f35540184e96a Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Thu, 4 Aug 2016 15:38:38 +0800 Subject: [PATCH] HBASE-16285 Drop RPC requests if it must be considered as timeout at client Signed-off-by: zhangduo --- .../apache/hadoop/hbase/ipc/CallRunner.java | 7 ++- .../apache/hadoop/hbase/ipc/RpcServer.java | 16 +++---- .../hadoop/hbase/ipc/RpcServerInterface.java | 11 +++-- .../apache/hadoop/hbase/client/TestHCM.java | 44 ++++++++++++++++++- 4 files changed, 62 insertions(+), 16 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index e91699ad3e7..b2b3c66e1c4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -93,6 +93,11 @@ public class CallRunner { } return; } + call.startTime = System.currentTimeMillis(); + if (call.startTime > call.deadline) { + RpcServer.LOG.info("Dropping timed out call: " + call); + return; + } this.status.setStatus("Setting up call"); this.status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort()); if (RpcServer.LOG.isTraceEnabled()) { @@ -116,7 +121,7 @@ public class CallRunner { } // make the call resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner, - call.timestamp, this.status, call.timeout); + call.timestamp, this.status, call.startTime, call.timeout); } catch (Throwable e) { RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e); errorThrowable = e; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 73226aa4c76..5b2aab14e23 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -75,12 +75,9 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.conf.ConfigurationObserver; @@ -99,7 +96,6 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation; -import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.AuthMethod; import org.apache.hadoop.hbase.security.HBasePolicyProvider; @@ -312,6 +308,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { protected long timestamp; // the time received when response is null // the time served when response is not null protected int timeout; + protected long startTime; + protected long deadline;// the deadline to handle this call, if exceed we can drop it. + /** * Chain of buffers to send as response. */ @@ -354,6 +353,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.retryImmediatelySupported = connection == null? null: connection.retryImmediatelySupported; this.timeout = timeout; + this.deadline = this.timeout > 0 ? this.timestamp + this.timeout : Long.MAX_VALUE; } /** @@ -1894,7 +1894,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId()) : null; int timeout = 0; - if (header.hasTimeout()){ + if (header.hasTimeout() && header.getTimeout() > 0){ timeout = Math.max(minClientRequestTimeout, header.getTimeout()); } Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, @@ -2187,7 +2187,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { public Pair call(BlockingService service, MethodDescriptor md, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) throws IOException { - return call(service, md, param, cellScanner, receiveTime, status, 0); + return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),0); } /** @@ -2195,10 +2195,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { * the return response has protobuf response payload. On failure, the * exception name and the stack trace are returned in the protobuf response. */ - @Override public Pair call(BlockingService service, MethodDescriptor md, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, - int timeout) + long startTime, int timeout) throws IOException { try { status.setRPC(md.getName(), new Object[]{param}, receiveTime); @@ -2206,7 +2205,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { status.setRPCPacket(param); status.resume("Servicing call"); //get an instance of the method arg type - long startTime = System.currentTimeMillis(); PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner); controller.setCallTimeout(timeout); Message result = service.callBlockingMethod(md, controller, param); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java index dd7e584dee1..0388ea4205b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java @@ -48,14 +48,17 @@ public interface RpcServerInterface { void setSocketSendBufSize(int size); InetSocketAddress getListenerAddress(); + /** + * @deprecated As of release 1.3, this will be removed in HBase 3.0 + */ + @Deprecated Pair call(BlockingService service, MethodDescriptor md, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) throws IOException, ServiceException; - Pair call(BlockingService service, MethodDescriptor md, - Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, - int timeout) - throws IOException, ServiceException; + Pair call(BlockingService service, MethodDescriptor md, Message param, + CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, long startTime, + int timeout) throws IOException, ServiceException; void setErrorHandler(HBaseRPCErrorHandler handler); HBaseRPCErrorHandler getErrorHandler(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 4d47bdea4c5..bfd16a7b6ed 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.*; +import com.google.common.collect.Lists; import java.io.IOException; import java.lang.reflect.Field; @@ -78,7 +79,11 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; -import com.google.common.collect.Lists; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; /** * This class is for testing HBaseConnectionManager features @@ -105,6 +110,7 @@ public class TestHCM { private static final byte[] ROW = Bytes.toBytes("bbb"); private static final byte[] ROW_X = Bytes.toBytes("xxx"); private static Random _randy = new Random(); + private static final int RPC_RETRY = 5; /** * This copro sleeps 20 second. The first call it fails. The second time, it works. @@ -155,12 +161,31 @@ public class TestHCM { } } + public static class SleepLongerAtFirstCoprocessor extends BaseRegionObserver { + public static final int SLEEP_TIME = 2000; + static final AtomicLong ct = new AtomicLong(0); + @Override + public void preGetOp(final ObserverContext e, + final Get get, final List results) throws IOException { + // After first sleep, all requests are timeout except the last retry. If we handle + // all the following requests, finally the last request is also timeout. If we drop all + // timeout requests, we can handle the last request immediately and it will not timeout. + if (ct.incrementAndGet() <= 1) { + Threads.sleep(SLEEP_TIME * RPC_RETRY * 2); + } else { + Threads.sleep(SLEEP_TIME); + } + } + } + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); // Up the handlers; this test needs more than usual. TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); - TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); + // simulate queue blocking in testDropTimeoutRequest + TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1); TEST_UTIL.startMiniCluster(2); } @@ -442,6 +467,21 @@ public class TestHCM { } } + @Test + public void testDropTimeoutRequest() throws Exception { + // Simulate the situation that the server is slow and client retries for several times because + // of timeout. When a request can be handled after waiting in the queue, we will drop it if + // it has been considered as timeout at client. If we don't drop it, the server will waste time + // on handling timeout requests and finally all requests timeout and client throws exception. + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDropTimeputRequest"); + hdt.addCoprocessor(SleepLongerAtFirstCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + t.setRpcTimeout(SleepLongerAtFirstCoprocessor.SLEEP_TIME * 2); + t.get(new Get(FAM_NAM)); + } + } + /** * Test starting from 0 index when RpcRetryingCaller calculate the backoff time. */