HBASE-11564 Improve cancellation management in the rpc layer

This commit is contained in:
Nicolas Liochon 2014-07-24 17:49:01 +02:00
parent 0409368454
commit d8562052a4
11 changed files with 300 additions and 132 deletions

View File

@ -25,12 +25,12 @@ import java.lang.reflect.UndeclaredThrowableException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.ipc.RemoteException;
@ -60,6 +60,7 @@ public class RpcRetryingCaller<T> {
private final long pause;
private final int retries;
private final AtomicBoolean cancelled = new AtomicBoolean(false);
public RpcRetryingCaller(long pause, int retries) {
this.pause = pause;
@ -70,6 +71,7 @@ public class RpcRetryingCaller<T> {
if (callTimeout <= 0) {
return 0;
} else {
if (callTimeout == Integer.MAX_VALUE) return Integer.MAX_VALUE;
int remainingTime = (int) (callTimeout -
(EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime));
if (remainingTime < MIN_RPC_TIMEOUT) {
@ -82,6 +84,13 @@ public class RpcRetryingCaller<T> {
}
}
public void cancel(){
cancelled.set(true);
synchronized (cancelled){
cancelled.notifyAll();
}
}
/**
* Retries if invocation fails.
* @param callTimeout Timeout for this call
@ -103,9 +112,11 @@ public class RpcRetryingCaller<T> {
} catch (Throwable t) {
ExceptionUtil.rethrowIfInterrupt(t);
if (LOG.isTraceEnabled()) {
LOG.trace("Call exception, tries=" + tries + ", retries=" + retries + ", retryTime=" +
(EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime) + "ms", t);
LOG.trace("Call exception, tries=" + tries + ", retries=" + retries + ", started=" +
(EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime) + " ms ago, "
+ "cancelled=" + cancelled.get(), t);
}
// translateException throws exception when should not retry: i.e. when request is bad.
t = translateException(t);
callable.throwable(t, retries != 1);
@ -130,7 +141,13 @@ public class RpcRetryingCaller<T> {
}
}
try {
Thread.sleep(expectedSleep);
if (expectedSleep > 0) {
synchronized (cancelled) {
if (cancelled.get()) return null;
cancelled.wait(expectedSleep);
}
}
if (cancelled.get()) return null;
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted after " + tries + " tries on " + retries);
}

View File

@ -21,18 +21,7 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -47,10 +36,20 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.BoundedCompletionService;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Caller that goes to replica if the primary region does no answer within a configurable
@ -60,6 +59,7 @@ import com.google.protobuf.ServiceException;
*/
public class RpcRetryingCallerWithReadReplicas {
static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class);
protected final ExecutorService pool;
protected final ClusterConnection cConnection;
protected final Configuration conf;
@ -69,12 +69,13 @@ public class RpcRetryingCallerWithReadReplicas {
private final int callTimeout;
private final int retries;
private final RpcControllerFactory rpcControllerFactory;
private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
public RpcRetryingCallerWithReadReplicas(
RpcControllerFactory rpcControllerFactory, TableName tableName,
ClusterConnection cConnection, final Get get,
ExecutorService pool, int retries, int callTimeout,
int timeBeforeReplicas) {
ClusterConnection cConnection, final Get get,
ExecutorService pool, int retries, int callTimeout,
int timeBeforeReplicas) {
this.rpcControllerFactory = rpcControllerFactory;
this.tableName = tableName;
this.cConnection = cConnection;
@ -84,6 +85,7 @@ public class RpcRetryingCallerWithReadReplicas {
this.retries = retries;
this.callTimeout = callTimeout;
this.timeBeforeReplicas = timeBeforeReplicas;
this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf);
}
/**
@ -94,12 +96,19 @@ public class RpcRetryingCallerWithReadReplicas {
*/
class ReplicaRegionServerCallable extends RegionServerCallable<Result> {
final int id;
private final PayloadCarryingRpcController controller;
public ReplicaRegionServerCallable(int id, HRegionLocation location) {
super(RpcRetryingCallerWithReadReplicas.this.cConnection,
RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow());
this.id = id;
this.location = location;
this.controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
}
public void startCancel() {
controller.startCancel();
}
/**
@ -109,6 +118,8 @@ public class RpcRetryingCallerWithReadReplicas {
*/
@Override
public void prepare(final boolean reload) throws IOException {
if (controller.isCanceled()) return;
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
@ -125,13 +136,14 @@ public class RpcRetryingCallerWithReadReplicas {
}
ServerName dest = location.getServerName();
assert dest != null;
setStub(cConnection.getClient(dest));
}
@Override
public Result call(int callTimeout) throws Exception {
if (controller.isCanceled()) return null;
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
@ -140,12 +152,13 @@ public class RpcRetryingCallerWithReadReplicas {
ClientProtos.GetRequest request =
RequestConverter.buildGetRequest(reg, get);
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
ClientProtos.GetResponse response = getStub().get(controller, request);
if (response == null) return null;
if (response == null) {
return null;
}
return ProtobufUtil.toResult(response.getResult());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
@ -153,23 +166,6 @@ public class RpcRetryingCallerWithReadReplicas {
}
}
/**
* Adapter to put the HBase retrying caller into a Java callable.
*/
class RetryingRPC implements Callable<Result> {
final RetryingCallable<Result> callable;
RetryingRPC(RetryingCallable<Result> callable) {
this.callable = callable;
}
@Override
public Result call() throws IOException {
return new RpcRetryingCallerFactory(conf).<Result>newCaller().
callWithRetries(callable, callTimeout);
}
}
/**
* Algo:
* - we put the query into the execution pool.
@ -191,12 +187,9 @@ public class RpcRetryingCallerWithReadReplicas {
throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
RegionLocations rl = getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID,
cConnection, tableName, get.getRow());
BoundedCompletionService<Result> cs = new BoundedCompletionService<Result>(pool, rl.size());
ResultBoundedCompletionService cs = new ResultBoundedCompletionService(pool, rl.size());
List<ExecutionException> exceptions = null;
int submitted = 0, completed = 0;
// submit call for the primary replica.
submitted += addCallsForReplica(cs, rl, 0, 0);
addCallsForReplica(cs, rl, 0, 0);
try {
// wait for the timeout to see whether the primary responds back
Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds
@ -204,11 +197,7 @@ public class RpcRetryingCallerWithReadReplicas {
return f.get(); //great we got a response
}
} catch (ExecutionException e) {
// the primary call failed with RetriesExhaustedException or DoNotRetryIOException
// but the secondaries might still succeed. Continue on the replica RPCs.
exceptions = new ArrayList<ExecutionException>(rl.size());
exceptions.add(e);
completed++;
throwEnrichedException(e, retries);
} catch (CancellationException e) {
throw new InterruptedIOException();
} catch (InterruptedException e) {
@ -216,20 +205,13 @@ public class RpcRetryingCallerWithReadReplicas {
}
// submit call for the all of the secondaries at once
// TODO: this may be an overkill for large region replication
submitted += addCallsForReplica(cs, rl, 1, rl.size() - 1);
addCallsForReplica(cs, rl, 1, rl.size() - 1);
try {
while (completed < submitted) {
try {
Future<Result> f = cs.take();
return f.get(); // great we got an answer
} catch (ExecutionException e) {
// if not cancel or interrupt, wait until all RPC's are done
// one of the tasks failed. Save the exception for later.
if (exceptions == null) exceptions = new ArrayList<ExecutionException>(rl.size());
exceptions.add(e);
completed++;
}
try {
Future<Result> f = cs.take();
return f.get();
} catch (ExecutionException e) {
throwEnrichedException(e, retries);
}
} catch (CancellationException e) {
throw new InterruptedIOException();
@ -238,12 +220,9 @@ public class RpcRetryingCallerWithReadReplicas {
} finally {
// We get there because we were interrupted or because one or more of the
// calls succeeded or failed. In all case, we stop all our tasks.
cs.cancelAll(true);
cs.cancelAll();
}
if (exceptions != null && !exceptions.isEmpty()) {
throwEnrichedException(exceptions.get(0), retries, toString()); // just rethrow the first exception for now.
}
return null; // unreachable
}
@ -251,7 +230,7 @@ public class RpcRetryingCallerWithReadReplicas {
* Extract the real exception from the ExecutionException, and throws what makes more
* sense.
*/
static void throwEnrichedException(ExecutionException e, int retries, String str)
static void throwEnrichedException(ExecutionException e, int retries)
throws RetriesExhaustedException, DoNotRetryIOException {
Throwable t = e.getCause();
assert t != null; // That's what ExecutionException is about: holding an exception
@ -266,7 +245,7 @@ public class RpcRetryingCallerWithReadReplicas {
RetriesExhaustedException.ThrowableWithExtraContext qt =
new RetriesExhaustedException.ThrowableWithExtraContext(t,
EnvironmentEdgeManager.currentTimeMillis(), str);
EnvironmentEdgeManager.currentTimeMillis(), null);
List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
Collections.singletonList(qt);
@ -277,26 +256,24 @@ public class RpcRetryingCallerWithReadReplicas {
/**
* Creates the calls and submit them
*
* @param cs - the completion service to use for submitting
* @param rl - the region locations
* @param min - the id of the first replica, inclusive
* @param max - the id of the last replica, inclusive.
* @return the number of submitted calls
* @param cs - the completion service to use for submitting
* @param rl - the region locations
* @param min - the id of the first replica, inclusive
* @param max - the id of the last replica, inclusive.
*/
private int addCallsForReplica(BoundedCompletionService<Result> cs,
RegionLocations rl, int min, int max) {
private void addCallsForReplica(ResultBoundedCompletionService cs,
RegionLocations rl, int min, int max) {
for (int id = min; id <= max; id++) {
HRegionLocation hrl = rl.getRegionLocation(id);
ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
RetryingRPC retryingOnReplica = new RetryingRPC(callOnReplica);
cs.submit(retryingOnReplica);
cs.submit(callOnReplica, callTimeout);
}
return max - min + 1;
}
static RegionLocations getRegionLocations(boolean useCache, int replicaId,
ClusterConnection cConnection, TableName tableName, byte[] row)
ClusterConnection cConnection, TableName tableName, byte[] row)
throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
RegionLocations rl;
try {
rl = cConnection.locateRegion(tableName, row, useCache, true, replicaId);
@ -315,4 +292,135 @@ public class RpcRetryingCallerWithReadReplicas {
return rl;
}
/**
* A completion service for the RpcRetryingCallerFactory.
* Keeps the list of the futures, and allows to cancel them all.
* This means as well that it can be used for a small set of tasks only.
* <br>Implementation is not Thread safe.
*/
public class ResultBoundedCompletionService {
private final Executor executor;
private final QueueingFuture[] tasks; // all the tasks
private volatile QueueingFuture completed = null;
class QueueingFuture implements RunnableFuture<Result> {
private final ReplicaRegionServerCallable future;
private Result result = null;
private ExecutionException exeEx = null;
private volatile boolean canceled;
private final int callTimeout;
private final RpcRetryingCaller<Result> retryingCaller;
public QueueingFuture(ReplicaRegionServerCallable future, int callTimeout) {
this.future = future;
this.callTimeout = callTimeout;
this.retryingCaller = rpcRetryingCallerFactory.<Result>newCaller();
}
@Override
public void run() {
try {
if (!canceled) {
result =
rpcRetryingCallerFactory.<Result>newCaller().callWithRetries(future, callTimeout);
}
} catch (Throwable t) {
exeEx = new ExecutionException(t);
} finally {
if (!canceled && completed == null) {
completed = QueueingFuture.this;
synchronized (tasks) {
tasks.notify();
}
}
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (result != null || exeEx != null) return false;
retryingCaller.cancel();
future.startCancel();
canceled = true;
return true;
}
@Override
public boolean isCancelled() {
return canceled;
}
@Override
public boolean isDone() {
return result != null || exeEx != null;
}
@Override
public Result get() throws InterruptedException, ExecutionException {
try {
return get(1000, TimeUnit.DAYS);
} catch (TimeoutException e) {
throw new RuntimeException("You did wait for 1000 days here?", e);
}
}
@Override
public Result get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
synchronized (tasks) {
if (result != null) {
return result;
}
if (exeEx != null) {
throw exeEx;
}
unit.timedWait(tasks, timeout);
}
if (result != null) {
return result;
}
if (exeEx != null) {
throw exeEx;
}
throw new TimeoutException();
}
}
public ResultBoundedCompletionService(Executor executor, int maxTasks) {
this.executor = executor;
this.tasks = new QueueingFuture[maxTasks];
}
public void submit(ReplicaRegionServerCallable task, int callTimeout) {
QueueingFuture newFuture = new QueueingFuture(task, callTimeout);
executor.execute(newFuture);
tasks[task.id] = newFuture;
}
public QueueingFuture take() throws InterruptedException {
synchronized (tasks) {
if (completed == null) tasks.wait();
}
return completed;
}
public QueueingFuture poll(long timeout, TimeUnit unit) throws InterruptedException {
synchronized (tasks) {
if (completed == null) unit.timedWait(tasks, timeout);
}
return completed;
}
public void cancelAll() {
for (QueueingFuture future : tasks) {
if (future != null) future.cancel(true);
}
}
}
}

View File

@ -198,7 +198,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
if (exceptions != null && !exceptions.isEmpty()) {
RpcRetryingCallerWithReadReplicas.throwEnrichedException(exceptions.get(0),
retries, toString()); // just rethrow the first exception for now.
retries); // just rethrow the first exception for now.
}
return null; // unreachable
}

View File

@ -24,6 +24,7 @@ import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.Message.Builder;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import com.google.protobuf.TextFormat;
@ -115,13 +116,13 @@ import java.util.concurrent.atomic.AtomicInteger;
public class RpcClient {
// The LOG key is intentionally not from this package to avoid ipc logging at DEBUG (all under
// o.a.h.hbase is set to DEBUG as default).
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RpcClient");
public static final Log LOG = LogFactory.getLog(RpcClient.class);
protected final PoolMap<ConnectionId, Connection> connections;
protected final AtomicInteger callIdCnt = new AtomicInteger();
protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
final protected Configuration conf;
protected final int minIdleTimeBeforeClose; // if the connection is iddle for more than this
protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
// time (in ms), it will be closed at any moment.
final protected int maxRetries; //the max. no. of retries for socket connections
final protected long failureSleep; // Time to sleep before retry on failure.
@ -168,7 +169,7 @@ public class RpcClient {
"hbase.ipc.client.fallback-to-simple-auth-allowed";
public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
public static final String ALLOWS_INTERRUPTS = "hbase.ipc.client.allowsInterrupt";
public static final String SPECIFIC_WRITE_THREAD = "hbase.ipc.client.specificThreadForWriting";
/**
* A class to manage a list of servers that failed recently.
@ -426,7 +427,10 @@ public class RpcClient {
public CallFuture sendCall(Call call, int priority, Span span)
throws InterruptedException, IOException {
CallFuture cts = new CallFuture(call, priority, span);
callsToWrite.add(cts);
if (!callsToWrite.offer(cts)) {
throw new IOException("Can't add the call " + call.id +
" to the write queue. callsToWrite.size()=" + callsToWrite.size());
}
checkIsOpen(); // We check after the put, to be sure that the call we added won't stay
// in the list while the cleanup was already done.
return cts;
@ -448,7 +452,11 @@ public class RpcClient {
public void remove(CallFuture cts){
callsToWrite.remove(cts);
// By removing the call from the expected call list, we make the list smaller, but
// it means as well that we don't know how many calls we cancelled.
calls.remove(cts.call.id);
cts.call.callComplete();
}
/**
@ -464,7 +472,7 @@ public class RpcClient {
markClosed(new InterruptedIOException());
}
if (cts == null || cts == CallFuture.DEATH_PILL){
if (cts == null || cts == CallFuture.DEATH_PILL) {
assert shouldCloseConnection.get();
break;
}
@ -580,7 +588,7 @@ public class RpcClient {
+ ticket.getUserName())));
this.setDaemon(true);
if (conf.getBoolean(ALLOWS_INTERRUPTS, false)) {
if (conf.getBoolean(SPECIFIC_WRITE_THREAD, false)) {
callSender = new CallSender(getName(), conf);
callSender.start();
} else {
@ -608,6 +616,7 @@ public class RpcClient {
}
protected synchronized void setupConnection() throws IOException {
short ioFailures = 0;
short timeoutFailures = 0;
@ -717,7 +726,7 @@ public class RpcClient {
* it is idle too long, it is marked as to be closed,
* or the client is marked as not running.
*
* Return true if it is time to read a response; false otherwise.
* @return true if it is time to read a response; false otherwise.
*/
protected synchronized boolean waitForWork() throws InterruptedException {
// beware of the concurrent access to the calls list: we can add calls, but as well
@ -743,13 +752,18 @@ public class RpcClient {
return true;
}
// Connection is idle.
// We expect the number of calls to be zero here, but actually someone can
// adds a call at the any moment, as there is no synchronization between this task
// and adding new calls. It's not a big issue, but it will get an exception.
markClosed(new IOException(
"idle connection closed with " + calls.size() + " pending request(s)"));
if (EnvironmentEdgeManager.currentTimeMillis() >= waitUntil) {
// Connection is idle.
// We expect the number of calls to be zero here, but actually someone can
// adds a call at the any moment, as there is no synchronization between this task
// and adding new calls. It's not a big issue, but it will get an exception.
markClosed(new IOException(
"idle connection closed with " + calls.size() + " pending request(s)"));
return false;
}
// We can get here if we received a notification that there is some work to do but
// the work was cancelled. As we're not idle we continue to wait.
return false;
}
@ -767,15 +781,19 @@ public class RpcClient {
while (waitForWork()) { // Wait here for work - read or close connection
readResponse();
}
} catch (InterruptedException t) {
LOG.debug(getName() + ": interrupted while waiting for call responses");
markClosed(ExceptionUtil.asInterrupt(t));
} catch (Throwable t) {
LOG.debug(getName() + ": unexpected exception receiving call responses", t);
markClosed(new IOException("Unexpected exception receiving call responses", t));
LOG.debug(getName() + ": unexpected throwable while waiting for call responses", t);
markClosed(new IOException("Unexpected throwable while waiting call responses", t));
}
close();
if (LOG.isDebugEnabled())
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": stopped, connections " + connections.size());
}
}
private synchronized void disposeSasl() {
@ -1146,8 +1164,10 @@ public class RpcClient {
// this connection.
int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
int whatIsLeftToRead = totalSize - readSoFar;
LOG.debug("Unknown callId: " + id + ", skipping over this response of " +
whatIsLeftToRead + " bytes");
if (LOG.isDebugEnabled()) {
LOG.debug("Unknown callId: " + id + ", skipping over this response of " +
whatIsLeftToRead + " bytes");
}
IOUtils.skipFully(in, whatIsLeftToRead);
}
if (responseHeader.hasException()) {
@ -1188,16 +1208,10 @@ public class RpcClient {
}
} finally {
cleanupCalls(false);
if (expectedCall && !call.done) {
LOG.warn("Coding error: code should be true for callId=" + call.id +
", server=" + getRemoteAddress() +
", shouldCloseConnection=" + shouldCloseConnection.get());
}
}
}
/**
* @param e
* @return True if the exception is a fatal connection exception.
*/
private boolean isFatalConnectionException(final ExceptionResponse e) {
@ -1225,7 +1239,7 @@ public class RpcClient {
if (shouldCloseConnection.compareAndSet(false, true)) {
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": marking at should close, reason =" + e.getMessage());
LOG.debug(getName() + ": marking at should close, reason: " + e.getMessage());
}
if (callSender != null) {
callSender.close();
@ -1447,10 +1461,12 @@ public class RpcClient {
}
}
Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells,
Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
MethodDescriptor md, Message param, CellScanner cells,
Message returnType, User ticket, InetSocketAddress addr, int rpcTimeout)
throws InterruptedException, IOException {
return call(md, param, cells, returnType, ticket, addr, rpcTimeout, HConstants.NORMAL_QOS);
return
call(pcrc, md, param, cells, returnType, ticket, addr, rpcTimeout, HConstants.NORMAL_QOS);
}
/** Make a call, passing <code>param</code>, to the IPC server running at
@ -1465,16 +1481,34 @@ public class RpcClient {
* @throws InterruptedException
* @throws IOException
*/
Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells,
Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md,
Message param, CellScanner cells,
Message returnType, User ticket, InetSocketAddress addr, int callTimeout, int priority)
throws IOException, InterruptedException {
Call call = new Call(md, param, cells, returnType, callTimeout);
Connection connection = getConnection(ticket, call, addr, this.codec, this.compressor);
final Call call = new Call(md, param, cells, returnType, callTimeout);
CallFuture cts = null;
if (connection.callSender != null){
final Connection connection = getConnection(ticket, call, addr, this.codec, this.compressor);
final CallFuture cts;
if (connection.callSender != null) {
cts = connection.callSender.sendCall(call, priority, Trace.currentSpan());
if (pcrc != null) {
pcrc.notifyOnCancel(new RpcCallback<Object>() {
@Override
public void run(Object parameter) {
connection.callSender.remove(cts);
call.callComplete();
}
});
if (pcrc.isCanceled()) {
// To finish if the call was cancelled before we set the notification (race condition)
call.callComplete();
return new Pair<Message, CellScanner>(call.response, call.cells);
}
}
} else {
cts = null;
connection.tracedWriteRequest(call, priority, Trace.currentSpan());
}
@ -1663,7 +1697,7 @@ public class RpcClient {
}
Pair<Message, CellScanner> val;
try {
val = call(md, param, cells, returnType, ticket, isa, callTimeout,
val = call(pcrc, md, param, cells, returnType, ticket, isa, callTimeout,
pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS);
if (pcrc != null) {
// Shove the results into controller so can be carried across the proxy/pb service void.

View File

@ -22,12 +22,18 @@ package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class TimeLimitedRpcController implements RpcController {
/**
* The time, in ms before the call should expire.
*/
protected Integer callTimeout;
protected volatile Integer callTimeout;
protected volatile boolean cancelled = false;
protected final AtomicReference<RpcCallback<Object>> cancellationCb =
new AtomicReference<RpcCallback<Object>>(null);
public Integer getCallTimeout() {
return callTimeout;
@ -53,12 +59,12 @@ public class TimeLimitedRpcController implements RpcController {
@Override
public boolean isCanceled() {
throw new UnsupportedOperationException();
return cancelled;
}
@Override
public void notifyOnCancel(RpcCallback<Object> arg0) {
throw new UnsupportedOperationException();
public void notifyOnCancel(RpcCallback<Object> cancellationCb) {
this.cancellationCb.set(cancellationCb);
}
@Override
@ -73,6 +79,9 @@ public class TimeLimitedRpcController implements RpcController {
@Override
public void startCancel() {
throw new UnsupportedOperationException();
cancelled = true;
if (cancellationCb.get() != null) {
cancellationCb.get().run(null);
}
}
}

View File

@ -59,7 +59,8 @@ public class ExceptionUtil {
if (t instanceof InterruptedIOException) return (InterruptedIOException) t;
if (t instanceof InterruptedException || t instanceof ClosedByInterruptException) {
InterruptedIOException iie = new InterruptedIOException();
InterruptedIOException iie =
new InterruptedIOException("Origin: " + t.getClass().getSimpleName());
iie.initCause(t);
return iie;
}

View File

@ -149,7 +149,7 @@ public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
conf.getLong("hbase.regionserver.storefile.refresh.period", 0) > 0);
// enable client-side settings
conf.setBoolean(RpcClient.ALLOWS_INTERRUPTS, true);
conf.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, true);
// TODO: expose these settings to CLI override
conf.setLong("hbase.client.primaryCallTimeout.get", primaryTimeout);
conf.setLong("hbase.client.primaryCallTimeout.multiget", primaryTimeout);

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
*/
@InterfaceAudience.Private
class BufferChain {
private static final ByteBuffer [] FOR_TOARRAY_TYPE = new ByteBuffer[0];
private final ByteBuffer[] buffers;
private int remaining = 0;
private int bufferOffset = 0;
@ -44,7 +43,7 @@ class BufferChain {
bbs.add(b);
this.remaining += b.remaining();
}
this.buffers = bbs.toArray(FOR_TOARRAY_TYPE);
this.buffers = bbs.toArray(new ByteBuffer[bbs.size()]);
}
/**

View File

@ -143,7 +143,7 @@ abstract public class Subprocedure implements Callable<Void> {
*
* This would normally be executed by the ProcedureMemeber when a acquire message comes from the
* coordinator. Rpcs are used to spend message back to the coordinator after different phases
* are executed. Any exceptions caught during the execution (except for InterrupedException) get
* are executed. Any exceptions caught during the execution (except for InterruptedException) get
* converted and propagated to coordinator via {@link ProcedureMemberRpcs#sendMemberAborted(
* Subprocedure, ForeignException)}.
*/

View File

@ -343,7 +343,7 @@ public class TestHCM {
c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot
c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 0); // don't wait between retries.
c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire
c2.setBoolean(RpcClient.ALLOWS_INTERRUPTS, allowsInterrupt);
c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt);
final HTable table = new HTable(c2, tableName.getBytes());

View File

@ -190,7 +190,7 @@ public class TestIPC {
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
final String message = "hello";
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
Pair<Message, CellScanner> r = client.call(md, param, null,
Pair<Message, CellScanner> r = client.call(null, md, param, null,
md.getOutputType().toProto(), User.getCurrent(), address, 0);
assertTrue(r.getSecond() == null);
// Silly assertion that the message is in the returned pb.
@ -229,7 +229,7 @@ public class TestIPC {
InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
Pair<Message, CellScanner> r = client.call(md, param, CellUtil.createCellScanner(cells),
Pair<Message, CellScanner> r = client.call(null, md, param, CellUtil.createCellScanner(cells),
md.getOutputType().toProto(), User.getCurrent(), address, 0);
int index = 0;
while (r.getSecond().advance()) {
@ -263,7 +263,7 @@ public class TestIPC {
InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
client.call(md, param, null, null, User.getCurrent(), address, 0);
client.call(null, md, param, null, null, User.getCurrent(), address, 0);
fail("Expected an exception to have been thrown!");
} catch (Exception e) {
LOG.info("Caught expected exception: " + e.toString());
@ -287,7 +287,7 @@ public class TestIPC {
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
for (int i = 0; i < 10; i++) {
client.call(md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)),
client.call(null, md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)),
md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), 0);
}
verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
@ -342,7 +342,7 @@ public class TestIPC {
}
CellScanner cellScanner = CellUtil.createCellScanner(cells);
Pair<Message, CellScanner> response =
client.call(md, builder.build(), cellScanner, param, user, address, 0);
client.call(null, md, builder.build(), cellScanner, param, user, address, 0);
/*
int count = 0;
while (p.getSecond().advance()) {