diff --git a/server/src/main/java/io/druid/server/coordinator/AbstractCostBalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/AbstractCostBalancerStrategy.java index 25c0f63238c..3eeb1baa616 100644 --- a/server/src/main/java/io/druid/server/coordinator/AbstractCostBalancerStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/AbstractCostBalancerStrategy.java @@ -197,6 +197,35 @@ public abstract class AbstractCostBalancerStrategy implements BalancerStrategy normalization, normalizedInitialCost ); - } -} \ No newline at end of file + + protected double computeCost( + final DataSegment proposalSegment, final ServerHolder server, final boolean includeCurrentServer + ) + { + final long proposalSegmentSize = proposalSegment.getSize(); + + if (includeCurrentServer || !server.isServingSegment(proposalSegment)) { + /** Don't calculate cost if the server doesn't have enough space or is loading the segment */ + if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) + return Double.POSITIVE_INFINITY; + + /** The contribution to the total cost of a given server by proposing to move the segment to that server is... */ + double cost = 0f; + /** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */ + for (DataSegment segment : server.getServer().getSegments().values()) { + if (!proposalSegment.equals(segment)) { + cost += computeJointSegmentCosts(proposalSegment, segment); + } + } + /** plus the costs of segments that will be loaded */ + for (DataSegment segment : server.getPeon().getSegmentsToLoad()) { + cost += computeJointSegmentCosts(proposalSegment, segment); + } + return cost; + } + return Double.POSITIVE_INFINITY; + } + +} + diff --git a/server/src/main/java/io/druid/server/coordinator/CostBalancerMultithreadStrategy.java b/server/src/main/java/io/druid/server/coordinator/CostBalancerMultithreadStrategy.java index 168bd5a221b..2fde07c1d43 100644 --- a/server/src/main/java/io/druid/server/coordinator/CostBalancerMultithreadStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/CostBalancerMultithreadStrategy.java @@ -20,18 +20,18 @@ package io.druid.server.coordinator; import com.google.api.client.util.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.Pair; import com.metamx.emitter.EmittingLogger; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; -import org.joda.time.Interval; import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; public class CostBalancerMultithreadStrategy extends AbstractCostBalancerStrategy { @@ -50,70 +50,38 @@ public class CostBalancerMultithreadStrategy extends AbstractCostBalancerStrateg { Pair bestServer = Pair.of(Double.POSITIVE_INFINITY, null); - ExecutorService service = Executors.newCachedThreadPool(); - List>> futures = Lists.newArrayList(); + ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(8)); + List>> futures = Lists.newArrayList(); for (final ServerHolder server : serverHolders) { - futures.add(service.submit(new CostCalculator(server, proposalSegment, includeCurrentServer))); + futures.add( + service.submit( + new Callable>() + { + @Override + public Pair call() throws Exception + { + return Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server); + } + } + ) + ); } - for (Future> f : futures) { - try { - Pair server = f.get(); + final ListenableFuture>> resultsFuture = Futures.allAsList(futures); + + try { + for (Pair server : resultsFuture.get()) { if (server.lhs < bestServer.lhs) { bestServer = server; } } - catch (InterruptedException e) { - e.printStackTrace(); - } - catch (ExecutionException e) { - e.printStackTrace(); - } + } + catch (Exception e) { + log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit(); } service.shutdown(); return bestServer; } +} - private final class CostCalculator implements Callable> - { - private final ServerHolder server; - private final DataSegment proposalSegment; - private final boolean includeCurrentServer; - - CostCalculator(final ServerHolder server, final DataSegment proposalSegment, final boolean includeCurrentServer) - { - this.server = server; - this.proposalSegment = proposalSegment; - this.includeCurrentServer = includeCurrentServer; - } - - @Override - public Pair call() throws Exception - { - if (includeCurrentServer || !server.isServingSegment(proposalSegment)) { - final long proposalSegmentSize = proposalSegment.getSize(); - /** Don't calculate cost if the server doesn't have enough space or is loading the segment */ - if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) { - return Pair.of(Double.POSITIVE_INFINITY, null); - } - - /** The contribution to the total cost of a given server by proposing to move the segment to that server is... */ - double cost = 0f; - /** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */ - for (DataSegment segment : server.getServer().getSegments().values()) { - if (!proposalSegment.equals(segment)) { - cost += computeJointSegmentCosts(proposalSegment, segment); - } - } - /** plus the costs of segments that will be loaded */ - for (DataSegment segment : server.getPeon().getSegmentsToLoad()) { - cost += computeJointSegmentCosts(proposalSegment, segment); - } - - return Pair.of(cost, server); - } - return Pair.of(Double.POSITIVE_INFINITY, null); - } - } -} \ No newline at end of file diff --git a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java index 82f60261a2e..09f6efbb17b 100644 --- a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java @@ -23,9 +23,6 @@ import com.metamx.common.Pair; import com.metamx.emitter.EmittingLogger; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; -import org.joda.time.Interval; - -import java.util.List; public class CostBalancerStrategy extends AbstractCostBalancerStrategy { @@ -42,36 +39,15 @@ public class CostBalancerStrategy extends AbstractCostBalancerStrategy boolean includeCurrentServer ) { - Pair bestServer = Pair.of(Double.POSITIVE_INFINITY, null); - final long proposalSegmentSize = proposalSegment.getSize(); for (ServerHolder server : serverHolders) { - if (includeCurrentServer || !server.isServingSegment(proposalSegment)) { - /** Don't calculate cost if the server doesn't have enough space or is loading the segment */ - if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) { - continue; - } - - /** The contribution to the total cost of a given server by proposing to move the segment to that server is... */ - double cost = 0f; - /** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */ - for (DataSegment segment : server.getServer().getSegments().values()) { - if (!proposalSegment.equals(segment)) { - cost += computeJointSegmentCosts(proposalSegment, segment); - } - } - /** plus the costs of segments that will be loaded */ - for (DataSegment segment : server.getPeon().getSegmentsToLoad()) { - cost += computeJointSegmentCosts(proposalSegment, segment); - } - - if (cost < bestServer.lhs) { - bestServer = Pair.of(cost, server); - } + double cost = computeCost(proposalSegment, server, includeCurrentServer); + if (cost < bestServer.lhs) { + bestServer = Pair.of(cost, server); } } - return bestServer; } -} \ No newline at end of file +} +