HBASE-11355 a couple of callQueue related improvements

This commit is contained in:
Matteo Bertozzi 2014-07-01 09:29:07 +02:00
parent 0167558eb3
commit 9a6a59c7b7
7 changed files with 582 additions and 89 deletions

View File

@ -18,6 +18,7 @@
*/ */
package org.apache.hadoop.hbase.util; package org.apache.hadoop.hbase.util;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -26,13 +27,23 @@ import org.apache.hadoop.classification.InterfaceAudience;
public class ReflectionUtils { public class ReflectionUtils {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static <T> T instantiateWithCustomCtor(String className, public static <T> T instantiateWithCustomCtor(String className,
Class<? >[] ctorArgTypes, Object[] ctorArgs) { Class<? >[] ctorArgTypes, Object[] ctorArgs) {
try { try {
Class<? extends T> resultType = (Class<? extends T>) Class.forName(className); Class<? extends T> resultType = (Class<? extends T>) Class.forName(className);
return resultType.getDeclaredConstructor(ctorArgTypes).newInstance(ctorArgs); Constructor<? extends T> ctor = resultType.getDeclaredConstructor(ctorArgTypes);
return instantiate(className, ctor, ctorArgs);
} catch (ClassNotFoundException e) { } catch (ClassNotFoundException e) {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
"Unable to find " + className, e); "Unable to find " + className, e);
} catch (NoSuchMethodException e) {
throw new UnsupportedOperationException(
"Unable to find suitable constructor for class " + className, e);
}
}
private static <T> T instantiate(final String className, Constructor<T> ctor, Object[] ctorArgs) {
try {
return ctor.newInstance(ctorArgs);
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
"Unable to access specified class " + className, e); "Unable to access specified class " + className, e);
@ -42,9 +53,40 @@ public class ReflectionUtils {
} catch (InvocationTargetException e) { } catch (InvocationTargetException e) {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
"Constructor threw an exception for " + className, e); "Constructor threw an exception for " + className, e);
} catch (NoSuchMethodException e) {
throw new UnsupportedOperationException(
"Unable to find suitable constructor for class " + className, e);
} }
} }
@SuppressWarnings("unchecked")
public static <T> T newInstance(Class<T> type, Object... params) {
return instantiate(type.getName(), findConstructor(type, params), params);
}
@SuppressWarnings("unchecked")
public static <T> Constructor<T> findConstructor(Class<T> type, Object... paramTypes) {
Constructor<T>[] constructors = (Constructor<T>[])type.getConstructors();
for (Constructor<T> ctor : constructors) {
Class<?>[] ctorParamTypes = ctor.getParameterTypes();
if (ctorParamTypes.length != paramTypes.length) {
continue;
}
boolean match = true;
for (int i = 0; i < ctorParamTypes.length && match; ++i) {
Class<?> paramType = paramTypes[i].getClass();
match = (!ctorParamTypes[i].isPrimitive()) ? ctorParamTypes[i].isAssignableFrom(paramType) :
((int.class.equals(ctorParamTypes[i]) && Integer.class.equals(paramType)) ||
(long.class.equals(ctorParamTypes[i]) && Long.class.equals(paramType)) ||
(char.class.equals(ctorParamTypes[i]) && Character.class.equals(paramType)) ||
(short.class.equals(ctorParamTypes[i]) && Short.class.equals(paramType)) ||
(boolean.class.equals(ctorParamTypes[i]) && Boolean.class.equals(paramType)) ||
(byte.class.equals(ctorParamTypes[i]) && Byte.class.equals(paramType)));
}
if (match) {
return ctor;
}
}
throw new UnsupportedOperationException(
"Unable to find suitable constructor for class " + type.getName());
}
} }

View File

@ -180,6 +180,22 @@ possible configurations would overwhelm and obscure the important.
<description>Count of RPC Listener instances spun up on RegionServers. <description>Count of RPC Listener instances spun up on RegionServers.
Same property is used by the Master for count of master handlers.</description> Same property is used by the Master for count of master handlers.</description>
</property> </property>
<property>
<name>ipc.server.callqueue.handler.factor</name>
<value>0.1</value>
<description>Factor to determine the number of call queues.
A value of 0 means a single queue shared between all the handlers.
A value of 1 means that each handler has its own queue.</description>
</property>
<property>
<name>ipc.server.callqueue.read.share</name>
<value>0</value>
<description>Split the call queues into read and write queues.
A value of 0 indicate to not split the call queues.
A value of 0.5 means there will be the same number of read and write queues
A value of 1.0 means that all the queues except one are used to dispatch read requests.
</description>
</property>
<property> <property>
<name>hbase.regionserver.msginterval</name> <name>hbase.regionserver.msginterval</name>
<value>3000</value> <value>3000</value>

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import com.google.common.collect.Lists;
/**
* RPC Executor that dispatch the requests on multiple queues.
* Each handler has its own queue and there is no stealing.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class MultipleQueueRpcExecutor extends RpcExecutor {
protected final List<BlockingQueue<CallRunner>> queues;
protected final Random balancer = new Random();
public MultipleQueueRpcExecutor(final String name, final int handlerCount,
final int numQueues, final int maxQueueLength) {
this(name, handlerCount, numQueues, LinkedBlockingQueue.class, maxQueueLength);
}
public MultipleQueueRpcExecutor(final String name, final int handlerCount,
final int numQueues,
final Class<? extends BlockingQueue> queueClass, Object... initargs) {
super(name, Math.max(handlerCount, numQueues));
queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
initializeQueues(numQueues, queueClass, initargs);
}
protected void initializeQueues(final int numQueues,
final Class<? extends BlockingQueue> queueClass, Object... initargs) {
for (int i = 0; i < numQueues; ++i) {
queues.add((BlockingQueue<CallRunner>)
ReflectionUtils.newInstance(queueClass, initargs));
}
}
@Override
public void dispatch(final CallRunner callTask) throws InterruptedException {
int queueIndex = balancer.nextInt(queues.size());
queues.get(queueIndex).put(callTask);
}
@Override
public int getQueueLength() {
int length = 0;
for (final BlockingQueue<CallRunner> queue: queues) {
length += queue.size();
}
return length;
}
@Override
protected List<BlockingQueue<CallRunner>> getQueues() {
return queues;
}
}

View File

@ -0,0 +1,163 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import com.google.common.collect.Lists;
import com.google.protobuf.Message;
/**
* RPC Executor that uses different queues for reads and writes.
* Each handler has its own queue and there is no stealing.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RWQueueRpcExecutor extends RpcExecutor {
private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class);
private final List<BlockingQueue<CallRunner>> queues;
private final Random balancer = new Random();
private final int writeHandlersCount;
private final int readHandlersCount;
private final int numWriteQueues;
private final int numReadQueues;
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final int maxQueueLength) {
this(name, handlerCount, numQueues, readShare, maxQueueLength,
LinkedBlockingQueue.class);
}
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final int maxQueueLength,
final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare),
LinkedBlockingQueue.class, new Object[] {maxQueueLength},
readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
}
public RWQueueRpcExecutor(final String name, final int writeHandlers, final int readHandlers,
final int numWriteQueues, final int numReadQueues,
final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
super(name, Math.max(writeHandlers + readHandlers, numWriteQueues + numReadQueues));
this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
this.readHandlersCount = Math.max(readHandlers, numReadQueues);
this.numWriteQueues = numWriteQueues;
this.numReadQueues = numReadQueues;
queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount);
LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount +
" readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount);
for (int i = 0; i < numWriteQueues; ++i) {
queues.add((BlockingQueue<CallRunner>)
ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
}
for (int i = 0; i < numReadQueues; ++i) {
queues.add((BlockingQueue<CallRunner>)
ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
}
}
@Override
protected void startHandlers(final int port) {
startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port);
startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port);
}
@Override
public void dispatch(final CallRunner callTask) throws InterruptedException {
RpcServer.Call call = callTask.getCall();
int queueIndex;
if (isWriteRequest(call.getHeader(), call.param)) {
queueIndex = balancer.nextInt(numWriteQueues);
} else {
queueIndex = numWriteQueues + balancer.nextInt(numReadQueues);
}
queues.get(queueIndex).put(callTask);
}
private boolean isWriteRequest(final RequestHeader header, final Message param) {
// TODO: Is there a better way to do this?
String methodName = header.getMethodName();
if (methodName.equalsIgnoreCase("multi") && param instanceof MultiRequest) {
MultiRequest multi = (MultiRequest)param;
for (RegionAction regionAction : multi.getRegionActionList()) {
for (Action action: regionAction.getActionList()) {
if (action.hasMutation()) {
return true;
}
}
}
}
return false;
}
@Override
public int getQueueLength() {
int length = 0;
for (final BlockingQueue<CallRunner> queue: queues) {
length += queue.size();
}
return length;
}
@Override
protected List<BlockingQueue<CallRunner>> getQueues() {
return queues;
}
/*
* Calculate the number of writers based on the "total count" and the read share.
* You'll get at least one writer.
*/
private static int calcNumWriters(final int count, final float readShare) {
return Math.max(1, count - Math.max(1, (int)Math.round(count * readShare)));
}
/*
* Calculate the number of readers based on the "total count" and the read share.
* You'll get at least one reader.
*/
private static int calcNumReaders(final int count, final float readShare) {
return count - calcNumWriters(count, readShare);
}
}

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class RpcExecutor {
private static final Log LOG = LogFactory.getLog(RpcExecutor.class);
private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
private final List<Thread> handlers;
private final int handlerCount;
private final String name;
private boolean running;
public RpcExecutor(final String name, final int handlerCount) {
this.handlers = new ArrayList<Thread>(handlerCount);
this.handlerCount = handlerCount;
this.name = Strings.nullToEmpty(name);
}
public void start(final int port) {
running = true;
startHandlers(port);
}
public void stop() {
running = false;
for (Thread handler : handlers) {
handler.interrupt();
}
}
public int getActiveHandlerCount() {
return activeHandlerCount.get();
}
/** Returns the length of the pending queue */
public abstract int getQueueLength();
/** Add the request to the executor queue */
public abstract void dispatch(final CallRunner callTask) throws InterruptedException;
/** Returns the list of request queues */
protected abstract List<BlockingQueue<CallRunner>> getQueues();
protected void startHandlers(final int port) {
List<BlockingQueue<CallRunner>> callQueues = getQueues();
startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port);
}
protected void startHandlers(final String nameSuffix, final int numHandlers,
final List<BlockingQueue<CallRunner>> callQueues,
final int qindex, final int qsize, final int port) {
final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
for (int i = 0; i < numHandlers; i++) {
final int index = qindex + (i % qsize);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
consumerLoop(callQueues.get(index));
}
});
t.setDaemon(true);
t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() +
",queue=" + index + ",port=" + port);
t.start();
LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index);
handlers.add(t);
}
}
protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
boolean interrupted = false;
try {
while (running) {
try {
CallRunner task = myQueue.take();
try {
activeHandlerCount.incrementAndGet();
task.run();
} finally {
activeHandlerCount.decrementAndGet();
}
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}

View File

@ -17,7 +17,11 @@
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.util.Random;
import java.util.Comparator; import java.util.Comparator;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -43,7 +47,11 @@ import com.google.common.collect.Lists;
public class SimpleRpcScheduler implements RpcScheduler { public class SimpleRpcScheduler implements RpcScheduler {
public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class); public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
/** If set to true, uses a priority queue and deprioritize long-running scans */ public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "ipc.server.callqueue.read.share";
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
"ipc.server.callqueue.handler.factor";
/** If set to 'deadline', uses a priority queue and deprioritize long-running scans */
public static final String CALL_QUEUE_TYPE_CONF_KEY = "ipc.server.callqueue.type"; public static final String CALL_QUEUE_TYPE_CONF_KEY = "ipc.server.callqueue.type";
public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline"; public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo"; public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
@ -82,16 +90,11 @@ public class SimpleRpcScheduler implements RpcScheduler {
} }
private int port; private int port;
private final int handlerCount;
private final int priorityHandlerCount;
private final int replicationHandlerCount;
private final PriorityFunction priority; private final PriorityFunction priority;
final BlockingQueue<CallRunner> callQueue; private final RpcExecutor callExecutor;
final BlockingQueue<CallRunner> priorityCallQueue; private final RpcExecutor priorityExecutor;
final BlockingQueue<CallRunner> replicationQueue; private final RpcExecutor replicationExecutor;
private volatile boolean running = false;
private final List<Thread> handlers = Lists.newArrayList();
private AtomicInteger activeHandlerCount = new AtomicInteger(0);
/** What level a high priority call is at. */ /** What level a high priority call is at. */
private final int highPriorityLevel; private final int highPriorityLevel;
@ -112,25 +115,53 @@ public class SimpleRpcScheduler implements RpcScheduler {
int highPriorityLevel) { int highPriorityLevel) {
int maxQueueLength = conf.getInt("ipc.server.max.callqueue.length", int maxQueueLength = conf.getInt("ipc.server.max.callqueue.length",
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
this.handlerCount = handlerCount;
this.priorityHandlerCount = priorityHandlerCount;
this.replicationHandlerCount = replicationHandlerCount;
this.priority = priority; this.priority = priority;
this.highPriorityLevel = highPriorityLevel; this.highPriorityLevel = highPriorityLevel;
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
LOG.debug("Using " + callQueueType + " as user call queue"); float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
this.callQueue = new BoundedPriorityBlockingQueue<CallRunner>(maxQueueLength, float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
new CallPriorityComparator(conf, this.priority)); int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues);
if (numCallQueues > 1 && callqReadShare > 0) {
// multiple read/write queues
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
callqReadShare, maxQueueLength, BoundedPriorityBlockingQueue.class, callPriority);
} else {
callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
callqReadShare, maxQueueLength);
}
} else if (numCallQueues > 1) {
// multiple queues
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
callExecutor = new MultipleQueueRpcExecutor("default", handlerCount, numCallQueues,
BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
} else {
callExecutor = new MultipleQueueRpcExecutor("default", handlerCount,
numCallQueues, maxQueueLength);
}
} else { } else {
this.callQueue = new LinkedBlockingQueue<CallRunner>(maxQueueLength); // Single queue
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
callExecutor = new SingleQueueRpcExecutor("default", handlerCount,
BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
} else {
callExecutor = new SingleQueueRpcExecutor("default", handlerCount, maxQueueLength);
}
} }
this.priorityCallQueue = priorityHandlerCount > 0
? new LinkedBlockingQueue<CallRunner>(maxQueueLength) this.priorityExecutor = priorityHandlerCount > 0
? new SingleQueueRpcExecutor("Priority", priorityHandlerCount, maxQueueLength)
: null; : null;
this.replicationQueue = replicationHandlerCount > 0 this.replicationExecutor = replicationHandlerCount > 0
? new LinkedBlockingQueue<CallRunner>(maxQueueLength) ? new SingleQueueRpcExecutor("Replication", replicationHandlerCount, maxQueueLength)
: null; : null;
} }
@ -141,96 +172,51 @@ public class SimpleRpcScheduler implements RpcScheduler {
@Override @Override
public void start() { public void start() {
running = true; callExecutor.start(port);
startHandlers(handlerCount, callQueue, null); if (priorityExecutor != null) priorityExecutor.start(port);
if (priorityCallQueue != null) { if (replicationExecutor != null) replicationExecutor.start(port);
startHandlers(priorityHandlerCount, priorityCallQueue, "Priority.");
}
if (replicationQueue != null) {
startHandlers(replicationHandlerCount, replicationQueue, "Replication.");
}
}
private void startHandlers(
int handlerCount,
final BlockingQueue<CallRunner> callQueue,
String threadNamePrefix) {
for (int i = 0; i < handlerCount; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
consumerLoop(callQueue);
}
});
t.setDaemon(true);
t.setName(Strings.nullToEmpty(threadNamePrefix) + "RpcServer.handler=" + i + ",port=" + port);
t.start();
handlers.add(t);
}
} }
@Override @Override
public void stop() { public void stop() {
running = false; callExecutor.stop();
for (Thread handler : handlers) { if (priorityExecutor != null) priorityExecutor.stop();
handler.interrupt(); if (replicationExecutor != null) replicationExecutor.stop();
}
} }
@Override @Override
public void dispatch(CallRunner callTask) throws InterruptedException { public void dispatch(CallRunner callTask) throws InterruptedException {
RpcServer.Call call = callTask.getCall(); RpcServer.Call call = callTask.getCall();
int level = priority.getPriority(call.getHeader(), call.param); int level = priority.getPriority(call.getHeader(), call.param);
if (priorityCallQueue != null && level > highPriorityLevel) { if (priorityExecutor != null && level > highPriorityLevel) {
priorityCallQueue.put(callTask); priorityExecutor.dispatch(callTask);
} else if (replicationQueue != null && level == HConstants.REPLICATION_QOS) { } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
replicationQueue.put(callTask); replicationExecutor.dispatch(callTask);
} else { } else {
callQueue.put(callTask); // queue the call; maybe blocked here callExecutor.dispatch(callTask);
} }
} }
@Override @Override
public int getGeneralQueueLength() { public int getGeneralQueueLength() {
return callQueue.size(); return callExecutor.getQueueLength();
} }
@Override @Override
public int getPriorityQueueLength() { public int getPriorityQueueLength() {
return priorityCallQueue == null ? 0 : priorityCallQueue.size(); return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength();
} }
@Override @Override
public int getReplicationQueueLength() { public int getReplicationQueueLength() {
return replicationQueue == null ? 0 : replicationQueue.size(); return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength();
} }
@Override @Override
public int getActiveRpcHandlerCount() { public int getActiveRpcHandlerCount() {
return activeHandlerCount.get(); return callExecutor.getActiveHandlerCount() +
} (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) +
(replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
private void consumerLoop(BlockingQueue<CallRunner> myQueue) {
boolean interrupted = false;
try {
while (running) {
try {
CallRunner task = myQueue.take();
try {
activeHandlerCount.incrementAndGet();
task.run();
} finally {
activeHandlerCount.decrementAndGet();
}
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} }
} }

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.collect.Lists;
/**
* RPC Executor that uses a single queue for all the requests.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class SingleQueueRpcExecutor extends RpcExecutor {
private final BlockingQueue<CallRunner> queue;
public SingleQueueRpcExecutor(final String name, final int handlerCount,
final int maxQueueLength) {
this(name, handlerCount, LinkedBlockingQueue.class, maxQueueLength);
}
public SingleQueueRpcExecutor(final String name, final int handlerCount,
final Class<? extends BlockingQueue> queueClass, Object... initargs) {
super(name, handlerCount);
queue = (BlockingQueue<CallRunner>)ReflectionUtils.newInstance(queueClass, initargs);
}
@Override
public void dispatch(final CallRunner callTask) throws InterruptedException {
queue.put(callTask);
}
@Override
public int getQueueLength() {
return queue.size();
}
@Override
protected List<BlockingQueue<CallRunner>> getQueues() {
List<BlockingQueue<CallRunner>> list = new ArrayList<BlockingQueue<CallRunner>>(1);
list.add(queue);
return list;
}
}