make rules assignable across tiers

This commit is contained in:
fjy 2013-12-16 15:54:42 -08:00
parent e309f48357
commit 27bba22e6f
8 changed files with 122 additions and 132 deletions

View File

@ -205,7 +205,7 @@ You are probably wondering, what are these [Granularities](Granularities.html) a
To issue the query and get some results, run the following in your command line:
```
curl -X POST 'http://localhost:8083/druid/v2/?pretty' -H 'content-type: application/json' -d ````timeseries_query.body
curl -X POST 'http://localhost:8083/druid/v2/?pretty' -H 'content-type: application/json' -d @timeseries_query.body
```
Once again, you should get a JSON blob of text back with your results, that looks something like this:

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
@ -36,11 +37,9 @@ import io.druid.concurrent.Execs;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Json;
import io.druid.server.coordinator.rules.ForeverLoadRule;
import io.druid.server.coordinator.rules.PeriodLoadRule;
import io.druid.server.coordinator.rules.Rule;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
@ -89,8 +88,10 @@ public class DatabaseRuleManager
final List<Rule> defaultRules = Arrays.<Rule>asList(
new ForeverLoadRule(
DruidServer.DEFAULT_NUM_REPLICANTS,
DruidServer.DEFAULT_TIER
ImmutableMap.<String, Integer>of(
DruidServer.DEFAULT_TIER,
DruidServer.DEFAULT_NUM_REPLICANTS
)
)
);
final String version = new DateTime().toString();

View File

@ -58,6 +58,7 @@ import io.druid.server.coordinator.helper.DruidCoordinatorCleanup;
import io.druid.server.coordinator.helper.DruidCoordinatorHelper;
import io.druid.server.coordinator.helper.DruidCoordinatorLogger;
import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentInfoLoader;
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;

View File

@ -24,39 +24,20 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import java.util.Map;
/**
*/
public class ForeverLoadRule extends LoadRule
{
private final Integer replicants;
private final String tier;
private final Map<String, Integer> tieredReplicants;
@JsonCreator
public ForeverLoadRule(
@JsonProperty("replicants") Integer replicants,
@JsonProperty("tier") String tier
@JsonProperty("tieredReplicants") Map<String, Integer> tieredReplicants
)
{
this.replicants = (replicants == null) ? 2 : replicants;
this.tier = tier;
}
@Override
public int getReplicants()
{
return replicants;
}
@Override
public int getReplicants(String tier)
{
return (this.tier.equalsIgnoreCase(tier)) ? replicants : 0;
}
@Override
public String getTier()
{
return null;
this.tieredReplicants = tieredReplicants;
}
@Override
@ -65,6 +46,19 @@ public class ForeverLoadRule extends LoadRule
return "loadForever";
}
@Override
@JsonProperty
public Map<String, Integer> getTieredReplicants()
{
return tieredReplicants;
}
@Override
public int getNumReplicants(String tier)
{
return tieredReplicants.get(tier);
}
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{

View File

@ -21,11 +21,14 @@ package io.druid.server.coordinator.rules;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.logger.Logger;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.Map;
/**
*/
public class IntervalLoadRule extends LoadRule
@ -33,19 +36,25 @@ public class IntervalLoadRule extends LoadRule
private static final Logger log = new Logger(IntervalLoadRule.class);
private final Interval interval;
private final Integer replicants;
private final String tier;
private final Map<String, Integer> tieredReplicants;
@JsonCreator
public IntervalLoadRule(
@JsonProperty("interval") Interval interval,
@JsonProperty("load") Map<String, Integer> tieredReplicants,
// Replicants and tier are deprecated
@JsonProperty("replicants") Integer replicants,
@JsonProperty("tier") String tier
)
{
this.interval = interval;
this.replicants = (replicants == null) ? 2 : replicants;
this.tier = tier;
if (tieredReplicants != null) {
this.tieredReplicants = tieredReplicants;
} else { // Backwards compatible
this.tieredReplicants = ImmutableMap.of(tier, replicants);
}
}
@Override
@ -55,24 +64,16 @@ public class IntervalLoadRule extends LoadRule
return "loadByInterval";
}
@Override
@JsonProperty
public int getReplicants()
public Map<String, Integer> getTieredReplicants()
{
return replicants;
return tieredReplicants;
}
@Override
public int getReplicants(String tier)
public int getNumReplicants(String tier)
{
return (this.tier.equalsIgnoreCase(tier)) ? replicants : 0;
}
@Override
@JsonProperty
public String getTier()
{
return tier;
return tieredReplicants.get(tier);
}
@JsonProperty

View File

@ -22,17 +22,16 @@ package io.druid.server.coordinator.rules;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.emitter.EmittingLogger;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.LoadPeonCallback;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.ReplicationThrottler;
import io.druid.server.coordinator.ServerHolder;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -48,39 +47,46 @@ public abstract class LoadRule implements Rule
{
CoordinatorStats stats = new CoordinatorStats();
int expectedReplicants = getReplicants();
int totalReplicants = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), getTier());
int clusterReplicants = params.getSegmentReplicantLookup().getClusterReplicants(segment.getIdentifier(), getTier());
for (Map.Entry<String, Integer> entry : getTieredReplicants().entrySet()) {
final String tier = entry.getKey();
final int expectedReplicants = entry.getValue();
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(getTier());
if (serverQueue == null) {
log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", getTier()).emit();
return stats;
int totalReplicants = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), tier);
int clusterReplicants = params.getSegmentReplicantLookup()
.getClusterReplicants(segment.getIdentifier(), tier);
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(tier);
if (serverQueue == null) {
log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit();
return stats;
}
final List<ServerHolder> serverHolderList = Lists.newArrayList(serverQueue);
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp);
if (params.getAvailableSegments().contains(segment)) {
stats.accumulate(
assign(
params.getReplicationManager(),
tier,
expectedReplicants,
totalReplicants,
strategy,
serverHolderList,
segment
)
);
}
stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params));
}
final List<ServerHolder> serverHolderList = new ArrayList<ServerHolder>(serverQueue);
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp);
if (params.getAvailableSegments().contains(segment)) {
stats.accumulate(
assign(
params.getReplicationManager(),
expectedReplicants,
totalReplicants,
strategy,
serverHolderList,
segment
)
);
}
stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params));
return stats;
}
private CoordinatorStats assign(
final ReplicationThrottler replicationManager,
final String tier,
final int expectedReplicants,
int totalReplicants,
final BalancerStrategy strategy,
@ -89,11 +95,12 @@ public abstract class LoadRule implements Rule
)
{
final CoordinatorStats stats = new CoordinatorStats();
stats.addToTieredStat("assignedCount", tier, 0);
while (totalReplicants < expectedReplicants) {
boolean replicate = totalReplicants > 0;
if (replicate && !replicationManager.canCreateReplicant(getTier())) {
if (replicate && !replicationManager.canCreateReplicant(tier)) {
break;
}
@ -101,8 +108,8 @@ public abstract class LoadRule implements Rule
if (holder == null) {
log.warn(
"Not enough %s servers or node capacity to assign segment[%s]! Expected Replicants[%d]",
getTier(),
"Not enough [%s] servers or node capacity to assign segment[%s]! Expected Replicants[%d]",
tier,
segment.getIdentifier(),
expectedReplicants
);
@ -111,7 +118,7 @@ public abstract class LoadRule implements Rule
if (replicate) {
replicationManager.registerReplicantCreation(
getTier(), segment.getIdentifier(), holder.getServer().getHost()
tier, segment.getIdentifier(), holder.getServer().getHost()
);
}
@ -123,7 +130,7 @@ public abstract class LoadRule implements Rule
public void execute()
{
replicationManager.unregisterReplicantCreation(
getTier(),
tier,
segment.getIdentifier(),
holder.getServer().getHost()
);
@ -131,7 +138,7 @@ public abstract class LoadRule implements Rule
}
);
stats.addToTieredStat("assignedCount", getTier(), 1);
stats.addToTieredStat("assignedCount", tier, 1);
++totalReplicants;
}
@ -152,17 +159,20 @@ public abstract class LoadRule implements Rule
return stats;
}
// Make sure we have enough actual replicants in the cluster before doing anything
// Make sure we have enough actual replicants in the correct tier in the cluster before doing anything
if (clusterReplicants < expectedReplicants) {
return stats;
}
Map<String, Integer> replicantsByType = params.getSegmentReplicantLookup().getClusterTiers(segment.getIdentifier());
// Find all instances of this segment across tiers
Map<String, Integer> replicantsByTier = params.getSegmentReplicantLookup().getClusterTiers(segment.getIdentifier());
for (Map.Entry<String, Integer> entry : replicantsByType.entrySet()) {
String tier = entry.getKey();
int actualNumReplicantsForType = entry.getValue();
int expectedNumReplicantsForType = getReplicants(tier);
for (Map.Entry<String, Integer> entry : replicantsByTier.entrySet()) {
final String tier = entry.getKey();
int actualNumReplicantsForTier = entry.getValue();
int expectedNumReplicantsForTier = getNumReplicants(tier);
stats.addToTieredStat("droppedCount", tier, 0);
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().get(tier);
if (serverQueue == null) {
@ -171,7 +181,7 @@ public abstract class LoadRule implements Rule
}
List<ServerHolder> droppedServers = Lists.newArrayList();
while (actualNumReplicantsForType > expectedNumReplicantsForType) {
while (actualNumReplicantsForTier > expectedNumReplicantsForTier) {
final ServerHolder holder = serverQueue.pollLast();
if (holder == null) {
log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
@ -179,14 +189,14 @@ public abstract class LoadRule implements Rule
}
if (holder.isServingSegment(segment)) {
if (expectedNumReplicantsForType > 0) { // don't throttle unless we are removing extra replicants
if (!replicationManager.canDestroyReplicant(getTier())) {
if (expectedNumReplicantsForTier > 0) { // don't throttle unless we are removing extra replicants
if (!replicationManager.canDestroyReplicant(tier)) {
serverQueue.add(holder);
break;
}
replicationManager.registerReplicantTermination(
getTier(),
tier,
segment.getIdentifier(),
holder.getServer().getHost()
);
@ -200,14 +210,14 @@ public abstract class LoadRule implements Rule
public void execute()
{
replicationManager.unregisterReplicantTermination(
getTier(),
tier,
segment.getIdentifier(),
holder.getServer().getHost()
);
}
}
);
--actualNumReplicantsForType;
--actualNumReplicantsForTier;
stats.addToTieredStat("droppedCount", tier, 1);
}
droppedServers.add(holder);
@ -218,9 +228,7 @@ public abstract class LoadRule implements Rule
return stats;
}
public abstract int getReplicants();
public abstract Map<String, Integer> getTieredReplicants();
public abstract int getReplicants(String tier);
public abstract String getTier();
public abstract int getNumReplicants(String tier);
}

View File

@ -21,12 +21,15 @@ package io.druid.server.coordinator.rules;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.logger.Logger;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import java.util.Map;
/**
*/
public class PeriodLoadRule extends LoadRule
@ -34,19 +37,24 @@ public class PeriodLoadRule extends LoadRule
private static final Logger log = new Logger(PeriodLoadRule.class);
private final Period period;
private final Integer replicants;
private final String tier;
private final Map<String, Integer> tieredReplicants;
@JsonCreator
public PeriodLoadRule(
@JsonProperty("period") Period period,
@JsonProperty("tieredReplicants") Map<String, Integer> tieredReplicants,
// The following two vars need to be deprecated
@JsonProperty("replicants") Integer replicants,
@JsonProperty("tier") String tier
)
{
this.period = period;
this.replicants = (replicants == null) ? 2 : replicants;
this.tier = tier;
if (tieredReplicants != null) {
this.tieredReplicants = tieredReplicants;
} else { // Backwards compatible
this.tieredReplicants = ImmutableMap.of(tier, replicants);
}
}
@Override
@ -62,22 +70,17 @@ public class PeriodLoadRule extends LoadRule
return period;
}
@Override
@JsonProperty
public int getReplicants()
public Map<String, Integer> getTieredReplicants()
{
return replicants;
return tieredReplicants;
}
@Override
public int getReplicants(String tier)
public int getNumReplicants(String tier)
{
return (this.tier.equalsIgnoreCase(tier)) ? replicants : 0;
}
@JsonProperty
public String getTier()
{
return tier;
return tieredReplicants.get(tier);
}
@Override

View File

@ -25,6 +25,8 @@ import com.google.common.collect.Range;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import java.util.Map;
/**
*/
public class SizeLoadRule extends LoadRule
@ -51,23 +53,15 @@ public class SizeLoadRule extends LoadRule
}
@Override
@JsonProperty
public int getReplicants()
public Map<String, Integer> getTieredReplicants()
{
return replicants;
return null;
}
@Override
public int getReplicants(String tier)
public int getNumReplicants(String tier)
{
return (this.tier.equalsIgnoreCase(tier)) ? replicants : 0;
}
@Override
@JsonProperty
public String getTier()
{
return tier;
return 0;
}
@Override
@ -76,18 +70,6 @@ public class SizeLoadRule extends LoadRule
return "loadBySize";
}
@JsonProperty
public long getLow()
{
return low;
}
@JsonProperty
public long getHigh()
{
return high;
}
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{