HBASE-15146 Don't block on Reader threads queueing to a scheduler queue

This commit is contained in:
Elliott Clark 2016-01-20 17:43:22 -08:00
parent 630ad95c92
commit 421fe24e9b
14 changed files with 240 additions and 71 deletions

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@SuppressWarnings("serial")
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class CallQueueTooBigException extends IOException {
public CallQueueTooBigException() {
super();
}
}

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -1293,7 +1294,7 @@ class AsyncProcess {
// Failure: retry if it's make sense else update the errors lists
if (result == null || result instanceof Throwable) {
Row row = sentAction.getAction();
throwable = ConnectionManager.findException(result);
throwable = ClientExceptionsUtil.findException(result);
// Register corresponding failures once per server/once per region.
if (!regionFailureRegistered) {
regionFailureRegistered = true;

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.ipc.RpcClient;
@ -2216,10 +2217,9 @@ class ConnectionManager {
}
HRegionInfo regionInfo = oldLocation.getRegionInfo();
Throwable cause = findException(exception);
Throwable cause = ClientExceptionsUtil.findException(exception);
if (cause != null) {
if (cause instanceof RegionTooBusyException || cause instanceof RegionOpeningException
|| cause instanceof ThrottlingException) {
if (!ClientExceptionsUtil.isMetaClearingException(cause)) {
// We know that the region is still on this region server
return;
}
@ -2698,46 +2698,4 @@ class ConnectionManager {
}
}
}
/**
* Look for an exception we know in the remote exception:
* - hadoop.ipc wrapped exceptions
* - nested exceptions
*
* Looks for: RegionMovedException / RegionOpeningException / RegionTooBusyException /
* ThrottlingException
* @return null if we didn't find the exception, the exception otherwise.
*/
public static Throwable findException(Object exception) {
if (exception == null || !(exception instanceof Throwable)) {
return null;
}
Throwable cur = (Throwable) exception;
while (cur != null) {
if (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
|| cur instanceof RegionTooBusyException || cur instanceof ThrottlingException
|| cur instanceof RetryImmediatelyException) {
return cur;
}
if (cur instanceof RemoteException) {
RemoteException re = (RemoteException) cur;
cur = re.unwrapRemoteException(
RegionOpeningException.class, RegionMovedException.class,
RegionTooBusyException.class);
if (cur == null) {
cur = re.unwrapRemoteException();
}
// unwrapRemoteException can return the exception given as a parameter when it cannot
// unwrap it. In this case, there is no need to look further
// noinspection ObjectEquality
if (cur == re) {
return null;
}
} else {
cur = cur.getCause();
}
}
return null;
}
}

View File

@ -0,0 +1,95 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.exceptions;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.RetryImmediatelyException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.quotas.ThrottlingException;
import org.apache.hadoop.ipc.RemoteException;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class ClientExceptionsUtil {
private ClientExceptionsUtil() {}
public static boolean isMetaClearingException(Throwable cur) {
cur = findException(cur);
if (cur == null) {
return true;
}
return !isSpecialException(cur) || (cur instanceof RegionMovedException);
}
public static boolean isSpecialException(Throwable cur) {
return (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
|| cur instanceof RegionTooBusyException || cur instanceof ThrottlingException
|| cur instanceof MultiActionResultTooLarge || cur instanceof RetryImmediatelyException
|| cur instanceof CallQueueTooBigException);
}
/**
* Look for an exception we know in the remote exception:
* - hadoop.ipc wrapped exceptions
* - nested exceptions
*
* Looks for: RegionMovedException / RegionOpeningException / RegionTooBusyException /
* ThrottlingException
* @return null if we didn't find the exception, the exception otherwise.
*/
public static Throwable findException(Object exception) {
if (exception == null || !(exception instanceof Throwable)) {
return null;
}
Throwable cur = (Throwable) exception;
while (cur != null) {
if (isSpecialException(cur)) {
return cur;
}
if (cur instanceof RemoteException) {
RemoteException re = (RemoteException) cur;
cur = re.unwrapRemoteException(
RegionOpeningException.class, RegionMovedException.class,
RegionTooBusyException.class);
if (cur == null) {
cur = re.unwrapRemoteException();
}
// unwrapRemoteException can return the exception given as a parameter when it cannot
// unwrap it. In this case, there is no need to look further
// noinspection ObjectEquality
if (cur == re) {
return cur;
}
} else if (cur.getCause() != null) {
cur = cur.getCause();
} else {
return cur;
}
}
return null;
}
}

View File

@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
@ -215,28 +216,35 @@ public class TestAsyncProcess {
static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{
public CallerWithFailure() {
private final IOException e;
public CallerWithFailure(IOException e) {
super(100, 100, 9);
this.e = e;
}
@Override
public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
throws IOException, RuntimeException {
throw new IOException("test");
throw e;
}
}
static class AsyncProcessWithFailure extends MyAsyncProcess {
public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf) {
private final IOException ioe;
public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) {
super(hc, conf, true);
this.ioe = ioe;
serverTrackerTimeout = 1;
}
@Override
protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
callsCt.incrementAndGet();
return new CallerWithFailure();
return new CallerWithFailure(ioe);
}
}
@ -830,7 +838,7 @@ public class TestAsyncProcess {
public void testGlobalErrors() throws IOException {
ClusterConnection conn = new MyConnectionImpl(conf);
BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf);
AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new IOException("test"));
mutator.ap = ap;
Assert.assertNotNull(mutator.ap.createServerErrorTracker());
@ -847,6 +855,27 @@ public class TestAsyncProcess {
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
}
@Test
public void testCallQueueTooLarge() throws IOException {
ClusterConnection conn = new MyConnectionImpl(conf);
BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new CallQueueTooBigException());
mutator.ap = ap;
Assert.assertNotNull(mutator.ap.createServerErrorTracker());
Put p = createPut(1, true);
mutator.mutate(p);
try {
mutator.flush();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
}
// Checking that the ErrorsServers came into play and didn't make us stop immediately
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
}
/**
* This test simulates multiple regions on 2 servers. We should have 2 multi requests and
* 2 threads: 1 per server, this whatever the number of regions.

View File

@ -0,0 +1,37 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.exceptions;
import com.google.protobuf.ServiceException;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.*;
@SuppressWarnings("ThrowableInstanceNeverThrown")
public class TestClientExceptionsUtil {
@Test
public void testFindException() throws Exception {
IOException ioe = new IOException("Tesst");
ServiceException se = new ServiceException(ioe);
assertEquals(ioe, ClientExceptionsUtil.findException(se));
}
}

View File

@ -72,9 +72,9 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
}
@Override
public void dispatch(final CallRunner callTask) throws InterruptedException {
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
int queueIndex = balancer.getNextQueue();
queues.get(queueIndex).put(callTask);
return queues.get(queueIndex).offer(callTask);
}
@Override

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A very simple {@code }RpcScheduler} that serves incoming requests in order.
@ -34,6 +35,7 @@ public class FifoRpcScheduler extends RpcScheduler {
private final int handlerCount;
private final int maxQueueLength;
private final AtomicInteger queueSize = new AtomicInteger(0);
private ThreadPoolExecutor executor;
public FifoRpcScheduler(Configuration conf, int handlerCount) {
@ -65,14 +67,22 @@ public class FifoRpcScheduler extends RpcScheduler {
}
@Override
public void dispatch(final CallRunner task) throws IOException, InterruptedException {
public boolean dispatch(final CallRunner task) throws IOException, InterruptedException {
// Executors provide no offer, so make our own.
int queued = queueSize.getAndIncrement();
if (maxQueueLength > 0 && queued >= maxQueueLength) {
queueSize.decrementAndGet();
return false;
}
executor.submit(new Runnable() {
@Override
public void run() {
task.setStatus(RpcServer.getStatus());
task.run();
queueSize.decrementAndGet();
}
});
return true;
}
@Override

View File

@ -160,7 +160,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
}
@Override
public void dispatch(final CallRunner callTask) throws InterruptedException {
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
RpcServer.Call call = callTask.getCall();
int queueIndex;
if (isWriteRequest(call.getHeader(), call.param)) {
@ -170,7 +170,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
} else {
queueIndex = numWriteQueues + readBalancer.getNextQueue();
}
queues.get(queueIndex).put(callTask);
return queues.get(queueIndex).offer(callTask);
}
private boolean isWriteRequest(final RequestHeader header, final Message param) {

View File

@ -86,7 +86,7 @@ public abstract class RpcExecutor {
public abstract int getQueueLength();
/** Add the request to the executor queue */
public abstract void dispatch(final CallRunner callTask) throws InterruptedException;
public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException;
/** Returns the list of request queues */
protected abstract List<BlockingQueue<CallRunner>> getQueues();

View File

@ -58,7 +58,7 @@ public abstract class RpcScheduler {
*
* @param task the request to be dispatched
*/
public abstract void dispatch(CallRunner task) throws IOException, InterruptedException;
public abstract boolean dispatch(CallRunner task) throws IOException, InterruptedException;
/** Retrieves length of the general queue for metrics. */
public abstract int getGeneralQueueLength();

View File

@ -67,6 +67,7 @@ import javax.security.sasl.SaslServer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -1159,13 +1160,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
}
@SuppressWarnings("serial")
public static class CallQueueTooBigException extends IOException {
CallQueueTooBigException() {
super();
}
}
/** Reads calls from a connection and queues them for handling. */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="VO_VOLATILE_INCREMENT",
@ -1864,7 +1858,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
: null;
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
totalRequestSize, traceInfo, this.addr);
scheduler.dispatch(new CallRunner(RpcServer.this, call));
if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
callQueueSize.add(-1 * call.getSize());
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
InetSocketAddress address = getListenerAddress();
setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
"Call queue is full on " + (address != null ? address : "(channel closed)") +
", too many items queued ?");
responder.doRespond(call);
}
}
private boolean authorizeConnection() throws IOException {

View File

@ -190,15 +190,15 @@ public class SimpleRpcScheduler extends RpcScheduler {
}
@Override
public void dispatch(CallRunner callTask) throws InterruptedException {
public boolean dispatch(CallRunner callTask) throws InterruptedException {
RpcServer.Call call = callTask.getCall();
int level = priority.getPriority(call.getHeader(), call.param, call.getRequestUser());
if (priorityExecutor != null && level > highPriorityLevel) {
priorityExecutor.dispatch(callTask);
return priorityExecutor.dispatch(callTask);
} else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
replicationExecutor.dispatch(callTask);
return replicationExecutor.dispatch(callTask);
} else {
callExecutor.dispatch(callTask);
return callExecutor.dispatch(callTask);
}
}

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementatio
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.filter.Filter;
@ -698,7 +699,7 @@ public class TestHCM {
Assert.assertArrayEquals(e.getRow(0).getRow(), ROW);
// Check that we unserialized the exception as expected
Throwable cause = ConnectionManager.findException(e.getCause(0));
Throwable cause = ClientExceptionsUtil.findException(e.getCause(0));
Assert.assertNotNull(cause);
Assert.assertTrue(cause instanceof RegionMovedException);
}