diff --git a/cassandra-storage/pom.xml b/cassandra-storage/pom.xml
index 71fe0270c55..1172adf7564 100644
--- a/cassandra-storage/pom.xml
+++ b/cassandra-storage/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.54-SNAPSHOT
+ 0.6.57-SNAPSHOT
diff --git a/common/pom.xml b/common/pom.xml
index d33b11bc2b7..497216bb8f6 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.54-SNAPSHOT
+ 0.6.57-SNAPSHOT
diff --git a/common/src/main/java/io/druid/db/DbConnector.java b/common/src/main/java/io/druid/db/DbConnector.java
index 03b8987297b..585276ade91 100644
--- a/common/src/main/java/io/druid/db/DbConnector.java
+++ b/common/src/main/java/io/druid/db/DbConnector.java
@@ -87,7 +87,8 @@ public class DbConnector
+ " `payload` longblob NOT NULL,\n"
+ " `status_payload` longblob NOT NULL,\n"
+ " `active` tinyint(1) NOT NULL DEFAULT '0',\n"
- + " PRIMARY KEY (`id`)\n"
+ + " PRIMARY KEY (`id`),\n"
+ + " KEY (active, created_date(100))\n"
+ ")",
taskTableName
)
diff --git a/examples/pom.xml b/examples/pom.xml
index cb8c2bce6da..1c4451a81e8 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.54-SNAPSHOT
+ 0.6.57-SNAPSHOT
diff --git a/hdfs-storage/pom.xml b/hdfs-storage/pom.xml
index 7b2bdf00e16..7f8bdae4830 100644
--- a/hdfs-storage/pom.xml
+++ b/hdfs-storage/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.54-SNAPSHOT
+ 0.6.57-SNAPSHOT
diff --git a/hll/pom.xml b/hll/pom.xml
index f7d07eeeb49..e566b93bb84 100644
--- a/hll/pom.xml
+++ b/hll/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.54-SNAPSHOT
+ 0.6.57-SNAPSHOT
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index e82fea270b1..6a960315b5f 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.54-SNAPSHOT
+ 0.6.57-SNAPSHOT
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index c7818874001..128c692d459 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.54-SNAPSHOT
+ 0.6.57-SNAPSHOT
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
index 5a3eb1c5e07..58c917c0a26 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
@@ -46,7 +46,6 @@ import io.druid.indexing.overlord.scaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.tasklogs.TaskLogStreamer;
import io.druid.timeline.DataSegment;
-import org.apache.zookeeper.data.Stat;
import org.joda.time.DateTime;
import javax.ws.rs.Consumes;
diff --git a/kafka-eight/pom.xml b/kafka-eight/pom.xml
index bb872066635..a049ab9cb47 100644
--- a/kafka-eight/pom.xml
+++ b/kafka-eight/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.54-SNAPSHOT
+ 0.6.57-SNAPSHOT
diff --git a/kafka-seven/pom.xml b/kafka-seven/pom.xml
index c4391660c05..d8af1ffe3a5 100644
--- a/kafka-seven/pom.xml
+++ b/kafka-seven/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.54-SNAPSHOT
+ 0.6.57-SNAPSHOT
diff --git a/pom.xml b/pom.xml
index 8e4994a5f67..eed8fa2bd0d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
io.druid
druid
pom
- 0.6.54-SNAPSHOT
+ 0.6.57-SNAPSHOT
druid
druid
diff --git a/processing/pom.xml b/processing/pom.xml
index dd86b28c470..e9f51ce1228 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.54-SNAPSHOT
+ 0.6.57-SNAPSHOT
diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java
index 376a982ebe3..adc41b535d7 100644
--- a/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java
+++ b/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java
@@ -148,7 +148,7 @@ public class DefaultLimitSpec implements LimitSpec
@Override
public int compare(Row left, Row right)
{
- return comparator.compare(left.getFloatMetric(column), right.getFloatMetric(column));
+ return comparator.compare(left.getRaw(column), right.getRaw(column));
}
};
}
diff --git a/rabbitmq/pom.xml b/rabbitmq/pom.xml
index 2eb140df098..c407d4387d0 100644
--- a/rabbitmq/pom.xml
+++ b/rabbitmq/pom.xml
@@ -9,7 +9,7 @@
io.druid
druid
- 0.6.54-SNAPSHOT
+ 0.6.57-SNAPSHOT
diff --git a/s3-extensions/pom.xml b/s3-extensions/pom.xml
index b359dc0f0d4..6170c861d14 100644
--- a/s3-extensions/pom.xml
+++ b/s3-extensions/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.54-SNAPSHOT
+ 0.6.57-SNAPSHOT
diff --git a/server/pom.xml b/server/pom.xml
index 13c4fee7e9f..a6bf41d1f36 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.54-SNAPSHOT
+ 0.6.57-SNAPSHOT
diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java
index bb98def3612..6f707fd60d0 100644
--- a/server/src/main/java/io/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java
@@ -314,7 +314,7 @@ public class CachingClusteredClient implements QueryRunner
final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors);
List intervals = segmentSpec.getIntervals();
- if (server.isRealtime() || !populateCache || isBySegment) {
+ if (!server.isAssignable() || !populateCache || isBySegment) {
resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec));
} else {
resultSeqToAdd = toolChest.mergeSequences(
diff --git a/server/src/main/java/io/druid/client/DruidServer.java b/server/src/main/java/io/druid/client/DruidServer.java
index 9cf0d992bd6..c33b9270b57 100644
--- a/server/src/main/java/io/druid/client/DruidServer.java
+++ b/server/src/main/java/io/druid/client/DruidServer.java
@@ -136,9 +136,9 @@ public class DruidServer implements Comparable
return Collections.unmodifiableMap(segments);
}
- public boolean isRealtime()
+ public boolean isAssignable()
{
- return getType().equalsIgnoreCase("realtime");
+ return getType().equalsIgnoreCase("historical") || getType().equalsIgnoreCase("bridge");
}
public DataSegment getSegment(String segmentName)
diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java
index 2b4d1f5f44b..62b3d3d5f6d 100644
--- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java
+++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java
@@ -704,7 +704,7 @@ public class RealtimePlumber implements Plumber
return ServerView.CallbackAction.UNREGISTER;
}
- if (server.isRealtime()) {
+ if (!server.isAssignable()) {
return ServerView.CallbackAction.CONTINUE;
}
diff --git a/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java b/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java
index f4073a18678..a029fe855f5 100644
--- a/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java
+++ b/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java
@@ -291,7 +291,7 @@ public class DruidClusterBridge
DruidServer input
)
{
- return !input.isRealtime();
+ return input.isAssignable();
}
}
);
diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java
index b11d2758035..f5f202b5904 100644
--- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java
@@ -96,8 +96,8 @@ public class DruidCoordinator
private final Object lock = new Object();
private volatile boolean started = false;
+ private volatile int leaderCounter = 0;
private volatile boolean leader = false;
- private volatile AtomicReference dynamicConfigs;
private final DruidCoordinatorConfig config;
private final ZkPathsConfig zkPaths;
@@ -184,7 +184,6 @@ public class DruidCoordinator
this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
this.leaderLatch = new AtomicReference<>(null);
- this.dynamicConfigs = new AtomicReference<>(null);
this.loadManagementPeons = loadQueuePeonMap;
}
@@ -280,7 +279,11 @@ public class DruidCoordinator
public CoordinatorDynamicConfig getDynamicConfigs()
{
- return dynamicConfigs.get();
+ return configManager.watch(
+ CoordinatorDynamicConfig.CONFIG_KEY,
+ CoordinatorDynamicConfig.class,
+ new CoordinatorDynamicConfig.Builder().build()
+ ).get();
}
public void removeSegment(DataSegment segment)
@@ -533,6 +536,7 @@ public class DruidCoordinator
log.info("I am the leader of the coordinators, all must bow!");
try {
+ leaderCounter++;
leader = true;
databaseSegmentManager.start();
databaseRuleManager.start();
@@ -540,11 +544,6 @@ public class DruidCoordinator
serviceAnnouncer.announce(self);
final List> coordinatorRunnables = Lists.newArrayList();
- dynamicConfigs = configManager.watch(
- CoordinatorDynamicConfig.CONFIG_KEY,
- CoordinatorDynamicConfig.class,
- new CoordinatorDynamicConfig.Builder().build()
- );
coordinatorRunnables.add(Pair.of(new CoordinatorHistoricalManagerRunnable(), config.getCoordinatorPeriod()));
if (indexingServiceClient != null) {
coordinatorRunnables.add(
@@ -562,6 +561,7 @@ public class DruidCoordinator
);
}
+ final int startingLeaderCounter = leaderCounter;
for (final Pair extends CoordinatorRunnable, Duration> coordinatorRunnable : coordinatorRunnables) {
ScheduledExecutors.scheduleWithFixedDelay(
exec,
@@ -574,10 +574,10 @@ public class DruidCoordinator
@Override
public ScheduledExecutors.Signal call()
{
- if (leader) {
+ if (leader && startingLeaderCounter == leaderCounter) {
theRunnable.run();
}
- if (leader) { // (We might no longer be leader)
+ if (leader && startingLeaderCounter == leaderCounter) { // (We might no longer be leader)
return ScheduledExecutors.Signal.REPEAT;
} else {
return ScheduledExecutors.Signal.STOP;
@@ -610,6 +610,8 @@ public class DruidCoordinator
{
synchronized (lock) {
try {
+ leaderCounter++;
+
log.info("I am no longer the leader...");
for (String server : loadManagementPeons.keySet()) {
@@ -739,11 +741,10 @@ public class DruidCoordinator
DruidCoordinatorRuntimeParams.newBuilder()
.withStartTime(startTime)
.withDatasources(databaseSegmentManager.getInventory())
- .withDynamicConfigs(dynamicConfigs.get())
+ .withDynamicConfigs(getDynamicConfigs())
.withEmitter(emitter)
.build();
-
for (DruidCoordinatorHelper helper : helpers) {
params = helper.run(params);
}
@@ -777,7 +778,7 @@ public class DruidCoordinator
DruidServer input
)
{
- return !input.isRealtime();
+ return input.isAssignable();
}
}
);
@@ -826,7 +827,6 @@ public class DruidCoordinator
.withLoadManagementPeons(loadManagementPeons)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerReferenceTimestamp(DateTime.now())
- .withDynamicConfigs(dynamicConfigs.get())
.build();
}
},
diff --git a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java
index a90eeba4cae..fddaf0d7c05 100644
--- a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java
+++ b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java
@@ -83,12 +83,22 @@ public class SegmentReplicantLookup
}
public Map getLoadingTiers(String segmentId)
- {
- Map retVal = loadingSegments.row(segmentId);
- return (retVal == null) ? Maps.newHashMap() : retVal;
- }
+ {
+ Map retVal = loadingSegments.row(segmentId);
+ return (retVal == null) ? Maps.newHashMap() : retVal;
+ }
- public int getClusterReplicants(String segmentId, String tier)
+ public int getLoadedReplicants(String segmentId)
+ {
+ Map allTiers = segmentsInCluster.row(segmentId);
+ int retVal = 0;
+ for (Integer replicants : allTiers.values()) {
+ retVal += replicants;
+ }
+ return retVal;
+ }
+
+ public int getLoadedReplicants(String segmentId, String tier)
{
Integer retVal = segmentsInCluster.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
@@ -100,8 +110,23 @@ public class SegmentReplicantLookup
return (retVal == null) ? 0 : retVal;
}
+ public int getLoadingReplicants(String segmentId)
+ {
+ Map allTiers = loadingSegments.row(segmentId);
+ int retVal = 0;
+ for (Integer replicants : allTiers.values()) {
+ retVal += replicants;
+ }
+ return retVal;
+ }
+
+ public int getTotalReplicants(String segmentId)
+ {
+ return getLoadedReplicants(segmentId) + getLoadingReplicants(segmentId);
+ }
+
public int getTotalReplicants(String segmentId, String tier)
{
- return getClusterReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier);
+ return getLoadedReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier);
}
}
diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java
index f629318395b..cac7a5b8c28 100644
--- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java
+++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java
@@ -42,6 +42,8 @@ import java.util.Map;
public abstract class LoadRule implements Rule
{
private static final EmittingLogger log = new EmittingLogger(LoadRule.class);
+ private static final String assignedCount = "assignedCount";
+ private static final String droppedCount = "droppedCount";
@Override
public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment)
@@ -49,13 +51,17 @@ public abstract class LoadRule implements Rule
CoordinatorStats stats = new CoordinatorStats();
final Map loadStatus = Maps.newHashMap();
+
+ int totalReplicantsInCluster = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier());
for (Map.Entry entry : getTieredReplicants().entrySet()) {
final String tier = entry.getKey();
- final int expectedReplicants = entry.getValue();
+ final int expectedReplicantsInTier = entry.getValue();
+ final int totalReplicantsInTier = params.getSegmentReplicantLookup()
+ .getTotalReplicants(segment.getIdentifier(), tier);
+ final int loadedReplicantsInTier = params.getSegmentReplicantLookup()
+ .getLoadedReplicants(segment.getIdentifier(), tier);
- int totalReplicants = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), tier);
-
- MinMaxPriorityQueue serverQueue = params.getDruidCluster().getServersByTier(tier);
+ final MinMaxPriorityQueue serverQueue = params.getDruidCluster().getServersByTier(tier);
if (serverQueue == null) {
log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit();
return stats;
@@ -65,22 +71,21 @@ public abstract class LoadRule implements Rule
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp);
if (params.getAvailableSegments().contains(segment)) {
- stats.accumulate(
- assign(
- params.getReplicationManager(),
- tier,
- expectedReplicants,
- totalReplicants,
- strategy,
- serverHolderList,
- segment
- )
+ CoordinatorStats assignStats = assign(
+ params.getReplicationManager(),
+ tier,
+ totalReplicantsInCluster,
+ expectedReplicantsInTier,
+ totalReplicantsInTier,
+ strategy,
+ serverHolderList,
+ segment
);
+ stats.accumulate(assignStats);
+ totalReplicantsInCluster += assignStats.getPerTierStats().get(assignedCount).get(tier).get();
}
- int clusterReplicants = params.getSegmentReplicantLookup()
- .getClusterReplicants(segment.getIdentifier(), tier);
- loadStatus.put(tier, expectedReplicants - clusterReplicants);
+ loadStatus.put(tier, expectedReplicantsInTier - loadedReplicantsInTier);
}
// Remove over-replication
stats.accumulate(drop(loadStatus, segment, params));
@@ -92,18 +97,21 @@ public abstract class LoadRule implements Rule
private CoordinatorStats assign(
final ReplicationThrottler replicationManager,
final String tier,
- final int expectedReplicants,
- int totalReplicants,
+ final int totalReplicantsInCluster,
+ final int expectedReplicantsInTier,
+ final int totalReplicantsInTier,
final BalancerStrategy strategy,
final List serverHolderList,
final DataSegment segment
)
{
final CoordinatorStats stats = new CoordinatorStats();
- stats.addToTieredStat("assignedCount", tier, 0);
+ stats.addToTieredStat(assignedCount, tier, 0);
- while (totalReplicants < expectedReplicants) {
- boolean replicate = totalReplicants > 0;
+ int currReplicantsInTier = totalReplicantsInTier;
+ int currTotalReplicantsInCluster = totalReplicantsInCluster;
+ while (currReplicantsInTier < expectedReplicantsInTier) {
+ boolean replicate = currTotalReplicantsInCluster > 0;
if (replicate && !replicationManager.canCreateReplicant(tier)) {
break;
@@ -116,7 +124,7 @@ public abstract class LoadRule implements Rule
"Not enough [%s] servers or node capacity to assign segment[%s]! Expected Replicants[%d]",
tier,
segment.getIdentifier(),
- expectedReplicants
+ expectedReplicantsInTier
);
break;
}
@@ -143,8 +151,9 @@ public abstract class LoadRule implements Rule
}
);
- stats.addToTieredStat("assignedCount", tier, 1);
- ++totalReplicants;
+ stats.addToTieredStat(assignedCount, tier, 1);
+ ++currReplicantsInTier;
+ ++currTotalReplicantsInCluster;
}
return stats;
@@ -162,7 +171,7 @@ public abstract class LoadRule implements Rule
return stats;
}
- // Make sure we have enough actual replicants in the correct tiers in the cluster before doing anything
+ // Make sure we have enough loaded replicants in the correct tiers in the cluster before doing anything
for (Integer leftToLoad : loadStatus.values()) {
if (leftToLoad > 0) {
return stats;
@@ -176,10 +185,10 @@ public abstract class LoadRule implements Rule
for (Map.Entry entry : replicantsByTier.entrySet()) {
final String tier = entry.getKey();
- int actualNumReplicantsForTier = entry.getValue();
+ int loadedNumReplicantsForTier = entry.getValue();
int expectedNumReplicantsForTier = getNumReplicants(tier);
- stats.addToTieredStat("droppedCount", tier, 0);
+ stats.addToTieredStat(droppedCount, tier, 0);
MinMaxPriorityQueue serverQueue = params.getDruidCluster().get(tier);
if (serverQueue == null) {
@@ -188,7 +197,7 @@ public abstract class LoadRule implements Rule
}
List droppedServers = Lists.newArrayList();
- while (actualNumReplicantsForTier > expectedNumReplicantsForTier) {
+ while (loadedNumReplicantsForTier > expectedNumReplicantsForTier) {
final ServerHolder holder = serverQueue.pollLast();
if (holder == null) {
log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
@@ -224,8 +233,8 @@ public abstract class LoadRule implements Rule
}
}
);
- --actualNumReplicantsForTier;
- stats.addToTieredStat("droppedCount", tier, 1);
+ --loadedNumReplicantsForTier;
+ stats.addToTieredStat(droppedCount, tier, 1);
}
droppedServers.add(holder);
}
diff --git a/server/src/main/resources/static/js/handlers-0.0.2.js b/server/src/main/resources/static/js/handlers-0.0.2.js
index 8441fe9c561..139b1447dbc 100644
--- a/server/src/main/resources/static/js/handlers-0.0.2.js
+++ b/server/src/main/resources/static/js/handlers-0.0.2.js
@@ -8,7 +8,6 @@ $(document).ready(function() {
function handleTable(dontDisplay)
{
- console.log(type);
$.get(basePath + type + '?full', function(data) {
buildTable(data, $('#result_table'), dontDisplay);
@@ -76,7 +75,6 @@ $(document).ready(function() {
}
$('#view_button').click(function() {
- console.log("here");
type = $('#select_type').val() + '';
view = $('#select_view').val() + '';
diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
index 2663a4a94e4..95d52c1fad1 100644
--- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
+++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
@@ -979,6 +979,94 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.verify(mockPeon);
}
+ /**
+ * Nodes:
+ * hot - nothing loaded
+ * _default_tier - 1 segment loaded
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testReplicantThrottleAcrossTiers() throws Exception
+ {
+ mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce();
+ EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
+ EasyMock.replay(mockPeon);
+
+ EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
+ Lists.newArrayList(
+ new IntervalLoadRule(
+ new Interval("2012-01-01T00:00:00.000Z/2013-01-01T00:00:00.000Z"),
+ ImmutableMap.of(
+ "hot", 1,
+ DruidServer.DEFAULT_TIER, 1
+ ),
+ null,
+ null
+ )
+ )
+ ).atLeastOnce();
+ EasyMock.replay(databaseRuleManager);
+
+ DruidCluster druidCluster = new DruidCluster(
+ ImmutableMap.of(
+ "hot",
+ MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
+ Arrays.asList(
+ new ServerHolder(
+ new DruidServer(
+ "serverHot",
+ "hostHot",
+ 1000,
+ "historical",
+ "hot",
+ 0
+ ),
+ mockPeon
+ )
+ )
+ ),
+ DruidServer.DEFAULT_TIER,
+ MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
+ Arrays.asList(
+ new ServerHolder(
+ new DruidServer(
+ "serverNorm",
+ "hostNorm",
+ 1000,
+ "historical",
+ DruidServer.DEFAULT_TIER,
+ 0
+ ),
+ mockPeon
+ )
+ )
+ )
+ )
+ );
+
+ DruidCoordinatorRuntimeParams params =
+ new DruidCoordinatorRuntimeParams.Builder()
+ .withDruidCluster(druidCluster)
+ .withAvailableSegments(availableSegments)
+ .withDatabaseRuleManager(databaseRuleManager)
+ .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
+ .build();
+
+ DruidCoordinatorRuleRunner runner = new DruidCoordinatorRuleRunner(new ReplicationThrottler(7, 1), coordinator);
+ DruidCoordinatorRuntimeParams afterParams = runner.run(params);
+ CoordinatorStats stats = afterParams.getCoordinatorStats();
+
+ Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 24);
+ Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get(DruidServer.DEFAULT_TIER).get() == 7);
+ Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
+ Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
+
+ EasyMock.verify(mockPeon);
+ }
+
@Test
public void testDropReplicantThrottle() throws Exception
{
diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java
index a52927d0b7d..3e75d1b79c3 100644
--- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java
+++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java
@@ -59,7 +59,7 @@ public class LoadRuleTest
public void setUp() throws Exception
{
mockPeon = EasyMock.createMock(LoadQueuePeon.class);
- throttler = new ReplicationThrottler(1, 1);
+ throttler = new ReplicationThrottler(2, 1);
for (String tier : Arrays.asList("hot", DruidServer.DEFAULT_TIER)) {
throttler.updateReplicationState(tier);
throttler.updateTerminationState(tier);
diff --git a/services/pom.xml b/services/pom.xml
index 365f7ed0224..1962aee650e 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -27,7 +27,7 @@
io.druid
druid
- 0.6.54-SNAPSHOT
+ 0.6.57-SNAPSHOT