HBASE-16505 Pass deadline to HRegion operations

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Phil Yang 2016-10-10 18:35:37 +08:00 committed by Michael Stack
parent e868d9586f
commit b8173a548c
3 changed files with 20 additions and 3 deletions

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.util.Pair;
@ -95,7 +96,7 @@ public class CallRunner {
}
call.startTime = System.currentTimeMillis();
if (call.startTime > call.deadline) {
RpcServer.LOG.info("Dropping timed out call: " + call);
RpcServer.LOG.warn("Dropping timed out call: " + call);
return;
}
this.status.setStatus("Setting up call");
@ -122,6 +123,9 @@ public class CallRunner {
// make the call
resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
call.timestamp, this.status, call.startTime, call.timeout);
} catch (TimeoutIOException e){
RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call);
return;
} catch (Throwable e) {
RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e);
errorThrowable = e;

View File

@ -93,4 +93,11 @@ public interface RpcCallContext {
long getResponseBlockSize();
void incrementResponseBlockSize(long blockSize);
/**
* Return the deadline of this call. If we can not complete this call in time, we can throw a
* TimeoutIOException and RPCServer will drop it.
* @return The system timestamp of deadline.
*/
long getDeadline();
}

View File

@ -401,7 +401,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return "callId: " + this.id + " service: " + serviceName +
" methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
" size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
" connection: " + connection.toString();
" connection: " + connection.toString() +
" deadline: " + deadline;
}
String toTraceString() {
@ -612,6 +613,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
responseBlockSize += blockSize;
}
@Override
public long getDeadline() {
return deadline;
}
public synchronized void sendResponseIfReady() throws IOException {
// set param null to reduce memory pressure
this.param = null;