From 7411b18df9188627f3933be1b36e2de31e26d437 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 2 May 2017 01:55:17 +0900 Subject: [PATCH] Add BroadcastDistributionRule (#4077) * Add BroadcastDistributionRule * Add missing null check * Rename variable 'colocateDataSource' to 'colocatedDatasource' * Address comments * Document for broadcast rules * Drop segments which are not co-located anymore * Remove duplicated segment loading and dropping * Add caveat * address comments --- docs/content/operations/rule-configuration.md | 66 +++- .../io/druid/client/ImmutableDruidServer.java | 5 + .../server/coordinator/DruidCluster.java | 8 + .../DruidCoordinatorRuntimeParams.java | 2 +- .../rules/BroadcastDistributionRule.java | 116 +++++++ .../ForeverBroadcastDistributionRule.java | 91 +++++ .../IntervalBroadcastDistributionRule.java | 104 ++++++ .../coordinator/rules/IntervalLoadRule.java | 2 +- .../server/coordinator/rules/LoadRule.java | 14 +- .../PeriodBroadcastDistributionRule.java | 106 ++++++ .../coordinator/rules/PeriodLoadRule.java | 3 +- .../druid/server/coordinator/rules/Rule.java | 13 +- .../druid/server/coordinator/rules/Rules.java | 40 +++ .../BroadcastDistributionRuleSerdeTest.java | 75 ++++ .../rules/BroadcastDistributionRuleTest.java | 320 ++++++++++++++++++ .../coordinator/rules/LoadRuleTest.java | 7 +- 16 files changed, 951 insertions(+), 21 deletions(-) create mode 100644 server/src/main/java/io/druid/server/coordinator/rules/BroadcastDistributionRule.java create mode 100644 server/src/main/java/io/druid/server/coordinator/rules/ForeverBroadcastDistributionRule.java create mode 100644 server/src/main/java/io/druid/server/coordinator/rules/IntervalBroadcastDistributionRule.java create mode 100644 server/src/main/java/io/druid/server/coordinator/rules/PeriodBroadcastDistributionRule.java create mode 100644 server/src/main/java/io/druid/server/coordinator/rules/Rules.java create mode 100644 server/src/test/java/io/druid/server/coordinator/rules/BroadcastDistributionRuleSerdeTest.java create mode 100644 server/src/test/java/io/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java diff --git a/docs/content/operations/rule-configuration.md b/docs/content/operations/rule-configuration.md index b3f123d9494..3ce75037ea0 100644 --- a/docs/content/operations/rule-configuration.md +++ b/docs/content/operations/rule-configuration.md @@ -3,9 +3,12 @@ layout: doc_page --- # Retaining or Automatically Dropping Data -Coordinator nodes use rules to determine what data should be loaded or dropped from the cluster. Rules are used for data retention and are set on the coordinator console (http://coordinator_ip:port). +Coordinator nodes use rules to determine what data should be loaded to or dropped from the cluster. Rules are used for data retention and query execution, and are set on the coordinator console (http://coordinator_ip:port). -Rules indicate how segments should be assigned to different historical node tiers and how many replicas of a segment should exist in each tier. Rules may also indicate when segments should be dropped entirely from the cluster. The coordinator loads a set of rules from the metadata storage. Rules may be specific to a certain datasource and/or a default set of rules can be configured. Rules are read in order and hence the ordering of rules is important. The coordinator will cycle through all available segments and match each segment with the first rule that applies. Each segment may only match a single rule. +There are three types of rules, i.e., load rules, drop rules, and broadcast rules. Load rules indicate how segments should be assigned to different historical node tiers and how many replicas of a segment should exist in each tier. +Drop rules indicate when segments should be dropped entirely from the cluster. Finally, broadcast rules indicate how segments of different data sources should be co-located in historical nodes. + +The coordinator loads a set of rules from the metadata storage. Rules may be specific to a certain datasource and/or a default set of rules can be configured. Rules are read in order and hence the ordering of rules is important. The coordinator will cycle through all available segments and match each segment with the first rule that applies. Each segment may only match a single rule. Note: It is recommended that the coordinator console is used to configure rules. However, the coordinator node does have HTTP endpoints to programmatically configure rules. @@ -126,6 +129,65 @@ Period drop rules are of the form: The interval of a segment will be compared against the specified period. The period is from some time in the past to the current time. The rule matches if the period contains the interval. +Broadcast Rules +--------------- + +Broadcast rules indicate how segments of different data sources should be co-located in historical nodes. +Once a broadcast rule is configured for a data source, all segments of the data source are broadcasted to the servers holding _any segments_ of the co-located data sources. + +### Forever Broadcast Rule + +Forever broadcast rules are of the form: + +```json +{ + "type" : "broadcastForever", + "colocatedDataSources" : [ "target_source1", "target_source2" ] +} +``` + +* `type` - this should always be "broadcastForever" +* `colocatedDataSources` - A JSON List containing data source names to be co-located. `null` and empty list means broadcasting to every node in the cluster. + +### Interval Broadcast Rule + +Interval broadcast rules are of the form: + +```json +{ + "type" : "broadcastByInterval", + "colocatedDataSources" : [ "target_source1", "target_source2" ], + "interval" : "2012-01-01/2013-01-01" +} +``` + +* `type` - this should always be "broadcastByInterval" +* `colocatedDataSources` - A JSON List containing data source names to be co-located. `null` and empty list means broadcasting to every node in the cluster. +* `interval` - A JSON Object representing ISO-8601 Periods. Only the segments of the interval will be broadcasted. + +### Period Broadcast Rule + +Period broadcast rules are of the form: + +```json +{ + "type" : "broadcastByPeriod", + "colocatedDataSources" : [ "target_source1", "target_source2" ], + "period" : "P1M" +} +``` + +* `type` - this should always be "broadcastByPeriod" +* `colocatedDataSources` - A JSON List containing data source names to be co-located. `null` and empty list means broadcasting to every node in the cluster. +* `period` - A JSON Object representing ISO-8601 Periods + +The interval of a segment will be compared against the specified period. The period is from some time in the past to the current time. The rule matches if the period contains the interval. + +
+broadcast rules don't guarantee that segments of the data sources are always co-located because segments for the colocated data sources are not loaded together atomically. +If you want to always co-locate the segments of some data sources together, it is recommended to leave colocatedDataSources empty. +
+ # Permanently Deleting Data Druid can fully drop data from the cluster, wipe the metadata store entry, and remove the data from deep storage for any segments that are diff --git a/server/src/main/java/io/druid/client/ImmutableDruidServer.java b/server/src/main/java/io/druid/client/ImmutableDruidServer.java index 551025c148e..534f91cad44 100644 --- a/server/src/main/java/io/druid/client/ImmutableDruidServer.java +++ b/server/src/main/java/io/druid/client/ImmutableDruidServer.java @@ -97,6 +97,11 @@ public class ImmutableDruidServer return dataSources.values(); } + public ImmutableDruidDataSource getDataSource(String name) + { + return dataSources.get(name); + } + public Map getSegments() { return segments; diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCluster.java b/server/src/main/java/io/druid/server/coordinator/DruidCluster.java index cbca241f973..75ec66e5f39 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCluster.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCluster.java @@ -24,7 +24,10 @@ import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Ordering; import io.druid.client.ImmutableDruidServer; +import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * Contains a representation of the current state of the cluster by tier. @@ -70,6 +73,11 @@ public class DruidCluster return cluster.get(tier); } + public List getAllServers() + { + return cluster.values().stream().flatMap(Collection::stream).collect(Collectors.toList()); + } + public Iterable> getSortedServersByTier() { return cluster.values(); diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java index daea2cb31dd..89887e78024 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java @@ -50,7 +50,7 @@ public class DruidCoordinatorRuntimeParams private final DateTime balancerReferenceTimestamp; private final BalancerStrategy balancerStrategy; - public DruidCoordinatorRuntimeParams( + private DruidCoordinatorRuntimeParams( long startTime, DruidCluster druidCluster, MetadataRuleManager databaseRuleManager, diff --git a/server/src/main/java/io/druid/server/coordinator/rules/BroadcastDistributionRule.java b/server/src/main/java/io/druid/server/coordinator/rules/BroadcastDistributionRule.java new file mode 100644 index 00000000000..e32268044b4 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/rules/BroadcastDistributionRule.java @@ -0,0 +1,116 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator.rules; + +import com.metamx.emitter.EmittingLogger; +import io.druid.server.coordinator.CoordinatorStats; +import io.druid.server.coordinator.DruidCoordinator; +import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import io.druid.server.coordinator.ServerHolder; +import io.druid.timeline.DataSegment; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public abstract class BroadcastDistributionRule implements Rule +{ + private static final EmittingLogger log = new EmittingLogger(BroadcastDistributionRule.class); + + @Override + public CoordinatorStats run( + DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment + ) + { + // Find servers which holds the segments of co-located data source + final Set loadServerHolders = new HashSet<>(); + final Set dropServerHolders = new HashSet<>(); + final List colocatedDataSources = getColocatedDataSources(); + if (colocatedDataSources == null || colocatedDataSources.isEmpty()) { + loadServerHolders.addAll(params.getDruidCluster().getAllServers()); + } else { + params.getDruidCluster().getAllServers().forEach( + eachHolder -> { + if (colocatedDataSources.stream() + .anyMatch(source -> eachHolder.getServer().getDataSource(source) != null)) { + loadServerHolders.add(eachHolder); + } else if (eachHolder.isServingSegment(segment)) { + if (!eachHolder.getPeon().getSegmentsToDrop().contains(segment)) { + dropServerHolders.add(eachHolder); + } + } + } + ); + } + + final CoordinatorStats stats = new CoordinatorStats(); + + return stats.accumulate(assign(loadServerHolders, segment)) + .accumulate(drop(dropServerHolders, segment)); + } + + private CoordinatorStats assign( + final Set serverHolders, + final DataSegment segment + ) + { + final CoordinatorStats stats = new CoordinatorStats(); + stats.addToGlobalStat(LoadRule.ASSIGNED_COUNT, 0); + + for (ServerHolder holder : serverHolders) { + if (segment.getSize() > holder.getAvailableSize()) { + log.makeAlert("Failed to broadcast segment for [%s]", segment.getDataSource()) + .addData("segmentId", segment.getIdentifier()) + .addData("segmentSize", segment.getSize()) + .addData("hostName", holder.getServer().getHost()) + .addData("availableSize", holder.getAvailableSize()) + .emit(); + } else { + if (!holder.isLoadingSegment(segment)) { + holder.getPeon().loadSegment( + segment, + null + ); + + stats.addToGlobalStat(LoadRule.ASSIGNED_COUNT, 1); + } + } + } + + return stats; + } + + private CoordinatorStats drop( + final Set serverHolders, + final DataSegment segment + ) + { + CoordinatorStats stats = new CoordinatorStats(); + + for (ServerHolder holder : serverHolders) { + holder.getPeon().dropSegment(segment, null); + stats.addToGlobalStat(LoadRule.DROPPED_COUNT, 1); + } + + return stats; + } + + public abstract List getColocatedDataSources(); +} diff --git a/server/src/main/java/io/druid/server/coordinator/rules/ForeverBroadcastDistributionRule.java b/server/src/main/java/io/druid/server/coordinator/rules/ForeverBroadcastDistributionRule.java new file mode 100644 index 00000000000..5714bc657b4 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/rules/ForeverBroadcastDistributionRule.java @@ -0,0 +1,91 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator.rules; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.timeline.DataSegment; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import java.util.List; +import java.util.Objects; + +public class ForeverBroadcastDistributionRule extends BroadcastDistributionRule +{ + static final String TYPE = "broadcastForever"; + + private final List colocatedDataSources; + + @JsonCreator + public ForeverBroadcastDistributionRule( + @JsonProperty("colocatedDataSources") List colocatedDataSources + ) + { + this.colocatedDataSources = colocatedDataSources; + } + + @Override + @JsonProperty + public String getType() + { + return TYPE; + } + + @Override + @JsonProperty + public List getColocatedDataSources() + { + return colocatedDataSources; + } + + @Override + public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) + { + return true; + } + + @Override + public boolean appliesTo(Interval interval, DateTime referenceTimestamp) + { + return true; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + + if (o == null || o.getClass() != getClass()) { + return false; + } + + ForeverBroadcastDistributionRule that = (ForeverBroadcastDistributionRule) o; + return Objects.equals(colocatedDataSources, that.colocatedDataSources); + } + + @Override + public int hashCode() + { + return Objects.hash(getType(), colocatedDataSources); + } +} diff --git a/server/src/main/java/io/druid/server/coordinator/rules/IntervalBroadcastDistributionRule.java b/server/src/main/java/io/druid/server/coordinator/rules/IntervalBroadcastDistributionRule.java new file mode 100644 index 00000000000..5f6156229be --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/rules/IntervalBroadcastDistributionRule.java @@ -0,0 +1,104 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator.rules; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.timeline.DataSegment; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import java.util.List; +import java.util.Objects; + +public class IntervalBroadcastDistributionRule extends BroadcastDistributionRule +{ + static final String TYPE = "broadcastByInterval"; + private final Interval interval; + private final List colocatedDataSources; + + @JsonCreator + public IntervalBroadcastDistributionRule( + @JsonProperty("interval") Interval interval, + @JsonProperty("colocatedDataSources") List colocatedDataSources + ) + { + this.interval = interval; + this.colocatedDataSources = colocatedDataSources; + } + + @Override + @JsonProperty + public String getType() + { + return TYPE; + } + + @Override + @JsonProperty + public List getColocatedDataSources() + { + return colocatedDataSources; + } + + @Override + public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) + { + return appliesTo(segment.getInterval(), referenceTimestamp); + } + + @Override + public boolean appliesTo(Interval interval, DateTime referenceTimestamp) + { + return Rules.eligibleForLoad(this.interval, interval); + } + + @JsonProperty + public Interval getInterval() + { + return interval; + } + + @Override + public boolean equals(Object o) + { + if (o == this) { + return true; + } + + if (o == null || o.getClass() != getClass()) { + return false; + } + + IntervalBroadcastDistributionRule that = (IntervalBroadcastDistributionRule) o; + + if (!Objects.equals(interval, that.interval)) { + return false; + } + + return Objects.equals(colocatedDataSources, that.colocatedDataSources); + } + + @Override + public int hashCode() + { + return Objects.hash(getType(), interval, colocatedDataSources); + } +} diff --git a/server/src/main/java/io/druid/server/coordinator/rules/IntervalLoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/IntervalLoadRule.java index 7a22c56ba46..8a0e4a5679b 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/IntervalLoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/IntervalLoadRule.java @@ -86,7 +86,7 @@ public class IntervalLoadRule extends LoadRule @Override public boolean appliesTo(Interval theInterval, DateTime referenceTimestamp) { - return interval.contains(theInterval); + return Rules.eligibleForLoad(interval, theInterval); } @Override diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index d7e8e73015d..c260ec23bbc 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -44,8 +44,8 @@ import java.util.Set; public abstract class LoadRule implements Rule { private static final EmittingLogger log = new EmittingLogger(LoadRule.class); - private static final String assignedCount = "assignedCount"; - private static final String droppedCount = "droppedCount"; + static final String ASSIGNED_COUNT = "assignedCount"; + static final String DROPPED_COUNT = "droppedCount"; @Override public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment) @@ -84,7 +84,7 @@ public abstract class LoadRule implements Rule segment ); stats.accumulate(assignStats); - totalReplicantsInCluster += assignStats.getPerTierStats().get(assignedCount).get(tier).get(); + totalReplicantsInCluster += assignStats.getPerTierStats().get(ASSIGNED_COUNT).get(tier).get(); } loadStatus.put(tier, expectedReplicantsInTier - loadedReplicantsInTier); @@ -108,7 +108,7 @@ public abstract class LoadRule implements Rule ) { final CoordinatorStats stats = new CoordinatorStats(); - stats.addToTieredStat(assignedCount, tier, 0); + stats.addToTieredStat(ASSIGNED_COUNT, tier, 0); int currReplicantsInTier = totalReplicantsInTier; int currTotalReplicantsInCluster = totalReplicantsInCluster; @@ -153,7 +153,7 @@ public abstract class LoadRule implements Rule } ); - stats.addToTieredStat(assignedCount, tier, 1); + stats.addToTieredStat(ASSIGNED_COUNT, tier, 1); ++currReplicantsInTier; ++currTotalReplicantsInCluster; } @@ -186,7 +186,7 @@ public abstract class LoadRule implements Rule int loadedNumReplicantsForTier = entry.getValue(); int expectedNumReplicantsForTier = getNumReplicants(tier); - stats.addToTieredStat(droppedCount, tier, 0); + stats.addToTieredStat(DROPPED_COUNT, tier, 0); MinMaxPriorityQueue serverQueue = params.getDruidCluster().get(tier); if (serverQueue == null) { @@ -208,7 +208,7 @@ public abstract class LoadRule implements Rule null ); --loadedNumReplicantsForTier; - stats.addToTieredStat(droppedCount, tier, 1); + stats.addToTieredStat(DROPPED_COUNT, tier, 1); } droppedServers.add(holder); } diff --git a/server/src/main/java/io/druid/server/coordinator/rules/PeriodBroadcastDistributionRule.java b/server/src/main/java/io/druid/server/coordinator/rules/PeriodBroadcastDistributionRule.java new file mode 100644 index 00000000000..29a001f852b --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/rules/PeriodBroadcastDistributionRule.java @@ -0,0 +1,106 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator.rules; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.timeline.DataSegment; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.joda.time.Period; + +import java.util.List; +import java.util.Objects; + +public class PeriodBroadcastDistributionRule extends BroadcastDistributionRule +{ + static final String TYPE = "broadcastByPeriod"; + + private final Period period; + private final List colocatedDataSources; + + @JsonCreator + public PeriodBroadcastDistributionRule( + @JsonProperty("period") Period period, + @JsonProperty("colocatedDataSources") List colocatedDataSources + ) + { + this.period = period; + this.colocatedDataSources = colocatedDataSources; + } + + @Override + @JsonProperty + public String getType() + { + return TYPE; + } + + @Override + @JsonProperty + public List getColocatedDataSources() + { + return colocatedDataSources; + } + + @Override + public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) + { + return appliesTo(segment.getInterval(), referenceTimestamp); + } + + @Override + public boolean appliesTo(Interval interval, DateTime referenceTimestamp) + { + return Rules.eligibleForLoad(period, interval, referenceTimestamp); + } + + @JsonProperty + public Period getPeriod() + { + return period; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + + if (o == null || o.getClass() != getClass()) { + return false; + } + + PeriodBroadcastDistributionRule that = (PeriodBroadcastDistributionRule) o; + + if (!Objects.equals(period, that.period)) { + return false; + } + + return Objects.equals(colocatedDataSources, that.colocatedDataSources); + } + + @Override + public int hashCode() + { + return Objects.hash(getType(), period, colocatedDataSources); + } +} diff --git a/server/src/main/java/io/druid/server/coordinator/rules/PeriodLoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/PeriodLoadRule.java index 2be0edec079..874ef2862da 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/PeriodLoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/PeriodLoadRule.java @@ -88,7 +88,6 @@ public class PeriodLoadRule extends LoadRule @Override public boolean appliesTo(Interval interval, DateTime referenceTimestamp) { - final Interval currInterval = new Interval(period, referenceTimestamp); - return currInterval.overlaps(interval) && interval.getStartMillis() >= currInterval.getStartMillis(); + return Rules.eligibleForLoad(period, interval, referenceTimestamp); } } diff --git a/server/src/main/java/io/druid/server/coordinator/rules/Rule.java b/server/src/main/java/io/druid/server/coordinator/rules/Rule.java index c96946edb88..f0bf6fef4b5 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/Rule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/Rule.java @@ -37,16 +37,19 @@ import org.joda.time.Interval; @JsonSubTypes.Type(name = "loadForever", value = ForeverLoadRule.class), @JsonSubTypes.Type(name = "dropByPeriod", value = PeriodDropRule.class), @JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class), - @JsonSubTypes.Type(name = "dropForever", value = ForeverDropRule.class) + @JsonSubTypes.Type(name = "dropForever", value = ForeverDropRule.class), + @JsonSubTypes.Type(name = ForeverBroadcastDistributionRule.TYPE, value = ForeverBroadcastDistributionRule.class), + @JsonSubTypes.Type(name = IntervalBroadcastDistributionRule.TYPE, value = IntervalBroadcastDistributionRule.class), + @JsonSubTypes.Type(name = PeriodBroadcastDistributionRule.TYPE, value = PeriodBroadcastDistributionRule.class) }) public interface Rule { - public String getType(); + String getType(); - public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp); + boolean appliesTo(DataSegment segment, DateTime referenceTimestamp); - public boolean appliesTo(Interval interval, DateTime referenceTimestamp); + boolean appliesTo(Interval interval, DateTime referenceTimestamp); - public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment); + CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment); } diff --git a/server/src/main/java/io/druid/server/coordinator/rules/Rules.java b/server/src/main/java/io/druid/server/coordinator/rules/Rules.java new file mode 100644 index 00000000000..808e8a15567 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/rules/Rules.java @@ -0,0 +1,40 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator.rules; + +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.joda.time.Period; + +public class Rules +{ + public static boolean eligibleForLoad(Interval src, Interval target) + { + return src.contains(target); + } + + public static boolean eligibleForLoad(Period period, Interval interval, DateTime referenceTimestamp) + { + final Interval currInterval = new Interval(period, referenceTimestamp); + return currInterval.overlaps(interval) && interval.getStartMillis() >= currInterval.getStartMillis(); + } + + private Rules() {} +} diff --git a/server/src/test/java/io/druid/server/coordinator/rules/BroadcastDistributionRuleSerdeTest.java b/server/src/test/java/io/druid/server/coordinator/rules/BroadcastDistributionRuleSerdeTest.java new file mode 100644 index 00000000000..ac76a368de5 --- /dev/null +++ b/server/src/test/java/io/druid/server/coordinator/rules/BroadcastDistributionRuleSerdeTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator.rules; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import io.druid.jackson.DefaultObjectMapper; +import org.joda.time.Interval; +import org.joda.time.Period; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class BroadcastDistributionRuleSerdeTest +{ + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); + + @Parameterized.Parameters + public static List constructorFeeder() + { + final List params = Lists.newArrayList( + new Object[]{new ForeverBroadcastDistributionRule(ImmutableList.of("large_source1", "large_source2"))}, + new Object[]{new ForeverBroadcastDistributionRule(ImmutableList.of())}, + new Object[]{new ForeverBroadcastDistributionRule(null)}, + new Object[]{new IntervalBroadcastDistributionRule(new Interval("0/1000"), ImmutableList.of("large_source"))}, + new Object[]{new IntervalBroadcastDistributionRule(new Interval("0/1000"), ImmutableList.of())}, + new Object[]{new IntervalBroadcastDistributionRule(new Interval("0/1000"), null)}, + new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), ImmutableList.of("large_source"))}, + new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), ImmutableList.of())}, + new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null)} + ); + return params; + } + + private final Rule testRule; + + public BroadcastDistributionRuleSerdeTest(Rule testRule) + { + this.testRule = testRule; + } + + @Test + public void testSerde() throws IOException + { + final List rules = Lists.newArrayList(testRule); + final String json = MAPPER.writeValueAsString(rules); + final List fromJson = MAPPER.readValue(json, new TypeReference>(){}); + assertEquals(rules, fromJson); + } +} diff --git a/server/src/test/java/io/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java new file mode 100644 index 00000000000..1255a5df5e0 --- /dev/null +++ b/server/src/test/java/io/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java @@ -0,0 +1,320 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator.rules; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.collect.Ordering; +import io.druid.client.DruidServer; +import io.druid.server.coordinator.CoordinatorStats; +import io.druid.server.coordinator.DruidCluster; +import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import io.druid.server.coordinator.LoadQueuePeonTester; +import io.druid.server.coordinator.SegmentReplicantLookup; +import io.druid.server.coordinator.ServerHolder; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class BroadcastDistributionRuleTest +{ + private DruidCluster druidCluster; + private ServerHolder holderOfSmallSegment; + private List holdersOfLargeSegments = Lists.newArrayList(); + private List holdersOfLargeSegments2 = Lists.newArrayList(); + private final List largeSegments = Lists.newArrayList(); + private final List largeSegments2 = Lists.newArrayList(); + private DataSegment smallSegment; + + @Before + public void setUp() throws Exception + { + smallSegment = new DataSegment( + "small_source", + new Interval("0/1000"), + new DateTime().toString(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + NoneShardSpec.instance(), + 0, + 0 + ); + + for (int i = 0; i < 3; i++) { + largeSegments.add( + new DataSegment( + "large_source", + new Interval((i * 1000) + "/" + ((i + 1) * 1000)), + new DateTime().toString(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + NoneShardSpec.instance(), + 0, + 100 + ) + ); + } + + for (int i = 0; i < 2; i++) { + largeSegments2.add( + new DataSegment( + "large_source2", + new Interval((i * 1000) + "/" + ((i + 1) * 1000)), + new DateTime().toString(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + NoneShardSpec.instance(), + 0, + 100 + ) + ); + } + + holderOfSmallSegment = new ServerHolder( + new DruidServer( + "serverHot2", + "hostHot2", + 1000, + "historical", + "hot", + 0 + ).addDataSegment(smallSegment.getIdentifier(), smallSegment) + .toImmutableDruidServer(), + new LoadQueuePeonTester() + ); + + holdersOfLargeSegments.add( + new ServerHolder( + new DruidServer( + "serverHot1", + "hostHot1", + 1000, + "historical", + "hot", + 0 + ).addDataSegment(largeSegments.get(0).getIdentifier(), largeSegments.get(0)) + .toImmutableDruidServer(), + new LoadQueuePeonTester() + ) + ); + holdersOfLargeSegments.add( + new ServerHolder( + new DruidServer( + "serverNorm1", + "hostNorm1", + 1000, + "historical", + DruidServer.DEFAULT_TIER, + 0 + ).addDataSegment(largeSegments.get(1).getIdentifier(), largeSegments.get(1)) + .toImmutableDruidServer(), + new LoadQueuePeonTester() + ) + ); + holdersOfLargeSegments.add( + new ServerHolder( + new DruidServer( + "serverNorm2", + "hostNorm2", + 100, + "historical", + DruidServer.DEFAULT_TIER, + 0 + ).addDataSegment(largeSegments.get(2).getIdentifier(), largeSegments.get(2)) + .toImmutableDruidServer(), + new LoadQueuePeonTester() + ) + ); + + holdersOfLargeSegments2.add( + new ServerHolder( + new DruidServer( + "serverHot3", + "hostHot3", + 1000, + "historical", + "hot", + 0 + ).addDataSegment(largeSegments2.get(0).getIdentifier(), largeSegments2.get(0)) + .toImmutableDruidServer(), + new LoadQueuePeonTester() + ) + ); + holdersOfLargeSegments2.add( + new ServerHolder( + new DruidServer( + "serverNorm3", + "hostNorm3", + 100, + "historical", + DruidServer.DEFAULT_TIER, + 0 + ).addDataSegment(largeSegments2.get(1).getIdentifier(), largeSegments2.get(1)) + .toImmutableDruidServer(), + new LoadQueuePeonTester() + ) + ); + + druidCluster = new DruidCluster( + ImmutableMap.of( + "hot", + MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( + Lists.newArrayList( + holdersOfLargeSegments.get(0), + holderOfSmallSegment, + holdersOfLargeSegments2.get(0) + ) + ), + DruidServer.DEFAULT_TIER, + MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( + Lists.newArrayList( + holdersOfLargeSegments.get(1), + holdersOfLargeSegments.get(2), + holdersOfLargeSegments2.get(1) + ) + ) + ) + ); + } + + @Test + public void testBroadcastToSingleDataSource() + { + final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule(ImmutableList.of("large_source")); + + CoordinatorStats stats = rule.run( + null, + DruidCoordinatorRuntimeParams.newBuilder() + .withDruidCluster(druidCluster) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) + .withAvailableSegments(Lists.newArrayList( + smallSegment, + largeSegments.get(0), + largeSegments.get(1), + largeSegments.get(2), + largeSegments2.get(0), + largeSegments2.get(1) + )).build(), + smallSegment + ); + + assertEquals(3, stats.getGlobalStats().get(LoadRule.ASSIGNED_COUNT).intValue()); + assertTrue(stats.getPerTierStats().isEmpty()); + + assertTrue( + holdersOfLargeSegments.stream() + .allMatch(holder -> holder.getPeon().getSegmentsToLoad().contains(smallSegment)) + ); + + assertTrue( + holdersOfLargeSegments2.stream() + .noneMatch(holder -> holder.getPeon().getSegmentsToLoad().contains(smallSegment)) + ); + + assertFalse(holderOfSmallSegment.getPeon().getSegmentsToLoad().contains(smallSegment)); + } + + @Test + public void testBroadcastToMultipleDataSources() + { + final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule( + ImmutableList.of("large_source", "large_source2") + ); + + CoordinatorStats stats = rule.run( + null, + DruidCoordinatorRuntimeParams.newBuilder() + .withDruidCluster(druidCluster) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) + .withAvailableSegments(Lists.newArrayList( + smallSegment, + largeSegments.get(0), + largeSegments.get(1), + largeSegments.get(2), + largeSegments2.get(0), + largeSegments2.get(1) + )).build(), + smallSegment + ); + + assertEquals(5, stats.getGlobalStats().get(LoadRule.ASSIGNED_COUNT).intValue()); + assertTrue(stats.getPerTierStats().isEmpty()); + + assertTrue( + holdersOfLargeSegments.stream() + .allMatch(holder -> holder.getPeon().getSegmentsToLoad().contains(smallSegment)) + ); + + assertTrue( + holdersOfLargeSegments2.stream() + .allMatch(holder -> holder.getPeon().getSegmentsToLoad().contains(smallSegment)) + ); + + assertFalse(holderOfSmallSegment.getPeon().getSegmentsToLoad().contains(smallSegment)); + } + + @Test + public void testBroadcastToAllServers() + { + final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule(null); + + CoordinatorStats stats = rule.run( + null, + DruidCoordinatorRuntimeParams.newBuilder() + .withDruidCluster(druidCluster) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) + .withAvailableSegments(Lists.newArrayList( + smallSegment, + largeSegments.get(0), + largeSegments.get(1), + largeSegments.get(2), + largeSegments2.get(0), + largeSegments2.get(1) + )).build(), + smallSegment + ); + + assertEquals(6, stats.getGlobalStats().get(LoadRule.ASSIGNED_COUNT).intValue()); + assertTrue(stats.getPerTierStats().isEmpty()); + + assertTrue( + druidCluster.getAllServers().stream() + .allMatch(holder -> holder.getPeon().getSegmentsToLoad().contains(smallSegment)) + ); + } +} diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java index 9e6d77dd559..33597215f81 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java @@ -107,6 +107,7 @@ public class LoadRuleTest public void tearDown() throws Exception { EasyMock.verify(mockPeon); + emitter.close(); } @Test @@ -210,8 +211,8 @@ public class LoadRuleTest segment ); - Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1); - Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get(DruidServer.DEFAULT_TIER).get() == 2); + Assert.assertTrue(stats.getPerTierStats().get(LoadRule.ASSIGNED_COUNT).get("hot").get() == 1); + Assert.assertTrue(stats.getPerTierStats().get(LoadRule.ASSIGNED_COUNT).get(DruidServer.DEFAULT_TIER).get() == 2); exec.shutdown(); } @@ -410,7 +411,7 @@ public class LoadRuleTest segment ); - Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1); + Assert.assertTrue(stats.getPerTierStats().get(LoadRule.ASSIGNED_COUNT).get("hot").get() == 1); exec.shutdown(); }