mirror of https://github.com/apache/druid.git
new interval based cost function (#2972)
* new interval based cost function Addresses issues with balancing of segments in the existing cost function - `gapPenalty` led to clusters of segments ~30 days apart - `recencyPenalty` caused imbalance among recent segments - size-based cost could be skewed by compression New cost function is purely based on segment intervals: - assumes each time-slice of a partition is a constant cost - cost is additive, i.e. cost(A, B union C) = cost(A, B) + cost(A, C) - cost decays exponentially based on distance between time-slices * comments and formatting * add more comments to explain the calculation
This commit is contained in:
parent
2863adef60
commit
e79284da59
|
@ -51,6 +51,11 @@
|
||||||
<artifactId>druid-processing</artifactId>
|
<artifactId>druid-processing</artifactId>
|
||||||
<version>${project.parent.version}</version>
|
<version>${project.parent.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.druid</groupId>
|
||||||
|
<artifactId>druid-server</artifactId>
|
||||||
|
<version>${project.parent.version}</version>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.github.wnameless</groupId>
|
<groupId>com.github.wnameless</groupId>
|
||||||
<artifactId>json-flattener</artifactId>
|
<artifactId>json-flattener</artifactId>
|
||||||
|
|
|
@ -0,0 +1,102 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.server.coordinator;
|
||||||
|
|
||||||
|
import io.druid.timeline.DataSegment;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
import org.openjdk.jmh.annotations.Benchmark;
|
||||||
|
import org.openjdk.jmh.annotations.BenchmarkMode;
|
||||||
|
import org.openjdk.jmh.annotations.Fork;
|
||||||
|
import org.openjdk.jmh.annotations.Mode;
|
||||||
|
import org.openjdk.jmh.annotations.OutputTimeUnit;
|
||||||
|
import org.openjdk.jmh.annotations.Scope;
|
||||||
|
import org.openjdk.jmh.annotations.Setup;
|
||||||
|
import org.openjdk.jmh.annotations.State;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@State(Scope.Benchmark)
|
||||||
|
public class CostBalancerStrategyBenchmark
|
||||||
|
{
|
||||||
|
private final static DateTime t0 = new DateTime("2016-01-01T01:00:00Z");
|
||||||
|
|
||||||
|
private List<DataSegment> segments;
|
||||||
|
private DataSegment segment;
|
||||||
|
|
||||||
|
int x1 = 2;
|
||||||
|
int y0 = 3;
|
||||||
|
int y1 = 4;
|
||||||
|
|
||||||
|
int n = 10000;
|
||||||
|
|
||||||
|
@Setup
|
||||||
|
public void setupDummyCluster()
|
||||||
|
{
|
||||||
|
segment = createSegment(t0);
|
||||||
|
|
||||||
|
Random r = new Random(1234);
|
||||||
|
segments = new ArrayList<>(n);
|
||||||
|
for(int i = 0; i < n; ++i) {
|
||||||
|
final DateTime t = t0.minusHours(r.nextInt(365 * 24) - 365*12);
|
||||||
|
segments.add(createSegment(t));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
DataSegment createSegment(DateTime t)
|
||||||
|
{
|
||||||
|
return new DataSegment(
|
||||||
|
"test",
|
||||||
|
new Interval(t, t.plusHours(1)),
|
||||||
|
"v1",
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
0,
|
||||||
|
0
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Benchmark
|
||||||
|
@BenchmarkMode(Mode.AverageTime)
|
||||||
|
@OutputTimeUnit(TimeUnit.MICROSECONDS)
|
||||||
|
@Fork(1)
|
||||||
|
public double measureCostStrategySingle() throws InterruptedException
|
||||||
|
{
|
||||||
|
double totalCost = 0;
|
||||||
|
for(DataSegment s : segments) {
|
||||||
|
totalCost += CostBalancerStrategy.computeJointSegmentsCost(segment, s);
|
||||||
|
}
|
||||||
|
return totalCost;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Benchmark
|
||||||
|
@BenchmarkMode(Mode.AverageTime)
|
||||||
|
@OutputTimeUnit(TimeUnit.MICROSECONDS)
|
||||||
|
@Fork(1)
|
||||||
|
public double measureIntervalPenalty() throws InterruptedException
|
||||||
|
{
|
||||||
|
return CostBalancerStrategy.intervalCost(x1, y0, y1);
|
||||||
|
}
|
||||||
|
}
|
5
pom.xml
5
pom.xml
|
@ -556,6 +556,11 @@
|
||||||
<artifactId>derbyclient</artifactId>
|
<artifactId>derbyclient</artifactId>
|
||||||
<version>10.11.1.1</version>
|
<version>10.11.1.1</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.commons</groupId>
|
||||||
|
<artifactId>commons-math3</artifactId>
|
||||||
|
<version>3.6.1</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- Test Scope -->
|
<!-- Test Scope -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|
|
@ -173,6 +173,10 @@
|
||||||
<groupId>org.apache.derby</groupId>
|
<groupId>org.apache.derby</groupId>
|
||||||
<artifactId>derbyclient</artifactId>
|
<artifactId>derbyclient</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.commons</groupId>
|
||||||
|
<artifactId>commons-math3</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- Tests -->
|
<!-- Tests -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|
|
@ -19,44 +19,162 @@
|
||||||
|
|
||||||
package io.druid.server.coordinator;
|
package io.druid.server.coordinator;
|
||||||
|
|
||||||
|
import com.google.common.base.Predicates;
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
|
||||||
import com.metamx.common.Pair;
|
import com.metamx.common.Pair;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
import io.druid.timeline.DataSegment;
|
import io.druid.timeline.DataSegment;
|
||||||
import org.joda.time.DateTime;
|
import org.apache.commons.math3.util.FastMath;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
|
|
||||||
public class CostBalancerStrategy implements BalancerStrategy
|
public class CostBalancerStrategy implements BalancerStrategy
|
||||||
{
|
{
|
||||||
private static final EmittingLogger log = new EmittingLogger(CostBalancerStrategy.class);
|
private static final EmittingLogger log = new EmittingLogger(CostBalancerStrategy.class);
|
||||||
private static final long DAY_IN_MILLIS = 1000 * 60 * 60 * 24;
|
|
||||||
private static final long SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS;
|
|
||||||
private static final long THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS;
|
|
||||||
private final long referenceTimestamp;
|
|
||||||
private final ListeningExecutorService exec;
|
|
||||||
|
|
||||||
public static long gapMillis(Interval interval1, Interval interval2)
|
private static final double HALF_LIFE = 24.0; // cost function half-life in hours
|
||||||
|
static final double LAMBDA = Math.log(2) / HALF_LIFE;
|
||||||
|
static final double INV_LAMBDA_SQUARE = 1 / (LAMBDA * LAMBDA);
|
||||||
|
|
||||||
|
private static final double MILLIS_IN_HOUR = 3_600_000.0;
|
||||||
|
private static final double MILLIS_FACTOR = MILLIS_IN_HOUR / LAMBDA;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This defines the unnormalized cost function between two segments.
|
||||||
|
*
|
||||||
|
* See https://github.com/druid-io/druid/pull/2972 for more details about the cost function.
|
||||||
|
*
|
||||||
|
* intervalCost: segments close together are more likely to be queried together
|
||||||
|
*
|
||||||
|
* multiplier: if two segments belong to the same data source, they are more likely to be involved
|
||||||
|
* in the same queries
|
||||||
|
*
|
||||||
|
* @param segmentA The first DataSegment.
|
||||||
|
* @param segmentB The second DataSegment.
|
||||||
|
*
|
||||||
|
* @return the joint cost of placing the two DataSegments together on one node.
|
||||||
|
*/
|
||||||
|
public static double computeJointSegmentsCost(final DataSegment segmentA, final DataSegment segmentB)
|
||||||
{
|
{
|
||||||
if (interval1.getStartMillis() > interval2.getEndMillis()) {
|
final Interval intervalA = segmentA.getInterval();
|
||||||
return interval1.getStartMillis() - interval2.getEndMillis();
|
final Interval intervalB = segmentB.getInterval();
|
||||||
} else if (interval2.getStartMillis() > interval1.getEndMillis()) {
|
|
||||||
return interval2.getStartMillis() - interval1.getEndMillis();
|
final double t0 = intervalA.getStartMillis();
|
||||||
} else {
|
final double t1 = (intervalA.getEndMillis() - t0) / MILLIS_FACTOR;
|
||||||
|
final double start = (intervalB.getStartMillis() - t0) / MILLIS_FACTOR;
|
||||||
|
final double end = (intervalB.getEndMillis() - t0) / MILLIS_FACTOR;
|
||||||
|
|
||||||
|
// constant cost-multiplier for segments of the same datsource
|
||||||
|
final double multiplier = segmentA.getDataSource().equals(segmentB.getDataSource()) ? 2.0 : 1.0;
|
||||||
|
|
||||||
|
return INV_LAMBDA_SQUARE * intervalCost(t1, start, end) * multiplier;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes the joint cost of two intervals X = [x_0 = 0, x_1) and Y = [y_0, y_1)
|
||||||
|
*
|
||||||
|
* cost(X, Y) = \int_{x_0}^{x_1} \int_{y_0}^{y_1} e^{-\lambda |x-y|}dxdy $$
|
||||||
|
*
|
||||||
|
* lambda = 1 in this particular implementation
|
||||||
|
*
|
||||||
|
* Other values of lambda can be calculated by multiplying inputs by lambda
|
||||||
|
* and multiplying the result by 1 / lambda ^ 2
|
||||||
|
*
|
||||||
|
* Interval start and end are all relative to x_0.
|
||||||
|
* Therefore this function assumes x_0 = 0, x1 >= 0, and y1 > y0
|
||||||
|
*
|
||||||
|
* @param x1 end of interval X
|
||||||
|
* @param y0 start of interval Y
|
||||||
|
* @param y1 end o interval Y
|
||||||
|
*
|
||||||
|
* @return joint cost of X and Y
|
||||||
|
*/
|
||||||
|
public static double intervalCost(double x1, double y0, double y1)
|
||||||
|
{
|
||||||
|
if (x1 == 0 || y1 == y0) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cost(X, Y) = cost(Y, X), so we swap X and Y to
|
||||||
|
// have x_0 <= y_0 and simplify the calculations below
|
||||||
|
if (y0 < 0) {
|
||||||
|
// swap X and Y
|
||||||
|
double tmp = x1;
|
||||||
|
x1 = y1 - y0;
|
||||||
|
y1 = tmp - y0;
|
||||||
|
y0 = -y0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public CostBalancerStrategy(DateTime referenceTimestamp, ListeningExecutorService exec)
|
// since x_0 <= y_0, Y must overlap X if y_0 < x_1
|
||||||
|
if (y0 < x1) {
|
||||||
|
/**
|
||||||
|
* We have two possible cases of overlap:
|
||||||
|
*
|
||||||
|
* X = [ A )[ B )[ C ) or [ A )[ B )
|
||||||
|
* Y = [ ) [ )[ C )
|
||||||
|
*
|
||||||
|
* A is empty if y0 = 0
|
||||||
|
* C is empty if y1 = x1
|
||||||
|
*
|
||||||
|
* cost(X, Y) = cost(A, Y) + cost(B, C) + cost(B, B)
|
||||||
|
*
|
||||||
|
* cost(A, Y) and cost(B, C) can be calculated using the non-overlapping case,
|
||||||
|
* which reduces the overlapping case to computing
|
||||||
|
*
|
||||||
|
* cost(B, B) = \int_0^{\beta} \int_{0}^{\beta} e^{-|x-y|}dxdy
|
||||||
|
* = 2 \cdot (\beta + e^{-\beta} - 1)
|
||||||
|
*
|
||||||
|
* where \beta is the length of interval B
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
final double beta; // b1 - y0, length of interval B
|
||||||
|
final double gamma; // c1 - y0, length of interval C
|
||||||
|
if (y1 <= x1) {
|
||||||
|
beta = y1 - y0;
|
||||||
|
gamma = x1 - y0;
|
||||||
|
} else {
|
||||||
|
beta = x1 - y0;
|
||||||
|
gamma = y1 - y0;
|
||||||
|
}
|
||||||
|
return intervalCost(y0, y0, y1) + // cost(A, Y)
|
||||||
|
intervalCost(beta, beta, gamma) + // cost(B, C)
|
||||||
|
2 * (beta + FastMath.exp(-beta) - 1); // cost(B, B)
|
||||||
|
} else {
|
||||||
|
/**
|
||||||
|
* In the case where there is no overlap:
|
||||||
|
*
|
||||||
|
* Given that x_0 <= y_0,
|
||||||
|
* then x <= y must be true for all x in [x_0, x_1] and y in [y_0, y_1).
|
||||||
|
*
|
||||||
|
* therefore,
|
||||||
|
*
|
||||||
|
* cost(X, Y) = \int_0^{x_1} \int_{y_0}^{y_1} e^{-|x-y|} dxdy
|
||||||
|
* = \int_0^{x_1} \int_{y_0}^{y_1} e^{x-y} dxdy
|
||||||
|
* = (e^{-y_1} - e^{-y_0}) - (e^{x_1-y_1} - e^{x_1-y_0})
|
||||||
|
*
|
||||||
|
* Note, this expression could be further reduced by factoring out (e^{x_1} - 1),
|
||||||
|
* but we prefer to keep the smaller values x_1 - y_0 and x_1 - y_1 in the exponent
|
||||||
|
* to avoid numerical overflow caused by calculating e^{x_1}
|
||||||
|
*/
|
||||||
|
final double exy0 = FastMath.exp(x1 - y0);
|
||||||
|
final double exy1 = FastMath.exp(x1 - y1);
|
||||||
|
final double ey0 = FastMath.exp(0f - y0);
|
||||||
|
final double ey1 = FastMath.exp(0f - y1);
|
||||||
|
|
||||||
|
return (ey1 - ey0) - (exy1 - exy0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final ListeningExecutorService exec;
|
||||||
|
|
||||||
|
public CostBalancerStrategy(ListeningExecutorService exec)
|
||||||
{
|
{
|
||||||
this.referenceTimestamp = referenceTimestamp.getMillis();
|
|
||||||
this.exec = exec;
|
this.exec = exec;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,51 +199,15 @@ public class CostBalancerStrategy implements BalancerStrategy
|
||||||
return chooseBestServer(proposalSegment, serverHolders, true).rhs;
|
return chooseBestServer(proposalSegment, serverHolders, true).rhs;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
static double computeJointSegmentsCost(final DataSegment segment, final Iterable<DataSegment> segmentSet)
|
||||||
* This defines the unnormalized cost function between two segments. There is a base cost given by
|
|
||||||
* the minimum size of the two segments and additional penalties.
|
|
||||||
* recencyPenalty: it is more likely that recent segments will be queried together
|
|
||||||
* dataSourcePenalty: if two segments belong to the same data source, they are more likely to be involved
|
|
||||||
* in the same queries
|
|
||||||
* gapPenalty: it is more likely that segments close together in time will be queried together
|
|
||||||
*
|
|
||||||
* @param segment1 The first DataSegment.
|
|
||||||
* @param segment2 The second DataSegment.
|
|
||||||
*
|
|
||||||
* @return The joint cost of placing the two DataSegments together on one node.
|
|
||||||
*/
|
|
||||||
public double computeJointSegmentCosts(final DataSegment segment1, final DataSegment segment2)
|
|
||||||
{
|
{
|
||||||
final long gapMillis = gapMillis(segment1.getInterval(), segment2.getInterval());
|
double totalCost = 0;
|
||||||
|
for (DataSegment s : segmentSet) {
|
||||||
final double baseCost = Math.min(segment1.getSize(), segment2.getSize());
|
totalCost += computeJointSegmentsCost(segment, s);
|
||||||
double recencyPenalty = 1;
|
}
|
||||||
double dataSourcePenalty = 1;
|
return totalCost;
|
||||||
double gapPenalty = 1;
|
|
||||||
|
|
||||||
if (segment1.getDataSource().equals(segment2.getDataSource())) {
|
|
||||||
dataSourcePenalty = 2;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
double segment1diff = referenceTimestamp - segment1.getInterval().getEndMillis();
|
|
||||||
double segment2diff = referenceTimestamp - segment2.getInterval().getEndMillis();
|
|
||||||
if (segment1diff < SEVEN_DAYS_IN_MILLIS && segment2diff < SEVEN_DAYS_IN_MILLIS) {
|
|
||||||
recencyPenalty = (2 - segment1diff / SEVEN_DAYS_IN_MILLIS) * (2 - segment2diff / SEVEN_DAYS_IN_MILLIS);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** gap is null if the two segment intervals overlap or if they're adjacent */
|
|
||||||
if (gapMillis == 0) {
|
|
||||||
gapPenalty = 2;
|
|
||||||
} else {
|
|
||||||
if (gapMillis < THIRTY_DAYS_IN_MILLIS) {
|
|
||||||
gapPenalty = 2 - gapMillis / THIRTY_DAYS_IN_MILLIS;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
final double cost = baseCost * recencyPenalty * dataSourcePenalty * gapPenalty;
|
|
||||||
|
|
||||||
return cost;
|
|
||||||
}
|
|
||||||
|
|
||||||
public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders)
|
public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders)
|
||||||
{
|
{
|
||||||
|
@ -144,11 +226,9 @@ public class CostBalancerStrategy implements BalancerStrategy
|
||||||
{
|
{
|
||||||
double cost = 0;
|
double cost = 0;
|
||||||
for (ServerHolder server : serverHolders) {
|
for (ServerHolder server : serverHolders) {
|
||||||
DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{});
|
Iterable<DataSegment> segments = server.getServer().getSegments().values();
|
||||||
for (int i = 0; i < segments.length; ++i) {
|
for (DataSegment s : segments) {
|
||||||
for (int j = i; j < segments.length; ++j) {
|
cost += computeJointSegmentsCost(s, segments);
|
||||||
cost += computeJointSegmentCosts(segments[i], segments[j]);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return cost;
|
return cost;
|
||||||
|
@ -169,7 +249,7 @@ public class CostBalancerStrategy implements BalancerStrategy
|
||||||
double cost = 0;
|
double cost = 0;
|
||||||
for (ServerHolder server : serverHolders) {
|
for (ServerHolder server : serverHolders) {
|
||||||
for (DataSegment segment : server.getServer().getSegments().values()) {
|
for (DataSegment segment : server.getServer().getSegments().values()) {
|
||||||
cost += computeJointSegmentCosts(segment, segment);
|
cost += computeJointSegmentsCost(segment, segment);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return cost;
|
return cost;
|
||||||
|
@ -211,17 +291,20 @@ public class CostBalancerStrategy implements BalancerStrategy
|
||||||
}
|
}
|
||||||
|
|
||||||
/** The contribution to the total cost of a given server by proposing to move the segment to that server is... */
|
/** The contribution to the total cost of a given server by proposing to move the segment to that server is... */
|
||||||
double cost = 0f;
|
double cost = 0d;
|
||||||
|
|
||||||
/** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */
|
/** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */
|
||||||
for (DataSegment segment : server.getServer().getSegments().values()) {
|
cost += computeJointSegmentsCost(
|
||||||
if (!proposalSegment.equals(segment)) {
|
proposalSegment,
|
||||||
cost += computeJointSegmentCosts(proposalSegment, segment);
|
Iterables.filter(
|
||||||
}
|
server.getServer().getSegments().values(),
|
||||||
}
|
Predicates.not(Predicates.equalTo(proposalSegment))
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
/** plus the costs of segments that will be loaded */
|
/** plus the costs of segments that will be loaded */
|
||||||
for (DataSegment segment : server.getPeon().getSegmentsToLoad()) {
|
cost += computeJointSegmentsCost(proposalSegment, server.getPeon().getSegmentsToLoad());
|
||||||
cost += computeJointSegmentCosts(proposalSegment, segment);
|
|
||||||
}
|
|
||||||
return cost;
|
return cost;
|
||||||
}
|
}
|
||||||
return Double.POSITIVE_INFINITY;
|
return Double.POSITIVE_INFINITY;
|
||||||
|
|
|
@ -35,9 +35,9 @@ public class CostBalancerStrategyFactory implements BalancerStrategyFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp)
|
public CostBalancerStrategy createBalancerStrategy(DateTime referenceTimestamp)
|
||||||
{
|
{
|
||||||
return new CostBalancerStrategy(referenceTimestamp, exec);
|
return new CostBalancerStrategy(exec);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -17,13 +17,10 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package io.druid.server.coordination;
|
package io.druid.server.coordinator;
|
||||||
|
|
||||||
import com.carrotsearch.junitbenchmarks.AbstractBenchmark;
|
import com.carrotsearch.junitbenchmarks.AbstractBenchmark;
|
||||||
import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
|
import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
|
||||||
import io.druid.server.coordinator.CostBalancerStrategy;
|
|
||||||
import io.druid.server.coordinator.CostBalancerStrategyFactory;
|
|
||||||
import io.druid.server.coordinator.ServerHolder;
|
|
||||||
import io.druid.timeline.DataSegment;
|
import io.druid.timeline.DataSegment;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
@ -58,7 +55,7 @@ public class CostBalancerStrategyBenchmark extends AbstractBenchmark
|
||||||
|
|
||||||
public CostBalancerStrategyBenchmark(CostBalancerStrategyFactory factory)
|
public CostBalancerStrategyBenchmark(CostBalancerStrategyFactory factory)
|
||||||
{
|
{
|
||||||
this.strategy = (CostBalancerStrategy) factory.createBalancerStrategy(DateTime.now());
|
this.strategy = factory.createBalancerStrategy(DateTime.now());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<ServerHolder> serverHolderList;
|
private static List<ServerHolder> serverHolderList;
|
||||||
|
@ -99,16 +96,4 @@ public class CostBalancerStrategyBenchmark extends AbstractBenchmark
|
||||||
}
|
}
|
||||||
sum = diff;
|
sum = diff;
|
||||||
}
|
}
|
||||||
|
|
||||||
@BenchmarkOptions(warmupRounds = 1000, benchmarkRounds = 1000000)
|
|
||||||
@Test
|
|
||||||
public void testBalancerGapMillis()
|
|
||||||
{
|
|
||||||
long diff = 0;
|
|
||||||
for (int i = 0; i < 1000; i++) {
|
|
||||||
diff = diff + CostBalancerStrategy.gapMillis(interval1, interval2);
|
|
||||||
}
|
|
||||||
sum = diff;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
|
@ -17,7 +17,7 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package io.druid.server.coordination;
|
package io.druid.server.coordinator;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
@ -25,18 +25,12 @@ import com.google.common.collect.Maps;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import io.druid.client.ImmutableDruidDataSource;
|
import io.druid.client.ImmutableDruidDataSource;
|
||||||
import io.druid.client.ImmutableDruidServer;
|
import io.druid.client.ImmutableDruidServer;
|
||||||
import io.druid.concurrent.Execs;
|
import io.druid.server.coordination.DruidServerMetadata;
|
||||||
import io.druid.server.coordinator.BalancerStrategy;
|
|
||||||
import io.druid.server.coordinator.CostBalancerStrategy;
|
|
||||||
import io.druid.server.coordinator.LoadQueuePeonTester;
|
|
||||||
import io.druid.server.coordinator.ServerHolder;
|
|
||||||
import io.druid.timeline.DataSegment;
|
import io.druid.timeline.DataSegment;
|
||||||
import org.easymock.EasyMock;
|
import org.easymock.EasyMock;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.DateTimeZone;
|
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -45,7 +39,7 @@ import java.util.concurrent.Executors;
|
||||||
|
|
||||||
public class CostBalancerStrategyTest
|
public class CostBalancerStrategyTest
|
||||||
{
|
{
|
||||||
private static final Interval day = DateTime.now().toDateMidnight().toInterval();
|
private static final Interval day = new Interval("2015-01-01T00/2015-01-01T01");
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create Druid cluster with serverCount servers having maxSegments segments each, and 1 server with 98 segment
|
* Create Druid cluster with serverCount servers having maxSegments segments each, and 1 server with 98 segment
|
||||||
|
@ -125,9 +119,7 @@ public class CostBalancerStrategyTest
|
||||||
List<ServerHolder> serverHolderList = setupDummyCluster(10, 20);
|
List<ServerHolder> serverHolderList = setupDummyCluster(10, 20);
|
||||||
DataSegment segment = getSegment(1000);
|
DataSegment segment = getSegment(1000);
|
||||||
|
|
||||||
final DateTime referenceTimestamp = new DateTime("2014-01-01");
|
|
||||||
BalancerStrategy strategy = new CostBalancerStrategy(
|
BalancerStrategy strategy = new CostBalancerStrategy(
|
||||||
referenceTimestamp,
|
|
||||||
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4))
|
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4))
|
||||||
);
|
);
|
||||||
ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
|
ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
|
||||||
|
@ -143,7 +135,6 @@ public class CostBalancerStrategyTest
|
||||||
|
|
||||||
final DateTime referenceTimestamp = new DateTime("2014-01-01");
|
final DateTime referenceTimestamp = new DateTime("2014-01-01");
|
||||||
BalancerStrategy strategy = new CostBalancerStrategy(
|
BalancerStrategy strategy = new CostBalancerStrategy(
|
||||||
referenceTimestamp,
|
|
||||||
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1))
|
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1))
|
||||||
);
|
);
|
||||||
ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
|
ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
|
||||||
|
@ -156,10 +147,9 @@ public class CostBalancerStrategyTest
|
||||||
{
|
{
|
||||||
DateTime referenceTime = new DateTime("2014-01-01T00:00:00");
|
DateTime referenceTime = new DateTime("2014-01-01T00:00:00");
|
||||||
CostBalancerStrategy strategy = new CostBalancerStrategy(
|
CostBalancerStrategy strategy = new CostBalancerStrategy(
|
||||||
referenceTime,
|
|
||||||
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4))
|
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4))
|
||||||
);
|
);
|
||||||
double segmentCost = strategy.computeJointSegmentCosts(
|
double segmentCost = strategy.computeJointSegmentsCost(
|
||||||
getSegment(
|
getSegment(
|
||||||
100,
|
100,
|
||||||
"DUMMY",
|
"DUMMY",
|
||||||
|
@ -172,12 +162,54 @@ public class CostBalancerStrategyTest
|
||||||
101,
|
101,
|
||||||
"DUMMY",
|
"DUMMY",
|
||||||
new Interval(
|
new Interval(
|
||||||
referenceTime.minusDays(2),
|
referenceTime.minusHours(2),
|
||||||
referenceTime.minusDays(2).plusHours(1)
|
referenceTime.minusHours(2).plusHours(1)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
Assert.assertEquals(138028.62811791385d, segmentCost, 0);
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
CostBalancerStrategy.INV_LAMBDA_SQUARE * CostBalancerStrategy.intervalCost(
|
||||||
|
1 * CostBalancerStrategy.LAMBDA,
|
||||||
|
-2 * CostBalancerStrategy.LAMBDA,
|
||||||
|
-1 * CostBalancerStrategy.LAMBDA
|
||||||
|
) * 2,
|
||||||
|
segmentCost, 1e-6);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIntervalCost() throws Exception
|
||||||
|
{
|
||||||
|
// additivity
|
||||||
|
Assert.assertEquals(CostBalancerStrategy.intervalCost(1, 1, 3),
|
||||||
|
CostBalancerStrategy.intervalCost(1, 1, 2) +
|
||||||
|
CostBalancerStrategy.intervalCost(1, 2, 3), 1e-6);
|
||||||
|
|
||||||
|
Assert.assertEquals(CostBalancerStrategy.intervalCost(2, 1, 3),
|
||||||
|
CostBalancerStrategy.intervalCost(2, 1, 2) +
|
||||||
|
CostBalancerStrategy.intervalCost(2, 2, 3), 1e-6);
|
||||||
|
|
||||||
|
Assert.assertEquals(CostBalancerStrategy.intervalCost(3, 1, 2),
|
||||||
|
CostBalancerStrategy.intervalCost(1, 1, 2) +
|
||||||
|
CostBalancerStrategy.intervalCost(1, 0, 1) +
|
||||||
|
CostBalancerStrategy.intervalCost(1, 1, 2), 1e-6);
|
||||||
|
|
||||||
|
// no overlap [0, 1) [1, 2)
|
||||||
|
Assert.assertEquals(0.3995764, CostBalancerStrategy.intervalCost(1, 1, 2), 1e-6);
|
||||||
|
// no overlap [0, 1) [-1, 0)
|
||||||
|
Assert.assertEquals(0.3995764, CostBalancerStrategy.intervalCost(1, -1, 0), 1e-6);
|
||||||
|
|
||||||
|
// exact overlap [0, 1), [0, 1)
|
||||||
|
Assert.assertEquals(0.7357589, CostBalancerStrategy.intervalCost(1, 0, 1), 1e-6);
|
||||||
|
// exact overlap [0, 2), [0, 2)
|
||||||
|
Assert.assertEquals(2.270671, CostBalancerStrategy.intervalCost(2, 0, 2), 1e-6);
|
||||||
|
// partial overlap [0, 2), [1, 3)
|
||||||
|
Assert.assertEquals(1.681908, CostBalancerStrategy.intervalCost(2, 1, 3), 1e-6);
|
||||||
|
// partial overlap [0, 2), [1, 2)
|
||||||
|
Assert.assertEquals(1.135335, CostBalancerStrategy.intervalCost(2, 1, 2), 1e-6);
|
||||||
|
// partial overlap [0, 2), [0, 1)
|
||||||
|
Assert.assertEquals(1.135335, CostBalancerStrategy.intervalCost(2, 0, 1), 1e-6);
|
||||||
|
// partial overlap [0, 3), [1, 2)
|
||||||
|
Assert.assertEquals(1.534912, CostBalancerStrategy.intervalCost(3, 1, 2), 1e-6);
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue