HBASE-15703 Deadline scheduler needs to return to the client info about skipped calls, not just drop them

This commit is contained in:
Mikhail Antonov 2016-05-02 15:23:07 -07:00
parent bbc7b90335
commit 58c4c3d174
6 changed files with 112 additions and 31 deletions

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Returned to the clients when their request was discarded due to server being overloaded.
* Clients should retry upon receiving it.
*/
@SuppressWarnings("serial")
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class CallDroppedException extends IOException {
public CallDroppedException() {
super();
}
// Absence of this constructor prevents proper unwrapping of
// remote exception on the client side
public CallDroppedException(String message) {
super(message);
}
}

View File

@ -31,6 +31,8 @@ public class CallQueueTooBigException extends IOException {
super();
}
// Absence of this constructor prevents proper unwrapping of
// remote exception on the client side
public CallQueueTooBigException(String message) {
super(message);
}

View File

@ -175,7 +175,8 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
boolean isLocalException = !(t2 instanceof RemoteException);
if ((isLocalException && ClientExceptionsUtil.isConnectionException(t2)) ||
ClientExceptionsUtil.isCallQueueTooBigException(t2)) {
ClientExceptionsUtil.isCallQueueTooBigException(t2) ||
ClientExceptionsUtil.isCallDroppedException(t2)) {
couldNotCommunicateWithServer.setValue(true);
guaranteedClientSideOnly.setValue(!(t2 instanceof CallTimeoutException));
handleFailureToServer(serverName, t2);

View File

@ -28,6 +28,7 @@ import java.net.SocketTimeoutException;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.CallDroppedException;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
@ -61,7 +62,8 @@ public final class ClientExceptionsUtil {
return (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
|| cur instanceof RegionTooBusyException || cur instanceof ThrottlingException
|| cur instanceof MultiActionResultTooLarge || cur instanceof RetryImmediatelyException
|| cur instanceof CallQueueTooBigException || cur instanceof NotServingRegionException);
|| cur instanceof CallQueueTooBigException || cur instanceof CallDroppedException
|| cur instanceof NotServingRegionException);
}
@ -118,6 +120,17 @@ public final class ClientExceptionsUtil {
return (t instanceof CallQueueTooBigException);
}
/**
* Checks if the exception is CallDroppedException (maybe wrapped
* into some RemoteException).
* @param t exception to check
* @return true if it's a CQTBE, false otherwise
*/
public static boolean isCallDroppedException(Throwable t) {
t = findException(t);
return (t instanceof CallDroppedException);
}
/**
* Check if the exception is something that indicates that we cannot
* contact/communicate with the server.

View File

@ -25,7 +25,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -56,16 +55,6 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
private AtomicLong numGeneralCallsDropped;
private AtomicLong numLifoModeSwitches;
/**
* Lock held by take ops, all other locks are inside queue impl.
*
* NOTE: We want to have this lock so that in case when there're lot of already expired
* calls in the call queue a handler thread calling take() can just grab lock once and
* then fast-forward through the expired calls to the first non-expired without having
* to contend for locks on every element in underlying queue.
*/
private final ReentrantLock lock = new ReentrantLock();
// Both are in milliseconds
private volatile int codelTargetDelay;
private volatile int codelInterval;
@ -120,25 +109,20 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
*/
@Override
public CallRunner take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
CallRunner cr;
while(true) {
if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
numLifoModeSwitches.incrementAndGet();
cr = queue.takeLast();
} else {
cr = queue.takeFirst();
}
if (needToDrop(cr)) {
numGeneralCallsDropped.incrementAndGet();
} else {
return cr;
}
CallRunner cr;
while(true) {
if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
numLifoModeSwitches.incrementAndGet();
cr = queue.takeLast();
} else {
cr = queue.takeFirst();
}
if (needToDrop(cr)) {
numGeneralCallsDropped.incrementAndGet();
cr.drop();
} else {
return cr;
}
} finally {
lock.unlock();
}
}

View File

@ -21,6 +21,7 @@ import java.nio.channels.ClosedChannelException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CallDroppedException;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -45,6 +46,9 @@ import com.google.protobuf.Message;
public class CallRunner {
private static final Log LOG = LogFactory.getLog(CallRunner.class);
private static final CallDroppedException CALL_DROPPED_EXCEPTION
= new CallDroppedException();
private Call call;
private RpcServerInterface rpcServer;
private MonitoredRPCHandler status;
@ -161,4 +165,38 @@ public class CallRunner {
cleanup();
}
}
/**
* When we want to drop this call because of server is overloaded.
*/
public void drop() {
try {
if (!call.connection.channel.isOpen()) {
if (RpcServer.LOG.isDebugEnabled()) {
RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
}
return;
}
// Set the response
InetSocketAddress address = rpcServer.getListenerAddress();
call.setResponse(null, null, CALL_DROPPED_EXCEPTION, "Call dropped, server "
+ (address != null ? address : "(channel closed)") + " is overloaded, please retry.");
call.sendResponseIfReady();
} catch (ClosedChannelException cce) {
InetSocketAddress address = rpcServer.getListenerAddress();
RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
"this means that the server " + (address != null ? address : "(channel closed)") +
" was processing a request but the client went away. The error message was: " +
cce.getMessage());
} catch (Exception e) {
RpcServer.LOG.warn(Thread.currentThread().getName()
+ ": caught: " + StringUtils.stringifyException(e));
} finally {
if (!sucessful) {
this.rpcServer.addCallSize(call.getSize() * -1);
}
cleanup();
}
}
}