From 27bba22e6fa8d2c19f770cd2af3cf1fd98647adc Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 16 Dec 2013 15:54:42 -0800 Subject: [PATCH] make rules assignable across tiers --- .../Tutorial:-A-First-Look-at-Druid.md | 2 +- .../java/io/druid/db/DatabaseRuleManager.java | 9 +- .../server/coordinator/DruidCoordinator.java | 1 + .../coordinator/rules/ForeverLoadRule.java | 42 +++---- .../coordinator/rules/IntervalLoadRule.java | 33 +++--- .../server/coordinator/rules/LoadRule.java | 106 ++++++++++-------- .../coordinator/rules/PeriodLoadRule.java | 31 ++--- .../coordinator/rules/SizeLoadRule.java | 30 +---- 8 files changed, 122 insertions(+), 132 deletions(-) diff --git a/docs/content/Tutorial:-A-First-Look-at-Druid.md b/docs/content/Tutorial:-A-First-Look-at-Druid.md index e3fe41c51c6..1d5cf883267 100644 --- a/docs/content/Tutorial:-A-First-Look-at-Druid.md +++ b/docs/content/Tutorial:-A-First-Look-at-Druid.md @@ -205,7 +205,7 @@ You are probably wondering, what are these [Granularities](Granularities.html) a To issue the query and get some results, run the following in your command line: ``` -curl -X POST 'http://localhost:8083/druid/v2/?pretty' -H 'content-type: application/json' -d ````timeseries_query.body +curl -X POST 'http://localhost:8083/druid/v2/?pretty' -H 'content-type: application/json' -d @timeseries_query.body ``` Once again, you should get a JSON blob of text back with your results, that looks something like this: diff --git a/server/src/main/java/io/druid/db/DatabaseRuleManager.java b/server/src/main/java/io/druid/db/DatabaseRuleManager.java index 9c5efde4fcd..f205542ba41 100644 --- a/server/src/main/java/io/druid/db/DatabaseRuleManager.java +++ b/server/src/main/java/io/druid/db/DatabaseRuleManager.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.inject.Inject; @@ -36,11 +37,9 @@ import io.druid.concurrent.Execs; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Json; import io.druid.server.coordinator.rules.ForeverLoadRule; -import io.druid.server.coordinator.rules.PeriodLoadRule; import io.druid.server.coordinator.rules.Rule; import org.joda.time.DateTime; import org.joda.time.Duration; -import org.joda.time.Period; import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; @@ -89,8 +88,10 @@ public class DatabaseRuleManager final List defaultRules = Arrays.asList( new ForeverLoadRule( - DruidServer.DEFAULT_NUM_REPLICANTS, - DruidServer.DEFAULT_TIER + ImmutableMap.of( + DruidServer.DEFAULT_TIER, + DruidServer.DEFAULT_NUM_REPLICANTS + ) ) ); final String version = new DateTime().toString(); diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 26e736ee3bf..41da48a9ed3 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -58,6 +58,7 @@ import io.druid.server.coordinator.helper.DruidCoordinatorCleanup; import io.druid.server.coordinator.helper.DruidCoordinatorHelper; import io.druid.server.coordinator.helper.DruidCoordinatorLogger; import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner; +import io.druid.server.coordinator.helper.DruidCoordinatorSegmentInfoLoader; import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; diff --git a/server/src/main/java/io/druid/server/coordinator/rules/ForeverLoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/ForeverLoadRule.java index d826c572ad0..9c3c5095066 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/ForeverLoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/ForeverLoadRule.java @@ -24,39 +24,20 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; +import java.util.Map; + /** */ public class ForeverLoadRule extends LoadRule { - private final Integer replicants; - private final String tier; + private final Map tieredReplicants; @JsonCreator public ForeverLoadRule( - @JsonProperty("replicants") Integer replicants, - @JsonProperty("tier") String tier + @JsonProperty("tieredReplicants") Map tieredReplicants ) { - this.replicants = (replicants == null) ? 2 : replicants; - this.tier = tier; - } - - @Override - public int getReplicants() - { - return replicants; - } - - @Override - public int getReplicants(String tier) - { - return (this.tier.equalsIgnoreCase(tier)) ? replicants : 0; - } - - @Override - public String getTier() - { - return null; + this.tieredReplicants = tieredReplicants; } @Override @@ -65,6 +46,19 @@ public class ForeverLoadRule extends LoadRule return "loadForever"; } + @Override + @JsonProperty + public Map getTieredReplicants() + { + return tieredReplicants; + } + + @Override + public int getNumReplicants(String tier) + { + return tieredReplicants.get(tier); + } + @Override public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) { diff --git a/server/src/main/java/io/druid/server/coordinator/rules/IntervalLoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/IntervalLoadRule.java index 4fe01521e66..ea3a36587d0 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/IntervalLoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/IntervalLoadRule.java @@ -21,11 +21,14 @@ package io.druid.server.coordinator.rules; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; import com.metamx.common.logger.Logger; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; import org.joda.time.Interval; +import java.util.Map; + /** */ public class IntervalLoadRule extends LoadRule @@ -33,19 +36,25 @@ public class IntervalLoadRule extends LoadRule private static final Logger log = new Logger(IntervalLoadRule.class); private final Interval interval; - private final Integer replicants; - private final String tier; + private final Map tieredReplicants; @JsonCreator public IntervalLoadRule( @JsonProperty("interval") Interval interval, + @JsonProperty("load") Map tieredReplicants, + // Replicants and tier are deprecated @JsonProperty("replicants") Integer replicants, @JsonProperty("tier") String tier ) { this.interval = interval; - this.replicants = (replicants == null) ? 2 : replicants; - this.tier = tier; + + + if (tieredReplicants != null) { + this.tieredReplicants = tieredReplicants; + } else { // Backwards compatible + this.tieredReplicants = ImmutableMap.of(tier, replicants); + } } @Override @@ -55,24 +64,16 @@ public class IntervalLoadRule extends LoadRule return "loadByInterval"; } - @Override @JsonProperty - public int getReplicants() + public Map getTieredReplicants() { - return replicants; + return tieredReplicants; } @Override - public int getReplicants(String tier) + public int getNumReplicants(String tier) { - return (this.tier.equalsIgnoreCase(tier)) ? replicants : 0; - } - - @Override - @JsonProperty - public String getTier() - { - return tier; + return tieredReplicants.get(tier); } @JsonProperty diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index 98ee7cfb206..7127c9af98c 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -22,17 +22,16 @@ package io.druid.server.coordinator.rules; import com.google.common.collect.Lists; import com.google.common.collect.MinMaxPriorityQueue; import com.metamx.emitter.EmittingLogger; +import io.druid.server.coordinator.BalancerStrategy; import io.druid.server.coordinator.CoordinatorStats; import io.druid.server.coordinator.DruidCoordinator; import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; import io.druid.server.coordinator.LoadPeonCallback; -import io.druid.server.coordinator.BalancerStrategy; import io.druid.server.coordinator.ReplicationThrottler; import io.druid.server.coordinator.ServerHolder; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; -import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -48,39 +47,46 @@ public abstract class LoadRule implements Rule { CoordinatorStats stats = new CoordinatorStats(); - int expectedReplicants = getReplicants(); - int totalReplicants = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), getTier()); - int clusterReplicants = params.getSegmentReplicantLookup().getClusterReplicants(segment.getIdentifier(), getTier()); + for (Map.Entry entry : getTieredReplicants().entrySet()) { + final String tier = entry.getKey(); + final int expectedReplicants = entry.getValue(); - MinMaxPriorityQueue serverQueue = params.getDruidCluster().getServersByTier(getTier()); - if (serverQueue == null) { - log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", getTier()).emit(); - return stats; + int totalReplicants = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), tier); + int clusterReplicants = params.getSegmentReplicantLookup() + .getClusterReplicants(segment.getIdentifier(), tier); + + MinMaxPriorityQueue serverQueue = params.getDruidCluster().getServersByTier(tier); + if (serverQueue == null) { + log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit(); + return stats; + } + + final List serverHolderList = Lists.newArrayList(serverQueue); + final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp(); + final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp); + if (params.getAvailableSegments().contains(segment)) { + stats.accumulate( + assign( + params.getReplicationManager(), + tier, + expectedReplicants, + totalReplicants, + strategy, + serverHolderList, + segment + ) + ); + } + + stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params)); } - final List serverHolderList = new ArrayList(serverQueue); - final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp(); - final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp); - if (params.getAvailableSegments().contains(segment)) { - stats.accumulate( - assign( - params.getReplicationManager(), - expectedReplicants, - totalReplicants, - strategy, - serverHolderList, - segment - ) - ); - } - - stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params)); - return stats; } private CoordinatorStats assign( final ReplicationThrottler replicationManager, + final String tier, final int expectedReplicants, int totalReplicants, final BalancerStrategy strategy, @@ -89,11 +95,12 @@ public abstract class LoadRule implements Rule ) { final CoordinatorStats stats = new CoordinatorStats(); + stats.addToTieredStat("assignedCount", tier, 0); while (totalReplicants < expectedReplicants) { boolean replicate = totalReplicants > 0; - if (replicate && !replicationManager.canCreateReplicant(getTier())) { + if (replicate && !replicationManager.canCreateReplicant(tier)) { break; } @@ -101,8 +108,8 @@ public abstract class LoadRule implements Rule if (holder == null) { log.warn( - "Not enough %s servers or node capacity to assign segment[%s]! Expected Replicants[%d]", - getTier(), + "Not enough [%s] servers or node capacity to assign segment[%s]! Expected Replicants[%d]", + tier, segment.getIdentifier(), expectedReplicants ); @@ -111,7 +118,7 @@ public abstract class LoadRule implements Rule if (replicate) { replicationManager.registerReplicantCreation( - getTier(), segment.getIdentifier(), holder.getServer().getHost() + tier, segment.getIdentifier(), holder.getServer().getHost() ); } @@ -123,7 +130,7 @@ public abstract class LoadRule implements Rule public void execute() { replicationManager.unregisterReplicantCreation( - getTier(), + tier, segment.getIdentifier(), holder.getServer().getHost() ); @@ -131,7 +138,7 @@ public abstract class LoadRule implements Rule } ); - stats.addToTieredStat("assignedCount", getTier(), 1); + stats.addToTieredStat("assignedCount", tier, 1); ++totalReplicants; } @@ -152,17 +159,20 @@ public abstract class LoadRule implements Rule return stats; } - // Make sure we have enough actual replicants in the cluster before doing anything + // Make sure we have enough actual replicants in the correct tier in the cluster before doing anything if (clusterReplicants < expectedReplicants) { return stats; } - Map replicantsByType = params.getSegmentReplicantLookup().getClusterTiers(segment.getIdentifier()); + // Find all instances of this segment across tiers + Map replicantsByTier = params.getSegmentReplicantLookup().getClusterTiers(segment.getIdentifier()); - for (Map.Entry entry : replicantsByType.entrySet()) { - String tier = entry.getKey(); - int actualNumReplicantsForType = entry.getValue(); - int expectedNumReplicantsForType = getReplicants(tier); + for (Map.Entry entry : replicantsByTier.entrySet()) { + final String tier = entry.getKey(); + int actualNumReplicantsForTier = entry.getValue(); + int expectedNumReplicantsForTier = getNumReplicants(tier); + + stats.addToTieredStat("droppedCount", tier, 0); MinMaxPriorityQueue serverQueue = params.getDruidCluster().get(tier); if (serverQueue == null) { @@ -171,7 +181,7 @@ public abstract class LoadRule implements Rule } List droppedServers = Lists.newArrayList(); - while (actualNumReplicantsForType > expectedNumReplicantsForType) { + while (actualNumReplicantsForTier > expectedNumReplicantsForTier) { final ServerHolder holder = serverQueue.pollLast(); if (holder == null) { log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier()); @@ -179,14 +189,14 @@ public abstract class LoadRule implements Rule } if (holder.isServingSegment(segment)) { - if (expectedNumReplicantsForType > 0) { // don't throttle unless we are removing extra replicants - if (!replicationManager.canDestroyReplicant(getTier())) { + if (expectedNumReplicantsForTier > 0) { // don't throttle unless we are removing extra replicants + if (!replicationManager.canDestroyReplicant(tier)) { serverQueue.add(holder); break; } replicationManager.registerReplicantTermination( - getTier(), + tier, segment.getIdentifier(), holder.getServer().getHost() ); @@ -200,14 +210,14 @@ public abstract class LoadRule implements Rule public void execute() { replicationManager.unregisterReplicantTermination( - getTier(), + tier, segment.getIdentifier(), holder.getServer().getHost() ); } } ); - --actualNumReplicantsForType; + --actualNumReplicantsForTier; stats.addToTieredStat("droppedCount", tier, 1); } droppedServers.add(holder); @@ -218,9 +228,7 @@ public abstract class LoadRule implements Rule return stats; } - public abstract int getReplicants(); + public abstract Map getTieredReplicants(); - public abstract int getReplicants(String tier); - - public abstract String getTier(); + public abstract int getNumReplicants(String tier); } diff --git a/server/src/main/java/io/druid/server/coordinator/rules/PeriodLoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/PeriodLoadRule.java index a2048756894..a8b47e35aef 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/PeriodLoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/PeriodLoadRule.java @@ -21,12 +21,15 @@ package io.druid.server.coordinator.rules; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; import com.metamx.common.logger.Logger; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; +import java.util.Map; + /** */ public class PeriodLoadRule extends LoadRule @@ -34,19 +37,24 @@ public class PeriodLoadRule extends LoadRule private static final Logger log = new Logger(PeriodLoadRule.class); private final Period period; - private final Integer replicants; - private final String tier; + private final Map tieredReplicants; @JsonCreator public PeriodLoadRule( @JsonProperty("period") Period period, + @JsonProperty("tieredReplicants") Map tieredReplicants, + // The following two vars need to be deprecated @JsonProperty("replicants") Integer replicants, @JsonProperty("tier") String tier ) { this.period = period; - this.replicants = (replicants == null) ? 2 : replicants; - this.tier = tier; + + if (tieredReplicants != null) { + this.tieredReplicants = tieredReplicants; + } else { // Backwards compatible + this.tieredReplicants = ImmutableMap.of(tier, replicants); + } } @Override @@ -62,22 +70,17 @@ public class PeriodLoadRule extends LoadRule return period; } + @Override @JsonProperty - public int getReplicants() + public Map getTieredReplicants() { - return replicants; + return tieredReplicants; } @Override - public int getReplicants(String tier) + public int getNumReplicants(String tier) { - return (this.tier.equalsIgnoreCase(tier)) ? replicants : 0; - } - - @JsonProperty - public String getTier() - { - return tier; + return tieredReplicants.get(tier); } @Override diff --git a/server/src/main/java/io/druid/server/coordinator/rules/SizeLoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/SizeLoadRule.java index 6fdc10f822c..e1f0942958c 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/SizeLoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/SizeLoadRule.java @@ -25,6 +25,8 @@ import com.google.common.collect.Range; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; +import java.util.Map; + /** */ public class SizeLoadRule extends LoadRule @@ -51,23 +53,15 @@ public class SizeLoadRule extends LoadRule } @Override - @JsonProperty - public int getReplicants() + public Map getTieredReplicants() { - return replicants; + return null; } @Override - public int getReplicants(String tier) + public int getNumReplicants(String tier) { - return (this.tier.equalsIgnoreCase(tier)) ? replicants : 0; - } - - @Override - @JsonProperty - public String getTier() - { - return tier; + return 0; } @Override @@ -76,18 +70,6 @@ public class SizeLoadRule extends LoadRule return "loadBySize"; } - @JsonProperty - public long getLow() - { - return low; - } - - @JsonProperty - public long getHigh() - { - return high; - } - @Override public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) {