diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 0dcf636b744..64168e11d76 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -29,7 +29,6 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap; import it.unimi.dsi.fastutil.objects.Object2IntMaps; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import it.unimi.dsi.fastutil.objects.Object2LongMap; -import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.utils.ZKPaths; import org.apache.druid.client.DataSourcesSnapshot; @@ -69,7 +68,6 @@ import org.apache.druid.server.coordinator.duty.LogUsedSegments; import org.apache.druid.server.coordinator.duty.MarkAsUnusedOvershadowedSegments; import org.apache.druid.server.coordinator.duty.RunRules; import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments; -import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule; import org.apache.druid.server.coordinator.rules.LoadRule; import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.server.initialization.ZkPathsConfig; @@ -275,13 +273,6 @@ public class DruidCoordinator ) { final Map> underReplicationCountsPerDataSourcePerTier = new HashMap<>(); - final Set decommissioningServers = getDynamicConfigs().getDecommissioningNodes(); - final List broadcastTargetServers = serverInventoryView - .getInventory() - .stream() - .filter(druidServer -> druidServer.isSegmentBroadcastTarget() && !decommissioningServers.contains(druidServer.getHost())) - .map(DruidServer::toImmutableDruidServer) - .collect(Collectors.toList()); if (segmentReplicantLookup == null) { return underReplicationCountsPerDataSourcePerTier; @@ -294,36 +285,19 @@ public class DruidCoordinator for (final Rule rule : rules) { if (!rule.appliesTo(segment, now)) { + // Rule did not match. Continue to the next Rule. continue; } - - if (rule instanceof LoadRule) { - ((LoadRule) rule) - .getTieredReplicants() - .forEach((final String tier, final Integer ruleReplicants) -> { - int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier); - Object2LongMap underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier - .computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>()); - ((Object2LongOpenHashMap) underReplicationPerDataSource) - .addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0)); - }); + if (!rule.canLoadSegments()) { + // Rule matched but rule does not and cannot load segments. + // Hence, there is no need to update underReplicationCountsPerDataSourcePerTier map + break; } - if (rule instanceof BroadcastDistributionRule) { - for (ImmutableDruidServer server : broadcastTargetServers) { - Object2LongMap underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier - .computeIfAbsent(server.getTier(), ignored -> new Object2LongOpenHashMap<>()); - if (server.getSegment(segment.getId()) == null) { - ((Object2LongOpenHashMap) underReplicationPerDataSource) - .addTo(segment.getDataSource(), 1); - } else { - // This make sure that every datasource has a entry even if the all segments are loaded - underReplicationPerDataSource.putIfAbsent(segment.getDataSource(), 0); - } - } - } + rule.updateUnderReplicated(underReplicationCountsPerDataSourcePerTier, segmentReplicantLookup, segment); - // only the first matching rule applies + // Only the first matching rule applies. This is because the Coordinator cycle through all used segments + // and match each segment with the first rule that applies. Each segment may only match a single rule. break; } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java index cd04bfdcb8a..b86ca0106d3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java @@ -21,6 +21,8 @@ package org.apache.druid.server.coordinator; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Table; +import it.unimi.dsi.fastutil.objects.Object2LongMap; +import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; @@ -62,19 +64,22 @@ public class SegmentReplicantLookup } } - return new SegmentReplicantLookup(segmentsInCluster, loadingSegments); + return new SegmentReplicantLookup(segmentsInCluster, loadingSegments, cluster); } private final Table segmentsInCluster; private final Table loadingSegments; + private final DruidCluster cluster; private SegmentReplicantLookup( Table segmentsInCluster, - Table loadingSegments + Table loadingSegments, + DruidCluster cluster ) { this.segmentsInCluster = segmentsInCluster; this.loadingSegments = loadingSegments; + this.cluster = cluster; } public Map getClusterTiers(SegmentId segmentId) @@ -93,7 +98,7 @@ public class SegmentReplicantLookup return retVal; } - int getLoadedReplicants(SegmentId segmentId, String tier) + public int getLoadedReplicants(SegmentId segmentId, String tier) { Integer retVal = segmentsInCluster.get(segmentId, tier); return (retVal == null) ? 0 : retVal; @@ -124,4 +129,21 @@ public class SegmentReplicantLookup { return getLoadedReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier); } + + public Object2LongMap getBroadcastUnderReplication(SegmentId segmentId) + { + Object2LongOpenHashMap perTier = new Object2LongOpenHashMap<>(); + for (ServerHolder holder : cluster.getAllServers()) { + // Only record tier entry for server that is segment broadcast target + if (holder.getServer().getType().isSegmentBroadcastTarget()) { + // Every broadcast target server should be serving 1 replica of the segment + if (!holder.isServingSegment(segmentId)) { + perTier.addTo(holder.getServer().getTier(), 1L); + } else { + perTier.putIfAbsent(holder.getServer().getTier(), 0); + } + } + } + return perTier; + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java index 26fa9a54c7f..43fdaaef1d1 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java @@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import java.util.Objects; @@ -114,7 +115,7 @@ public class ServerHolder implements Comparable public boolean isServingSegment(DataSegment segment) { - return server.getSegment(segment.getId()) != null; + return isServingSegment(segment.getId()); } public boolean isLoadingSegment(DataSegment segment) @@ -132,6 +133,11 @@ public class ServerHolder implements Comparable return peon.getNumberOfSegmentsInQueue(); } + public boolean isServingSegment(SegmentId segmentId) + { + return server.getSegment(segmentId) != null; + } + @Override public int compareTo(ServerHolder serverHolder) { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java index 35ff39ea650..0b8c37b7931 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java @@ -19,15 +19,19 @@ package org.apache.druid.server.coordinator.rules; +import it.unimi.dsi.fastutil.objects.Object2LongMap; +import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.CoordinatorStats; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.SegmentReplicantLookup; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.timeline.DataSegment; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -71,6 +75,37 @@ public abstract class BroadcastDistributionRule implements Rule .accumulate(drop(dropServerHolders, segment)); } + @Override + public boolean canLoadSegments() + { + return true; + } + + @Override + public void updateUnderReplicated( + Map> underReplicatedPerTier, + SegmentReplicantLookup segmentReplicantLookup, + DataSegment segment + ) + { + Object2LongMap underReplicatedBroadcastTiers = segmentReplicantLookup.getBroadcastUnderReplication(segment.getId()); + for (final Object2LongMap.Entry entry : underReplicatedBroadcastTiers.object2LongEntrySet()) { + final String tier = entry.getKey(); + final long underReplicatedCount = entry.getLongValue(); + underReplicatedPerTier.compute(tier, (_tier, existing) -> { + Object2LongMap underReplicationPerDataSource = existing; + if (existing == null) { + underReplicationPerDataSource = new Object2LongOpenHashMap<>(); + } + underReplicationPerDataSource.compute( + segment.getDataSource(), + (_datasource, count) -> count != null ? count + underReplicatedCount : underReplicatedCount + ); + return underReplicationPerDataSource; + }); + } + } + private CoordinatorStats assign( final Set serverHolders, final DataSegment segment diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/DropRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/DropRule.java index c565df9b58b..7ffc7a21155 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/DropRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/DropRule.java @@ -37,4 +37,10 @@ public abstract class DropRule implements Rule stats.addToGlobalStat("deletedCount", 1); return stats; } + + @Override + public boolean canLoadSegments() + { + return false; + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java index 80912544a4e..d0bf5b8ad56 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java @@ -21,6 +21,8 @@ package org.apache.druid.server.coordinator.rules; import it.unimi.dsi.fastutil.objects.Object2IntMap; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import it.unimi.dsi.fastutil.objects.Object2LongMap; +import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; @@ -30,6 +32,7 @@ import org.apache.druid.server.coordinator.DruidCluster; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.ReplicationThrottler; +import org.apache.druid.server.coordinator.SegmentReplicantLookup; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; @@ -90,6 +93,32 @@ public abstract class LoadRule implements Rule } } + @Override + public boolean canLoadSegments() + { + return true; + } + + @Override + public void updateUnderReplicated( + Map> underReplicatedPerTier, + SegmentReplicantLookup segmentReplicantLookup, + DataSegment segment + ) + { + getTieredReplicants().forEach((final String tier, final Integer ruleReplicants) -> { + int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier); + Object2LongMap underReplicationPerDataSource = underReplicatedPerTier.computeIfAbsent( + tier, + ignored -> new Object2LongOpenHashMap<>() + ); + ((Object2LongOpenHashMap) underReplicationPerDataSource).addTo( + segment.getDataSource(), + Math.max(ruleReplicants - currentReplicants, 0) + ); + }); + } + /** * @param stats {@link CoordinatorStats} to accumulate assignment statistics. */ diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java index d475f67d4c1..02c552f639c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java @@ -21,13 +21,18 @@ package org.apache.druid.server.coordinator.rules; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.base.Preconditions; +import it.unimi.dsi.fastutil.objects.Object2LongMap; import org.apache.druid.server.coordinator.CoordinatorStats; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.SegmentReplicantLookup; import org.apache.druid.timeline.DataSegment; import org.joda.time.DateTime; import org.joda.time.Interval; +import java.util.Map; + /** */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @@ -51,6 +56,27 @@ public interface Rule boolean appliesTo(Interval interval, DateTime referenceTimestamp); + /** + * Return true if this Rule can load segment onto one or more type of Druid node, otherwise return false. + * Any Rule that returns true for this method should implement logic for calculating segment under replicated + * in {@link Rule#updateUnderReplicated} + */ + boolean canLoadSegments(); + + /** + * This method should update the {@param underReplicatedPerTier} with the replication count of the + * {@param segment}. Rule that returns true for {@link Rule#canLoadSegments()} must override this method. + * Note that {@param underReplicatedPerTier} is a map of tier -> { dataSource -> underReplicationCount } + */ + default void updateUnderReplicated( + Map> underReplicatedPerTier, + SegmentReplicantLookup segmentReplicantLookup, + DataSegment segment + ) + { + Preconditions.checkArgument(!canLoadSegments()); + } + /** * {@link DruidCoordinatorRuntimeParams#getUsedSegments()} must not be called in Rule's code, because the used * segments are not specified for the {@link DruidCoordinatorRuntimeParams} passed into Rule's code. This is because diff --git a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java index fcebbdee820..cb1ee0425b6 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java @@ -39,7 +39,7 @@ public class ServerHolderTest { private static final List SEGMENTS = ImmutableList.of( new DataSegment( - "test", + "src1", Intervals.of("2015-04-12/2015-04-13"), "1", ImmutableMap.of("containerName", "container1", "blobPath", "blobPath1"), @@ -50,7 +50,7 @@ public class ServerHolderTest 1 ), new DataSegment( - "test", + "src2", Intervals.of("2015-04-12/2015-04-13"), "1", ImmutableMap.of("containerName", "container2", "blobPath", "blobPath2"), @@ -177,4 +177,22 @@ public class ServerHolderTest Assert.assertNotEquals(h1, h4); Assert.assertNotEquals(h1, h5); } + + @Test + public void testIsServingSegment() + { + final ServerHolder h1 = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host1", null, 100L, ServerType.HISTORICAL, "tier1", 0), + 0L, + ImmutableMap.of("src1", DATA_SOURCES.get("src1")), + 1 + ), + new LoadQueuePeonTester() + ); + Assert.assertTrue(h1.isServingSegment(SEGMENTS.get(0))); + Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1))); + Assert.assertTrue(h1.isServingSegment(SEGMENTS.get(0).getId())); + Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1).getId())); + } }