HBASE-15146 Don't block on Reader threads queueing to a scheduler queue
This commit is contained in:
parent
47c4147940
commit
138b754671
|
@ -0,0 +1,33 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class CallQueueTooBigException extends IOException {
|
||||
public CallQueueTooBigException() {
|
||||
super();
|
||||
}
|
||||
}
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -1174,9 +1175,9 @@ class AsyncProcess {
|
|||
byte[] row = e.getValue().iterator().next().getAction().getRow();
|
||||
// Do not use the exception for updating cache because it might be coming from
|
||||
// any of the regions in the MultiAction.
|
||||
// TODO: depending on type of exception we might not want to update cache at all?
|
||||
if (tableName != null) {
|
||||
connection.updateCachedLocations(tableName, regionName, row, null, server);
|
||||
connection.updateCachedLocations(tableName, regionName, row,
|
||||
ClientExceptionsUtil.isMetaClearingException(t) ? null : t, server);
|
||||
}
|
||||
for (Action<Row> action : e.getValue()) {
|
||||
Retry retry = manageError(
|
||||
|
@ -1293,7 +1294,7 @@ class AsyncProcess {
|
|||
// Failure: retry if it's make sense else update the errors lists
|
||||
if (result == null || result instanceof Throwable) {
|
||||
Row row = sentAction.getAction();
|
||||
throwable = ConnectionImplementation.findException(result);
|
||||
throwable = ClientExceptionsUtil.findException(result);
|
||||
// Register corresponding failures once per server/once per region.
|
||||
if (!regionFailureRegistered) {
|
||||
regionFailureRegistered = true;
|
||||
|
@ -1316,7 +1317,7 @@ class AsyncProcess {
|
|||
++failed;
|
||||
}
|
||||
} else {
|
||||
|
||||
|
||||
if (AsyncProcess.this.connection.getConnectionMetrics() != null) {
|
||||
AsyncProcess.this.connection.getConnectionMetrics().
|
||||
updateServerStats(server, regionName, result);
|
||||
|
|
|
@ -35,10 +35,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
|
|||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.RegionTooBusyException;
|
||||
import org.apache.hadoop.hbase.RetryImmediatelyException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotEnabledException;
|
||||
|
@ -48,8 +45,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
|
||||
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
|
@ -68,7 +65,6 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SecurityCapabilit
|
|||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
|
||||
import org.apache.hadoop.hbase.quotas.ThrottlingException;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -282,47 +278,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
return ng;
|
||||
}
|
||||
|
||||
/**
|
||||
* Look for an exception we know in the remote exception:
|
||||
* - hadoop.ipc wrapped exceptions
|
||||
* - nested exceptions
|
||||
*
|
||||
* Looks for: RegionMovedException / RegionOpeningException / RegionTooBusyException /
|
||||
* ThrottlingException
|
||||
* @return null if we didn't find the exception, the exception otherwise.
|
||||
*/
|
||||
public static Throwable findException(Object exception) {
|
||||
if (exception == null || !(exception instanceof Throwable)) {
|
||||
return null;
|
||||
}
|
||||
Throwable cur = (Throwable) exception;
|
||||
while (cur != null) {
|
||||
if (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
|
||||
|| cur instanceof RegionTooBusyException || cur instanceof ThrottlingException
|
||||
|| cur instanceof RetryImmediatelyException) {
|
||||
return cur;
|
||||
}
|
||||
if (cur instanceof RemoteException) {
|
||||
RemoteException re = (RemoteException) cur;
|
||||
cur = re.unwrapRemoteException(
|
||||
RegionOpeningException.class, RegionMovedException.class,
|
||||
RegionTooBusyException.class);
|
||||
if (cur == null) {
|
||||
cur = re.unwrapRemoteException();
|
||||
}
|
||||
// unwrapRemoteException can return the exception given as a parameter when it cannot
|
||||
// unwrap it. In this case, there is no need to look further
|
||||
// noinspection ObjectEquality
|
||||
if (cur == re) {
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
cur = cur.getCause();
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HTableInterface getTable(String tableName) throws IOException {
|
||||
|
@ -1908,10 +1863,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
}
|
||||
|
||||
HRegionInfo regionInfo = oldLocation.getRegionInfo();
|
||||
Throwable cause = findException(exception);
|
||||
Throwable cause = ClientExceptionsUtil.findException(exception);
|
||||
if (cause != null) {
|
||||
if (cause instanceof RegionTooBusyException || cause instanceof RegionOpeningException
|
||||
|| cause instanceof ThrottlingException || cause instanceof MultiActionResultTooLarge) {
|
||||
if (!ClientExceptionsUtil.isMetaClearingException(cause)) {
|
||||
// We know that the region is still on this region server
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,95 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.exceptions;
|
||||
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
|
||||
import org.apache.hadoop.hbase.RegionTooBusyException;
|
||||
import org.apache.hadoop.hbase.RetryImmediatelyException;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.quotas.ThrottlingException;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public final class ClientExceptionsUtil {
|
||||
|
||||
private ClientExceptionsUtil() {}
|
||||
|
||||
public static boolean isMetaClearingException(Throwable cur) {
|
||||
cur = findException(cur);
|
||||
|
||||
if (cur == null) {
|
||||
return true;
|
||||
}
|
||||
return !isSpecialException(cur) || (cur instanceof RegionMovedException);
|
||||
}
|
||||
|
||||
public static boolean isSpecialException(Throwable cur) {
|
||||
return (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
|
||||
|| cur instanceof RegionTooBusyException || cur instanceof ThrottlingException
|
||||
|| cur instanceof MultiActionResultTooLarge || cur instanceof RetryImmediatelyException
|
||||
|| cur instanceof CallQueueTooBigException);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Look for an exception we know in the remote exception:
|
||||
* - hadoop.ipc wrapped exceptions
|
||||
* - nested exceptions
|
||||
*
|
||||
* Looks for: RegionMovedException / RegionOpeningException / RegionTooBusyException /
|
||||
* ThrottlingException
|
||||
* @return null if we didn't find the exception, the exception otherwise.
|
||||
*/
|
||||
public static Throwable findException(Object exception) {
|
||||
if (exception == null || !(exception instanceof Throwable)) {
|
||||
return null;
|
||||
}
|
||||
Throwable cur = (Throwable) exception;
|
||||
while (cur != null) {
|
||||
if (isSpecialException(cur)) {
|
||||
return cur;
|
||||
}
|
||||
if (cur instanceof RemoteException) {
|
||||
RemoteException re = (RemoteException) cur;
|
||||
cur = re.unwrapRemoteException(
|
||||
RegionOpeningException.class, RegionMovedException.class,
|
||||
RegionTooBusyException.class);
|
||||
if (cur == null) {
|
||||
cur = re.unwrapRemoteException();
|
||||
}
|
||||
// unwrapRemoteException can return the exception given as a parameter when it cannot
|
||||
// unwrap it. In this case, there is no need to look further
|
||||
// noinspection ObjectEquality
|
||||
if (cur == re) {
|
||||
return cur;
|
||||
}
|
||||
} else if (cur.getCause() != null) {
|
||||
cur = cur.getCause();
|
||||
} else {
|
||||
return cur;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.CategoryBasedTimeout;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -218,28 +219,35 @@ public class TestAsyncProcess {
|
|||
|
||||
static class CallerWithFailure extends RpcRetryingCallerImpl<MultiResponse>{
|
||||
|
||||
public CallerWithFailure() {
|
||||
private final IOException e;
|
||||
|
||||
public CallerWithFailure(IOException e) {
|
||||
super(100, 100, 9);
|
||||
this.e = e;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
|
||||
throws IOException, RuntimeException {
|
||||
throw new IOException("test");
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static class AsyncProcessWithFailure extends MyAsyncProcess {
|
||||
|
||||
public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf) {
|
||||
private final IOException ioe;
|
||||
|
||||
public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) {
|
||||
super(hc, conf, true);
|
||||
this.ioe = ioe;
|
||||
serverTrackerTimeout = 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
|
||||
callsCt.incrementAndGet();
|
||||
return new CallerWithFailure();
|
||||
return new CallerWithFailure(ioe);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -825,7 +833,7 @@ public class TestAsyncProcess {
|
|||
public void testGlobalErrors() throws IOException {
|
||||
ClusterConnection conn = new MyConnectionImpl(conf);
|
||||
BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
|
||||
AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf);
|
||||
AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new IOException("test"));
|
||||
mutator.ap = ap;
|
||||
|
||||
Assert.assertNotNull(mutator.ap.createServerErrorTracker());
|
||||
|
@ -842,6 +850,27 @@ public class TestAsyncProcess {
|
|||
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testCallQueueTooLarge() throws IOException {
|
||||
ClusterConnection conn = new MyConnectionImpl(conf);
|
||||
BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
|
||||
AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new CallQueueTooBigException());
|
||||
mutator.ap = ap;
|
||||
|
||||
Assert.assertNotNull(mutator.ap.createServerErrorTracker());
|
||||
|
||||
Put p = createPut(1, true);
|
||||
mutator.mutate(p);
|
||||
|
||||
try {
|
||||
mutator.flush();
|
||||
Assert.fail();
|
||||
} catch (RetriesExhaustedWithDetailsException expected) {
|
||||
}
|
||||
// Checking that the ErrorsServers came into play and didn't make us stop immediately
|
||||
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
|
||||
}
|
||||
/**
|
||||
* This test simulates multiple regions on 2 servers. We should have 2 multi requests and
|
||||
* 2 threads: 1 per server, this whatever the number of regions.
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.exceptions;
|
||||
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
@SuppressWarnings("ThrowableInstanceNeverThrown")
|
||||
public class TestClientExceptionsUtil {
|
||||
|
||||
@Test
|
||||
public void testFindException() throws Exception {
|
||||
IOException ioe = new IOException("Tesst");
|
||||
ServiceException se = new ServiceException(ioe);
|
||||
assertEquals(ioe, ClientExceptionsUtil.findException(se));
|
||||
}
|
||||
}
|
|
@ -72,9 +72,9 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void dispatch(final CallRunner callTask) throws InterruptedException {
|
||||
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
|
||||
int queueIndex = balancer.getNextQueue();
|
||||
queues.get(queueIndex).put(callTask);
|
||||
return queues.get(queueIndex).offer(callTask);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.io.IOException;
|
|||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* A very simple {@code }RpcScheduler} that serves incoming requests in order.
|
||||
|
@ -34,6 +35,7 @@ public class FifoRpcScheduler extends RpcScheduler {
|
|||
|
||||
private final int handlerCount;
|
||||
private final int maxQueueLength;
|
||||
private final AtomicInteger queueSize = new AtomicInteger(0);
|
||||
private ThreadPoolExecutor executor;
|
||||
|
||||
public FifoRpcScheduler(Configuration conf, int handlerCount) {
|
||||
|
@ -65,14 +67,22 @@ public class FifoRpcScheduler extends RpcScheduler {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void dispatch(final CallRunner task) throws IOException, InterruptedException {
|
||||
public boolean dispatch(final CallRunner task) throws IOException, InterruptedException {
|
||||
// Executors provide no offer, so make our own.
|
||||
int queued = queueSize.getAndIncrement();
|
||||
if (maxQueueLength > 0 && queued >= maxQueueLength) {
|
||||
queueSize.decrementAndGet();
|
||||
return false;
|
||||
}
|
||||
executor.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
task.setStatus(RpcServer.getStatus());
|
||||
task.run();
|
||||
queueSize.decrementAndGet();
|
||||
}
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -160,7 +160,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void dispatch(final CallRunner callTask) throws InterruptedException {
|
||||
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
|
||||
RpcServer.Call call = callTask.getCall();
|
||||
int queueIndex;
|
||||
if (isWriteRequest(call.getHeader(), call.param)) {
|
||||
|
@ -170,7 +170,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
|
|||
} else {
|
||||
queueIndex = numWriteQueues + readBalancer.getNextQueue();
|
||||
}
|
||||
queues.get(queueIndex).put(callTask);
|
||||
return queues.get(queueIndex).offer(callTask);
|
||||
}
|
||||
|
||||
private boolean isWriteRequest(final RequestHeader header, final Message param) {
|
||||
|
|
|
@ -86,7 +86,7 @@ public abstract class RpcExecutor {
|
|||
public abstract int getQueueLength();
|
||||
|
||||
/** Add the request to the executor queue */
|
||||
public abstract void dispatch(final CallRunner callTask) throws InterruptedException;
|
||||
public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException;
|
||||
|
||||
/** Returns the list of request queues */
|
||||
protected abstract List<BlockingQueue<CallRunner>> getQueues();
|
||||
|
|
|
@ -58,7 +58,7 @@ public abstract class RpcScheduler {
|
|||
*
|
||||
* @param task the request to be dispatched
|
||||
*/
|
||||
public abstract void dispatch(CallRunner task) throws IOException, InterruptedException;
|
||||
public abstract boolean dispatch(CallRunner task) throws IOException, InterruptedException;
|
||||
|
||||
/** Retrieves length of the general queue for metrics. */
|
||||
public abstract int getGeneralQueueLength();
|
||||
|
|
|
@ -67,6 +67,7 @@ import javax.security.sasl.SaslServer;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -1175,13 +1176,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
public static class CallQueueTooBigException extends IOException {
|
||||
CallQueueTooBigException() {
|
||||
super();
|
||||
}
|
||||
}
|
||||
|
||||
/** Reads calls from a connection and queues them for handling. */
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
|
||||
value="VO_VOLATILE_INCREMENT",
|
||||
|
@ -1880,7 +1874,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
: null;
|
||||
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
|
||||
totalRequestSize, traceInfo, this.addr);
|
||||
scheduler.dispatch(new CallRunner(RpcServer.this, call));
|
||||
|
||||
if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
|
||||
callQueueSize.add(-1 * call.getSize());
|
||||
|
||||
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
|
||||
metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
|
||||
InetSocketAddress address = getListenerAddress();
|
||||
setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
|
||||
"Call queue is full on " + (address != null ? address : "(channel closed)") +
|
||||
", too many items queued ?");
|
||||
responder.doRespond(call);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean authorizeConnection() throws IOException {
|
||||
|
|
|
@ -190,15 +190,15 @@ public class SimpleRpcScheduler extends RpcScheduler {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void dispatch(CallRunner callTask) throws InterruptedException {
|
||||
public boolean dispatch(CallRunner callTask) throws InterruptedException {
|
||||
RpcServer.Call call = callTask.getCall();
|
||||
int level = priority.getPriority(call.getHeader(), call.param, call.getRequestUser());
|
||||
if (priorityExecutor != null && level > highPriorityLevel) {
|
||||
priorityExecutor.dispatch(callTask);
|
||||
return priorityExecutor.dispatch(callTask);
|
||||
} else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
|
||||
replicationExecutor.dispatch(callTask);
|
||||
return replicationExecutor.dispatch(callTask);
|
||||
} else {
|
||||
callExecutor.dispatch(callTask);
|
||||
return callExecutor.dispatch(callTask);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.Waiter;
|
|||
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
|
@ -687,7 +688,7 @@ public class TestHCM {
|
|||
Assert.assertArrayEquals(e.getRow(0).getRow(), ROW);
|
||||
|
||||
// Check that we unserialized the exception as expected
|
||||
Throwable cause = ConnectionImplementation.findException(e.getCause(0));
|
||||
Throwable cause = ClientExceptionsUtil.findException(e.getCause(0));
|
||||
Assert.assertNotNull(cause);
|
||||
Assert.assertTrue(cause instanceof RegionMovedException);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue