From 32bc2f776ec28f4967ec9c40226b76839195b2bc Mon Sep 17 00:00:00 2001 From: Nishant Date: Mon, 11 Jan 2016 15:53:14 +0530 Subject: [PATCH] Fix loadRule when one of the tiers had no servers When one of the tiers have no servers, LoadRule should ignore that tier and continue to load/drop segments in other available tiers. the bug also causes whacky behavior with LoadRule with non existent tier where the segment balancer keeps on moving segments to other nodes in existing tiers but the extra segment copies are never dropped eventually leading to all the tiers getting full . --- .../server/coordinator/rules/LoadRule.java | 4 +- .../coordinator/rules/LoadRuleTest.java | 200 ++++++++++++++++++ 2 files changed, 202 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index b76300f6947..3c39c54a740 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -67,7 +67,7 @@ public abstract class LoadRule implements Rule final MinMaxPriorityQueue serverQueue = params.getDruidCluster().getServersByTier(tier); if (serverQueue == null) { log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit(); - return stats; + continue; } final List serverHolderList = Lists.newArrayList(serverQueue); @@ -192,7 +192,7 @@ public abstract class LoadRule implements Rule MinMaxPriorityQueue serverQueue = params.getDruidCluster().get(tier); if (serverQueue == null) { log.makeAlert("No holders found for tier[%s]", entry.getKey()).emit(); - return stats; + continue; } List droppedServers = Lists.newArrayList(); diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java index d84c8cf8a67..d2d254ff414 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java @@ -19,13 +19,19 @@ package io.druid.server.coordinator.rules; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; +import com.metamx.common.logger.Logger; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.core.LoggingEmitter; +import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.DruidServer; +import io.druid.jackson.DefaultObjectMapper; import io.druid.server.coordinator.CoordinatorStats; import io.druid.server.coordinator.DruidCluster; import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; @@ -51,13 +57,29 @@ import java.util.Map; */ public class LoadRuleTest { + private static final Logger log = new Logger(LoadRuleTest.class); + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + private static final ServiceEmitter emitter = new ServiceEmitter( + "service", + "host", + new LoggingEmitter( + log, + LoggingEmitter.Level.ERROR, + jsonMapper + ) + ); + private LoadQueuePeon mockPeon; private ReplicationThrottler throttler; private DataSegment segment; + @Before public void setUp() throws Exception { + EmittingLogger.registerEmitter(emitter); + emitter.start(); mockPeon = EasyMock.createMock(LoadQueuePeon.class); throttler = new ReplicationThrottler(2, 1); for (String tier : Arrays.asList("hot", DruidServer.DEFAULT_TIER)) { @@ -283,4 +305,182 @@ public class LoadRuleTest Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1); Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get(DruidServer.DEFAULT_TIER).get() == 1); } + + @Test + public void testLoadWithNonExistentTier() throws Exception + { + mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); + EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); + EasyMock.replay(mockPeon); + + LoadRule rule = new LoadRule() + { + private final Map tiers = ImmutableMap.of( + "nonExistentTier", 1, + "hot", 1 + ); + + @Override + public Map getTieredReplicants() + { + return tiers; + } + + @Override + public int getNumReplicants(String tier) + { + return tiers.get(tier); + } + + @Override + public String getType() + { + return "test"; + } + + @Override + public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) + { + return true; + } + + @Override + public boolean appliesTo(Interval interval, DateTime referenceTimestamp) + { + return true; + } + }; + + DruidCluster druidCluster = new DruidCluster( + ImmutableMap.of( + "hot", + MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( + Arrays.asList( + new ServerHolder( + new DruidServer( + "serverHot", + "hostHot", + 1000, + "historical", + "hot", + 0 + ).toImmutableDruidServer(), + mockPeon + ) + ) + ) + ) + ); + + CoordinatorStats stats = rule.run( + null, + DruidCoordinatorRuntimeParams.newBuilder() + .withDruidCluster(druidCluster) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) + .withReplicationManager(throttler) + .withAvailableSegments(Arrays.asList(segment)).build(), + segment + ); + + Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1); + } + + @Test + public void testDropWithNonExistentTier() throws Exception + { + mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); + EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes(); + EasyMock.replay(mockPeon); + + LoadRule rule = new LoadRule() + { + private final Map tiers = ImmutableMap.of( + "nonExistentTier", 1, + "hot", 1 + ); + + @Override + public Map getTieredReplicants() + { + return tiers; + } + + @Override + public int getNumReplicants(String tier) + { + return tiers.get(tier); + } + + @Override + public String getType() + { + return "test"; + } + + @Override + public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) + { + return true; + } + + @Override + public boolean appliesTo(Interval interval, DateTime referenceTimestamp) + { + return true; + } + }; + + DruidServer server1 = new DruidServer( + "serverHot", + "hostHot", + 1000, + "historical", + "hot", + 0 + ); + DruidServer server2 = new DruidServer( + "serverHo2t", + "hostHot2", + 1000, + "historical", + "hot", + 0 + ); + server1.addDataSegment(segment.getIdentifier(), segment); + server2.addDataSegment(segment.getIdentifier(), segment); + + DruidCluster druidCluster = new DruidCluster( + ImmutableMap.of( + "hot", + MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( + Arrays.asList( + new ServerHolder( + server1.toImmutableDruidServer(), + mockPeon + ), + new ServerHolder( + server2.toImmutableDruidServer(), + mockPeon + ) + ) + ) + ) + ); + + CoordinatorStats stats = rule.run( + null, + DruidCoordinatorRuntimeParams.newBuilder() + .withDruidCluster(druidCluster) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) + .withReplicationManager(throttler) + .withAvailableSegments(Arrays.asList(segment)).build(), + segment + ); + + Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1); + } }