HBASE-15703 Deadline scheduler needs to return to the client info about skipped calls, not just drop them
This commit is contained in:
parent
36d634d353
commit
7e0e86072a
|
@ -0,0 +1,43 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Returned to the clients when their request was discarded due to server being overloaded.
|
||||
* Clients should retry upon receiving it.
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class CallDroppedException extends IOException {
|
||||
public CallDroppedException() {
|
||||
super();
|
||||
}
|
||||
|
||||
// Absence of this constructor prevents proper unwrapping of
|
||||
// remote exception on the client side
|
||||
public CallDroppedException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
|
@ -31,6 +31,8 @@ public class CallQueueTooBigException extends IOException {
|
|||
super();
|
||||
}
|
||||
|
||||
// Absence of this constructor prevents proper unwrapping of
|
||||
// remote exception on the client side
|
||||
public CallQueueTooBigException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
|
|
@ -175,7 +175,8 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
|
|||
boolean isLocalException = !(t2 instanceof RemoteException);
|
||||
|
||||
if ((isLocalException && ClientExceptionsUtil.isConnectionException(t2)) ||
|
||||
ClientExceptionsUtil.isCallQueueTooBigException(t2)) {
|
||||
ClientExceptionsUtil.isCallQueueTooBigException(t2) ||
|
||||
ClientExceptionsUtil.isCallDroppedException(t2)) {
|
||||
couldNotCommunicateWithServer.setValue(true);
|
||||
guaranteedClientSideOnly.setValue(!(t2 instanceof CallTimeoutException));
|
||||
handleFailureToServer(serverName, t2);
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.net.SocketTimeoutException;
|
|||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.hadoop.hbase.CallDroppedException;
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
|
||||
|
@ -61,7 +62,8 @@ public final class ClientExceptionsUtil {
|
|||
return (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
|
||||
|| cur instanceof RegionTooBusyException || cur instanceof ThrottlingException
|
||||
|| cur instanceof MultiActionResultTooLarge || cur instanceof RetryImmediatelyException
|
||||
|| cur instanceof CallQueueTooBigException || cur instanceof NotServingRegionException);
|
||||
|| cur instanceof CallQueueTooBigException || cur instanceof CallDroppedException
|
||||
|| cur instanceof NotServingRegionException);
|
||||
}
|
||||
|
||||
|
||||
|
@ -118,6 +120,17 @@ public final class ClientExceptionsUtil {
|
|||
return (t instanceof CallQueueTooBigException);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the exception is CallDroppedException (maybe wrapped
|
||||
* into some RemoteException).
|
||||
* @param t exception to check
|
||||
* @return true if it's a CQTBE, false otherwise
|
||||
*/
|
||||
public static boolean isCallDroppedException(Throwable t) {
|
||||
t = findException(t);
|
||||
return (t instanceof CallDroppedException);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the exception is something that indicates that we cannot
|
||||
* contact/communicate with the server.
|
||||
|
|
|
@ -25,7 +25,6 @@ import java.util.concurrent.LinkedBlockingQueue;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -56,16 +55,6 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
|
|||
private AtomicLong numGeneralCallsDropped;
|
||||
private AtomicLong numLifoModeSwitches;
|
||||
|
||||
/**
|
||||
* Lock held by take ops, all other locks are inside queue impl.
|
||||
*
|
||||
* NOTE: We want to have this lock so that in case when there're lot of already expired
|
||||
* calls in the call queue a handler thread calling take() can just grab lock once and
|
||||
* then fast-forward through the expired calls to the first non-expired without having
|
||||
* to contend for locks on every element in underlying queue.
|
||||
*/
|
||||
private final ReentrantLock lock = new ReentrantLock();
|
||||
|
||||
// Both are in milliseconds
|
||||
private volatile int codelTargetDelay;
|
||||
private volatile int codelInterval;
|
||||
|
@ -120,25 +109,20 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
|
|||
*/
|
||||
@Override
|
||||
public CallRunner take() throws InterruptedException {
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try {
|
||||
CallRunner cr;
|
||||
while(true) {
|
||||
if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
|
||||
numLifoModeSwitches.incrementAndGet();
|
||||
cr = queue.takeLast();
|
||||
} else {
|
||||
cr = queue.takeFirst();
|
||||
}
|
||||
if (needToDrop(cr)) {
|
||||
numGeneralCallsDropped.incrementAndGet();
|
||||
} else {
|
||||
return cr;
|
||||
}
|
||||
CallRunner cr;
|
||||
while(true) {
|
||||
if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
|
||||
numLifoModeSwitches.incrementAndGet();
|
||||
cr = queue.takeLast();
|
||||
} else {
|
||||
cr = queue.takeFirst();
|
||||
}
|
||||
if (needToDrop(cr)) {
|
||||
numGeneralCallsDropped.incrementAndGet();
|
||||
cr.drop();
|
||||
} else {
|
||||
return cr;
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.nio.channels.ClosedChannelException;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.CallDroppedException;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
@ -45,6 +46,9 @@ import com.google.protobuf.Message;
|
|||
public class CallRunner {
|
||||
private static final Log LOG = LogFactory.getLog(CallRunner.class);
|
||||
|
||||
private static final CallDroppedException CALL_DROPPED_EXCEPTION
|
||||
= new CallDroppedException();
|
||||
|
||||
private Call call;
|
||||
private RpcServerInterface rpcServer;
|
||||
private MonitoredRPCHandler status;
|
||||
|
@ -161,4 +165,38 @@ public class CallRunner {
|
|||
cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* When we want to drop this call because of server is overloaded.
|
||||
*/
|
||||
public void drop() {
|
||||
try {
|
||||
if (!call.connection.channel.isOpen()) {
|
||||
if (RpcServer.LOG.isDebugEnabled()) {
|
||||
RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Set the response
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
call.setResponse(null, null, CALL_DROPPED_EXCEPTION, "Call dropped, server "
|
||||
+ (address != null ? address : "(channel closed)") + " is overloaded, please retry.");
|
||||
call.sendResponseIfReady();
|
||||
} catch (ClosedChannelException cce) {
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
|
||||
"this means that the server " + (address != null ? address : "(channel closed)") +
|
||||
" was processing a request but the client went away. The error message was: " +
|
||||
cce.getMessage());
|
||||
} catch (Exception e) {
|
||||
RpcServer.LOG.warn(Thread.currentThread().getName()
|
||||
+ ": caught: " + StringUtils.stringifyException(e));
|
||||
} finally {
|
||||
if (!sucessful) {
|
||||
this.rpcServer.addCallSize(call.getSize() * -1);
|
||||
}
|
||||
cleanup();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue