HBASE-12028 Abort the RegionServer, when it's handler threads die (Alicia Ying Shu)

This commit is contained in:
Enis Soztutar 2015-01-02 13:07:57 -08:00
parent a90e64c637
commit 820f629423
11 changed files with 343 additions and 37 deletions

View File

@ -867,6 +867,17 @@ public final class HConstants {
public static final String REGION_SERVER_HANDLER_COUNT = "hbase.regionserver.handler.count";
public static final int DEFAULT_REGION_SERVER_HANDLER_COUNT = 30;
/*
* REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT:
* -1 => Disable aborting
* 0 => Abort if even a single handler has died
* 0.x => Abort only when this percent of handlers have died
* 1 => Abort only all of the handers have died
*/
public static final String REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT =
"hbase.regionserver.handler.abort.on.error.percent";
public static final double DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT = 0.5;
public static final String REGION_SERVER_META_HANDLER_COUNT =
"hbase.regionserver.metahandler.count";
public static final int DEFAULT_REGION_SERVER_META_HANDLER_COUNT = 10;

View File

@ -1452,4 +1452,12 @@ possible configurations would overwhelm and obscure the important.
<name>hbase.http.staticuser.user</name>
<value>dr.stack</value>
</property>
<property>
<name>hbase.regionserver.handler.abort.on.error.percent</name>
<value>0.5</value>
<description>The percent of region server RPC threads failed to abort RS.
-1 Disable aborting; 0 Abort if even a single handler has died;
0.x Abort only when this percent of handlers have died;
1 Abort only all of the handers have died.</description>
</property>
</configuration>

View File

@ -22,9 +22,11 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
@ -40,12 +42,23 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final int maxQueueLength) {
this(name, handlerCount, numQueues, LinkedBlockingQueue.class, maxQueueLength);
this(name, handlerCount, numQueues, maxQueueLength, null, null);
}
public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final int maxQueueLength, final Configuration conf, final Abortable abortable) {
this(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class, maxQueueLength);
}
public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final Class<? extends BlockingQueue> queueClass, Object... initargs) {
super(name, Math.max(handlerCount, numQueues));
this(name, handlerCount, numQueues, null, null, queueClass, initargs);
}
public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final Configuration conf, final Abortable abortable,
final Class<? extends BlockingQueue> queueClass, Object... initargs) {
super(name, Math.max(handlerCount, numQueues), conf, abortable);
queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
this.balancer = getBalancer(numQueues);
initializeQueues(numQueues, queueClass, initargs);

View File

@ -18,8 +18,8 @@ package org.apache.hadoop.hbase.ipc;
*/
import java.nio.channels.ClosedChannelException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@ -111,6 +111,9 @@ public class CallRunner {
RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e);
errorThrowable = e;
error = StringUtils.stringifyException(e);
if (e instanceof Error) {
throw (Error)e;
}
} finally {
if (traceScope != null) {
traceScope.close();

View File

@ -26,9 +26,11 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
@ -60,25 +62,35 @@ public class RWQueueRpcExecutor extends RpcExecutor {
private final int numScanQueues;
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final int maxQueueLength) {
this(name, handlerCount, numQueues, readShare, maxQueueLength, 0, LinkedBlockingQueue.class);
final float readShare, final int maxQueueLength,
final Configuration conf, final Abortable abortable) {
this(name, handlerCount, numQueues, readShare, maxQueueLength, 0,
conf, abortable, LinkedBlockingQueue.class);
}
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final float scanShare, final int maxQueueLength) {
this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, null, null);
}
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final float scanShare, final int maxQueueLength,
final Configuration conf, final Abortable abortable) {
this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength,
LinkedBlockingQueue.class);
conf, abortable, LinkedBlockingQueue.class);
}
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final int maxQueueLength,
final Configuration conf, final Abortable abortable,
final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
this(name, handlerCount, numQueues, readShare, 0, maxQueueLength,
this(name, handlerCount, numQueues, readShare, 0, maxQueueLength, conf, abortable,
readQueueClass, readQueueInitArgs);
}
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final float scanShare, final int maxQueueLength,
final Configuration conf, final Abortable abortable,
final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,

View File

@ -21,13 +21,17 @@ package org.apache.hadoop.hbase.ipc;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@ -41,15 +45,26 @@ public abstract class RpcExecutor {
private final List<Thread> handlers;
private final int handlerCount;
private final String name;
private final AtomicInteger failedHandlerCount = new AtomicInteger(0);
private boolean running;
private Configuration conf = null;
private Abortable abortable = null;
public RpcExecutor(final String name, final int handlerCount) {
this.handlers = new ArrayList<Thread>(handlerCount);
this.handlerCount = handlerCount;
this.name = Strings.nullToEmpty(name);
}
public RpcExecutor(final String name, final int handlerCount, final Configuration conf,
final Abortable abortable) {
this(name, handlerCount);
this.conf = conf;
this.abortable = abortable;
}
public void start(final int port) {
running = true;
startHandlers(port);
@ -94,7 +109,7 @@ public abstract class RpcExecutor {
});
t.setDaemon(true);
t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() +
",queue=" + index + ",port=" + port);
",queue=" + index + ",port=" + port);
t.start();
LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index);
handlers.add(t);
@ -103,6 +118,9 @@ public abstract class RpcExecutor {
protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
boolean interrupted = false;
double handlerFailureThreshhold =
conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
try {
while (running) {
try {
@ -110,9 +128,30 @@ public abstract class RpcExecutor {
try {
activeHandlerCount.incrementAndGet();
task.run();
} catch (Throwable t) {
LOG.error("RpcServer handler thread throws exception: ", t);
throw t;
} catch (Throwable e) {
if (e instanceof Error) {
int failedCount = failedHandlerCount.incrementAndGet();
if (handlerFailureThreshhold >= 0
&& failedCount > handlerCount * handlerFailureThreshhold) {
String message =
"Number of failed RpcServer handler exceeded threshhold "
+ handlerFailureThreshhold + " with failed reason: "
+ StringUtils.stringifyException(e);
if (abortable != null) {
abortable.abort(message, e);
} else {
LOG.error("Received " + StringUtils.stringifyException(e)
+ " but not aborting due to abortable being null");
throw e;
}
} else {
LOG.warn("RpcServer handler threads encountered errors "
+ StringUtils.stringifyException(e));
}
} else {
LOG.warn("RpcServer handler threads encountered exceptions "
+ StringUtils.stringifyException(e));
}
} finally {
activeHandlerCount.decrementAndGet();
}

View File

@ -22,11 +22,12 @@ import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
/**
@ -93,6 +94,8 @@ public class SimpleRpcScheduler extends RpcScheduler {
/** What level a high priority call is at. */
private final int highPriorityLevel;
private Abortable abortable = null;
/**
* @param conf
* @param handlerCount the number of handler threads that will be used to process calls
@ -107,11 +110,13 @@ public class SimpleRpcScheduler extends RpcScheduler {
int priorityHandlerCount,
int replicationHandlerCount,
PriorityFunction priority,
Abortable server,
int highPriorityLevel) {
int maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length",
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
this.priority = priority;
this.highPriorityLevel = highPriorityLevel;
this.abortable = server;
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
@ -127,30 +132,41 @@ public class SimpleRpcScheduler extends RpcScheduler {
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
callqReadShare, callqScanShare, maxQueueLength,
callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
BoundedPriorityBlockingQueue.class, callPriority);
} else {
callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
callqReadShare, callqScanShare, maxQueueLength);
callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
}
} else {
// multiple queues
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
} else {
callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount,
numCallQueues, maxQueueLength);
numCallQueues, maxQueueLength, conf, abortable);
}
}
this.priorityExecutor =
priorityHandlerCount > 0 ? new BalancedQueueRpcExecutor("Priority", priorityHandlerCount,
1, maxQueueLength) : null;
1, maxQueueLength, conf, abortable) : null;
this.replicationExecutor =
replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",
replicationHandlerCount, 1, maxQueueLength) : null;
replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
}
public SimpleRpcScheduler(
Configuration conf,
int handlerCount,
int priorityHandlerCount,
int replicationHandlerCount,
PriorityFunction priority,
int highPriorityLevel) {
this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, priority,
null, highPriorityLevel);
}
@Override

View File

@ -792,7 +792,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
rpcServer = new RpcServer(rs, name, getServices(),
initialIsa, // BindAddress is IP we got for this server.
rs.conf,
rpcSchedulerFactory.create(rs.conf, this));
rpcSchedulerFactory.create(rs.conf, this, rs));
scannerLeaseTimeoutPeriod = rs.conf.getInt(
HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,

View File

@ -18,9 +18,10 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
@ -34,5 +35,9 @@ public interface RpcSchedulerFactory {
/**
* Constructs a {@link org.apache.hadoop.hbase.ipc.RpcScheduler}.
*/
RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server);
@Deprecated
RpcScheduler create(Configuration conf, PriorityFunction priority);
}

View File

@ -17,11 +17,12 @@
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
@ -32,17 +33,25 @@ import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory {
@Override
@Deprecated
public RpcScheduler create(Configuration conf, PriorityFunction priority) {
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
return new SimpleRpcScheduler(
conf,
handlerCount,
conf.getInt(HConstants.REGION_SERVER_META_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_META_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
priority,
HConstants.QOS_THRESHOLD);
return create(conf, priority, null);
}
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
return new SimpleRpcScheduler(
conf,
handlerCount,
conf.getInt(HConstants.REGION_SERVER_META_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_META_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
priority,
server,
HConstants.QOS_THRESHOLD);
}
}

View File

@ -0,0 +1,190 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.protobuf.BlockingService;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@Category({RPCTests.class, SmallTests.class})
public class TestRpcHandlerException {
public static final Log LOG = LogFactory.getLog(TestRpcHandlerException.class);
static String example = "xyz";
static byte[] CELL_BYTES = example.getBytes();
static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
private final static Configuration CONF = HBaseConfiguration.create();
RpcExecutor rpcExecutor = Mockito.mock(RpcExecutor.class);
// We are using the test TestRpcServiceProtos generated classes and Service because they are
// available and basic with methods like 'echo', and ping. Below we make a blocking service
// by passing in implementation of blocking interface. We use this service in all tests that
// follow.
private static final BlockingService SERVICE =
TestRpcServiceProtos.TestProtobufRpcProto
.newReflectiveBlockingService(new TestRpcServiceProtos
.TestProtobufRpcProto.BlockingInterface() {
@Override
public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
throws ServiceException {
return null;
}
@Override
public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
throws ServiceException {
return null;
}
@Override
public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
throws Error, RuntimeException {
if (controller instanceof PayloadCarryingRpcController) {
PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
// If cells, scan them to check we are able to iterate what we were given and since
// this is
// an echo, just put them back on the controller creating a new block. Tests our
// block
// building.
CellScanner cellScanner = pcrc.cellScanner();
List<Cell> list = null;
if (cellScanner != null) {
list = new ArrayList<Cell>();
try {
while (cellScanner.advance()) {
list.add(cellScanner.current());
throw new StackOverflowError();
}
} catch (StackOverflowError e) {
throw e;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
cellScanner = CellUtil.createCellScanner(list);
((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
}
return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
}
});
/**
* Instance of server. We actually don't do anything speical in here so could just use
* HBaseRpcServer directly.
*/
private static class TestRpcServer extends RpcServer {
TestRpcServer() throws IOException {
this(new FifoRpcScheduler(CONF, 1));
}
TestRpcServer(RpcScheduler scheduler) throws IOException {
super(null, "testRpcServer",
Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, scheduler);
}
@Override
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
throws IOException {
return super.call(service, md, param, cellScanner, receiveTime, status);
}
}
/** Tests that the rpc scheduler is called when requests arrive.
* When Rpc handler thread dies, the client will hang and the test will fail.
* The test is meant to be a unit test to test the behavior.
*
* */
private class AbortServer implements Abortable {
private boolean aborted = false;
@Override
public void abort(String why, Throwable e) {
aborted = true;
}
@Override
public boolean isAborted() {
return aborted;
}
}
/* This is a unit test to make sure to abort region server when the number of Rpc handler thread
* caught errors exceeds the threshold. Client will hang when RS aborts.
*/
@Ignore
@Test
public void testRpcScheduler() throws IOException, InterruptedException {
PriorityFunction qosFunction = mock(PriorityFunction.class);
Abortable abortable = new AbortServer();
RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, qosFunction, abortable, 0);
RpcServer rpcServer = new TestRpcServer(scheduler);
RpcClientImpl client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT);
try {
rpcServer.start();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
client.call(null, md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)), md
.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), 0);
} catch (Throwable e) {
assert(abortable.isAborted() == true);
} finally {
rpcServer.stop();
}
}
}