fixes to comments

This commit is contained in:
Himadri Singh 2014-02-14 12:42:46 +05:30
parent 6e0590f97b
commit 7dae293b5f
3 changed files with 62 additions and 89 deletions

View File

@ -197,6 +197,35 @@ public abstract class AbstractCostBalancerStrategy implements BalancerStrategy
normalization,
normalizedInitialCost
);
}
}
protected double computeCost(
final DataSegment proposalSegment, final ServerHolder server, final boolean includeCurrentServer
)
{
final long proposalSegmentSize = proposalSegment.getSize();
if (includeCurrentServer || !server.isServingSegment(proposalSegment)) {
/** Don't calculate cost if the server doesn't have enough space or is loading the segment */
if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment))
return Double.POSITIVE_INFINITY;
/** The contribution to the total cost of a given server by proposing to move the segment to that server is... */
double cost = 0f;
/** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */
for (DataSegment segment : server.getServer().getSegments().values()) {
if (!proposalSegment.equals(segment)) {
cost += computeJointSegmentCosts(proposalSegment, segment);
}
}
/** plus the costs of segments that will be loaded */
for (DataSegment segment : server.getPeon().getSegmentsToLoad()) {
cost += computeJointSegmentCosts(proposalSegment, segment);
}
return cost;
}
return Double.POSITIVE_INFINITY;
}
}

View File

@ -20,18 +20,18 @@
package io.druid.server.coordinator;
import com.google.api.client.util.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.Pair;
import com.metamx.emitter.EmittingLogger;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class CostBalancerMultithreadStrategy extends AbstractCostBalancerStrategy
{
@ -50,70 +50,38 @@ public class CostBalancerMultithreadStrategy extends AbstractCostBalancerStrateg
{
Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null);
ExecutorService service = Executors.newCachedThreadPool();
List<Future<Pair<Double, ServerHolder>>> futures = Lists.newArrayList();
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(8));
List<ListenableFuture<Pair<Double, ServerHolder>>> futures = Lists.newArrayList();
for (final ServerHolder server : serverHolders) {
futures.add(service.submit(new CostCalculator(server, proposalSegment, includeCurrentServer)));
futures.add(
service.submit(
new Callable<Pair<Double, ServerHolder>>()
{
@Override
public Pair<Double, ServerHolder> call() throws Exception
{
return Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server);
}
}
)
);
}
for (Future<Pair<Double, ServerHolder>> f : futures) {
try {
Pair<Double, ServerHolder> server = f.get();
final ListenableFuture<List<Pair<Double, ServerHolder>>> resultsFuture = Futures.allAsList(futures);
try {
for (Pair<Double, ServerHolder> server : resultsFuture.get()) {
if (server.lhs < bestServer.lhs) {
bestServer = server;
}
}
catch (InterruptedException e) {
e.printStackTrace();
}
catch (ExecutionException e) {
e.printStackTrace();
}
}
catch (Exception e) {
log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit();
}
service.shutdown();
return bestServer;
}
}
private final class CostCalculator implements Callable<Pair<Double, ServerHolder>>
{
private final ServerHolder server;
private final DataSegment proposalSegment;
private final boolean includeCurrentServer;
CostCalculator(final ServerHolder server, final DataSegment proposalSegment, final boolean includeCurrentServer)
{
this.server = server;
this.proposalSegment = proposalSegment;
this.includeCurrentServer = includeCurrentServer;
}
@Override
public Pair<Double, ServerHolder> call() throws Exception
{
if (includeCurrentServer || !server.isServingSegment(proposalSegment)) {
final long proposalSegmentSize = proposalSegment.getSize();
/** Don't calculate cost if the server doesn't have enough space or is loading the segment */
if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
return Pair.of(Double.POSITIVE_INFINITY, null);
}
/** The contribution to the total cost of a given server by proposing to move the segment to that server is... */
double cost = 0f;
/** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */
for (DataSegment segment : server.getServer().getSegments().values()) {
if (!proposalSegment.equals(segment)) {
cost += computeJointSegmentCosts(proposalSegment, segment);
}
}
/** plus the costs of segments that will be loaded */
for (DataSegment segment : server.getPeon().getSegmentsToLoad()) {
cost += computeJointSegmentCosts(proposalSegment, segment);
}
return Pair.of(cost, server);
}
return Pair.of(Double.POSITIVE_INFINITY, null);
}
}
}

View File

@ -23,9 +23,6 @@ import com.metamx.common.Pair;
import com.metamx.emitter.EmittingLogger;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.List;
public class CostBalancerStrategy extends AbstractCostBalancerStrategy
{
@ -42,36 +39,15 @@ public class CostBalancerStrategy extends AbstractCostBalancerStrategy
boolean includeCurrentServer
)
{
Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null);
final long proposalSegmentSize = proposalSegment.getSize();
for (ServerHolder server : serverHolders) {
if (includeCurrentServer || !server.isServingSegment(proposalSegment)) {
/** Don't calculate cost if the server doesn't have enough space or is loading the segment */
if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
continue;
}
/** The contribution to the total cost of a given server by proposing to move the segment to that server is... */
double cost = 0f;
/** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */
for (DataSegment segment : server.getServer().getSegments().values()) {
if (!proposalSegment.equals(segment)) {
cost += computeJointSegmentCosts(proposalSegment, segment);
}
}
/** plus the costs of segments that will be loaded */
for (DataSegment segment : server.getPeon().getSegmentsToLoad()) {
cost += computeJointSegmentCosts(proposalSegment, segment);
}
if (cost < bestServer.lhs) {
bestServer = Pair.of(cost, server);
}
double cost = computeCost(proposalSegment, server, includeCurrentServer);
if (cost < bestServer.lhs) {
bestServer = Pair.of(cost, server);
}
}
return bestServer;
}
}
}