manage overshadowing efficiently at coordinator (#3584)

* manage overshadowing efficiently at coordinator

* take readlock in VersionedIntervalTimeline.isOvershadowed()
This commit is contained in:
Himanshu 2016-10-24 12:19:08 -05:00 committed by Nishant
parent 9bb735133f
commit 641469fc38
8 changed files with 362 additions and 46 deletions

View File

@ -25,6 +25,7 @@ import com.google.common.collect.Ordering;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import io.druid.java.util.common.guava.Comparators; import io.druid.java.util.common.guava.Comparators;
import io.druid.common.utils.JodaUtils;
import io.druid.timeline.partition.ImmutablePartitionHolder; import io.druid.timeline.partition.ImmutablePartitionHolder;
import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.PartitionHolder; import io.druid.timeline.partition.PartitionHolder;
@ -265,6 +266,48 @@ public class VersionedIntervalTimeline<VersionType, ObjectType> implements Timel
} }
} }
public boolean isOvershadowed(Interval interval, VersionType version)
{
try {
lock.readLock().lock();
TimelineEntry entry = completePartitionsTimeline.get(interval);
if (entry != null) {
return versionComparator.compare(version, entry.getVersion()) < 0;
}
Interval lower = completePartitionsTimeline.floorKey(
new Interval(interval.getStartMillis(), JodaUtils.MAX_INSTANT)
);
if (lower == null || !lower.overlaps(interval)) {
return false;
}
Interval prev = null;
Interval curr = lower;
do {
if (curr == null || //no further keys
(prev != null && curr.getStartMillis() > prev.getEndMillis()) || //a discontinuity
//lower or same version
versionComparator.compare(version, completePartitionsTimeline.get(curr).getVersion()) >= 0
) {
return false;
}
prev = curr;
curr = completePartitionsTimeline.higherKey(curr);
} while (interval.getEndMillis() > prev.getEndMillis());
return true;
}
finally {
lock.readLock().unlock();
}
}
private void add( private void add(
NavigableMap<Interval, TimelineEntry> timeline, NavigableMap<Interval, TimelineEntry> timeline,
Interval interval, Interval interval,

View File

@ -1567,6 +1567,156 @@ public class VersionedIntervalTimelineTest
); );
} }
@Test
public void testIsOvershadowedWithNonOverlappingSegmentsInTimeline()
{
timeline = makeStringIntegerTimeline();
add("2011-04-05/2011-04-07", "1", new SingleElementPartitionChunk<Integer>(1));
add("2011-04-07/2011-04-09", "1", new SingleElementPartitionChunk<Integer>(1));
add("2011-04-15/2011-04-17", "1", new SingleElementPartitionChunk<Integer>(1));
add("2011-04-17/2011-04-19", "1", new SingleElementPartitionChunk<Integer>(1));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-03"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-05"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-06"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-07"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-08"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-09"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-10"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-30"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-06"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-07"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-08"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-09"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-06"), "1"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-07"), "1"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-08"), "1"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-09"), "1"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-06"), "2"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-07"), "2"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-08"), "2"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-09"), "2"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-07"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-08"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-09"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-10"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-30"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-07/2011-04-08"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-07/2011-04-09"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-07/2011-04-10"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-07/2011-04-30"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-08/2011-04-09"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-08/2011-04-10"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-08/2011-04-30"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-09/2011-04-10"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-09/2011-04-15"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-09/2011-04-17"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-09/2011-04-19"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-09/2011-04-30"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-15/2011-04-16"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-15/2011-04-17"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-15/2011-04-18"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-15/2011-04-19"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-15/2011-04-20"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-15/2011-04-30"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-19/2011-04-20"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-21/2011-04-22"), "0"));
}
@Test
public void testIsOvershadowedWithOverlappingSegmentsInTimeline()
{
timeline = makeStringIntegerTimeline();
add("2011-04-05/2011-04-09", "11", new SingleElementPartitionChunk<Integer>(1));
add("2011-04-07/2011-04-11", "12", new SingleElementPartitionChunk<Integer>(1));
add("2011-04-15/2011-04-19", "12", new SingleElementPartitionChunk<Integer>(1));
add("2011-04-17/2011-04-21", "11", new SingleElementPartitionChunk<Integer>(1));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-03"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-05"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-06"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-07"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-08"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-09"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-10"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-11"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-01/2011-04-30"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-06"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-07"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-08"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-09"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-10"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-11"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-06"), "12"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-07"), "12"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-08"), "12"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-09"), "12"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-10"), "12"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-11"), "12"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-06"), "13"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-07"), "13"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-08"), "13"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-09"), "13"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-10"), "13"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-11"), "13"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-12"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-15"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-16"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-17"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-18"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-19"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-20"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-21"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-05/2011-04-22"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-07"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-08"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-09"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-10"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-11"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-12"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-15"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-16"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-17"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-18"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-19"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-20"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-21"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-06/2011-04-22"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-12/2011-04-15"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-12/2011-04-16"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-12/2011-04-17"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-12/2011-04-18"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-12/2011-04-19"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-12/2011-04-20"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-12/2011-04-21"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-12/2011-04-22"), "0"));
Assert.assertTrue(timeline.isOvershadowed(new Interval("2011-04-15/2011-04-21"), "0"));
Assert.assertFalse(timeline.isOvershadowed(new Interval("2011-04-21/2011-04-22"), "0"));
}
private Pair<Interval, Pair<String, PartitionHolder<Integer>>> createExpected( private Pair<Interval, Pair<String, PartitionHolder<Integer>>> createExpected(
String intervalString, String intervalString,
String version, String version,

View File

@ -239,7 +239,7 @@ Returns total size and count for each datasource for each interval within given
* `/druid/coordinator/v1/datasources/{dataSourceName}` * `/druid/coordinator/v1/datasources/{dataSourceName}`
Enables a datasource. Enables all segments of datasource which are not overshadowed by others.
* `/druid/coordinator/v1/datasources/{dataSourceName}/segments/{segmentId}` * `/druid/coordinator/v1/datasources/{dataSourceName}/segments/{segmentId}`

View File

@ -175,6 +175,25 @@ public class DruidCoordinatorRuntimeParams
); );
} }
public Builder buildFromExistingWithoutAvailableSegments()
{
return new Builder(
startTime,
druidCluster,
databaseRuleManager,
segmentReplicantLookup,
dataSources,
Sets.newTreeSet(DruidCoordinator.SEGMENT_COMPARATOR),
loadManagementPeons,
replicationManager,
emitter,
coordinatorDynamicConfig,
stats,
balancerReferenceTimestamp,
strategyFactory
);
}
public static class Builder public static class Builder
{ {
private long startTime; private long startTime;

View File

@ -29,10 +29,8 @@ import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster; import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinator; import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.LoadQueuePeon;
import io.druid.server.coordinator.ServerHolder; import io.druid.server.coordinator.ServerHolder;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.VersionedIntervalTimeline;
import java.util.Map; import java.util.Map;
@ -77,43 +75,14 @@ public class DruidCoordinatorCleanupOvershadowed implements DruidCoordinatorHelp
} }
} }
for (VersionedIntervalTimeline<String, DataSegment> timeline : timelines.values()) { //Remove all segments in db that are overshadowed by served segments
for (TimelineObjectHolder<String, DataSegment> holder : timeline.findOvershadowed()) { for (DataSegment dataSegment : params.getAvailableSegments()) {
for (DataSegment dataSegment : holder.getObject().payloads()) { VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(dataSegment.getDataSource());
if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) {
coordinator.removeSegment(dataSegment); coordinator.removeSegment(dataSegment);
stats.addToGlobalStat("overShadowedCount", 1); stats.addToGlobalStat("overShadowedCount", 1);
} }
} }
for (LoadQueuePeon loadQueue : coordinator.getLoadManagementPeons().values()) {
for (DataSegment dataSegment : loadQueue.getSegmentsToLoad()) {
timeline = timelines.get(dataSegment.getDataSource());
if (timeline == null) {
continue;
}
// Temporarily add queued segments to the timeline to see if they still need to be loaded.
timeline.add(
dataSegment.getInterval(),
dataSegment.getVersion(),
dataSegment.getShardSpec().createChunk(dataSegment)
);
for (TimelineObjectHolder<String, DataSegment> holder : timeline.findOvershadowed()) {
for (DataSegment segmentToRemove : holder.getObject().payloads()) {
if (segmentToRemove == dataSegment) {
coordinator.removeSegment(dataSegment);
stats.addToGlobalStat("overShadowedCount", 1);
}
}
}
// Removing it to make sure that if two segment to load and they overshadow both get loaded.
timeline.remove(
dataSegment.getInterval(),
dataSegment.getVersion(),
dataSegment.getShardSpec().createChunk(dataSegment)
);
}
}
}
} }
return params.buildFromExisting() return params.buildFromExisting()
.withCoordinatorStats(stats) .withCoordinatorStats(stats)

View File

@ -20,6 +20,7 @@
package io.druid.server.coordinator.helper; package io.druid.server.coordinator.helper;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.guava.Comparators;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.metadata.MetadataRuleManager; import io.druid.metadata.MetadataRuleManager;
import io.druid.server.coordinator.CoordinatorStats; import io.druid.server.coordinator.CoordinatorStats;
@ -29,9 +30,15 @@ import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.ReplicationThrottler; import io.druid.server.coordinator.ReplicationThrottler;
import io.druid.server.coordinator.rules.Rule; import io.druid.server.coordinator.rules.Rule;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set;
/** /**
*/ */
@ -77,13 +84,46 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
return params; return params;
} }
// find available segments which are not overshadowed by other segments in DB
// only those would need to be loaded/dropped
// anything overshadowed by served segments is dropped automatically by DruidCoordinatorCleanupOvershadowed
Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = new HashMap<>();
for (DataSegment segment : params.getAvailableSegments()) {
VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(segment.getDataSource());
if (timeline == null) {
timeline = new VersionedIntervalTimeline<>(Comparators.comparable());
timelines.put(segment.getDataSource(), timeline);
}
timeline.add(
segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)
);
}
Set<DataSegment> overshadowed = new HashSet<>();
for (VersionedIntervalTimeline<String, DataSegment> timeline : timelines.values()) {
for (TimelineObjectHolder<String, DataSegment> holder : timeline.findOvershadowed()) {
for (DataSegment dataSegment : holder.getObject().payloads()) {
overshadowed.add(dataSegment);
}
}
}
Set<DataSegment> nonOvershadowed = new HashSet<>();
for (DataSegment dataSegment : params.getAvailableSegments()) {
if (!overshadowed.contains(dataSegment)) {
nonOvershadowed.add(dataSegment);
}
}
for (String tier : cluster.getTierNames()) { for (String tier : cluster.getTierNames()) {
replicatorThrottler.updateReplicationState(tier); replicatorThrottler.updateReplicationState(tier);
replicatorThrottler.updateTerminationState(tier); replicatorThrottler.updateTerminationState(tier);
} }
DruidCoordinatorRuntimeParams paramsWithReplicationManager = params.buildFromExisting() DruidCoordinatorRuntimeParams paramsWithReplicationManager = params.buildFromExistingWithoutAvailableSegments()
.withReplicationManager(replicatorThrottler) .withReplicationManager(replicatorThrottler)
.withAvailableSegments(nonOvershadowed)
.build(); .build();
// Run through all matched rules for available segments // Run through all matched rules for available segments
@ -118,8 +158,9 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
.emit(); .emit();
} }
return paramsWithReplicationManager.buildFromExisting() return paramsWithReplicationManager.buildFromExistingWithoutAvailableSegments()
.withCoordinatorStats(stats) .withCoordinatorStats(stats)
.withAvailableSegments(params.getAvailableSegments())
.build(); .build();
} }
} }

View File

@ -32,6 +32,7 @@ import io.druid.client.DruidServer;
import io.druid.metadata.MetadataRuleManager; import io.druid.metadata.MetadataRuleManager;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner; import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
import io.druid.server.coordinator.rules.ForeverLoadRule;
import io.druid.server.coordinator.rules.IntervalDropRule; import io.druid.server.coordinator.rules.IntervalDropRule;
import io.druid.server.coordinator.rules.IntervalLoadRule; import io.druid.server.coordinator.rules.IntervalLoadRule;
import io.druid.server.coordinator.rules.Rule; import io.druid.server.coordinator.rules.Rule;
@ -46,7 +47,9 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
/** /**
*/ */
@ -1208,6 +1211,96 @@ public class DruidCoordinatorRuleRunnerTest
params.getBalancerStrategyFactory().close(); params.getBalancerStrategyFactory().close();
} }
@Test
public void testRulesRunOnNonOvershadowedSegmentsOnly() throws Exception
{
Set<DataSegment> availableSegments = new HashSet<>();
DataSegment v1 = new DataSegment(
"test",
new Interval("2012-01-01/2012-01-02"),
"1",
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
NoneShardSpec.instance(),
IndexIO.CURRENT_VERSION_ID,
1
);
DataSegment v2 = new DataSegment(
"test",
new Interval("2012-01-01/2012-01-02"),
"2",
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
NoneShardSpec.instance(),
IndexIO.CURRENT_VERSION_ID,
1
);
availableSegments.add(v1);
availableSegments.add(v2);
mockCoordinator();
mockPeon.loadSegment(EasyMock.eq(v2), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().once();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new ForeverLoadRule(ImmutableMap.of(DruidServer.DEFAULT_TIER, 1))
)).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
DruidServer.DEFAULT_TIER,
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
1000,
"historical",
DruidServer.DEFAULT_TIER,
0
).toImmutableDruidServer(),
mockPeon
)
)
)
)
);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(5).build())
.build();
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertEquals(1, stats.getPerTierStats().get("assignedCount").size());
Assert.assertEquals(1, stats.getPerTierStats().get("assignedCount").get("_default_tier").get());
Assert.assertNull(stats.getPerTierStats().get("unassignedCount"));
Assert.assertNull(stats.getPerTierStats().get("unassignedSize"));
Assert.assertEquals(2, availableSegments.size());
Assert.assertEquals(availableSegments, params.getAvailableSegments());
Assert.assertEquals(availableSegments, afterParams.getAvailableSegments());
EasyMock.verify(mockPeon);
params.getBalancerStrategyFactory().close();
}
private void mockCoordinator() private void mockCoordinator()
{ {
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn( EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(

View File

@ -59,11 +59,15 @@ public class DruidCoordinatorCleanupOvershadowedTest
.interval(new Interval(start, start.plusHours(1))) .interval(new Interval(start, start.plusHours(1)))
.version("1") .version("1")
.build(); .build();
private DataSegment segmentV2 = new DataSegment.Builder().dataSource("test")
.interval(new Interval(start, start.plusHours(1)))
.version("2")
.build();
@Test @Test
public void testRun() public void testRun()
{ {
druidCoordinatorCleanupOvershadowed = new DruidCoordinatorCleanupOvershadowed(coordinator); druidCoordinatorCleanupOvershadowed = new DruidCoordinatorCleanupOvershadowed(coordinator);
availableSegments = ImmutableList.of(segmentV1, segmentV0); availableSegments = ImmutableList.of(segmentV1, segmentV0, segmentV2);
druidCluster = new DruidCluster( druidCluster = new DruidCluster(
ImmutableMap.of("normal", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(Arrays.asList( ImmutableMap.of("normal", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(Arrays.asList(
@ -73,14 +77,11 @@ public class DruidCoordinatorCleanupOvershadowedTest
EasyMock.expect(druidServer.getDataSources()) EasyMock.expect(druidServer.getDataSources())
.andReturn(ImmutableList.of(druidDataSource)) .andReturn(ImmutableList.of(druidDataSource))
.anyTimes(); .anyTimes();
EasyMock.expect(druidDataSource.getSegments()).andReturn(ImmutableSet.<DataSegment>of(segmentV1)).anyTimes(); EasyMock.expect(druidDataSource.getSegments()).andReturn(ImmutableSet.<DataSegment>of(segmentV1, segmentV2)).anyTimes();
EasyMock.expect(druidDataSource.getName()).andReturn("test").anyTimes(); EasyMock.expect(druidDataSource.getName()).andReturn("test").anyTimes();
EasyMock.expect(coordinator.getLoadManagementPeons()) coordinator.removeSegment(segmentV1);
.andReturn(ImmutableMap.<String, LoadQueuePeon>of("testHost", mockPeon))
.anyTimes();
coordinator.removeSegment(segmentV0); coordinator.removeSegment(segmentV0);
EasyMock.expectLastCall(); EasyMock.expectLastCall();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(ImmutableSet.<DataSegment>of(segmentV0)).anyTimes();
EasyMock.replay(mockPeon, coordinator, druidServer, druidDataSource); EasyMock.replay(mockPeon, coordinator, druidServer, druidDataSource);
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams.newBuilder() DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams.newBuilder()
.withAvailableSegments(availableSegments) .withAvailableSegments(availableSegments)