diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/CallDroppedException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallDroppedException.java new file mode 100644 index 00000000000..ed141530ebc --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallDroppedException.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.io.IOException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Returned to the clients when their request was discarded due to server being overloaded. + * Clients should retry upon receiving it. + */ +@SuppressWarnings("serial") +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class CallDroppedException extends IOException { + public CallDroppedException() { + super(); + } + + // Absence of this constructor prevents proper unwrapping of + // remote exception on the client side + public CallDroppedException(String message) { + super(message); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java index 95ca9885d47..9f8b3869e69 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java @@ -31,6 +31,8 @@ public class CallQueueTooBigException extends IOException { super(); } + // Absence of this constructor prevents proper unwrapping of + // remote exception on the client side public CallQueueTooBigException(String message) { super(message); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java index c87d6c7e0e6..fed87c119c0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java @@ -175,7 +175,8 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { boolean isLocalException = !(t2 instanceof RemoteException); if ((isLocalException && ClientExceptionsUtil.isConnectionException(t2)) || - ClientExceptionsUtil.isCallQueueTooBigException(t2)) { + ClientExceptionsUtil.isCallQueueTooBigException(t2) || + ClientExceptionsUtil.isCallDroppedException(t2)) { couldNotCommunicateWithServer.setValue(true); guaranteedClientSideOnly.setValue(!(t2 instanceof CallTimeoutException)); handleFailureToServer(serverName, t2); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java index cf2a16f6fce..f367ed9ecae 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java @@ -28,6 +28,7 @@ import java.net.SocketTimeoutException; import java.nio.channels.ClosedChannelException; import java.util.concurrent.TimeoutException; +import org.apache.hadoop.hbase.CallDroppedException; import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.MultiActionResultTooLarge; @@ -61,7 +62,8 @@ public final class ClientExceptionsUtil { return (cur instanceof RegionMovedException || cur instanceof RegionOpeningException || cur instanceof RegionTooBusyException || cur instanceof ThrottlingException || cur instanceof MultiActionResultTooLarge || cur instanceof RetryImmediatelyException - || cur instanceof CallQueueTooBigException || cur instanceof NotServingRegionException); + || cur instanceof CallQueueTooBigException || cur instanceof CallDroppedException + || cur instanceof NotServingRegionException); } @@ -118,6 +120,17 @@ public final class ClientExceptionsUtil { return (t instanceof CallQueueTooBigException); } + /** + * Checks if the exception is CallDroppedException (maybe wrapped + * into some RemoteException). + * @param t exception to check + * @return true if it's a CQTBE, false otherwise + */ + public static boolean isCallDroppedException(Throwable t) { + t = findException(t); + return (t instanceof CallDroppedException); + } + /** * Check if the exception is something that indicates that we cannot * contact/communicate with the server. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java index 266c6a27480..08c488b434d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java @@ -25,7 +25,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -56,16 +55,6 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue { private AtomicLong numGeneralCallsDropped; private AtomicLong numLifoModeSwitches; - /** - * Lock held by take ops, all other locks are inside queue impl. - * - * NOTE: We want to have this lock so that in case when there're lot of already expired - * calls in the call queue a handler thread calling take() can just grab lock once and - * then fast-forward through the expired calls to the first non-expired without having - * to contend for locks on every element in underlying queue. - */ - private final ReentrantLock lock = new ReentrantLock(); - // Both are in milliseconds private volatile int codelTargetDelay; private volatile int codelInterval; @@ -120,25 +109,20 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue { */ @Override public CallRunner take() throws InterruptedException { - final ReentrantLock lock = this.lock; - lock.lock(); - try { - CallRunner cr; - while(true) { - if (((double) queue.size() / this.maxCapacity) > lifoThreshold) { - numLifoModeSwitches.incrementAndGet(); - cr = queue.takeLast(); - } else { - cr = queue.takeFirst(); - } - if (needToDrop(cr)) { - numGeneralCallsDropped.incrementAndGet(); - } else { - return cr; - } + CallRunner cr; + while(true) { + if (((double) queue.size() / this.maxCapacity) > lifoThreshold) { + numLifoModeSwitches.incrementAndGet(); + cr = queue.takeLast(); + } else { + cr = queue.takeFirst(); + } + if (needToDrop(cr)) { + numGeneralCallsDropped.incrementAndGet(); + cr.drop(); + } else { + return cr; } - } finally { - lock.unlock(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index a9cf0f1f253..3514245c0f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -21,6 +21,7 @@ import java.nio.channels.ClosedChannelException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CallDroppedException; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -45,6 +46,9 @@ import com.google.protobuf.Message; public class CallRunner { private static final Log LOG = LogFactory.getLog(CallRunner.class); + private static final CallDroppedException CALL_DROPPED_EXCEPTION + = new CallDroppedException(); + private Call call; private RpcServerInterface rpcServer; private MonitoredRPCHandler status; @@ -161,4 +165,38 @@ public class CallRunner { cleanup(); } } + + /** + * When we want to drop this call because of server is overloaded. + */ + public void drop() { + try { + if (!call.connection.channel.isOpen()) { + if (RpcServer.LOG.isDebugEnabled()) { + RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call); + } + return; + } + + // Set the response + InetSocketAddress address = rpcServer.getListenerAddress(); + call.setResponse(null, null, CALL_DROPPED_EXCEPTION, "Call dropped, server " + + (address != null ? address : "(channel closed)") + " is overloaded, please retry."); + call.sendResponseIfReady(); + } catch (ClosedChannelException cce) { + InetSocketAddress address = rpcServer.getListenerAddress(); + RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " + + "this means that the server " + (address != null ? address : "(channel closed)") + + " was processing a request but the client went away. The error message was: " + + cce.getMessage()); + } catch (Exception e) { + RpcServer.LOG.warn(Thread.currentThread().getName() + + ": caught: " + StringUtils.stringifyException(e)); + } finally { + if (!sucessful) { + this.rpcServer.addCallSize(call.getSize() * -1); + } + cleanup(); + } + } }