From 5af414c93c6d6e8a0c66619ca2c2c91211211f35 Mon Sep 17 00:00:00 2001 From: Himadri Singh Date: Fri, 14 Feb 2014 13:09:24 +0530 Subject: [PATCH] configurable thread count --- .../coordinator/CostBalancerMultithreadStrategy.java | 6 ++++-- .../CostBalancerMultithreadStrategyFactory.java | 8 +++++++- .../io/druid/server/coordinator/DruidCoordinator.java | 2 +- .../druid/server/coordinator/DruidCoordinatorConfig.java | 4 ++++ .../server/coordination/CostBalancerStrategyTest.java | 6 ++++-- .../io/druid/server/coordinator/DruidCoordinatorTest.java | 6 ++++++ 6 files changed, 26 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/CostBalancerMultithreadStrategy.java b/server/src/main/java/io/druid/server/coordinator/CostBalancerMultithreadStrategy.java index 2fde07c1d43..d25abe17027 100644 --- a/server/src/main/java/io/druid/server/coordinator/CostBalancerMultithreadStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/CostBalancerMultithreadStrategy.java @@ -36,10 +36,12 @@ import java.util.concurrent.Executors; public class CostBalancerMultithreadStrategy extends AbstractCostBalancerStrategy { private static final EmittingLogger log = new EmittingLogger(CostBalancerMultithreadStrategy.class); + private final int threadCount; - public CostBalancerMultithreadStrategy(DateTime referenceTimestamp) + public CostBalancerMultithreadStrategy(DateTime referenceTimestamp, int threadCount) { super(referenceTimestamp); + this.threadCount = threadCount; } protected Pair chooseBestServer( @@ -50,7 +52,7 @@ public class CostBalancerMultithreadStrategy extends AbstractCostBalancerStrateg { Pair bestServer = Pair.of(Double.POSITIVE_INFINITY, null); - ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(8)); + ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadCount)); List>> futures = Lists.newArrayList(); for (final ServerHolder server : serverHolders) { diff --git a/server/src/main/java/io/druid/server/coordinator/CostBalancerMultithreadStrategyFactory.java b/server/src/main/java/io/druid/server/coordinator/CostBalancerMultithreadStrategyFactory.java index d1373f49989..54eefb8fd22 100644 --- a/server/src/main/java/io/druid/server/coordinator/CostBalancerMultithreadStrategyFactory.java +++ b/server/src/main/java/io/druid/server/coordinator/CostBalancerMultithreadStrategyFactory.java @@ -22,10 +22,16 @@ import org.joda.time.DateTime; public class CostBalancerMultithreadStrategyFactory implements BalancerStrategyFactory { + private final int threadCount; + + public CostBalancerMultithreadStrategyFactory(int costBalancerStrategyThreadCount) + { + this.threadCount = costBalancerStrategyThreadCount; + } @Override public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp) { - return new CostBalancerMultithreadStrategy(referenceTimestamp); + return new CostBalancerMultithreadStrategy(referenceTimestamp, threadCount); } } diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index bc9140b854e..770d867a8b9 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -745,7 +745,7 @@ public class DruidCoordinator else if (RANDOM.equals(config.getCoordinatorBalancerStrategy())) factory= new RandomBalancerStrategyFactory(); else if (COST_MULTI.equals(config.getCoordinatorBalancerStrategy())) - factory = new CostBalancerMultithreadStrategyFactory(); + factory = new CostBalancerMultithreadStrategyFactory(config.getCostBalancerStrategyThreadCount()); // Do coordinator stuff. DruidCoordinatorRuntimeParams params = diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorConfig.java index 7b429cac705..9905c013b1e 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorConfig.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorConfig.java @@ -46,6 +46,10 @@ public abstract class DruidCoordinatorConfig @Default("cost") public abstract String getCoordinatorBalancerStrategy(); + @Config("druid.coordinator.cost.balancer.threads") + @Default("4") + public abstract int getCostBalancerStrategyThreadCount(); + @Config("druid.coordinator.merge.on") public boolean isMergeSegments() { diff --git a/server/src/test/java/io/druid/server/coordination/CostBalancerStrategyTest.java b/server/src/test/java/io/druid/server/coordination/CostBalancerStrategyTest.java index 0bd1cbf7748..18962bfb8a1 100644 --- a/server/src/test/java/io/druid/server/coordination/CostBalancerStrategyTest.java +++ b/server/src/test/java/io/druid/server/coordination/CostBalancerStrategyTest.java @@ -3,6 +3,8 @@ package io.druid.server.coordination; import com.google.api.client.util.Lists; import com.google.common.collect.Maps; import io.druid.client.DruidServer; +import io.druid.server.coordinator.BalancerStrategy; +import io.druid.server.coordinator.CostBalancerMultithreadStrategy; import io.druid.server.coordinator.CostBalancerStrategy; import io.druid.server.coordinator.LoadQueuePeonTester; import io.druid.server.coordinator.ServerHolder; @@ -77,7 +79,7 @@ public class CostBalancerStrategyTest public void testCostBalancerStrategy() throws InterruptedException { DataSegment segment = getSegment(1000); - CostBalancerStrategy strategy = new CostBalancerStrategy(DateTime.now(DateTimeZone.UTC)); + BalancerStrategy strategy = new CostBalancerStrategy(DateTime.now(DateTimeZone.UTC)); ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList); Assert.assertNotNull("Should be able to find a place for new segment!!", holder); Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName()); @@ -87,7 +89,7 @@ public class CostBalancerStrategyTest public void testCostBalancerMultithreadStrategy() throws InterruptedException { DataSegment segment = getSegment(1000); - CostBalancerStrategy strategy = new CostBalancerStrategy(DateTime.now(DateTimeZone.UTC)); + BalancerStrategy strategy = new CostBalancerMultithreadStrategy(DateTime.now(DateTimeZone.UTC), 8); ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList); Assert.assertNotNull("Should be able to find a place for new segment!!", holder); Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName()); diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index 74ea71ad105..f57e77afc3d 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -101,6 +101,12 @@ public class DruidCoordinatorTest { return null; } + + @Override + public int getCostBalancerStrategyThreadCount() + { + return 1; + } }, new ZkPathsConfig(){