From 641469fc38ea90d5fbe8a3320891b5bd87e2a52f Mon Sep 17 00:00:00 2001 From: Himanshu Date: Mon, 24 Oct 2016 12:19:08 -0500 Subject: [PATCH] manage overshadowing efficiently at coordinator (#3584) * manage overshadowing efficiently at coordinator * take readlock in VersionedIntervalTimeline.isOvershadowed() --- .../timeline/VersionedIntervalTimeline.java | 43 +++++ .../VersionedIntervalTimelineTest.java | 150 ++++++++++++++++++ docs/content/design/coordinator.md | 2 +- .../DruidCoordinatorRuntimeParams.java | 19 +++ .../DruidCoordinatorCleanupOvershadowed.java | 43 +---- .../helper/DruidCoordinatorRuleRunner.java | 45 +++++- .../DruidCoordinatorRuleRunnerTest.java | 93 +++++++++++ ...uidCoordinatorCleanupOvershadowedTest.java | 13 +- 8 files changed, 362 insertions(+), 46 deletions(-) diff --git a/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java b/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java index ed472f9a46e..c03c52f4a82 100644 --- a/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java +++ b/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java @@ -25,6 +25,7 @@ import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import io.druid.java.util.common.guava.Comparators; +import io.druid.common.utils.JodaUtils; import io.druid.timeline.partition.ImmutablePartitionHolder; import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.PartitionHolder; @@ -265,6 +266,48 @@ public class VersionedIntervalTimeline implements Timel } } + public boolean isOvershadowed(Interval interval, VersionType version) + { + try { + lock.readLock().lock(); + + TimelineEntry entry = completePartitionsTimeline.get(interval); + if (entry != null) { + return versionComparator.compare(version, entry.getVersion()) < 0; + } + + Interval lower = completePartitionsTimeline.floorKey( + new Interval(interval.getStartMillis(), JodaUtils.MAX_INSTANT) + ); + + if (lower == null || !lower.overlaps(interval)) { + return false; + } + + Interval prev = null; + Interval curr = lower; + + do { + if (curr == null || //no further keys + (prev != null && curr.getStartMillis() > prev.getEndMillis()) || //a discontinuity + //lower or same version + versionComparator.compare(version, completePartitionsTimeline.get(curr).getVersion()) >= 0 + ) { + return false; + } + + prev = curr; + curr = completePartitionsTimeline.higherKey(curr); + + } while (interval.getEndMillis() > prev.getEndMillis()); + + return true; + } + finally { + lock.readLock().unlock(); + } + } + private void add( NavigableMap timeline, Interval interval, diff --git a/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java b/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java index e29e008a3e2..8b70b13b007 100644 --- a/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java +++ b/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java @@ -1567,6 +1567,156 @@ public class VersionedIntervalTimelineTest ); } + @Test + public void testIsOvershadowedWithNonOverlappingSegmentsInTimeline() + { + timeline = makeStringIntegerTimeline(); + + add("2011-04-05/2011-04-07", "1", new SingleElementPartitionChunk(1)); + add("2011-04-07/2011-04-09", "1", new SingleElementPartitionChunk(1)); + + add("2011-04-15/2011-04-17", "1", new SingleElementPartitionChunk(1)); + add("2011-04-17/2011-04-19", "1", new SingleElementPartitionChunk(1)); + + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-03"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-05"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-06"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-07"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-08"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-09"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-10"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-30"), "0")); + + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-06"), "0")); + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-07"), "0")); + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-08"), "0")); + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-09"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-06"), "1")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-07"), "1")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-08"), "1")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-09"), "1")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-06"), "2")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-07"), "2")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-08"), "2")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-09"), "2")); + + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-07"), "0")); + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-08"), "0")); + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-09"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-10"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-30"), "0")); + + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-07/2011-04-08"), "0")); + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-07/2011-04-09"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-07/2011-04-10"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-07/2011-04-30"), "0")); + + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-08/2011-04-09"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-08/2011-04-10"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-08/2011-04-30"), "0")); + + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-09/2011-04-10"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-09/2011-04-15"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-09/2011-04-17"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-09/2011-04-19"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-09/2011-04-30"), "0")); + + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-15/2011-04-16"), "0")); + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-15/2011-04-17"), "0")); + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-15/2011-04-18"), "0")); + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-15/2011-04-19"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-15/2011-04-20"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-15/2011-04-30"), "0")); + + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-19/2011-04-20"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-21/2011-04-22"), "0")); + } + + @Test + public void testIsOvershadowedWithOverlappingSegmentsInTimeline() + { + timeline = makeStringIntegerTimeline(); + + add("2011-04-05/2011-04-09", "11", new SingleElementPartitionChunk(1)); + add("2011-04-07/2011-04-11", "12", new SingleElementPartitionChunk(1)); + + add("2011-04-15/2011-04-19", "12", new SingleElementPartitionChunk(1)); + add("2011-04-17/2011-04-21", "11", new SingleElementPartitionChunk(1)); + + + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-03"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-05"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-06"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-07"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-08"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-09"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-10"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-11"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-30"), "0")); + + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-06"), "0")); + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-07"), "0")); + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-08"), "0")); + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-09"), "0")); + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-10"), "0")); + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-11"), "0")); + + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-06"), "12")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-07"), "12")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-08"), "12")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-09"), "12")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-10"), "12")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-11"), "12")); + + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-06"), "13")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-07"), "13")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-08"), "13")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-09"), "13")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-10"), "13")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-11"), "13")); + + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-12"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-15"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-16"), "0")); + + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-17"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-18"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-19"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-20"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-21"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-22"), "0")); + + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-07"), "0")); + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-08"), "0")); + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-09"), "0")); + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-10"), "0")); + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-11"), "0")); + + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-12"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-15"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-16"), "0")); + + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-17"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-18"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-19"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-20"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-21"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-22"), "0")); + + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-12/2011-04-15"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-12/2011-04-16"), "0")); + + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-12/2011-04-17"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-12/2011-04-18"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-12/2011-04-19"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-12/2011-04-20"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-12/2011-04-21"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-12/2011-04-22"), "0")); + + Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-15/2011-04-21"), "0")); + Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-21/2011-04-22"), "0")); + } + private Pair>> createExpected( String intervalString, String version, diff --git a/docs/content/design/coordinator.md b/docs/content/design/coordinator.md index 41d40d3c244..39fb060ecd5 100644 --- a/docs/content/design/coordinator.md +++ b/docs/content/design/coordinator.md @@ -239,7 +239,7 @@ Returns total size and count for each datasource for each interval within given * `/druid/coordinator/v1/datasources/{dataSourceName}` -Enables a datasource. +Enables all segments of datasource which are not overshadowed by others. * `/druid/coordinator/v1/datasources/{dataSourceName}/segments/{segmentId}` diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java index 695abc06e60..90f47319735 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java @@ -175,6 +175,25 @@ public class DruidCoordinatorRuntimeParams ); } + public Builder buildFromExistingWithoutAvailableSegments() + { + return new Builder( + startTime, + druidCluster, + databaseRuleManager, + segmentReplicantLookup, + dataSources, + Sets.newTreeSet(DruidCoordinator.SEGMENT_COMPARATOR), + loadManagementPeons, + replicationManager, + emitter, + coordinatorDynamicConfig, + stats, + balancerReferenceTimestamp, + strategyFactory + ); + } + public static class Builder { private long startTime; diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java index 3c057749edd..68a83c3a919 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java @@ -29,10 +29,8 @@ import io.druid.server.coordinator.CoordinatorStats; import io.druid.server.coordinator.DruidCluster; import io.druid.server.coordinator.DruidCoordinator; import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; -import io.druid.server.coordinator.LoadQueuePeon; import io.druid.server.coordinator.ServerHolder; import io.druid.timeline.DataSegment; -import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; import java.util.Map; @@ -77,41 +75,12 @@ public class DruidCoordinatorCleanupOvershadowed implements DruidCoordinatorHelp } } - for (VersionedIntervalTimeline timeline : timelines.values()) { - for (TimelineObjectHolder holder : timeline.findOvershadowed()) { - for (DataSegment dataSegment : holder.getObject().payloads()) { - coordinator.removeSegment(dataSegment); - stats.addToGlobalStat("overShadowedCount", 1); - } - } - - for (LoadQueuePeon loadQueue : coordinator.getLoadManagementPeons().values()) { - for (DataSegment dataSegment : loadQueue.getSegmentsToLoad()) { - timeline = timelines.get(dataSegment.getDataSource()); - if (timeline == null) { - continue; - } - // Temporarily add queued segments to the timeline to see if they still need to be loaded. - timeline.add( - dataSegment.getInterval(), - dataSegment.getVersion(), - dataSegment.getShardSpec().createChunk(dataSegment) - ); - for (TimelineObjectHolder holder : timeline.findOvershadowed()) { - for (DataSegment segmentToRemove : holder.getObject().payloads()) { - if (segmentToRemove == dataSegment) { - coordinator.removeSegment(dataSegment); - stats.addToGlobalStat("overShadowedCount", 1); - } - } - } - // Removing it to make sure that if two segment to load and they overshadow both get loaded. - timeline.remove( - dataSegment.getInterval(), - dataSegment.getVersion(), - dataSegment.getShardSpec().createChunk(dataSegment) - ); - } + //Remove all segments in db that are overshadowed by served segments + for (DataSegment dataSegment : params.getAvailableSegments()) { + VersionedIntervalTimeline timeline = timelines.get(dataSegment.getDataSource()); + if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) { + coordinator.removeSegment(dataSegment); + stats.addToGlobalStat("overShadowedCount", 1); } } } diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java index 0a88cbac1bc..bec45b8b6ce 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java @@ -20,6 +20,7 @@ package io.druid.server.coordinator.helper; import com.google.common.collect.Lists; +import com.metamx.common.guava.Comparators; import com.metamx.emitter.EmittingLogger; import io.druid.metadata.MetadataRuleManager; import io.druid.server.coordinator.CoordinatorStats; @@ -29,9 +30,15 @@ import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; import io.druid.server.coordinator.ReplicationThrottler; import io.druid.server.coordinator.rules.Rule; import io.druid.timeline.DataSegment; +import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.VersionedIntervalTimeline; import org.joda.time.DateTime; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; /** */ @@ -77,13 +84,46 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper return params; } + // find available segments which are not overshadowed by other segments in DB + // only those would need to be loaded/dropped + // anything overshadowed by served segments is dropped automatically by DruidCoordinatorCleanupOvershadowed + Map> timelines = new HashMap<>(); + for (DataSegment segment : params.getAvailableSegments()) { + VersionedIntervalTimeline timeline = timelines.get(segment.getDataSource()); + if (timeline == null) { + timeline = new VersionedIntervalTimeline<>(Comparators.comparable()); + timelines.put(segment.getDataSource(), timeline); + } + + timeline.add( + segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment) + ); + } + + Set overshadowed = new HashSet<>(); + for (VersionedIntervalTimeline timeline : timelines.values()) { + for (TimelineObjectHolder holder : timeline.findOvershadowed()) { + for (DataSegment dataSegment : holder.getObject().payloads()) { + overshadowed.add(dataSegment); + } + } + } + + Set nonOvershadowed = new HashSet<>(); + for (DataSegment dataSegment : params.getAvailableSegments()) { + if (!overshadowed.contains(dataSegment)) { + nonOvershadowed.add(dataSegment); + } + } + for (String tier : cluster.getTierNames()) { replicatorThrottler.updateReplicationState(tier); replicatorThrottler.updateTerminationState(tier); } - DruidCoordinatorRuntimeParams paramsWithReplicationManager = params.buildFromExisting() + DruidCoordinatorRuntimeParams paramsWithReplicationManager = params.buildFromExistingWithoutAvailableSegments() .withReplicationManager(replicatorThrottler) + .withAvailableSegments(nonOvershadowed) .build(); // Run through all matched rules for available segments @@ -118,8 +158,9 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper .emit(); } - return paramsWithReplicationManager.buildFromExisting() + return paramsWithReplicationManager.buildFromExistingWithoutAvailableSegments() .withCoordinatorStats(stats) + .withAvailableSegments(params.getAvailableSegments()) .build(); } } diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index 45a07b5ed53..273a77e3913 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -32,6 +32,7 @@ import io.druid.client.DruidServer; import io.druid.metadata.MetadataRuleManager; import io.druid.segment.IndexIO; import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner; +import io.druid.server.coordinator.rules.ForeverLoadRule; import io.druid.server.coordinator.rules.IntervalDropRule; import io.druid.server.coordinator.rules.IntervalLoadRule; import io.druid.server.coordinator.rules.Rule; @@ -46,7 +47,9 @@ import org.junit.Before; import org.junit.Test; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; /** */ @@ -1208,6 +1211,96 @@ public class DruidCoordinatorRuleRunnerTest params.getBalancerStrategyFactory().close(); } + @Test + public void testRulesRunOnNonOvershadowedSegmentsOnly() throws Exception + { + Set availableSegments = new HashSet<>(); + DataSegment v1 = new DataSegment( + "test", + new Interval("2012-01-01/2012-01-02"), + "1", + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + NoneShardSpec.instance(), + IndexIO.CURRENT_VERSION_ID, + 1 + ); + DataSegment v2 = new DataSegment( + "test", + new Interval("2012-01-01/2012-01-02"), + "2", + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + NoneShardSpec.instance(), + IndexIO.CURRENT_VERSION_ID, + 1 + ); + availableSegments.add(v1); + availableSegments.add(v2); + + mockCoordinator(); + mockPeon.loadSegment(EasyMock.eq(v2), EasyMock.anyObject()); + EasyMock.expectLastCall().once(); + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); + EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); + EasyMock.replay(mockPeon); + + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( + Lists.newArrayList( + new ForeverLoadRule(ImmutableMap.of(DruidServer.DEFAULT_TIER, 1)) + )).atLeastOnce(); + EasyMock.replay(databaseRuleManager); + + DruidCluster druidCluster = new DruidCluster( + ImmutableMap.of( + DruidServer.DEFAULT_TIER, + MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( + Arrays.asList( + new ServerHolder( + new DruidServer( + "serverHot", + "hostHot", + 1000, + "historical", + DruidServer.DEFAULT_TIER, + 0 + ).toImmutableDruidServer(), + mockPeon + ) + ) + ) + ) + ); + + DruidCoordinatorRuntimeParams params = + new DruidCoordinatorRuntimeParams.Builder() + .withDruidCluster(druidCluster) + .withAvailableSegments(availableSegments) + .withDatabaseRuleManager(databaseRuleManager) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) + .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) + .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(5).build()) + .build(); + + DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); + CoordinatorStats stats = afterParams.getCoordinatorStats(); + + Assert.assertEquals(1, stats.getPerTierStats().get("assignedCount").size()); + Assert.assertEquals(1, stats.getPerTierStats().get("assignedCount").get("_default_tier").get()); + Assert.assertNull(stats.getPerTierStats().get("unassignedCount")); + Assert.assertNull(stats.getPerTierStats().get("unassignedSize")); + + Assert.assertEquals(2, availableSegments.size()); + Assert.assertEquals(availableSegments, params.getAvailableSegments()); + Assert.assertEquals(availableSegments, afterParams.getAvailableSegments()); + + EasyMock.verify(mockPeon); + params.getBalancerStrategyFactory().close(); + } + private void mockCoordinator() { EasyMock.expect(coordinator.getDynamicConfigs()).andReturn( diff --git a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java index 919233f2d68..955d1de2996 100644 --- a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java +++ b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java @@ -59,11 +59,15 @@ public class DruidCoordinatorCleanupOvershadowedTest .interval(new Interval(start, start.plusHours(1))) .version("1") .build(); + private DataSegment segmentV2 = new DataSegment.Builder().dataSource("test") + .interval(new Interval(start, start.plusHours(1))) + .version("2") + .build(); @Test public void testRun() { druidCoordinatorCleanupOvershadowed = new DruidCoordinatorCleanupOvershadowed(coordinator); - availableSegments = ImmutableList.of(segmentV1, segmentV0); + availableSegments = ImmutableList.of(segmentV1, segmentV0, segmentV2); druidCluster = new DruidCluster( ImmutableMap.of("normal", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(Arrays.asList( @@ -73,14 +77,11 @@ public class DruidCoordinatorCleanupOvershadowedTest EasyMock.expect(druidServer.getDataSources()) .andReturn(ImmutableList.of(druidDataSource)) .anyTimes(); - EasyMock.expect(druidDataSource.getSegments()).andReturn(ImmutableSet.of(segmentV1)).anyTimes(); + EasyMock.expect(druidDataSource.getSegments()).andReturn(ImmutableSet.of(segmentV1, segmentV2)).anyTimes(); EasyMock.expect(druidDataSource.getName()).andReturn("test").anyTimes(); - EasyMock.expect(coordinator.getLoadManagementPeons()) - .andReturn(ImmutableMap.of("testHost", mockPeon)) - .anyTimes(); + coordinator.removeSegment(segmentV1); coordinator.removeSegment(segmentV0); EasyMock.expectLastCall(); - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(ImmutableSet.of(segmentV0)).anyTimes(); EasyMock.replay(mockPeon, coordinator, druidServer, druidDataSource); DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams.newBuilder() .withAvailableSegments(availableSegments)