From d8562052a4f5c956a514becf6439442763387e86 Mon Sep 17 00:00:00 2001 From: Nicolas Liochon Date: Thu, 24 Jul 2014 17:49:01 +0200 Subject: [PATCH] HBASE-11564 Improve cancellation management in the rpc layer --- .../hbase/client/RpcRetryingCaller.java | 25 +- .../RpcRetryingCallerWithReadReplicas.java | 264 ++++++++++++------ .../client/ScannerCallableWithReplicas.java | 2 +- .../apache/hadoop/hbase/ipc/RpcClient.java | 100 ++++--- .../hbase/ipc/TimeLimitedRpcController.java | 19 +- .../hadoop/hbase/util/ExceptionUtil.java | 3 +- .../IntegrationTestRegionReplicaPerf.java | 2 +- .../apache/hadoop/hbase/ipc/BufferChain.java | 3 +- .../hadoop/hbase/procedure/Subprocedure.java | 2 +- .../apache/hadoop/hbase/client/TestHCM.java | 2 +- .../org/apache/hadoop/hbase/ipc/TestIPC.java | 10 +- 11 files changed, 300 insertions(+), 132 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java index 7ab772e75a8..286b6fe9352 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -25,12 +25,12 @@ import java.lang.reflect.UndeclaredThrowableException; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.ipc.RemoteException; @@ -60,6 +60,7 @@ public class RpcRetryingCaller { private final long pause; private final int retries; + private final AtomicBoolean cancelled = new AtomicBoolean(false); public RpcRetryingCaller(long pause, int retries) { this.pause = pause; @@ -70,6 +71,7 @@ public class RpcRetryingCaller { if (callTimeout <= 0) { return 0; } else { + if (callTimeout == Integer.MAX_VALUE) return Integer.MAX_VALUE; int remainingTime = (int) (callTimeout - (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime)); if (remainingTime < MIN_RPC_TIMEOUT) { @@ -82,6 +84,13 @@ public class RpcRetryingCaller { } } + public void cancel(){ + cancelled.set(true); + synchronized (cancelled){ + cancelled.notifyAll(); + } + } + /** * Retries if invocation fails. * @param callTimeout Timeout for this call @@ -103,9 +112,11 @@ public class RpcRetryingCaller { } catch (Throwable t) { ExceptionUtil.rethrowIfInterrupt(t); if (LOG.isTraceEnabled()) { - LOG.trace("Call exception, tries=" + tries + ", retries=" + retries + ", retryTime=" + - (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime) + "ms", t); + LOG.trace("Call exception, tries=" + tries + ", retries=" + retries + ", started=" + + (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime) + " ms ago, " + + "cancelled=" + cancelled.get(), t); } + // translateException throws exception when should not retry: i.e. when request is bad. t = translateException(t); callable.throwable(t, retries != 1); @@ -130,7 +141,13 @@ public class RpcRetryingCaller { } } try { - Thread.sleep(expectedSleep); + if (expectedSleep > 0) { + synchronized (cancelled) { + if (cancelled.get()) return null; + cancelled.wait(expectedSleep); + } + } + if (cancelled.get()) return null; } catch (InterruptedException e) { throw new InterruptedIOException("Interrupted after " + tries + " tries on " + retries); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index c0c75beef8a..1c733b62fc6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -21,18 +21,7 @@ package org.apache.hadoop.hbase.client; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - +import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -47,10 +36,20 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.util.BoundedCompletionService; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import com.google.protobuf.ServiceException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Caller that goes to replica if the primary region does no answer within a configurable @@ -60,6 +59,7 @@ import com.google.protobuf.ServiceException; */ public class RpcRetryingCallerWithReadReplicas { static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class); + protected final ExecutorService pool; protected final ClusterConnection cConnection; protected final Configuration conf; @@ -69,12 +69,13 @@ public class RpcRetryingCallerWithReadReplicas { private final int callTimeout; private final int retries; private final RpcControllerFactory rpcControllerFactory; + private final RpcRetryingCallerFactory rpcRetryingCallerFactory; public RpcRetryingCallerWithReadReplicas( RpcControllerFactory rpcControllerFactory, TableName tableName, - ClusterConnection cConnection, final Get get, - ExecutorService pool, int retries, int callTimeout, - int timeBeforeReplicas) { + ClusterConnection cConnection, final Get get, + ExecutorService pool, int retries, int callTimeout, + int timeBeforeReplicas) { this.rpcControllerFactory = rpcControllerFactory; this.tableName = tableName; this.cConnection = cConnection; @@ -84,6 +85,7 @@ public class RpcRetryingCallerWithReadReplicas { this.retries = retries; this.callTimeout = callTimeout; this.timeBeforeReplicas = timeBeforeReplicas; + this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf); } /** @@ -94,12 +96,19 @@ public class RpcRetryingCallerWithReadReplicas { */ class ReplicaRegionServerCallable extends RegionServerCallable { final int id; + private final PayloadCarryingRpcController controller; public ReplicaRegionServerCallable(int id, HRegionLocation location) { super(RpcRetryingCallerWithReadReplicas.this.cConnection, RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow()); this.id = id; this.location = location; + this.controller = rpcControllerFactory.newController(); + controller.setPriority(tableName); + } + + public void startCancel() { + controller.startCancel(); } /** @@ -109,6 +118,8 @@ public class RpcRetryingCallerWithReadReplicas { */ @Override public void prepare(final boolean reload) throws IOException { + if (controller.isCanceled()) return; + if (Thread.interrupted()) { throw new InterruptedIOException(); } @@ -125,13 +136,14 @@ public class RpcRetryingCallerWithReadReplicas { } ServerName dest = location.getServerName(); - assert dest != null; setStub(cConnection.getClient(dest)); } @Override public Result call(int callTimeout) throws Exception { + if (controller.isCanceled()) return null; + if (Thread.interrupted()) { throw new InterruptedIOException(); } @@ -140,12 +152,13 @@ public class RpcRetryingCallerWithReadReplicas { ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get); - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); controller.setCallTimeout(callTimeout); + try { ClientProtos.GetResponse response = getStub().get(controller, request); - if (response == null) return null; + if (response == null) { + return null; + } return ProtobufUtil.toResult(response.getResult()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); @@ -153,23 +166,6 @@ public class RpcRetryingCallerWithReadReplicas { } } - /** - * Adapter to put the HBase retrying caller into a Java callable. - */ - class RetryingRPC implements Callable { - final RetryingCallable callable; - - RetryingRPC(RetryingCallable callable) { - this.callable = callable; - } - - @Override - public Result call() throws IOException { - return new RpcRetryingCallerFactory(conf).newCaller(). - callWithRetries(callable, callTimeout); - } - } - /** * Algo: * - we put the query into the execution pool. @@ -191,12 +187,9 @@ public class RpcRetryingCallerWithReadReplicas { throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException { RegionLocations rl = getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, get.getRow()); - BoundedCompletionService cs = new BoundedCompletionService(pool, rl.size()); + ResultBoundedCompletionService cs = new ResultBoundedCompletionService(pool, rl.size()); - List exceptions = null; - int submitted = 0, completed = 0; - // submit call for the primary replica. - submitted += addCallsForReplica(cs, rl, 0, 0); + addCallsForReplica(cs, rl, 0, 0); try { // wait for the timeout to see whether the primary responds back Future f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds @@ -204,11 +197,7 @@ public class RpcRetryingCallerWithReadReplicas { return f.get(); //great we got a response } } catch (ExecutionException e) { - // the primary call failed with RetriesExhaustedException or DoNotRetryIOException - // but the secondaries might still succeed. Continue on the replica RPCs. - exceptions = new ArrayList(rl.size()); - exceptions.add(e); - completed++; + throwEnrichedException(e, retries); } catch (CancellationException e) { throw new InterruptedIOException(); } catch (InterruptedException e) { @@ -216,20 +205,13 @@ public class RpcRetryingCallerWithReadReplicas { } // submit call for the all of the secondaries at once - // TODO: this may be an overkill for large region replication - submitted += addCallsForReplica(cs, rl, 1, rl.size() - 1); + addCallsForReplica(cs, rl, 1, rl.size() - 1); try { - while (completed < submitted) { - try { - Future f = cs.take(); - return f.get(); // great we got an answer - } catch (ExecutionException e) { - // if not cancel or interrupt, wait until all RPC's are done - // one of the tasks failed. Save the exception for later. - if (exceptions == null) exceptions = new ArrayList(rl.size()); - exceptions.add(e); - completed++; - } + try { + Future f = cs.take(); + return f.get(); + } catch (ExecutionException e) { + throwEnrichedException(e, retries); } } catch (CancellationException e) { throw new InterruptedIOException(); @@ -238,12 +220,9 @@ public class RpcRetryingCallerWithReadReplicas { } finally { // We get there because we were interrupted or because one or more of the // calls succeeded or failed. In all case, we stop all our tasks. - cs.cancelAll(true); + cs.cancelAll(); } - if (exceptions != null && !exceptions.isEmpty()) { - throwEnrichedException(exceptions.get(0), retries, toString()); // just rethrow the first exception for now. - } return null; // unreachable } @@ -251,7 +230,7 @@ public class RpcRetryingCallerWithReadReplicas { * Extract the real exception from the ExecutionException, and throws what makes more * sense. */ - static void throwEnrichedException(ExecutionException e, int retries, String str) + static void throwEnrichedException(ExecutionException e, int retries) throws RetriesExhaustedException, DoNotRetryIOException { Throwable t = e.getCause(); assert t != null; // That's what ExecutionException is about: holding an exception @@ -266,7 +245,7 @@ public class RpcRetryingCallerWithReadReplicas { RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext(t, - EnvironmentEdgeManager.currentTimeMillis(), str); + EnvironmentEdgeManager.currentTimeMillis(), null); List exceptions = Collections.singletonList(qt); @@ -277,26 +256,24 @@ public class RpcRetryingCallerWithReadReplicas { /** * Creates the calls and submit them * - * @param cs - the completion service to use for submitting - * @param rl - the region locations - * @param min - the id of the first replica, inclusive - * @param max - the id of the last replica, inclusive. - * @return the number of submitted calls + * @param cs - the completion service to use for submitting + * @param rl - the region locations + * @param min - the id of the first replica, inclusive + * @param max - the id of the last replica, inclusive. */ - private int addCallsForReplica(BoundedCompletionService cs, - RegionLocations rl, int min, int max) { + private void addCallsForReplica(ResultBoundedCompletionService cs, + RegionLocations rl, int min, int max) { for (int id = min; id <= max; id++) { HRegionLocation hrl = rl.getRegionLocation(id); ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl); - RetryingRPC retryingOnReplica = new RetryingRPC(callOnReplica); - cs.submit(retryingOnReplica); + cs.submit(callOnReplica, callTimeout); } - return max - min + 1; } static RegionLocations getRegionLocations(boolean useCache, int replicaId, - ClusterConnection cConnection, TableName tableName, byte[] row) + ClusterConnection cConnection, TableName tableName, byte[] row) throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException { + RegionLocations rl; try { rl = cConnection.locateRegion(tableName, row, useCache, true, replicaId); @@ -315,4 +292,135 @@ public class RpcRetryingCallerWithReadReplicas { return rl; } + + + /** + * A completion service for the RpcRetryingCallerFactory. + * Keeps the list of the futures, and allows to cancel them all. + * This means as well that it can be used for a small set of tasks only. + *
Implementation is not Thread safe. + */ + public class ResultBoundedCompletionService { + private final Executor executor; + private final QueueingFuture[] tasks; // all the tasks + private volatile QueueingFuture completed = null; + + class QueueingFuture implements RunnableFuture { + private final ReplicaRegionServerCallable future; + private Result result = null; + private ExecutionException exeEx = null; + private volatile boolean canceled; + private final int callTimeout; + private final RpcRetryingCaller retryingCaller; + + + public QueueingFuture(ReplicaRegionServerCallable future, int callTimeout) { + this.future = future; + this.callTimeout = callTimeout; + this.retryingCaller = rpcRetryingCallerFactory.newCaller(); + } + + @Override + public void run() { + try { + if (!canceled) { + result = + rpcRetryingCallerFactory.newCaller().callWithRetries(future, callTimeout); + } + } catch (Throwable t) { + exeEx = new ExecutionException(t); + } finally { + if (!canceled && completed == null) { + completed = QueueingFuture.this; + synchronized (tasks) { + tasks.notify(); + } + } + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (result != null || exeEx != null) return false; + retryingCaller.cancel(); + future.startCancel(); + canceled = true; + return true; + } + + @Override + public boolean isCancelled() { + return canceled; + } + + @Override + public boolean isDone() { + return result != null || exeEx != null; + } + + @Override + public Result get() throws InterruptedException, ExecutionException { + try { + return get(1000, TimeUnit.DAYS); + } catch (TimeoutException e) { + throw new RuntimeException("You did wait for 1000 days here?", e); + } + } + + @Override + public Result get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + synchronized (tasks) { + if (result != null) { + return result; + } + if (exeEx != null) { + throw exeEx; + } + unit.timedWait(tasks, timeout); + } + + if (result != null) { + return result; + } + if (exeEx != null) { + throw exeEx; + } + + throw new TimeoutException(); + } + } + + public ResultBoundedCompletionService(Executor executor, int maxTasks) { + this.executor = executor; + this.tasks = new QueueingFuture[maxTasks]; + } + + + public void submit(ReplicaRegionServerCallable task, int callTimeout) { + QueueingFuture newFuture = new QueueingFuture(task, callTimeout); + executor.execute(newFuture); + tasks[task.id] = newFuture; + } + + public QueueingFuture take() throws InterruptedException { + synchronized (tasks) { + if (completed == null) tasks.wait(); + } + return completed; + } + + public QueueingFuture poll(long timeout, TimeUnit unit) throws InterruptedException { + synchronized (tasks) { + if (completed == null) unit.timedWait(tasks, timeout); + } + return completed; + } + + public void cancelAll() { + for (QueueingFuture future : tasks) { + if (future != null) future.cancel(true); + } + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 4c99e01e2e4..709ae62adb1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -198,7 +198,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { if (exceptions != null && !exceptions.isEmpty()) { RpcRetryingCallerWithReadReplicas.throwEnrichedException(exceptions.get(0), - retries, toString()); // just rethrow the first exception for now. + retries); // just rethrow the first exception for now. } return null; // unreachable } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 620fb18d11a..5959da39e57 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -24,6 +24,7 @@ import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; import com.google.protobuf.Message.Builder; +import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import com.google.protobuf.TextFormat; @@ -115,13 +116,13 @@ import java.util.concurrent.atomic.AtomicInteger; public class RpcClient { // The LOG key is intentionally not from this package to avoid ipc logging at DEBUG (all under // o.a.h.hbase is set to DEBUG as default). - public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RpcClient"); + public static final Log LOG = LogFactory.getLog(RpcClient.class); protected final PoolMap connections; protected final AtomicInteger callIdCnt = new AtomicInteger(); protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs final protected Configuration conf; - protected final int minIdleTimeBeforeClose; // if the connection is iddle for more than this + protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this // time (in ms), it will be closed at any moment. final protected int maxRetries; //the max. no. of retries for socket connections final protected long failureSleep; // Time to sleep before retry on failure. @@ -168,7 +169,7 @@ public class RpcClient { "hbase.ipc.client.fallback-to-simple-auth-allowed"; public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false; - public static final String ALLOWS_INTERRUPTS = "hbase.ipc.client.allowsInterrupt"; + public static final String SPECIFIC_WRITE_THREAD = "hbase.ipc.client.specificThreadForWriting"; /** * A class to manage a list of servers that failed recently. @@ -426,7 +427,10 @@ public class RpcClient { public CallFuture sendCall(Call call, int priority, Span span) throws InterruptedException, IOException { CallFuture cts = new CallFuture(call, priority, span); - callsToWrite.add(cts); + if (!callsToWrite.offer(cts)) { + throw new IOException("Can't add the call " + call.id + + " to the write queue. callsToWrite.size()=" + callsToWrite.size()); + } checkIsOpen(); // We check after the put, to be sure that the call we added won't stay // in the list while the cleanup was already done. return cts; @@ -448,7 +452,11 @@ public class RpcClient { public void remove(CallFuture cts){ callsToWrite.remove(cts); + + // By removing the call from the expected call list, we make the list smaller, but + // it means as well that we don't know how many calls we cancelled. calls.remove(cts.call.id); + cts.call.callComplete(); } /** @@ -464,7 +472,7 @@ public class RpcClient { markClosed(new InterruptedIOException()); } - if (cts == null || cts == CallFuture.DEATH_PILL){ + if (cts == null || cts == CallFuture.DEATH_PILL) { assert shouldCloseConnection.get(); break; } @@ -580,7 +588,7 @@ public class RpcClient { + ticket.getUserName()))); this.setDaemon(true); - if (conf.getBoolean(ALLOWS_INTERRUPTS, false)) { + if (conf.getBoolean(SPECIFIC_WRITE_THREAD, false)) { callSender = new CallSender(getName(), conf); callSender.start(); } else { @@ -608,6 +616,7 @@ public class RpcClient { } + protected synchronized void setupConnection() throws IOException { short ioFailures = 0; short timeoutFailures = 0; @@ -717,7 +726,7 @@ public class RpcClient { * it is idle too long, it is marked as to be closed, * or the client is marked as not running. * - * Return true if it is time to read a response; false otherwise. + * @return true if it is time to read a response; false otherwise. */ protected synchronized boolean waitForWork() throws InterruptedException { // beware of the concurrent access to the calls list: we can add calls, but as well @@ -743,13 +752,18 @@ public class RpcClient { return true; } - // Connection is idle. - // We expect the number of calls to be zero here, but actually someone can - // adds a call at the any moment, as there is no synchronization between this task - // and adding new calls. It's not a big issue, but it will get an exception. - markClosed(new IOException( - "idle connection closed with " + calls.size() + " pending request(s)")); + if (EnvironmentEdgeManager.currentTimeMillis() >= waitUntil) { + // Connection is idle. + // We expect the number of calls to be zero here, but actually someone can + // adds a call at the any moment, as there is no synchronization between this task + // and adding new calls. It's not a big issue, but it will get an exception. + markClosed(new IOException( + "idle connection closed with " + calls.size() + " pending request(s)")); + return false; + } + // We can get here if we received a notification that there is some work to do but + // the work was cancelled. As we're not idle we continue to wait. return false; } @@ -767,15 +781,19 @@ public class RpcClient { while (waitForWork()) { // Wait here for work - read or close connection readResponse(); } + } catch (InterruptedException t) { + LOG.debug(getName() + ": interrupted while waiting for call responses"); + markClosed(ExceptionUtil.asInterrupt(t)); } catch (Throwable t) { - LOG.debug(getName() + ": unexpected exception receiving call responses", t); - markClosed(new IOException("Unexpected exception receiving call responses", t)); + LOG.debug(getName() + ": unexpected throwable while waiting for call responses", t); + markClosed(new IOException("Unexpected throwable while waiting call responses", t)); } close(); - if (LOG.isDebugEnabled()) + if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": stopped, connections " + connections.size()); + } } private synchronized void disposeSasl() { @@ -1146,8 +1164,10 @@ public class RpcClient { // this connection. int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); int whatIsLeftToRead = totalSize - readSoFar; - LOG.debug("Unknown callId: " + id + ", skipping over this response of " + - whatIsLeftToRead + " bytes"); + if (LOG.isDebugEnabled()) { + LOG.debug("Unknown callId: " + id + ", skipping over this response of " + + whatIsLeftToRead + " bytes"); + } IOUtils.skipFully(in, whatIsLeftToRead); } if (responseHeader.hasException()) { @@ -1188,16 +1208,10 @@ public class RpcClient { } } finally { cleanupCalls(false); - if (expectedCall && !call.done) { - LOG.warn("Coding error: code should be true for callId=" + call.id + - ", server=" + getRemoteAddress() + - ", shouldCloseConnection=" + shouldCloseConnection.get()); - } } } /** - * @param e * @return True if the exception is a fatal connection exception. */ private boolean isFatalConnectionException(final ExceptionResponse e) { @@ -1225,7 +1239,7 @@ public class RpcClient { if (shouldCloseConnection.compareAndSet(false, true)) { if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": marking at should close, reason =" + e.getMessage()); + LOG.debug(getName() + ": marking at should close, reason: " + e.getMessage()); } if (callSender != null) { callSender.close(); @@ -1447,10 +1461,12 @@ public class RpcClient { } } - Pair call(MethodDescriptor md, Message param, CellScanner cells, + Pair call(PayloadCarryingRpcController pcrc, + MethodDescriptor md, Message param, CellScanner cells, Message returnType, User ticket, InetSocketAddress addr, int rpcTimeout) throws InterruptedException, IOException { - return call(md, param, cells, returnType, ticket, addr, rpcTimeout, HConstants.NORMAL_QOS); + return + call(pcrc, md, param, cells, returnType, ticket, addr, rpcTimeout, HConstants.NORMAL_QOS); } /** Make a call, passing param, to the IPC server running at @@ -1465,16 +1481,34 @@ public class RpcClient { * @throws InterruptedException * @throws IOException */ - Pair call(MethodDescriptor md, Message param, CellScanner cells, + Pair call(PayloadCarryingRpcController pcrc, MethodDescriptor md, + Message param, CellScanner cells, Message returnType, User ticket, InetSocketAddress addr, int callTimeout, int priority) throws IOException, InterruptedException { - Call call = new Call(md, param, cells, returnType, callTimeout); - Connection connection = getConnection(ticket, call, addr, this.codec, this.compressor); + final Call call = new Call(md, param, cells, returnType, callTimeout); - CallFuture cts = null; - if (connection.callSender != null){ + final Connection connection = getConnection(ticket, call, addr, this.codec, this.compressor); + + final CallFuture cts; + if (connection.callSender != null) { cts = connection.callSender.sendCall(call, priority, Trace.currentSpan()); + if (pcrc != null) { + pcrc.notifyOnCancel(new RpcCallback() { + @Override + public void run(Object parameter) { + connection.callSender.remove(cts); + call.callComplete(); + } + }); + if (pcrc.isCanceled()) { + // To finish if the call was cancelled before we set the notification (race condition) + call.callComplete(); + return new Pair(call.response, call.cells); + } + } + } else { + cts = null; connection.tracedWriteRequest(call, priority, Trace.currentSpan()); } @@ -1663,7 +1697,7 @@ public class RpcClient { } Pair val; try { - val = call(md, param, cells, returnType, ticket, isa, callTimeout, + val = call(pcrc, md, param, cells, returnType, ticket, isa, callTimeout, pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS); if (pcrc != null) { // Shove the results into controller so can be carried across the proxy/pb service void. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java index 511738825e6..ec98a5f26b6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java @@ -22,12 +22,18 @@ package org.apache.hadoop.hbase.ipc; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + public class TimeLimitedRpcController implements RpcController { /** * The time, in ms before the call should expire. */ - protected Integer callTimeout; + protected volatile Integer callTimeout; + protected volatile boolean cancelled = false; + protected final AtomicReference> cancellationCb = + new AtomicReference>(null); public Integer getCallTimeout() { return callTimeout; @@ -53,12 +59,12 @@ public class TimeLimitedRpcController implements RpcController { @Override public boolean isCanceled() { - throw new UnsupportedOperationException(); + return cancelled; } @Override - public void notifyOnCancel(RpcCallback arg0) { - throw new UnsupportedOperationException(); + public void notifyOnCancel(RpcCallback cancellationCb) { + this.cancellationCb.set(cancellationCb); } @Override @@ -73,6 +79,9 @@ public class TimeLimitedRpcController implements RpcController { @Override public void startCancel() { - throw new UnsupportedOperationException(); + cancelled = true; + if (cancellationCb.get() != null) { + cancellationCb.get().run(null); + } } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java index 35bb2599537..d56055a668d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java @@ -59,7 +59,8 @@ public class ExceptionUtil { if (t instanceof InterruptedIOException) return (InterruptedIOException) t; if (t instanceof InterruptedException || t instanceof ClosedByInterruptException) { - InterruptedIOException iie = new InterruptedIOException(); + InterruptedIOException iie = + new InterruptedIOException("Origin: " + t.getClass().getSimpleName()); iie.initCause(t); return iie; } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java index 8ea27bfd553..8471cb4d26a 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java @@ -149,7 +149,7 @@ public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase { conf.getLong("hbase.regionserver.storefile.refresh.period", 0) > 0); // enable client-side settings - conf.setBoolean(RpcClient.ALLOWS_INTERRUPTS, true); + conf.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, true); // TODO: expose these settings to CLI override conf.setLong("hbase.client.primaryCallTimeout.get", primaryTimeout); conf.setLong("hbase.client.primaryCallTimeout.multiget", primaryTimeout); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java index 8b0b56809b4..940dbcaa9b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java @@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceAudience; */ @InterfaceAudience.Private class BufferChain { - private static final ByteBuffer [] FOR_TOARRAY_TYPE = new ByteBuffer[0]; private final ByteBuffer[] buffers; private int remaining = 0; private int bufferOffset = 0; @@ -44,7 +43,7 @@ class BufferChain { bbs.add(b); this.remaining += b.remaining(); } - this.buffers = bbs.toArray(FOR_TOARRAY_TYPE); + this.buffers = bbs.toArray(new ByteBuffer[bbs.size()]); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java index fc234f653d5..9ef5d232d7a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java @@ -143,7 +143,7 @@ abstract public class Subprocedure implements Callable { * * This would normally be executed by the ProcedureMemeber when a acquire message comes from the * coordinator. Rpcs are used to spend message back to the coordinator after different phases - * are executed. Any exceptions caught during the execution (except for InterrupedException) get + * are executed. Any exceptions caught during the execution (except for InterruptedException) get * converted and propagated to coordinator via {@link ProcedureMemberRpcs#sendMemberAborted( * Subprocedure, ForeignException)}. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index e0f1dfb04e6..54232b6a43c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -343,7 +343,7 @@ public class TestHCM { c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 0); // don't wait between retries. c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire - c2.setBoolean(RpcClient.ALLOWS_INTERRUPTS, allowsInterrupt); + c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt); final HTable table = new HTable(c2, tableName.getBytes()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index 128a91a51fa..138103eb990 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -190,7 +190,7 @@ public class TestIPC { MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); final String message = "hello"; EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build(); - Pair r = client.call(md, param, null, + Pair r = client.call(null, md, param, null, md.getOutputType().toProto(), User.getCurrent(), address, 0); assertTrue(r.getSecond() == null); // Silly assertion that the message is in the returned pb. @@ -229,7 +229,7 @@ public class TestIPC { InetSocketAddress address = rpcServer.getListenerAddress(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - Pair r = client.call(md, param, CellUtil.createCellScanner(cells), + Pair r = client.call(null, md, param, CellUtil.createCellScanner(cells), md.getOutputType().toProto(), User.getCurrent(), address, 0); int index = 0; while (r.getSecond().advance()) { @@ -263,7 +263,7 @@ public class TestIPC { InetSocketAddress address = rpcServer.getListenerAddress(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - client.call(md, param, null, null, User.getCurrent(), address, 0); + client.call(null, md, param, null, null, User.getCurrent(), address, 0); fail("Expected an exception to have been thrown!"); } catch (Exception e) { LOG.info("Caught expected exception: " + e.toString()); @@ -287,7 +287,7 @@ public class TestIPC { MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); for (int i = 0; i < 10; i++) { - client.call(md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)), + client.call(null, md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)), md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), 0); } verify(scheduler, times(10)).dispatch((CallRunner) anyObject()); @@ -342,7 +342,7 @@ public class TestIPC { } CellScanner cellScanner = CellUtil.createCellScanner(cells); Pair response = - client.call(md, builder.build(), cellScanner, param, user, address, 0); + client.call(null, md, builder.build(), cellScanner, param, user, address, 0); /* int count = 0; while (p.getSecond().advance()) {