mirror of https://github.com/apache/druid.git
replace param BalancerCostAnalyzer getter with a factory
This commit is contained in:
parent
2d7113b263
commit
94b72e8878
|
@ -671,7 +671,7 @@ public class DruidMaster
|
||||||
.withDruidCluster(cluster)
|
.withDruidCluster(cluster)
|
||||||
.withDatabaseRuleManager(databaseRuleManager)
|
.withDatabaseRuleManager(databaseRuleManager)
|
||||||
.withSegmentReplicantLookup(segmentReplicantLookup)
|
.withSegmentReplicantLookup(segmentReplicantLookup)
|
||||||
.withBalancerCostAnalyzer(new BalancerCostAnalyzer(DateTime.now()))
|
.withBalancerReferenceTimestamp(DateTime.now())
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
|
@ -26,6 +26,7 @@ import com.metamx.common.guava.Comparators;
|
||||||
import com.metamx.druid.client.DataSegment;
|
import com.metamx.druid.client.DataSegment;
|
||||||
import com.metamx.druid.client.DruidServer;
|
import com.metamx.druid.client.DruidServer;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
@ -82,7 +83,8 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
||||||
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
|
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
|
||||||
{
|
{
|
||||||
final MasterStats stats = new MasterStats();
|
final MasterStats stats = new MasterStats();
|
||||||
final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer();
|
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
|
||||||
|
final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp);
|
||||||
final int maxSegmentsToMove = params.getMaxSegmentsToMove();
|
final int maxSegmentsToMove = params.getMaxSegmentsToMove();
|
||||||
|
|
||||||
for (Map.Entry<String, MinMaxPriorityQueue<ServerHolder>> entry :
|
for (Map.Entry<String, MinMaxPriorityQueue<ServerHolder>> entry :
|
||||||
|
|
|
@ -27,6 +27,7 @@ import com.metamx.druid.client.DruidDataSource;
|
||||||
import com.metamx.druid.db.DatabaseRuleManager;
|
import com.metamx.druid.db.DatabaseRuleManager;
|
||||||
import com.metamx.druid.master.rules.RuleMap;
|
import com.metamx.druid.master.rules.RuleMap;
|
||||||
import com.metamx.emitter.service.ServiceEmitter;
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -50,7 +51,7 @@ public class DruidMasterRuntimeParams
|
||||||
private final long mergeBytesLimit;
|
private final long mergeBytesLimit;
|
||||||
private final int mergeSegmentsLimit;
|
private final int mergeSegmentsLimit;
|
||||||
private final int maxSegmentsToMove;
|
private final int maxSegmentsToMove;
|
||||||
private final BalancerCostAnalyzer balancerCostAnalyzer;
|
private final DateTime balancerReferenceTimestamp;
|
||||||
|
|
||||||
public DruidMasterRuntimeParams(
|
public DruidMasterRuntimeParams(
|
||||||
long startTime,
|
long startTime,
|
||||||
|
@ -66,7 +67,7 @@ public class DruidMasterRuntimeParams
|
||||||
long mergeBytesLimit,
|
long mergeBytesLimit,
|
||||||
int mergeSegmentsLimit,
|
int mergeSegmentsLimit,
|
||||||
int maxSegmentsToMove,
|
int maxSegmentsToMove,
|
||||||
BalancerCostAnalyzer balancerCostAnalyzer
|
DateTime balancerReferenceTimestamp
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.startTime = startTime;
|
this.startTime = startTime;
|
||||||
|
@ -82,7 +83,7 @@ public class DruidMasterRuntimeParams
|
||||||
this.mergeBytesLimit = mergeBytesLimit;
|
this.mergeBytesLimit = mergeBytesLimit;
|
||||||
this.mergeSegmentsLimit = mergeSegmentsLimit;
|
this.mergeSegmentsLimit = mergeSegmentsLimit;
|
||||||
this.maxSegmentsToMove = maxSegmentsToMove;
|
this.maxSegmentsToMove = maxSegmentsToMove;
|
||||||
this.balancerCostAnalyzer = balancerCostAnalyzer;
|
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getStartTime()
|
public long getStartTime()
|
||||||
|
@ -150,9 +151,14 @@ public class DruidMasterRuntimeParams
|
||||||
return maxSegmentsToMove;
|
return maxSegmentsToMove;
|
||||||
}
|
}
|
||||||
|
|
||||||
public BalancerCostAnalyzer getBalancerCostAnalyzer()
|
public DateTime getBalancerReferenceTimestamp()
|
||||||
{
|
{
|
||||||
return balancerCostAnalyzer;
|
return balancerReferenceTimestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public BalancerCostAnalyzer getBalancerCostAnalyzer(DateTime referenceTimestamp)
|
||||||
|
{
|
||||||
|
return new BalancerCostAnalyzer(referenceTimestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean hasDeletionWaitTimeElapsed()
|
public boolean hasDeletionWaitTimeElapsed()
|
||||||
|
@ -181,7 +187,7 @@ public class DruidMasterRuntimeParams
|
||||||
mergeBytesLimit,
|
mergeBytesLimit,
|
||||||
mergeSegmentsLimit,
|
mergeSegmentsLimit,
|
||||||
maxSegmentsToMove,
|
maxSegmentsToMove,
|
||||||
balancerCostAnalyzer
|
balancerReferenceTimestamp
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,7 +206,7 @@ public class DruidMasterRuntimeParams
|
||||||
private long mergeBytesLimit;
|
private long mergeBytesLimit;
|
||||||
private int mergeSegmentsLimit;
|
private int mergeSegmentsLimit;
|
||||||
private int maxSegmentsToMove;
|
private int maxSegmentsToMove;
|
||||||
private BalancerCostAnalyzer balancerCostAnalyzer;
|
private DateTime balancerReferenceTimestamp;
|
||||||
|
|
||||||
Builder()
|
Builder()
|
||||||
{
|
{
|
||||||
|
@ -217,7 +223,7 @@ public class DruidMasterRuntimeParams
|
||||||
this.mergeBytesLimit = 0;
|
this.mergeBytesLimit = 0;
|
||||||
this.mergeSegmentsLimit = 0;
|
this.mergeSegmentsLimit = 0;
|
||||||
this.maxSegmentsToMove = 0;
|
this.maxSegmentsToMove = 0;
|
||||||
this.balancerCostAnalyzer = null;
|
this.balancerReferenceTimestamp = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
Builder(
|
Builder(
|
||||||
|
@ -234,7 +240,7 @@ public class DruidMasterRuntimeParams
|
||||||
long mergeBytesLimit,
|
long mergeBytesLimit,
|
||||||
int mergeSegmentsLimit,
|
int mergeSegmentsLimit,
|
||||||
int maxSegmentsToMove,
|
int maxSegmentsToMove,
|
||||||
BalancerCostAnalyzer balancerCostAnalyzer
|
DateTime balancerReferenceTimestamp
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.startTime = startTime;
|
this.startTime = startTime;
|
||||||
|
@ -250,7 +256,7 @@ public class DruidMasterRuntimeParams
|
||||||
this.mergeBytesLimit = mergeBytesLimit;
|
this.mergeBytesLimit = mergeBytesLimit;
|
||||||
this.mergeSegmentsLimit = mergeSegmentsLimit;
|
this.mergeSegmentsLimit = mergeSegmentsLimit;
|
||||||
this.maxSegmentsToMove = maxSegmentsToMove;
|
this.maxSegmentsToMove = maxSegmentsToMove;
|
||||||
this.balancerCostAnalyzer = balancerCostAnalyzer;
|
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
public DruidMasterRuntimeParams build()
|
public DruidMasterRuntimeParams build()
|
||||||
|
@ -269,7 +275,7 @@ public class DruidMasterRuntimeParams
|
||||||
mergeBytesLimit,
|
mergeBytesLimit,
|
||||||
mergeSegmentsLimit,
|
mergeSegmentsLimit,
|
||||||
maxSegmentsToMove,
|
maxSegmentsToMove,
|
||||||
balancerCostAnalyzer
|
balancerReferenceTimestamp
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -351,9 +357,9 @@ public class DruidMasterRuntimeParams
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder withBalancerCostAnalyzer(BalancerCostAnalyzer balancerCostAnalyzer)
|
public Builder withBalancerReferenceTimestamp(DateTime balancerReferenceTimestamp)
|
||||||
{
|
{
|
||||||
this.balancerCostAnalyzer = balancerCostAnalyzer;
|
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,7 @@ import com.metamx.druid.master.LoadPeonCallback;
|
||||||
import com.metamx.druid.master.MasterStats;
|
import com.metamx.druid.master.MasterStats;
|
||||||
import com.metamx.druid.master.ServerHolder;
|
import com.metamx.druid.master.ServerHolder;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -86,7 +87,8 @@ public abstract class LoadRule implements Rule
|
||||||
|
|
||||||
List<ServerHolder> assignedServers = Lists.newArrayList();
|
List<ServerHolder> assignedServers = Lists.newArrayList();
|
||||||
while (totalReplicants < expectedReplicants) {
|
while (totalReplicants < expectedReplicants) {
|
||||||
BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer();
|
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
|
||||||
|
final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp);
|
||||||
ServerHolder holder = analyzer.findNewSegmentHome(segment, serverHolderList);
|
ServerHolder holder = analyzer.findNewSegmentHome(segment, serverHolderList);
|
||||||
|
|
||||||
if (holder == null) {
|
if (holder == null) {
|
||||||
|
|
|
@ -224,7 +224,7 @@ public class DruidMasterBalancerTest
|
||||||
.withLoadManagementPeons(ImmutableMap.of("from", peon, "to", peon))
|
.withLoadManagementPeons(ImmutableMap.of("from", peon, "to", peon))
|
||||||
.withAvailableSegments(segments.values())
|
.withAvailableSegments(segments.values())
|
||||||
.withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
|
.withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
|
||||||
.withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01")))
|
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
params = new DruidMasterBalancer(master).run(params);
|
params = new DruidMasterBalancer(master).run(params);
|
||||||
|
@ -373,7 +373,7 @@ public class DruidMasterBalancerTest
|
||||||
.withLoadManagementPeons(ImmutableMap.of("1", peon, "2", peon, "3", peon, "4", peon))
|
.withLoadManagementPeons(ImmutableMap.of("1", peon, "2", peon, "3", peon, "4", peon))
|
||||||
.withAvailableSegments(segments.values())
|
.withAvailableSegments(segments.values())
|
||||||
.withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
|
.withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
|
||||||
.withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01")))
|
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
params = new DruidMasterBalancer(master).run(params);
|
params = new DruidMasterBalancer(master).run(params);
|
||||||
|
|
|
@ -176,7 +176,7 @@ public class DruidMasterRuleRunnerTest
|
||||||
.withDatabaseRuleManager(databaseRuleManager)
|
.withDatabaseRuleManager(databaseRuleManager)
|
||||||
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
|
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
|
||||||
.withMaxSegmentsToMove(5)
|
.withMaxSegmentsToMove(5)
|
||||||
.withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01")))
|
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
|
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
|
||||||
|
@ -266,7 +266,7 @@ public class DruidMasterRuleRunnerTest
|
||||||
.withAvailableSegments(availableSegments)
|
.withAvailableSegments(availableSegments)
|
||||||
.withDatabaseRuleManager(databaseRuleManager)
|
.withDatabaseRuleManager(databaseRuleManager)
|
||||||
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
|
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
|
||||||
.withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01")))
|
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
|
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
|
||||||
|
@ -352,7 +352,7 @@ public class DruidMasterRuleRunnerTest
|
||||||
.withAvailableSegments(availableSegments)
|
.withAvailableSegments(availableSegments)
|
||||||
.withDatabaseRuleManager(databaseRuleManager)
|
.withDatabaseRuleManager(databaseRuleManager)
|
||||||
.withSegmentReplicantLookup(segmentReplicantLookup)
|
.withSegmentReplicantLookup(segmentReplicantLookup)
|
||||||
.withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01")))
|
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
|
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
|
||||||
|
@ -414,7 +414,7 @@ public class DruidMasterRuleRunnerTest
|
||||||
.withAvailableSegments(availableSegments)
|
.withAvailableSegments(availableSegments)
|
||||||
.withDatabaseRuleManager(databaseRuleManager)
|
.withDatabaseRuleManager(databaseRuleManager)
|
||||||
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
|
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
|
||||||
.withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01")))
|
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
|
||||||
|
@ -679,7 +679,7 @@ public class DruidMasterRuleRunnerTest
|
||||||
.withAvailableSegments(availableSegments)
|
.withAvailableSegments(availableSegments)
|
||||||
.withDatabaseRuleManager(databaseRuleManager)
|
.withDatabaseRuleManager(databaseRuleManager)
|
||||||
.withSegmentReplicantLookup(segmentReplicantLookup)
|
.withSegmentReplicantLookup(segmentReplicantLookup)
|
||||||
.withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01")))
|
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
|
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
|
||||||
|
@ -756,7 +756,7 @@ public class DruidMasterRuleRunnerTest
|
||||||
.withAvailableSegments(availableSegments)
|
.withAvailableSegments(availableSegments)
|
||||||
.withDatabaseRuleManager(databaseRuleManager)
|
.withDatabaseRuleManager(databaseRuleManager)
|
||||||
.withSegmentReplicantLookup(segmentReplicantLookup)
|
.withSegmentReplicantLookup(segmentReplicantLookup)
|
||||||
.withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01")))
|
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
|
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
|
||||||
|
|
Loading…
Reference in New Issue