HubSpot Addendum HBASE-26703: Add executor name to QueueBalancer constructor args
This commit is contained in:
parent
cf6efffc77
commit
66b2e38300
@ -47,7 +47,7 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
|
|||||||
final Configuration conf, final Abortable abortable) {
|
final Configuration conf, final Abortable abortable) {
|
||||||
super(name, handlerCount, callQueueType, maxQueueLength, priority, conf, abortable);
|
super(name, handlerCount, callQueueType, maxQueueLength, priority, conf, abortable);
|
||||||
initializeQueues(this.numCallQueues);
|
initializeQueues(this.numCallQueues);
|
||||||
this.balancer = getBalancer(conf, getQueues());
|
this.balancer = getBalancer(name, conf, getQueues());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -99,10 +99,10 @@ public class RWQueueRpcExecutor extends RpcExecutor {
|
|||||||
initializeQueues(numReadQueues);
|
initializeQueues(numReadQueues);
|
||||||
initializeQueues(numScanQueues);
|
initializeQueues(numScanQueues);
|
||||||
|
|
||||||
this.writeBalancer = getBalancer(conf, queues.subList(0, numWriteQueues));
|
this.writeBalancer = getBalancer(name, conf, queues.subList(0, numWriteQueues));
|
||||||
this.readBalancer = getBalancer(conf, queues.subList(numWriteQueues, numWriteQueues + numReadQueues));
|
this.readBalancer = getBalancer(name, conf, queues.subList(numWriteQueues, numWriteQueues + numReadQueues));
|
||||||
this.scanBalancer = numScanQueues > 0 ?
|
this.scanBalancer = numScanQueues > 0 ?
|
||||||
getBalancer(conf, queues.subList(numWriteQueues + numReadQueues,
|
getBalancer(name, conf, queues.subList(numWriteQueues + numReadQueues,
|
||||||
numWriteQueues + numReadQueues + numScanQueues)) :
|
numWriteQueues + numReadQueues + numScanQueues)) :
|
||||||
null;
|
null;
|
||||||
|
|
||||||
|
@ -36,7 +36,7 @@ public class RandomQueueBalancer implements QueueBalancer {
|
|||||||
private final int queueSize;
|
private final int queueSize;
|
||||||
private final List<BlockingQueue<CallRunner>> queues;
|
private final List<BlockingQueue<CallRunner>> queues;
|
||||||
|
|
||||||
public RandomQueueBalancer(Configuration conf, List<BlockingQueue<CallRunner>> queues) {
|
public RandomQueueBalancer(Configuration conf, String name, List<BlockingQueue<CallRunner>> queues) {
|
||||||
this.queueSize = queues.size();
|
this.queueSize = queues.size();
|
||||||
this.queues = queues;
|
this.queues = queues;
|
||||||
}
|
}
|
||||||
|
@ -382,13 +382,13 @@ public abstract class RpcExecutor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static QueueBalancer getBalancer(Configuration conf, List<BlockingQueue<CallRunner>> queues) {
|
public static QueueBalancer getBalancer(String name, Configuration conf, List<BlockingQueue<CallRunner>> queues) {
|
||||||
Preconditions.checkArgument(queues.size() > 0, "Queue size is <= 0, must be at least 1");
|
Preconditions.checkArgument(queues.size() > 0, "Queue size is <= 0, must be at least 1");
|
||||||
if (queues.size() == 1) {
|
if (queues.size() == 1) {
|
||||||
return ONE_QUEUE;
|
return ONE_QUEUE;
|
||||||
} else {
|
} else {
|
||||||
Class<?> balancerClass = conf.getClass(CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT);
|
Class<?> balancerClass = conf.getClass(CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT);
|
||||||
return (QueueBalancer) ReflectionUtils.newInstance(balancerClass, conf, queues);
|
return (QueueBalancer) ReflectionUtils.newInstance(balancerClass, conf, name, queues);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user