HBASE-16285 Drop RPC requests if it must be considered as timeout at client
Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
parent
2d203e6053
commit
ede9940a7b
|
@ -93,6 +93,11 @@ public class CallRunner {
|
|||
}
|
||||
return;
|
||||
}
|
||||
call.startTime = System.currentTimeMillis();
|
||||
if (call.startTime > call.deadline) {
|
||||
RpcServer.LOG.info("Dropping timed out call: " + call);
|
||||
return;
|
||||
}
|
||||
this.status.setStatus("Setting up call");
|
||||
this.status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());
|
||||
if (RpcServer.LOG.isTraceEnabled()) {
|
||||
|
@ -116,7 +121,7 @@ public class CallRunner {
|
|||
}
|
||||
// make the call
|
||||
resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
|
||||
call.timestamp, this.status, call.timeout);
|
||||
call.timestamp, this.status, call.startTime, call.timeout);
|
||||
} catch (Throwable e) {
|
||||
RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e);
|
||||
errorThrowable = e;
|
||||
|
|
|
@ -75,12 +75,9 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
|
|||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.client.Operation;
|
||||
import org.apache.hadoop.hbase.client.VersionInfoUtil;
|
||||
import org.apache.hadoop.hbase.codec.Codec;
|
||||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||
|
@ -99,7 +96,6 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.security.AccessDeniedException;
|
||||
import org.apache.hadoop.hbase.security.AuthMethod;
|
||||
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
|
||||
|
@ -312,6 +308,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
protected long timestamp; // the time received when response is null
|
||||
// the time served when response is not null
|
||||
protected int timeout;
|
||||
protected long startTime;
|
||||
protected long deadline;// the deadline to handle this call, if exceed we can drop it.
|
||||
|
||||
/**
|
||||
* Chain of buffers to send as response.
|
||||
*/
|
||||
|
@ -354,6 +353,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
this.retryImmediatelySupported =
|
||||
connection == null? null: connection.retryImmediatelySupported;
|
||||
this.timeout = timeout;
|
||||
this.deadline = this.timeout > 0 ? this.timestamp + this.timeout : Long.MAX_VALUE;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1894,7 +1894,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
|
||||
: null;
|
||||
int timeout = 0;
|
||||
if (header.hasTimeout()){
|
||||
if (header.hasTimeout() && header.getTimeout() > 0){
|
||||
timeout = Math.max(minClientRequestTimeout, header.getTimeout());
|
||||
}
|
||||
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
|
||||
|
@ -2187,7 +2187,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
|
||||
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
|
||||
throws IOException {
|
||||
return call(service, md, param, cellScanner, receiveTime, status, 0);
|
||||
return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2195,10 +2195,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
* the return response has protobuf response payload. On failure, the
|
||||
* exception name and the stack trace are returned in the protobuf response.
|
||||
*/
|
||||
@Override
|
||||
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
|
||||
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
|
||||
int timeout)
|
||||
long startTime, int timeout)
|
||||
throws IOException {
|
||||
try {
|
||||
status.setRPC(md.getName(), new Object[]{param}, receiveTime);
|
||||
|
@ -2206,7 +2205,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
status.setRPCPacket(param);
|
||||
status.resume("Servicing call");
|
||||
//get an instance of the method arg type
|
||||
long startTime = System.currentTimeMillis();
|
||||
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
|
||||
controller.setCallTimeout(timeout);
|
||||
Message result = service.callBlockingMethod(md, controller, param);
|
||||
|
|
|
@ -48,14 +48,17 @@ public interface RpcServerInterface {
|
|||
void setSocketSendBufSize(int size);
|
||||
InetSocketAddress getListenerAddress();
|
||||
|
||||
/**
|
||||
* @deprecated As of release 1.3, this will be removed in HBase 3.0
|
||||
*/
|
||||
@Deprecated
|
||||
Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
|
||||
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
|
||||
throws IOException, ServiceException;
|
||||
|
||||
Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
|
||||
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
|
||||
int timeout)
|
||||
throws IOException, ServiceException;
|
||||
Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, Message param,
|
||||
CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, long startTime,
|
||||
int timeout) throws IOException, ServiceException;
|
||||
|
||||
void setErrorHandler(HBaseRPCErrorHandler handler);
|
||||
HBaseRPCErrorHandler getErrorHandler();
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
|
@ -78,7 +79,11 @@ import org.junit.Test;
|
|||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestRule;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* This class is for testing HBaseConnectionManager features
|
||||
|
@ -105,6 +110,7 @@ public class TestHCM {
|
|||
private static final byte[] ROW = Bytes.toBytes("bbb");
|
||||
private static final byte[] ROW_X = Bytes.toBytes("xxx");
|
||||
private static Random _randy = new Random();
|
||||
private static final int RPC_RETRY = 5;
|
||||
|
||||
/**
|
||||
* This copro sleeps 20 second. The first call it fails. The second time, it works.
|
||||
|
@ -155,12 +161,31 @@ public class TestHCM {
|
|||
}
|
||||
}
|
||||
|
||||
public static class SleepLongerAtFirstCoprocessor extends BaseRegionObserver {
|
||||
public static final int SLEEP_TIME = 2000;
|
||||
static final AtomicLong ct = new AtomicLong(0);
|
||||
@Override
|
||||
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
final Get get, final List<Cell> results) throws IOException {
|
||||
// After first sleep, all requests are timeout except the last retry. If we handle
|
||||
// all the following requests, finally the last request is also timeout. If we drop all
|
||||
// timeout requests, we can handle the last request immediately and it will not timeout.
|
||||
if (ct.incrementAndGet() <= 1) {
|
||||
Threads.sleep(SLEEP_TIME * RPC_RETRY * 2);
|
||||
} else {
|
||||
Threads.sleep(SLEEP_TIME);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
|
||||
// Up the handlers; this test needs more than usual.
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
|
||||
// simulate queue blocking in testDropTimeoutRequest
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1);
|
||||
TEST_UTIL.startMiniCluster(2);
|
||||
}
|
||||
|
||||
|
@ -442,6 +467,21 @@ public class TestHCM {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDropTimeoutRequest() throws Exception {
|
||||
// Simulate the situation that the server is slow and client retries for several times because
|
||||
// of timeout. When a request can be handled after waiting in the queue, we will drop it if
|
||||
// it has been considered as timeout at client. If we don't drop it, the server will waste time
|
||||
// on handling timeout requests and finally all requests timeout and client throws exception.
|
||||
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDropTimeputRequest");
|
||||
hdt.addCoprocessor(SleepLongerAtFirstCoprocessor.class.getName());
|
||||
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
|
||||
try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
|
||||
t.setRpcTimeout(SleepLongerAtFirstCoprocessor.SLEEP_TIME * 2);
|
||||
t.get(new Get(FAM_NAM));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test starting from 0 index when RpcRetryingCaller calculate the backoff time.
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue