HBASE-6908: Add pluggable rpc queue implementation (#3522)
Can pass in a FQCN to load as the call queue implementation. Standardized arguments to the constructor are the max queue length, the PriorityFunction, and the Configuration. `PluggableBlockingQueue` abstract class provided to help guide the correct constructor signature Hard fails if the class fails to load as a `BlockingQueue<CallRunner>` Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
6a1382a503
commit
7f614ce77e
|
@ -0,0 +1,55 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Abstract class template for defining a pluggable blocking queue implementation to be used
|
||||||
|
* by the 'pluggable' call queue type in the RpcExecutor.
|
||||||
|
*
|
||||||
|
* The intention is that the constructor shape helps re-inforce the expected parameters needed
|
||||||
|
* to match up to how the RpcExecutor will instantiate instances of the queue.
|
||||||
|
*
|
||||||
|
* If the implementation class implements the
|
||||||
|
* {@link org.apache.hadoop.hbase.conf.ConfigurationObserver} interface, it will also be wired
|
||||||
|
* into configuration changes.
|
||||||
|
*
|
||||||
|
* Instantiation requires a constructor with {@code
|
||||||
|
* final int maxQueueLength,
|
||||||
|
* final PriorityFunction priority,
|
||||||
|
* final Configuration conf)}
|
||||||
|
* as the arguments.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public abstract class PluggableBlockingQueue implements BlockingQueue<CallRunner> {
|
||||||
|
protected final int maxQueueLength;
|
||||||
|
protected final PriorityFunction priority;
|
||||||
|
protected final Configuration conf;
|
||||||
|
|
||||||
|
public PluggableBlockingQueue(final int maxQueueLength,
|
||||||
|
final PriorityFunction priority, final Configuration conf) {
|
||||||
|
this.maxQueueLength = maxQueueLength;
|
||||||
|
this.priority = priority;
|
||||||
|
this.conf = conf;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Internal runtime error type to indicate the RpcExecutor failed to execute a `pluggable`
|
||||||
|
* call queue type. Either the FQCN for the class was missing in Configuration, not found on the
|
||||||
|
* classpath, or is not a subtype of {@code BlockingQueue<CallRunner>}
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class PluggableRpcQueueNotFound extends RuntimeException {
|
||||||
|
public PluggableRpcQueueNotFound(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
}
|
|
@ -22,6 +22,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
@ -33,6 +34,7 @@ import java.util.HashMap;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||||
import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
|
import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -67,6 +69,7 @@ public abstract class RpcExecutor {
|
||||||
public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
|
public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
|
||||||
public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
|
public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
|
||||||
public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
|
public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
|
||||||
|
public static final String CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE = "pluggable";
|
||||||
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
|
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
|
||||||
public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE;
|
public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE;
|
||||||
|
|
||||||
|
@ -79,6 +82,9 @@ public abstract class RpcExecutor {
|
||||||
public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
|
public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
|
||||||
public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8;
|
public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8;
|
||||||
|
|
||||||
|
public static final String PLUGGABLE_CALL_QUEUE_CLASS_NAME =
|
||||||
|
"hbase.ipc.server.callqueue.pluggable.queue.class.name";
|
||||||
|
|
||||||
private LongAdder numGeneralCallsDropped = new LongAdder();
|
private LongAdder numGeneralCallsDropped = new LongAdder();
|
||||||
private LongAdder numLifoModeSwitches = new LongAdder();
|
private LongAdder numLifoModeSwitches = new LongAdder();
|
||||||
|
|
||||||
|
@ -150,13 +156,23 @@ public abstract class RpcExecutor {
|
||||||
int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
|
int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
|
||||||
double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
|
double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
|
||||||
CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
|
CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
|
||||||
queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval,
|
this.queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval,
|
||||||
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
|
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
|
||||||
queueClass = AdaptiveLifoCoDelCallQueue.class;
|
this.queueClass = AdaptiveLifoCoDelCallQueue.class;
|
||||||
|
} else if (isPluggableQueueType(callQueueType)) {
|
||||||
|
Optional<Class<? extends BlockingQueue<CallRunner>>> pluggableQueueClass = getPluggableQueueClass();
|
||||||
|
|
||||||
|
if (!pluggableQueueClass.isPresent()) {
|
||||||
|
throw new PluggableRpcQueueNotFound("Pluggable call queue failed to load and selected call"
|
||||||
|
+ " queue type required");
|
||||||
|
} else {
|
||||||
|
this.queueInitArgs = new Object[] { maxQueueLength, this.priority, conf };
|
||||||
|
this.queueClass = pluggableQueueClass.get();
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
this.name += ".Fifo";
|
this.name += ".Fifo";
|
||||||
queueInitArgs = new Object[] { maxQueueLength };
|
this.queueInitArgs = new Object[] { maxQueueLength };
|
||||||
queueClass = LinkedBlockingQueue.class;
|
this.queueClass = LinkedBlockingQueue.class;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Instantiated {} with queueClass={}; " +
|
LOG.info("Instantiated {} with queueClass={}; " +
|
||||||
|
@ -445,6 +461,35 @@ public abstract class RpcExecutor {
|
||||||
return callQueueType.equals(CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
|
return callQueueType.equals(CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static boolean isPluggableQueueType(String callQueueType) {
|
||||||
|
return callQueueType.equals(CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Optional<Class<? extends BlockingQueue<CallRunner>>> getPluggableQueueClass() {
|
||||||
|
String queueClassName = conf.get(PLUGGABLE_CALL_QUEUE_CLASS_NAME);
|
||||||
|
|
||||||
|
if (queueClassName == null) {
|
||||||
|
LOG.error("Pluggable queue class config at " + PLUGGABLE_CALL_QUEUE_CLASS_NAME +
|
||||||
|
" was not found");
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
Class<?> clazz = Class.forName(queueClassName);
|
||||||
|
|
||||||
|
if (BlockingQueue.class.isAssignableFrom(clazz)) {
|
||||||
|
return Optional.of((Class<? extends BlockingQueue<CallRunner>>) clazz);
|
||||||
|
} else {
|
||||||
|
LOG.error("Pluggable Queue class " + queueClassName +
|
||||||
|
" does not extend BlockingQueue<CallRunner>");
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
} catch (ClassNotFoundException exception) {
|
||||||
|
LOG.error("Could not find " + queueClassName + " on the classpath to load.");
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public long getNumGeneralCallsDropped() {
|
public long getNumGeneralCallsDropped() {
|
||||||
return numGeneralCallsDropped.longValue();
|
return numGeneralCallsDropped.longValue();
|
||||||
}
|
}
|
||||||
|
@ -522,6 +567,8 @@ public abstract class RpcExecutor {
|
||||||
if (queue instanceof AdaptiveLifoCoDelCallQueue) {
|
if (queue instanceof AdaptiveLifoCoDelCallQueue) {
|
||||||
((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay, codelInterval,
|
((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay, codelInterval,
|
||||||
codelLifoThreshold);
|
codelLifoThreshold);
|
||||||
|
} else if (queue instanceof ConfigurationObserver) {
|
||||||
|
((ConfigurationObserver)queue).onConfigurationChange(conf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -154,7 +154,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
|
||||||
|
|
||||||
String callQueueType = conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
|
String callQueueType = conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
|
||||||
RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT);
|
RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT);
|
||||||
if (RpcExecutor.isCodelQueueType(callQueueType)) {
|
if (RpcExecutor.isCodelQueueType(callQueueType) ||
|
||||||
|
RpcExecutor.isPluggableQueueType(callQueueType)) {
|
||||||
callExecutor.onConfigurationChange(conf);
|
callExecutor.onConfigurationChange(conf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,155 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||||
|
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementation of the PluggableBlockingQueue abstract class.
|
||||||
|
*
|
||||||
|
* Used to verify that the pluggable call queue type for the RpcExecutor can load correctly
|
||||||
|
* via the FQCN reflection semantics.
|
||||||
|
*/
|
||||||
|
public class TestPluggableQueueImpl extends PluggableBlockingQueue implements
|
||||||
|
ConfigurationObserver {
|
||||||
|
|
||||||
|
private final BoundedPriorityBlockingQueue<CallRunner> inner;
|
||||||
|
private static boolean configurationRecentlyChanged = false;
|
||||||
|
|
||||||
|
public TestPluggableQueueImpl(int maxQueueLength, PriorityFunction priority, Configuration conf) {
|
||||||
|
super(maxQueueLength, priority, conf);
|
||||||
|
Comparator<CallRunner> comparator = Comparator.comparingInt(r -> r.getRpcCall().getPriority());
|
||||||
|
inner = new BoundedPriorityBlockingQueue<>(maxQueueLength, comparator);
|
||||||
|
configurationRecentlyChanged = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean add(CallRunner callRunner) {
|
||||||
|
return inner.add(callRunner);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean offer(CallRunner callRunner) {
|
||||||
|
return inner.offer(callRunner);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public CallRunner remove() {
|
||||||
|
return inner.remove();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public CallRunner poll() {
|
||||||
|
return inner.poll();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public CallRunner element() {
|
||||||
|
return inner.element();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public CallRunner peek() {
|
||||||
|
return inner.peek();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void put(CallRunner callRunner) throws InterruptedException {
|
||||||
|
inner.put(callRunner);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean offer(CallRunner callRunner, long timeout, TimeUnit unit)
|
||||||
|
throws InterruptedException {
|
||||||
|
return inner.offer(callRunner, timeout, unit);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public CallRunner take() throws InterruptedException {
|
||||||
|
return inner.take();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public CallRunner poll(long timeout, TimeUnit unit) throws InterruptedException {
|
||||||
|
return inner.poll(timeout, unit);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public int remainingCapacity() {
|
||||||
|
return inner.remainingCapacity();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean remove(Object o) {
|
||||||
|
return inner.remove(o);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean containsAll(Collection<?> c) {
|
||||||
|
return inner.containsAll(c);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean addAll(Collection<? extends CallRunner> c) {
|
||||||
|
return inner.addAll(c);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean removeAll(Collection<?> c) {
|
||||||
|
return inner.removeAll(c);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean retainAll(Collection<?> c) {
|
||||||
|
return inner.retainAll(c);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void clear() {
|
||||||
|
inner.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public int size() {
|
||||||
|
return inner.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean isEmpty() {
|
||||||
|
return inner.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean contains(Object o) {
|
||||||
|
return inner.contains(o);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public Iterator<CallRunner> iterator() {
|
||||||
|
return inner.iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public Object[] toArray() {
|
||||||
|
return inner.toArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public <T> T[] toArray(T[] a) {
|
||||||
|
return inner.toArray(a);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public int drainTo(Collection<? super CallRunner> c) {
|
||||||
|
return inner.drainTo(c);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public int drainTo(Collection<? super CallRunner> c, int maxElements) {
|
||||||
|
return inner.drainTo(c, maxElements);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean hasObservedARecentConfigurationChange() {
|
||||||
|
return configurationRecentlyChanged;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void onConfigurationChange(Configuration conf) {
|
||||||
|
configurationRecentlyChanged = true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotEquals;
|
import static org.junit.Assert.assertNotEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
@ -247,10 +248,84 @@ public class TestSimpleRpcScheduler {
|
||||||
testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
|
testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPluggableRpcQueue() throws Exception {
|
||||||
|
testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE,
|
||||||
|
"org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
|
||||||
|
|
||||||
|
try {
|
||||||
|
testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE,
|
||||||
|
"MissingClass");
|
||||||
|
fail("Expected a PluggableRpcQueueNotFound for unloaded class");
|
||||||
|
} catch (PluggableRpcQueueNotFound e) {
|
||||||
|
// expected
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("Expected a PluggableRpcQueueNotFound for unloaded class, but instead got " + e);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE,
|
||||||
|
"org.apache.hadoop.hbase.ipc.SimpleRpcServer");
|
||||||
|
fail("Expected a PluggableRpcQueueNotFound for incompatible class");
|
||||||
|
} catch (PluggableRpcQueueNotFound e) {
|
||||||
|
// expected
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("Expected a PluggableRpcQueueNotFound for incompatible class, but instead got " + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPluggableRpcQueueCanListenToConfigurationChanges() throws Exception {
|
||||||
|
|
||||||
|
Configuration schedConf = HBaseConfiguration.create();
|
||||||
|
|
||||||
|
schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2);
|
||||||
|
schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
|
||||||
|
schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
|
||||||
|
RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE);
|
||||||
|
schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME,
|
||||||
|
"org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
|
||||||
|
|
||||||
|
PriorityFunction priority = mock(PriorityFunction.class);
|
||||||
|
when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
|
||||||
|
SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority,
|
||||||
|
HConstants.QOS_THRESHOLD);
|
||||||
|
try {
|
||||||
|
scheduler.start();
|
||||||
|
|
||||||
|
CallRunner putCallTask = mock(CallRunner.class);
|
||||||
|
ServerCall putCall = mock(ServerCall.class);
|
||||||
|
putCall.param = RequestConverter.buildMutateRequest(
|
||||||
|
Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
|
||||||
|
RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
|
||||||
|
when(putCallTask.getRpcCall()).thenReturn(putCall);
|
||||||
|
when(putCall.getHeader()).thenReturn(putHead);
|
||||||
|
|
||||||
|
assertTrue(scheduler.dispatch(putCallTask));
|
||||||
|
|
||||||
|
schedConf.setInt("hbase.ipc.server.max.callqueue.length", 4);
|
||||||
|
scheduler.onConfigurationChange(schedConf);
|
||||||
|
assertTrue(TestPluggableQueueImpl.hasObservedARecentConfigurationChange());
|
||||||
|
waitUntilQueueEmpty(scheduler);
|
||||||
|
} finally {
|
||||||
|
scheduler.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void testRpcScheduler(final String queueType) throws Exception {
|
private void testRpcScheduler(final String queueType) throws Exception {
|
||||||
|
testRpcScheduler(queueType, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testRpcScheduler(final String queueType, final String pluggableQueueClass)
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
Configuration schedConf = HBaseConfiguration.create();
|
Configuration schedConf = HBaseConfiguration.create();
|
||||||
schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
|
schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
|
||||||
|
|
||||||
|
if (RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE.equals(queueType)) {
|
||||||
|
schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, pluggableQueueClass);
|
||||||
|
}
|
||||||
|
|
||||||
PriorityFunction priority = mock(PriorityFunction.class);
|
PriorityFunction priority = mock(PriorityFunction.class);
|
||||||
when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
|
when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue