configurable thread count

This commit is contained in:
Himadri Singh 2014-02-14 13:09:24 +05:30
parent 7dae293b5f
commit 5af414c93c
6 changed files with 26 additions and 6 deletions

View File

@ -36,10 +36,12 @@ import java.util.concurrent.Executors;
public class CostBalancerMultithreadStrategy extends AbstractCostBalancerStrategy public class CostBalancerMultithreadStrategy extends AbstractCostBalancerStrategy
{ {
private static final EmittingLogger log = new EmittingLogger(CostBalancerMultithreadStrategy.class); private static final EmittingLogger log = new EmittingLogger(CostBalancerMultithreadStrategy.class);
private final int threadCount;
public CostBalancerMultithreadStrategy(DateTime referenceTimestamp) public CostBalancerMultithreadStrategy(DateTime referenceTimestamp, int threadCount)
{ {
super(referenceTimestamp); super(referenceTimestamp);
this.threadCount = threadCount;
} }
protected Pair<Double, ServerHolder> chooseBestServer( protected Pair<Double, ServerHolder> chooseBestServer(
@ -50,7 +52,7 @@ public class CostBalancerMultithreadStrategy extends AbstractCostBalancerStrateg
{ {
Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null); Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null);
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(8)); ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadCount));
List<ListenableFuture<Pair<Double, ServerHolder>>> futures = Lists.newArrayList(); List<ListenableFuture<Pair<Double, ServerHolder>>> futures = Lists.newArrayList();
for (final ServerHolder server : serverHolders) { for (final ServerHolder server : serverHolders) {

View File

@ -22,10 +22,16 @@ import org.joda.time.DateTime;
public class CostBalancerMultithreadStrategyFactory implements BalancerStrategyFactory public class CostBalancerMultithreadStrategyFactory implements BalancerStrategyFactory
{ {
private final int threadCount;
public CostBalancerMultithreadStrategyFactory(int costBalancerStrategyThreadCount)
{
this.threadCount = costBalancerStrategyThreadCount;
}
@Override @Override
public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp) public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp)
{ {
return new CostBalancerMultithreadStrategy(referenceTimestamp); return new CostBalancerMultithreadStrategy(referenceTimestamp, threadCount);
} }
} }

View File

@ -745,7 +745,7 @@ public class DruidCoordinator
else if (RANDOM.equals(config.getCoordinatorBalancerStrategy())) else if (RANDOM.equals(config.getCoordinatorBalancerStrategy()))
factory= new RandomBalancerStrategyFactory(); factory= new RandomBalancerStrategyFactory();
else if (COST_MULTI.equals(config.getCoordinatorBalancerStrategy())) else if (COST_MULTI.equals(config.getCoordinatorBalancerStrategy()))
factory = new CostBalancerMultithreadStrategyFactory(); factory = new CostBalancerMultithreadStrategyFactory(config.getCostBalancerStrategyThreadCount());
// Do coordinator stuff. // Do coordinator stuff.
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams params =

View File

@ -46,6 +46,10 @@ public abstract class DruidCoordinatorConfig
@Default("cost") @Default("cost")
public abstract String getCoordinatorBalancerStrategy(); public abstract String getCoordinatorBalancerStrategy();
@Config("druid.coordinator.cost.balancer.threads")
@Default("4")
public abstract int getCostBalancerStrategyThreadCount();
@Config("druid.coordinator.merge.on") @Config("druid.coordinator.merge.on")
public boolean isMergeSegments() public boolean isMergeSegments()
{ {

View File

@ -3,6 +3,8 @@ package io.druid.server.coordination;
import com.google.api.client.util.Lists; import com.google.api.client.util.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import io.druid.client.DruidServer; import io.druid.client.DruidServer;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CostBalancerMultithreadStrategy;
import io.druid.server.coordinator.CostBalancerStrategy; import io.druid.server.coordinator.CostBalancerStrategy;
import io.druid.server.coordinator.LoadQueuePeonTester; import io.druid.server.coordinator.LoadQueuePeonTester;
import io.druid.server.coordinator.ServerHolder; import io.druid.server.coordinator.ServerHolder;
@ -77,7 +79,7 @@ public class CostBalancerStrategyTest
public void testCostBalancerStrategy() throws InterruptedException { public void testCostBalancerStrategy() throws InterruptedException {
DataSegment segment = getSegment(1000); DataSegment segment = getSegment(1000);
CostBalancerStrategy strategy = new CostBalancerStrategy(DateTime.now(DateTimeZone.UTC)); BalancerStrategy strategy = new CostBalancerStrategy(DateTime.now(DateTimeZone.UTC));
ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList); ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
Assert.assertNotNull("Should be able to find a place for new segment!!", holder); Assert.assertNotNull("Should be able to find a place for new segment!!", holder);
Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName()); Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName());
@ -87,7 +89,7 @@ public class CostBalancerStrategyTest
public void testCostBalancerMultithreadStrategy() throws InterruptedException { public void testCostBalancerMultithreadStrategy() throws InterruptedException {
DataSegment segment = getSegment(1000); DataSegment segment = getSegment(1000);
CostBalancerStrategy strategy = new CostBalancerStrategy(DateTime.now(DateTimeZone.UTC)); BalancerStrategy strategy = new CostBalancerMultithreadStrategy(DateTime.now(DateTimeZone.UTC), 8);
ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList); ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
Assert.assertNotNull("Should be able to find a place for new segment!!", holder); Assert.assertNotNull("Should be able to find a place for new segment!!", holder);
Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName()); Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName());

View File

@ -101,6 +101,12 @@ public class DruidCoordinatorTest
{ {
return null; return null;
} }
@Override
public int getCostBalancerStrategyThreadCount()
{
return 1;
}
}, },
new ZkPathsConfig(){ new ZkPathsConfig(){