HBASE-16285 Drop RPC requests if it must be considered as timeout at client

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Phil Yang 2016-08-04 15:55:18 +08:00 committed by zhangduo
parent cebba7b4d8
commit 38044ada32
4 changed files with 65 additions and 11 deletions

View File

@ -93,6 +93,11 @@ public class CallRunner {
} }
return; return;
} }
call.startTime = System.currentTimeMillis();
if (call.startTime > call.deadline) {
RpcServer.LOG.info("Drop timeout call: " + call);
return;
}
this.status.setStatus("Setting up call"); this.status.setStatus("Setting up call");
this.status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort()); this.status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());
if (RpcServer.LOG.isTraceEnabled()) { if (RpcServer.LOG.isTraceEnabled()) {
@ -116,7 +121,7 @@ public class CallRunner {
} }
// make the call // make the call
resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner, resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
call.timestamp, this.status, call.timeout); call.timestamp, this.status, call.startTime, call.timeout);
} catch (Throwable e) { } catch (Throwable e) {
RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e); RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e);
errorThrowable = e; errorThrowable = e;

View File

@ -266,6 +266,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time"; private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size"; private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
/** /**
* Minimum allowable timeout (in milliseconds) in rpc request's header. This * Minimum allowable timeout (in milliseconds) in rpc request's header. This
* configuration exists to prevent the rpc service regarding this request as timeout immediately. * configuration exists to prevent the rpc service regarding this request as timeout immediately.
@ -315,6 +316,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
protected long timestamp; // the time received when response is null protected long timestamp; // the time received when response is null
// the time served when response is not null // the time served when response is not null
protected int timeout; protected int timeout;
protected long startTime;
protected long deadline;// the deadline to handle this call, if exceed we can drop it.
/** /**
* Chain of buffers to send as response. * Chain of buffers to send as response.
*/ */
@ -356,6 +360,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.retryImmediatelySupported = this.retryImmediatelySupported =
connection == null? null: connection.retryImmediatelySupported; connection == null? null: connection.retryImmediatelySupported;
this.timeout = timeout; this.timeout = timeout;
this.deadline = this.timeout > 0 ? this.timestamp + this.timeout : Long.MAX_VALUE;
} }
/** /**
@ -1933,7 +1938,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId()) ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
: null; : null;
int timeout = 0; int timeout = 0;
if (header.hasTimeout()){ if (header.hasTimeout() && header.getTimeout() > 0){
timeout = Math.max(minClientRequestTimeout, header.getTimeout()); timeout = Math.max(minClientRequestTimeout, header.getTimeout());
} }
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
@ -2239,7 +2244,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
throws IOException { throws IOException {
return call(service, md, param, cellScanner, receiveTime, status, 0); return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),0);
} }
/** /**
@ -2250,7 +2255,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
@Override @Override
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
int timeout) long startTime, int timeout)
throws IOException { throws IOException {
try { try {
status.setRPC(md.getName(), new Object[]{param}, receiveTime); status.setRPC(md.getName(), new Object[]{param}, receiveTime);
@ -2258,7 +2263,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
status.setRPCPacket(param); status.setRPCPacket(param);
status.resume("Servicing call"); status.resume("Servicing call");
//get an instance of the method arg type //get an instance of the method arg type
long startTime = System.currentTimeMillis();
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner); PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
controller.setCallTimeout(timeout); controller.setCallTimeout(timeout);
Message result = service.callBlockingMethod(md, controller, param); Message result = service.callBlockingMethod(md, controller, param);

View File

@ -48,14 +48,17 @@ public interface RpcServerInterface {
void setSocketSendBufSize(int size); void setSocketSendBufSize(int size);
InetSocketAddress getListenerAddress(); InetSocketAddress getListenerAddress();
/**
* @deprecated As of release 1.3, this will be removed in HBase 3.0
*/
@Deprecated
Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
throws IOException, ServiceException; throws IOException, ServiceException;
Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, Message param,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, long startTime,
int timeout) int timeout) throws IOException, ServiceException;
throws IOException, ServiceException;
void setErrorHandler(HBaseRPCErrorHandler handler); void setErrorHandler(HBaseRPCErrorHandler handler);
HBaseRPCErrorHandler getErrorHandler(); HBaseRPCErrorHandler getErrorHandler();

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import com.google.common.collect.Lists;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
@ -83,7 +84,12 @@ import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule; import org.junit.rules.TestRule;
import com.google.common.collect.Lists; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** /**
* This class is for testing HBaseConnectionManager features * This class is for testing HBaseConnectionManager features
@ -110,6 +116,7 @@ public class TestHCM {
private static final byte[] ROW = Bytes.toBytes("bbb"); private static final byte[] ROW = Bytes.toBytes("bbb");
private static final byte[] ROW_X = Bytes.toBytes("xxx"); private static final byte[] ROW_X = Bytes.toBytes("xxx");
private static Random _randy = new Random(); private static Random _randy = new Random();
private static final int RPC_RETRY = 5;
/** /**
* This copro sleeps 20 second. The first call it fails. The second time, it works. * This copro sleeps 20 second. The first call it fails. The second time, it works.
@ -160,12 +167,32 @@ public class TestHCM {
} }
} }
public static class SleepLongerAtFirstCoprocessor extends BaseRegionObserver {
public static final int SLEEP_TIME = 2000;
static final AtomicLong ct = new AtomicLong(0);
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
final Get get, final List<Cell> results) throws IOException {
// After first sleep, all requests are timeout except the last retry. If we handle
// all the following requests, finally the last request is also timeout. If we drop all
// timeout requests, we can handle the last request immediately and it will not timeout.
if (ct.incrementAndGet() <= 1) {
Threads.sleep(SLEEP_TIME * (RPC_RETRY-1) * 2);
} else {
Threads.sleep(SLEEP_TIME);
}
}
}
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
// Up the handlers; this test needs more than usual. // Up the handlers; this test needs more than usual.
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
// simulate queue blocking in testDropTimeoutRequest
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1);
TEST_UTIL.startMiniCluster(2); TEST_UTIL.startMiniCluster(2);
} }
@ -438,6 +465,21 @@ public class TestHCM {
} }
} }
@Test
public void testDropTimeoutRequest() throws Exception {
// Simulate the situation that the server is slow and client retries for several times because
// of timeout. When a request can be handled after waiting in the queue, we will drop it if
// it has been considered as timeout at client. If we don't drop it, the server will waste time
// on handling timeout requests and finally all requests timeout and client throws exception.
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDropTimeputRequest");
hdt.addCoprocessor(SleepLongerAtFirstCoprocessor.class.getName());
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
t.setRpcTimeout(SleepLongerAtFirstCoprocessor.SLEEP_TIME * 2);
t.get(new Get(FAM_NAM));
}
}
/** /**
* Test starting from 0 index when RpcRetryingCaller calculate the backoff time. * Test starting from 0 index when RpcRetryingCaller calculate the backoff time.
*/ */