HBASE-11355 a couple of callQueue related improvements
This commit is contained in:
parent
92c52a727a
commit
0e8e41d0ef
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -26,13 +27,23 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
public class ReflectionUtils {
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> T instantiateWithCustomCtor(String className,
|
||||
Class<? >[] ctorArgTypes, Object[] ctorArgs) {
|
||||
Class<? >[] ctorArgTypes, Object[] ctorArgs) {
|
||||
try {
|
||||
Class<? extends T> resultType = (Class<? extends T>) Class.forName(className);
|
||||
return resultType.getDeclaredConstructor(ctorArgTypes).newInstance(ctorArgs);
|
||||
Constructor<? extends T> ctor = resultType.getDeclaredConstructor(ctorArgTypes);
|
||||
return instantiate(className, ctor, ctorArgs);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Unable to find " + className, e);
|
||||
} catch (NoSuchMethodException e) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Unable to find suitable constructor for class " + className, e);
|
||||
}
|
||||
}
|
||||
|
||||
private static <T> T instantiate(final String className, Constructor<T> ctor, Object[] ctorArgs) {
|
||||
try {
|
||||
return ctor.newInstance(ctorArgs);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Unable to access specified class " + className, e);
|
||||
|
@ -42,9 +53,40 @@ public class ReflectionUtils {
|
|||
} catch (InvocationTargetException e) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Constructor threw an exception for " + className, e);
|
||||
} catch (NoSuchMethodException e) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Unable to find suitable constructor for class " + className, e);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> T newInstance(Class<T> type, Object... params) {
|
||||
return instantiate(type.getName(), findConstructor(type, params), params);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> Constructor<T> findConstructor(Class<T> type, Object... paramTypes) {
|
||||
Constructor<T>[] constructors = (Constructor<T>[])type.getConstructors();
|
||||
for (Constructor<T> ctor : constructors) {
|
||||
Class<?>[] ctorParamTypes = ctor.getParameterTypes();
|
||||
if (ctorParamTypes.length != paramTypes.length) {
|
||||
continue;
|
||||
}
|
||||
|
||||
boolean match = true;
|
||||
for (int i = 0; i < ctorParamTypes.length && match; ++i) {
|
||||
Class<?> paramType = paramTypes[i].getClass();
|
||||
match = (!ctorParamTypes[i].isPrimitive()) ? ctorParamTypes[i].isAssignableFrom(paramType) :
|
||||
((int.class.equals(ctorParamTypes[i]) && Integer.class.equals(paramType)) ||
|
||||
(long.class.equals(ctorParamTypes[i]) && Long.class.equals(paramType)) ||
|
||||
(char.class.equals(ctorParamTypes[i]) && Character.class.equals(paramType)) ||
|
||||
(short.class.equals(ctorParamTypes[i]) && Short.class.equals(paramType)) ||
|
||||
(boolean.class.equals(ctorParamTypes[i]) && Boolean.class.equals(paramType)) ||
|
||||
(byte.class.equals(ctorParamTypes[i]) && Byte.class.equals(paramType)));
|
||||
}
|
||||
|
||||
if (match) {
|
||||
return ctor;
|
||||
}
|
||||
}
|
||||
throw new UnsupportedOperationException(
|
||||
"Unable to find suitable constructor for class " + type.getName());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -180,6 +180,22 @@ possible configurations would overwhelm and obscure the important.
|
|||
<description>Count of RPC Listener instances spun up on RegionServers.
|
||||
Same property is used by the Master for count of master handlers.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>ipc.server.callqueue.handler.factor</name>
|
||||
<value>0.1</value>
|
||||
<description>Factor to determine the number of call queues.
|
||||
A value of 0 means a single queue shared between all the handlers.
|
||||
A value of 1 means that each handler has its own queue.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>ipc.server.callqueue.read.share</name>
|
||||
<value>0</value>
|
||||
<description>Split the call queues into read and write queues.
|
||||
A value of 0 indicate to not split the call queues.
|
||||
A value of 0.5 means there will be the same number of read and write queues
|
||||
A value of 1.0 means that all the queues except one are used to dispatch read requests.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.regionserver.msginterval</name>
|
||||
<value>3000</value>
|
||||
|
|
|
@ -0,0 +1,87 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* RPC Executor that dispatch the requests on multiple queues.
|
||||
* Each handler has its own queue and there is no stealing.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class MultipleQueueRpcExecutor extends RpcExecutor {
|
||||
protected final List<BlockingQueue<CallRunner>> queues;
|
||||
protected final Random balancer = new Random();
|
||||
|
||||
public MultipleQueueRpcExecutor(final String name, final int handlerCount,
|
||||
final int numQueues, final int maxQueueLength) {
|
||||
this(name, handlerCount, numQueues, LinkedBlockingQueue.class, maxQueueLength);
|
||||
}
|
||||
|
||||
public MultipleQueueRpcExecutor(final String name, final int handlerCount,
|
||||
final int numQueues,
|
||||
final Class<? extends BlockingQueue> queueClass, Object... initargs) {
|
||||
super(name, Math.max(handlerCount, numQueues));
|
||||
queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
|
||||
initializeQueues(numQueues, queueClass, initargs);
|
||||
}
|
||||
|
||||
protected void initializeQueues(final int numQueues,
|
||||
final Class<? extends BlockingQueue> queueClass, Object... initargs) {
|
||||
for (int i = 0; i < numQueues; ++i) {
|
||||
queues.add((BlockingQueue<CallRunner>)
|
||||
ReflectionUtils.newInstance(queueClass, initargs));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispatch(final CallRunner callTask) throws InterruptedException {
|
||||
int queueIndex = balancer.nextInt(queues.size());
|
||||
queues.get(queueIndex).put(callTask);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQueueLength() {
|
||||
int length = 0;
|
||||
for (final BlockingQueue<CallRunner> queue: queues) {
|
||||
length += queue.size();
|
||||
}
|
||||
return length;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<BlockingQueue<CallRunner>> getQueues() {
|
||||
return queues;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,163 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import org.apache.commons.lang.ArrayUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.protobuf.Message;
|
||||
|
||||
/**
|
||||
* RPC Executor that uses different queues for reads and writes.
|
||||
* Each handler has its own queue and there is no stealing.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class RWQueueRpcExecutor extends RpcExecutor {
|
||||
private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class);
|
||||
|
||||
private final List<BlockingQueue<CallRunner>> queues;
|
||||
private final Random balancer = new Random();
|
||||
private final int writeHandlersCount;
|
||||
private final int readHandlersCount;
|
||||
private final int numWriteQueues;
|
||||
private final int numReadQueues;
|
||||
|
||||
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
|
||||
final float readShare, final int maxQueueLength) {
|
||||
this(name, handlerCount, numQueues, readShare, maxQueueLength,
|
||||
LinkedBlockingQueue.class);
|
||||
}
|
||||
|
||||
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
|
||||
final float readShare, final int maxQueueLength,
|
||||
final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
|
||||
this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
|
||||
calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare),
|
||||
LinkedBlockingQueue.class, new Object[] {maxQueueLength},
|
||||
readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
|
||||
}
|
||||
|
||||
public RWQueueRpcExecutor(final String name, final int writeHandlers, final int readHandlers,
|
||||
final int numWriteQueues, final int numReadQueues,
|
||||
final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
|
||||
final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
|
||||
super(name, Math.max(writeHandlers + readHandlers, numWriteQueues + numReadQueues));
|
||||
|
||||
this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
|
||||
this.readHandlersCount = Math.max(readHandlers, numReadQueues);
|
||||
this.numWriteQueues = numWriteQueues;
|
||||
this.numReadQueues = numReadQueues;
|
||||
|
||||
queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount);
|
||||
LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount +
|
||||
" readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount);
|
||||
|
||||
for (int i = 0; i < numWriteQueues; ++i) {
|
||||
queues.add((BlockingQueue<CallRunner>)
|
||||
ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
|
||||
}
|
||||
|
||||
for (int i = 0; i < numReadQueues; ++i) {
|
||||
queues.add((BlockingQueue<CallRunner>)
|
||||
ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void startHandlers(final int port) {
|
||||
startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port);
|
||||
startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispatch(final CallRunner callTask) throws InterruptedException {
|
||||
RpcServer.Call call = callTask.getCall();
|
||||
int queueIndex;
|
||||
if (isWriteRequest(call.getHeader(), call.param)) {
|
||||
queueIndex = balancer.nextInt(numWriteQueues);
|
||||
} else {
|
||||
queueIndex = numWriteQueues + balancer.nextInt(numReadQueues);
|
||||
}
|
||||
queues.get(queueIndex).put(callTask);
|
||||
}
|
||||
|
||||
private boolean isWriteRequest(final RequestHeader header, final Message param) {
|
||||
// TODO: Is there a better way to do this?
|
||||
String methodName = header.getMethodName();
|
||||
if (methodName.equalsIgnoreCase("multi") && param instanceof MultiRequest) {
|
||||
MultiRequest multi = (MultiRequest)param;
|
||||
for (RegionAction regionAction : multi.getRegionActionList()) {
|
||||
for (Action action: regionAction.getActionList()) {
|
||||
if (action.hasMutation()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQueueLength() {
|
||||
int length = 0;
|
||||
for (final BlockingQueue<CallRunner> queue: queues) {
|
||||
length += queue.size();
|
||||
}
|
||||
return length;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<BlockingQueue<CallRunner>> getQueues() {
|
||||
return queues;
|
||||
}
|
||||
|
||||
/*
|
||||
* Calculate the number of writers based on the "total count" and the read share.
|
||||
* You'll get at least one writer.
|
||||
*/
|
||||
private static int calcNumWriters(final int count, final float readShare) {
|
||||
return Math.max(1, count - Math.max(1, (int)Math.round(count * readShare)));
|
||||
}
|
||||
|
||||
/*
|
||||
* Calculate the number of readers based on the "total count" and the read share.
|
||||
* You'll get at least one reader.
|
||||
*/
|
||||
private static int calcNumReaders(final int count, final float readShare) {
|
||||
return count - calcNumWriters(count, readShare);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,128 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public abstract class RpcExecutor {
|
||||
private static final Log LOG = LogFactory.getLog(RpcExecutor.class);
|
||||
|
||||
private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
|
||||
private final List<Thread> handlers;
|
||||
private final int handlerCount;
|
||||
private final String name;
|
||||
|
||||
private boolean running;
|
||||
|
||||
public RpcExecutor(final String name, final int handlerCount) {
|
||||
this.handlers = new ArrayList<Thread>(handlerCount);
|
||||
this.handlerCount = handlerCount;
|
||||
this.name = Strings.nullToEmpty(name);
|
||||
}
|
||||
|
||||
public void start(final int port) {
|
||||
running = true;
|
||||
startHandlers(port);
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
running = false;
|
||||
for (Thread handler : handlers) {
|
||||
handler.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
public int getActiveHandlerCount() {
|
||||
return activeHandlerCount.get();
|
||||
}
|
||||
|
||||
/** Returns the length of the pending queue */
|
||||
public abstract int getQueueLength();
|
||||
|
||||
/** Add the request to the executor queue */
|
||||
public abstract void dispatch(final CallRunner callTask) throws InterruptedException;
|
||||
|
||||
/** Returns the list of request queues */
|
||||
protected abstract List<BlockingQueue<CallRunner>> getQueues();
|
||||
|
||||
protected void startHandlers(final int port) {
|
||||
List<BlockingQueue<CallRunner>> callQueues = getQueues();
|
||||
startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port);
|
||||
}
|
||||
|
||||
protected void startHandlers(final String nameSuffix, final int numHandlers,
|
||||
final List<BlockingQueue<CallRunner>> callQueues,
|
||||
final int qindex, final int qsize, final int port) {
|
||||
final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
|
||||
for (int i = 0; i < numHandlers; i++) {
|
||||
final int index = qindex + (i % qsize);
|
||||
Thread t = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
consumerLoop(callQueues.get(index));
|
||||
}
|
||||
});
|
||||
t.setDaemon(true);
|
||||
t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() +
|
||||
",queue=" + index + ",port=" + port);
|
||||
t.start();
|
||||
LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index);
|
||||
handlers.add(t);
|
||||
}
|
||||
}
|
||||
|
||||
protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
|
||||
boolean interrupted = false;
|
||||
try {
|
||||
while (running) {
|
||||
try {
|
||||
CallRunner task = myQueue.take();
|
||||
try {
|
||||
activeHandlerCount.incrementAndGet();
|
||||
task.run();
|
||||
} finally {
|
||||
activeHandlerCount.decrementAndGet();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
interrupted = true;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (interrupted) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -17,7 +17,11 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import java.util.Random;
|
||||
import java.util.Comparator;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
@ -43,7 +47,11 @@ import com.google.common.collect.Lists;
|
|||
public class SimpleRpcScheduler implements RpcScheduler {
|
||||
public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
|
||||
|
||||
/** If set to true, uses a priority queue and deprioritize long-running scans */
|
||||
public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "ipc.server.callqueue.read.share";
|
||||
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
|
||||
"ipc.server.callqueue.handler.factor";
|
||||
|
||||
/** If set to 'deadline', uses a priority queue and deprioritize long-running scans */
|
||||
public static final String CALL_QUEUE_TYPE_CONF_KEY = "ipc.server.callqueue.type";
|
||||
public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
|
||||
public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
|
||||
|
@ -82,16 +90,11 @@ public class SimpleRpcScheduler implements RpcScheduler {
|
|||
}
|
||||
|
||||
private int port;
|
||||
private final int handlerCount;
|
||||
private final int priorityHandlerCount;
|
||||
private final int replicationHandlerCount;
|
||||
private final PriorityFunction priority;
|
||||
final BlockingQueue<CallRunner> callQueue;
|
||||
final BlockingQueue<CallRunner> priorityCallQueue;
|
||||
final BlockingQueue<CallRunner> replicationQueue;
|
||||
private volatile boolean running = false;
|
||||
private final List<Thread> handlers = Lists.newArrayList();
|
||||
private AtomicInteger activeHandlerCount = new AtomicInteger(0);
|
||||
private final RpcExecutor callExecutor;
|
||||
private final RpcExecutor priorityExecutor;
|
||||
private final RpcExecutor replicationExecutor;
|
||||
|
||||
/** What level a high priority call is at. */
|
||||
private final int highPriorityLevel;
|
||||
|
||||
|
@ -112,25 +115,53 @@ public class SimpleRpcScheduler implements RpcScheduler {
|
|||
int highPriorityLevel) {
|
||||
int maxQueueLength = conf.getInt("ipc.server.max.callqueue.length",
|
||||
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
|
||||
this.handlerCount = handlerCount;
|
||||
this.priorityHandlerCount = priorityHandlerCount;
|
||||
this.replicationHandlerCount = replicationHandlerCount;
|
||||
this.priority = priority;
|
||||
this.highPriorityLevel = highPriorityLevel;
|
||||
|
||||
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
|
||||
LOG.debug("Using " + callQueueType + " as user call queue");
|
||||
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
|
||||
this.callQueue = new BoundedPriorityBlockingQueue<CallRunner>(maxQueueLength,
|
||||
new CallPriorityComparator(conf, this.priority));
|
||||
float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
|
||||
|
||||
float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
|
||||
int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
|
||||
|
||||
LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues);
|
||||
|
||||
if (numCallQueues > 1 && callqReadShare > 0) {
|
||||
// multiple read/write queues
|
||||
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
|
||||
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
|
||||
callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
|
||||
callqReadShare, maxQueueLength, BoundedPriorityBlockingQueue.class, callPriority);
|
||||
} else {
|
||||
callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
|
||||
callqReadShare, maxQueueLength);
|
||||
}
|
||||
} else if (numCallQueues > 1) {
|
||||
// multiple queues
|
||||
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
|
||||
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
|
||||
callExecutor = new MultipleQueueRpcExecutor("default", handlerCount, numCallQueues,
|
||||
BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
|
||||
} else {
|
||||
callExecutor = new MultipleQueueRpcExecutor("default", handlerCount,
|
||||
numCallQueues, maxQueueLength);
|
||||
}
|
||||
} else {
|
||||
this.callQueue = new LinkedBlockingQueue<CallRunner>(maxQueueLength);
|
||||
// Single queue
|
||||
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
|
||||
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
|
||||
callExecutor = new SingleQueueRpcExecutor("default", handlerCount,
|
||||
BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
|
||||
} else {
|
||||
callExecutor = new SingleQueueRpcExecutor("default", handlerCount, maxQueueLength);
|
||||
}
|
||||
}
|
||||
this.priorityCallQueue = priorityHandlerCount > 0
|
||||
? new LinkedBlockingQueue<CallRunner>(maxQueueLength)
|
||||
|
||||
this.priorityExecutor = priorityHandlerCount > 0
|
||||
? new SingleQueueRpcExecutor("Priority", priorityHandlerCount, maxQueueLength)
|
||||
: null;
|
||||
this.replicationQueue = replicationHandlerCount > 0
|
||||
? new LinkedBlockingQueue<CallRunner>(maxQueueLength)
|
||||
this.replicationExecutor = replicationHandlerCount > 0
|
||||
? new SingleQueueRpcExecutor("Replication", replicationHandlerCount, maxQueueLength)
|
||||
: null;
|
||||
}
|
||||
|
||||
|
@ -141,96 +172,51 @@ public class SimpleRpcScheduler implements RpcScheduler {
|
|||
|
||||
@Override
|
||||
public void start() {
|
||||
running = true;
|
||||
startHandlers(handlerCount, callQueue, null);
|
||||
if (priorityCallQueue != null) {
|
||||
startHandlers(priorityHandlerCount, priorityCallQueue, "Priority.");
|
||||
}
|
||||
if (replicationQueue != null) {
|
||||
startHandlers(replicationHandlerCount, replicationQueue, "Replication.");
|
||||
}
|
||||
}
|
||||
|
||||
private void startHandlers(
|
||||
int handlerCount,
|
||||
final BlockingQueue<CallRunner> callQueue,
|
||||
String threadNamePrefix) {
|
||||
for (int i = 0; i < handlerCount; i++) {
|
||||
Thread t = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
consumerLoop(callQueue);
|
||||
}
|
||||
});
|
||||
t.setDaemon(true);
|
||||
t.setName(Strings.nullToEmpty(threadNamePrefix) + "RpcServer.handler=" + i + ",port=" + port);
|
||||
t.start();
|
||||
handlers.add(t);
|
||||
}
|
||||
callExecutor.start(port);
|
||||
if (priorityExecutor != null) priorityExecutor.start(port);
|
||||
if (replicationExecutor != null) replicationExecutor.start(port);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
running = false;
|
||||
for (Thread handler : handlers) {
|
||||
handler.interrupt();
|
||||
}
|
||||
callExecutor.stop();
|
||||
if (priorityExecutor != null) priorityExecutor.stop();
|
||||
if (replicationExecutor != null) replicationExecutor.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispatch(CallRunner callTask) throws InterruptedException {
|
||||
RpcServer.Call call = callTask.getCall();
|
||||
int level = priority.getPriority(call.getHeader(), call.param);
|
||||
if (priorityCallQueue != null && level > highPriorityLevel) {
|
||||
priorityCallQueue.put(callTask);
|
||||
} else if (replicationQueue != null && level == HConstants.REPLICATION_QOS) {
|
||||
replicationQueue.put(callTask);
|
||||
if (priorityExecutor != null && level > highPriorityLevel) {
|
||||
priorityExecutor.dispatch(callTask);
|
||||
} else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
|
||||
replicationExecutor.dispatch(callTask);
|
||||
} else {
|
||||
callQueue.put(callTask); // queue the call; maybe blocked here
|
||||
callExecutor.dispatch(callTask);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getGeneralQueueLength() {
|
||||
return callQueue.size();
|
||||
return callExecutor.getQueueLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPriorityQueueLength() {
|
||||
return priorityCallQueue == null ? 0 : priorityCallQueue.size();
|
||||
return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getReplicationQueueLength() {
|
||||
return replicationQueue == null ? 0 : replicationQueue.size();
|
||||
return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getActiveRpcHandlerCount() {
|
||||
return activeHandlerCount.get();
|
||||
}
|
||||
|
||||
private void consumerLoop(BlockingQueue<CallRunner> myQueue) {
|
||||
boolean interrupted = false;
|
||||
try {
|
||||
while (running) {
|
||||
try {
|
||||
CallRunner task = myQueue.take();
|
||||
try {
|
||||
activeHandlerCount.incrementAndGet();
|
||||
task.run();
|
||||
} finally {
|
||||
activeHandlerCount.decrementAndGet();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
interrupted = true;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (interrupted) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
return callExecutor.getActiveHandlerCount() +
|
||||
(priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) +
|
||||
(replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* RPC Executor that uses a single queue for all the requests.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class SingleQueueRpcExecutor extends RpcExecutor {
|
||||
private final BlockingQueue<CallRunner> queue;
|
||||
|
||||
public SingleQueueRpcExecutor(final String name, final int handlerCount,
|
||||
final int maxQueueLength) {
|
||||
this(name, handlerCount, LinkedBlockingQueue.class, maxQueueLength);
|
||||
}
|
||||
|
||||
public SingleQueueRpcExecutor(final String name, final int handlerCount,
|
||||
final Class<? extends BlockingQueue> queueClass, Object... initargs) {
|
||||
super(name, handlerCount);
|
||||
queue = (BlockingQueue<CallRunner>)ReflectionUtils.newInstance(queueClass, initargs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispatch(final CallRunner callTask) throws InterruptedException {
|
||||
queue.put(callTask);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQueueLength() {
|
||||
return queue.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<BlockingQueue<CallRunner>> getQueues() {
|
||||
List<BlockingQueue<CallRunner>> list = new ArrayList<BlockingQueue<CallRunner>>(1);
|
||||
list.add(queue);
|
||||
return list;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue