diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index b76300f6947..3c39c54a740 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -67,7 +67,7 @@ public abstract class LoadRule implements Rule final MinMaxPriorityQueue serverQueue = params.getDruidCluster().getServersByTier(tier); if (serverQueue == null) { log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit(); - return stats; + continue; } final List serverHolderList = Lists.newArrayList(serverQueue); @@ -192,7 +192,7 @@ public abstract class LoadRule implements Rule MinMaxPriorityQueue serverQueue = params.getDruidCluster().get(tier); if (serverQueue == null) { log.makeAlert("No holders found for tier[%s]", entry.getKey()).emit(); - return stats; + continue; } List droppedServers = Lists.newArrayList(); diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java index d84c8cf8a67..d2d254ff414 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java @@ -19,13 +19,19 @@ package io.druid.server.coordinator.rules; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; +import com.metamx.common.logger.Logger; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.core.LoggingEmitter; +import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.DruidServer; +import io.druid.jackson.DefaultObjectMapper; import io.druid.server.coordinator.CoordinatorStats; import io.druid.server.coordinator.DruidCluster; import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; @@ -51,13 +57,29 @@ import java.util.Map; */ public class LoadRuleTest { + private static final Logger log = new Logger(LoadRuleTest.class); + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + private static final ServiceEmitter emitter = new ServiceEmitter( + "service", + "host", + new LoggingEmitter( + log, + LoggingEmitter.Level.ERROR, + jsonMapper + ) + ); + private LoadQueuePeon mockPeon; private ReplicationThrottler throttler; private DataSegment segment; + @Before public void setUp() throws Exception { + EmittingLogger.registerEmitter(emitter); + emitter.start(); mockPeon = EasyMock.createMock(LoadQueuePeon.class); throttler = new ReplicationThrottler(2, 1); for (String tier : Arrays.asList("hot", DruidServer.DEFAULT_TIER)) { @@ -283,4 +305,182 @@ public class LoadRuleTest Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1); Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get(DruidServer.DEFAULT_TIER).get() == 1); } + + @Test + public void testLoadWithNonExistentTier() throws Exception + { + mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); + EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); + EasyMock.replay(mockPeon); + + LoadRule rule = new LoadRule() + { + private final Map tiers = ImmutableMap.of( + "nonExistentTier", 1, + "hot", 1 + ); + + @Override + public Map getTieredReplicants() + { + return tiers; + } + + @Override + public int getNumReplicants(String tier) + { + return tiers.get(tier); + } + + @Override + public String getType() + { + return "test"; + } + + @Override + public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) + { + return true; + } + + @Override + public boolean appliesTo(Interval interval, DateTime referenceTimestamp) + { + return true; + } + }; + + DruidCluster druidCluster = new DruidCluster( + ImmutableMap.of( + "hot", + MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( + Arrays.asList( + new ServerHolder( + new DruidServer( + "serverHot", + "hostHot", + 1000, + "historical", + "hot", + 0 + ).toImmutableDruidServer(), + mockPeon + ) + ) + ) + ) + ); + + CoordinatorStats stats = rule.run( + null, + DruidCoordinatorRuntimeParams.newBuilder() + .withDruidCluster(druidCluster) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) + .withReplicationManager(throttler) + .withAvailableSegments(Arrays.asList(segment)).build(), + segment + ); + + Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1); + } + + @Test + public void testDropWithNonExistentTier() throws Exception + { + mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); + EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes(); + EasyMock.replay(mockPeon); + + LoadRule rule = new LoadRule() + { + private final Map tiers = ImmutableMap.of( + "nonExistentTier", 1, + "hot", 1 + ); + + @Override + public Map getTieredReplicants() + { + return tiers; + } + + @Override + public int getNumReplicants(String tier) + { + return tiers.get(tier); + } + + @Override + public String getType() + { + return "test"; + } + + @Override + public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) + { + return true; + } + + @Override + public boolean appliesTo(Interval interval, DateTime referenceTimestamp) + { + return true; + } + }; + + DruidServer server1 = new DruidServer( + "serverHot", + "hostHot", + 1000, + "historical", + "hot", + 0 + ); + DruidServer server2 = new DruidServer( + "serverHo2t", + "hostHot2", + 1000, + "historical", + "hot", + 0 + ); + server1.addDataSegment(segment.getIdentifier(), segment); + server2.addDataSegment(segment.getIdentifier(), segment); + + DruidCluster druidCluster = new DruidCluster( + ImmutableMap.of( + "hot", + MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( + Arrays.asList( + new ServerHolder( + server1.toImmutableDruidServer(), + mockPeon + ), + new ServerHolder( + server2.toImmutableDruidServer(), + mockPeon + ) + ) + ) + ) + ); + + CoordinatorStats stats = rule.run( + null, + DruidCoordinatorRuntimeParams.newBuilder() + .withDruidCluster(druidCluster) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) + .withReplicationManager(throttler) + .withAvailableSegments(Arrays.asList(segment)).build(), + segment + ); + + Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1); + } }