diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java index cda54245abb..6629868ef00 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.util; +import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import org.apache.hadoop.classification.InterfaceAudience; @@ -26,13 +27,23 @@ import org.apache.hadoop.classification.InterfaceAudience; public class ReflectionUtils { @SuppressWarnings("unchecked") public static T instantiateWithCustomCtor(String className, - Class[] ctorArgTypes, Object[] ctorArgs) { + Class[] ctorArgTypes, Object[] ctorArgs) { try { Class resultType = (Class) Class.forName(className); - return resultType.getDeclaredConstructor(ctorArgTypes).newInstance(ctorArgs); + Constructor ctor = resultType.getDeclaredConstructor(ctorArgTypes); + return instantiate(className, ctor, ctorArgs); } catch (ClassNotFoundException e) { throw new UnsupportedOperationException( "Unable to find " + className, e); + } catch (NoSuchMethodException e) { + throw new UnsupportedOperationException( + "Unable to find suitable constructor for class " + className, e); + } + } + + private static T instantiate(final String className, Constructor ctor, Object[] ctorArgs) { + try { + return ctor.newInstance(ctorArgs); } catch (IllegalAccessException e) { throw new UnsupportedOperationException( "Unable to access specified class " + className, e); @@ -42,9 +53,40 @@ public class ReflectionUtils { } catch (InvocationTargetException e) { throw new UnsupportedOperationException( "Constructor threw an exception for " + className, e); - } catch (NoSuchMethodException e) { - throw new UnsupportedOperationException( - "Unable to find suitable constructor for class " + className, e); } } + + @SuppressWarnings("unchecked") + public static T newInstance(Class type, Object... params) { + return instantiate(type.getName(), findConstructor(type, params), params); + } + + @SuppressWarnings("unchecked") + public static Constructor findConstructor(Class type, Object... paramTypes) { + Constructor[] constructors = (Constructor[])type.getConstructors(); + for (Constructor ctor : constructors) { + Class[] ctorParamTypes = ctor.getParameterTypes(); + if (ctorParamTypes.length != paramTypes.length) { + continue; + } + + boolean match = true; + for (int i = 0; i < ctorParamTypes.length && match; ++i) { + Class paramType = paramTypes[i].getClass(); + match = (!ctorParamTypes[i].isPrimitive()) ? ctorParamTypes[i].isAssignableFrom(paramType) : + ((int.class.equals(ctorParamTypes[i]) && Integer.class.equals(paramType)) || + (long.class.equals(ctorParamTypes[i]) && Long.class.equals(paramType)) || + (char.class.equals(ctorParamTypes[i]) && Character.class.equals(paramType)) || + (short.class.equals(ctorParamTypes[i]) && Short.class.equals(paramType)) || + (boolean.class.equals(ctorParamTypes[i]) && Boolean.class.equals(paramType)) || + (byte.class.equals(ctorParamTypes[i]) && Byte.class.equals(paramType))); + } + + if (match) { + return ctor; + } + } + throw new UnsupportedOperationException( + "Unable to find suitable constructor for class " + type.getName()); + } } diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 43723f8c301..70d20a7660b 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -180,6 +180,22 @@ possible configurations would overwhelm and obscure the important. Count of RPC Listener instances spun up on RegionServers. Same property is used by the Master for count of master handlers. + + ipc.server.callqueue.handler.factor + 0.1 + Factor to determine the number of call queues. + A value of 0 means a single queue shared between all the handlers. + A value of 1 means that each handler has its own queue. + + + ipc.server.callqueue.read.share + 0 + Split the call queues into read and write queues. + A value of 0 indicate to not split the call queues. + A value of 0.5 means there will be the same number of read and write queues + A value of 1.0 means that all the queues except one are used to dispatch read requests. + + hbase.regionserver.msginterval 3000 diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java new file mode 100644 index 00000000000..ab149065dc0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +import com.google.common.collect.Lists; + +/** + * RPC Executor that dispatch the requests on multiple queues. + * Each handler has its own queue and there is no stealing. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class MultipleQueueRpcExecutor extends RpcExecutor { + protected final List> queues; + protected final Random balancer = new Random(); + + public MultipleQueueRpcExecutor(final String name, final int handlerCount, + final int numQueues, final int maxQueueLength) { + this(name, handlerCount, numQueues, LinkedBlockingQueue.class, maxQueueLength); + } + + public MultipleQueueRpcExecutor(final String name, final int handlerCount, + final int numQueues, + final Class queueClass, Object... initargs) { + super(name, Math.max(handlerCount, numQueues)); + queues = new ArrayList>(numQueues); + initializeQueues(numQueues, queueClass, initargs); + } + + protected void initializeQueues(final int numQueues, + final Class queueClass, Object... initargs) { + for (int i = 0; i < numQueues; ++i) { + queues.add((BlockingQueue) + ReflectionUtils.newInstance(queueClass, initargs)); + } + } + + @Override + public void dispatch(final CallRunner callTask) throws InterruptedException { + int queueIndex = balancer.nextInt(queues.size()); + queues.get(queueIndex).put(callTask); + } + + @Override + public int getQueueLength() { + int length = 0; + for (final BlockingQueue queue: queues) { + length += queue.size(); + } + return length; + } + + @Override + protected List> getQueues() { + return queues; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java new file mode 100644 index 00000000000..1eb1a22499b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +import com.google.common.collect.Lists; +import com.google.protobuf.Message; + +/** + * RPC Executor that uses different queues for reads and writes. + * Each handler has its own queue and there is no stealing. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class RWQueueRpcExecutor extends RpcExecutor { + private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class); + + private final List> queues; + private final Random balancer = new Random(); + private final int writeHandlersCount; + private final int readHandlersCount; + private final int numWriteQueues; + private final int numReadQueues; + + public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, + final float readShare, final int maxQueueLength) { + this(name, handlerCount, numQueues, readShare, maxQueueLength, + LinkedBlockingQueue.class); + } + + public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, + final float readShare, final int maxQueueLength, + final Class readQueueClass, Object... readQueueInitArgs) { + this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare), + calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), + LinkedBlockingQueue.class, new Object[] {maxQueueLength}, + readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs)); + } + + public RWQueueRpcExecutor(final String name, final int writeHandlers, final int readHandlers, + final int numWriteQueues, final int numReadQueues, + final Class writeQueueClass, Object[] writeQueueInitArgs, + final Class readQueueClass, Object[] readQueueInitArgs) { + super(name, Math.max(writeHandlers + readHandlers, numWriteQueues + numReadQueues)); + + this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues); + this.readHandlersCount = Math.max(readHandlers, numReadQueues); + this.numWriteQueues = numWriteQueues; + this.numReadQueues = numReadQueues; + + queues = new ArrayList>(writeHandlersCount + readHandlersCount); + LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount + + " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount); + + for (int i = 0; i < numWriteQueues; ++i) { + queues.add((BlockingQueue) + ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs)); + } + + for (int i = 0; i < numReadQueues; ++i) { + queues.add((BlockingQueue) + ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs)); + } + } + + @Override + protected void startHandlers(final int port) { + startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port); + startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port); + } + + @Override + public void dispatch(final CallRunner callTask) throws InterruptedException { + RpcServer.Call call = callTask.getCall(); + int queueIndex; + if (isWriteRequest(call.getHeader(), call.param)) { + queueIndex = balancer.nextInt(numWriteQueues); + } else { + queueIndex = numWriteQueues + balancer.nextInt(numReadQueues); + } + queues.get(queueIndex).put(callTask); + } + + private boolean isWriteRequest(final RequestHeader header, final Message param) { + // TODO: Is there a better way to do this? + String methodName = header.getMethodName(); + if (methodName.equalsIgnoreCase("multi") && param instanceof MultiRequest) { + MultiRequest multi = (MultiRequest)param; + for (RegionAction regionAction : multi.getRegionActionList()) { + for (Action action: regionAction.getActionList()) { + if (action.hasMutation()) { + return true; + } + } + } + } + return false; + } + + @Override + public int getQueueLength() { + int length = 0; + for (final BlockingQueue queue: queues) { + length += queue.size(); + } + return length; + } + + @Override + protected List> getQueues() { + return queues; + } + + /* + * Calculate the number of writers based on the "total count" and the read share. + * You'll get at least one writer. + */ + private static int calcNumWriters(final int count, final float readShare) { + return Math.max(1, count - Math.max(1, (int)Math.round(count * readShare))); + } + + /* + * Calculate the number of readers based on the "total count" and the read share. + * You'll get at least one reader. + */ + private static int calcNumReaders(final int count, final float readShare) { + return count - calcNumWriters(count, readShare); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java new file mode 100644 index 00000000000..84a71eafc62 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class RpcExecutor { + private static final Log LOG = LogFactory.getLog(RpcExecutor.class); + + private final AtomicInteger activeHandlerCount = new AtomicInteger(0); + private final List handlers; + private final int handlerCount; + private final String name; + + private boolean running; + + public RpcExecutor(final String name, final int handlerCount) { + this.handlers = new ArrayList(handlerCount); + this.handlerCount = handlerCount; + this.name = Strings.nullToEmpty(name); + } + + public void start(final int port) { + running = true; + startHandlers(port); + } + + public void stop() { + running = false; + for (Thread handler : handlers) { + handler.interrupt(); + } + } + + public int getActiveHandlerCount() { + return activeHandlerCount.get(); + } + + /** Returns the length of the pending queue */ + public abstract int getQueueLength(); + + /** Add the request to the executor queue */ + public abstract void dispatch(final CallRunner callTask) throws InterruptedException; + + /** Returns the list of request queues */ + protected abstract List> getQueues(); + + protected void startHandlers(final int port) { + List> callQueues = getQueues(); + startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port); + } + + protected void startHandlers(final String nameSuffix, final int numHandlers, + final List> callQueues, + final int qindex, final int qsize, final int port) { + final String threadPrefix = name + Strings.nullToEmpty(nameSuffix); + for (int i = 0; i < numHandlers; i++) { + final int index = qindex + (i % qsize); + Thread t = new Thread(new Runnable() { + @Override + public void run() { + consumerLoop(callQueues.get(index)); + } + }); + t.setDaemon(true); + t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() + + ",queue=" + index + ",port=" + port); + t.start(); + LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index); + handlers.add(t); + } + } + + protected void consumerLoop(final BlockingQueue myQueue) { + boolean interrupted = false; + try { + while (running) { + try { + CallRunner task = myQueue.take(); + try { + activeHandlerCount.incrementAndGet(); + task.run(); + } finally { + activeHandlerCount.decrementAndGet(); + } + } catch (InterruptedException e) { + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 764900b670d..27f54275465 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -17,7 +17,11 @@ */ package org.apache.hadoop.hbase.ipc; +import java.io.IOException; + +import java.util.Random; import java.util.Comparator; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -43,7 +47,11 @@ import com.google.common.collect.Lists; public class SimpleRpcScheduler implements RpcScheduler { public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class); - /** If set to true, uses a priority queue and deprioritize long-running scans */ + public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "ipc.server.callqueue.read.share"; + public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = + "ipc.server.callqueue.handler.factor"; + + /** If set to 'deadline', uses a priority queue and deprioritize long-running scans */ public static final String CALL_QUEUE_TYPE_CONF_KEY = "ipc.server.callqueue.type"; public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline"; public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo"; @@ -82,16 +90,11 @@ public class SimpleRpcScheduler implements RpcScheduler { } private int port; - private final int handlerCount; - private final int priorityHandlerCount; - private final int replicationHandlerCount; private final PriorityFunction priority; - final BlockingQueue callQueue; - final BlockingQueue priorityCallQueue; - final BlockingQueue replicationQueue; - private volatile boolean running = false; - private final List handlers = Lists.newArrayList(); - private AtomicInteger activeHandlerCount = new AtomicInteger(0); + private final RpcExecutor callExecutor; + private final RpcExecutor priorityExecutor; + private final RpcExecutor replicationExecutor; + /** What level a high priority call is at. */ private final int highPriorityLevel; @@ -112,25 +115,53 @@ public class SimpleRpcScheduler implements RpcScheduler { int highPriorityLevel) { int maxQueueLength = conf.getInt("ipc.server.max.callqueue.length", handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); - this.handlerCount = handlerCount; - this.priorityHandlerCount = priorityHandlerCount; - this.replicationHandlerCount = replicationHandlerCount; this.priority = priority; this.highPriorityLevel = highPriorityLevel; String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); - LOG.debug("Using " + callQueueType + " as user call queue"); - if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { - this.callQueue = new BoundedPriorityBlockingQueue(maxQueueLength, - new CallPriorityComparator(conf, this.priority)); + float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0); + + float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0); + int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor)); + + LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues); + + if (numCallQueues > 1 && callqReadShare > 0) { + // multiple read/write queues + if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { + CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); + callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues, + callqReadShare, maxQueueLength, BoundedPriorityBlockingQueue.class, callPriority); + } else { + callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues, + callqReadShare, maxQueueLength); + } + } else if (numCallQueues > 1) { + // multiple queues + if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { + CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); + callExecutor = new MultipleQueueRpcExecutor("default", handlerCount, numCallQueues, + BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); + } else { + callExecutor = new MultipleQueueRpcExecutor("default", handlerCount, + numCallQueues, maxQueueLength); + } } else { - this.callQueue = new LinkedBlockingQueue(maxQueueLength); + // Single queue + if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { + CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); + callExecutor = new SingleQueueRpcExecutor("default", handlerCount, + BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); + } else { + callExecutor = new SingleQueueRpcExecutor("default", handlerCount, maxQueueLength); + } } - this.priorityCallQueue = priorityHandlerCount > 0 - ? new LinkedBlockingQueue(maxQueueLength) + + this.priorityExecutor = priorityHandlerCount > 0 + ? new SingleQueueRpcExecutor("Priority", priorityHandlerCount, maxQueueLength) : null; - this.replicationQueue = replicationHandlerCount > 0 - ? new LinkedBlockingQueue(maxQueueLength) + this.replicationExecutor = replicationHandlerCount > 0 + ? new SingleQueueRpcExecutor("Replication", replicationHandlerCount, maxQueueLength) : null; } @@ -141,96 +172,51 @@ public class SimpleRpcScheduler implements RpcScheduler { @Override public void start() { - running = true; - startHandlers(handlerCount, callQueue, null); - if (priorityCallQueue != null) { - startHandlers(priorityHandlerCount, priorityCallQueue, "Priority."); - } - if (replicationQueue != null) { - startHandlers(replicationHandlerCount, replicationQueue, "Replication."); - } - } - - private void startHandlers( - int handlerCount, - final BlockingQueue callQueue, - String threadNamePrefix) { - for (int i = 0; i < handlerCount; i++) { - Thread t = new Thread(new Runnable() { - @Override - public void run() { - consumerLoop(callQueue); - } - }); - t.setDaemon(true); - t.setName(Strings.nullToEmpty(threadNamePrefix) + "RpcServer.handler=" + i + ",port=" + port); - t.start(); - handlers.add(t); - } + callExecutor.start(port); + if (priorityExecutor != null) priorityExecutor.start(port); + if (replicationExecutor != null) replicationExecutor.start(port); } @Override public void stop() { - running = false; - for (Thread handler : handlers) { - handler.interrupt(); - } + callExecutor.stop(); + if (priorityExecutor != null) priorityExecutor.stop(); + if (replicationExecutor != null) replicationExecutor.stop(); } @Override public void dispatch(CallRunner callTask) throws InterruptedException { RpcServer.Call call = callTask.getCall(); int level = priority.getPriority(call.getHeader(), call.param); - if (priorityCallQueue != null && level > highPriorityLevel) { - priorityCallQueue.put(callTask); - } else if (replicationQueue != null && level == HConstants.REPLICATION_QOS) { - replicationQueue.put(callTask); + if (priorityExecutor != null && level > highPriorityLevel) { + priorityExecutor.dispatch(callTask); + } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) { + replicationExecutor.dispatch(callTask); } else { - callQueue.put(callTask); // queue the call; maybe blocked here + callExecutor.dispatch(callTask); } } @Override public int getGeneralQueueLength() { - return callQueue.size(); + return callExecutor.getQueueLength(); } @Override public int getPriorityQueueLength() { - return priorityCallQueue == null ? 0 : priorityCallQueue.size(); + return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength(); } @Override public int getReplicationQueueLength() { - return replicationQueue == null ? 0 : replicationQueue.size(); + return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength(); } @Override public int getActiveRpcHandlerCount() { - return activeHandlerCount.get(); - } - - private void consumerLoop(BlockingQueue myQueue) { - boolean interrupted = false; - try { - while (running) { - try { - CallRunner task = myQueue.take(); - try { - activeHandlerCount.incrementAndGet(); - task.run(); - } finally { - activeHandlerCount.decrementAndGet(); - } - } catch (InterruptedException e) { - interrupted = true; - } - } - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } + return callExecutor.getActiveHandlerCount() + + (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) + + (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java new file mode 100644 index 00000000000..f195e0d9516 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.collect.Lists; + +/** + * RPC Executor that uses a single queue for all the requests. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class SingleQueueRpcExecutor extends RpcExecutor { + private final BlockingQueue queue; + + public SingleQueueRpcExecutor(final String name, final int handlerCount, + final int maxQueueLength) { + this(name, handlerCount, LinkedBlockingQueue.class, maxQueueLength); + } + + public SingleQueueRpcExecutor(final String name, final int handlerCount, + final Class queueClass, Object... initargs) { + super(name, handlerCount); + queue = (BlockingQueue)ReflectionUtils.newInstance(queueClass, initargs); + } + + @Override + public void dispatch(final CallRunner callTask) throws InterruptedException { + queue.put(callTask); + } + + @Override + public int getQueueLength() { + return queue.size(); + } + + @Override + protected List> getQueues() { + List> list = new ArrayList>(1); + list.add(queue); + return list; + } +}