mirror of https://github.com/apache/druid.git
Fix loadRule when one of the tiers had no servers
When one of the tiers have no servers, LoadRule should ignore that tier and continue to load/drop segments in other available tiers. the bug also causes whacky behavior with LoadRule with non existent tier where the segment balancer keeps on moving segments to other nodes in existing tiers but the extra segment copies are never dropped eventually leading to all the tiers getting full .
This commit is contained in:
parent
5ace91fd7a
commit
32bc2f776e
|
@ -67,7 +67,7 @@ public abstract class LoadRule implements Rule
|
|||
final MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(tier);
|
||||
if (serverQueue == null) {
|
||||
log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit();
|
||||
return stats;
|
||||
continue;
|
||||
}
|
||||
|
||||
final List<ServerHolder> serverHolderList = Lists.newArrayList(serverQueue);
|
||||
|
@ -192,7 +192,7 @@ public abstract class LoadRule implements Rule
|
|||
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().get(tier);
|
||||
if (serverQueue == null) {
|
||||
log.makeAlert("No holders found for tier[%s]", entry.getKey()).emit();
|
||||
return stats;
|
||||
continue;
|
||||
}
|
||||
|
||||
List<ServerHolder> droppedServers = Lists.newArrayList();
|
||||
|
|
|
@ -19,13 +19,19 @@
|
|||
|
||||
package io.druid.server.coordinator.rules;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.MinMaxPriorityQueue;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.core.LoggingEmitter;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import io.druid.client.DruidServer;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.server.coordinator.CoordinatorStats;
|
||||
import io.druid.server.coordinator.DruidCluster;
|
||||
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
|
||||
|
@ -51,13 +57,29 @@ import java.util.Map;
|
|||
*/
|
||||
public class LoadRuleTest
|
||||
{
|
||||
private static final Logger log = new Logger(LoadRuleTest.class);
|
||||
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
|
||||
private static final ServiceEmitter emitter = new ServiceEmitter(
|
||||
"service",
|
||||
"host",
|
||||
new LoggingEmitter(
|
||||
log,
|
||||
LoggingEmitter.Level.ERROR,
|
||||
jsonMapper
|
||||
)
|
||||
);
|
||||
|
||||
private LoadQueuePeon mockPeon;
|
||||
private ReplicationThrottler throttler;
|
||||
private DataSegment segment;
|
||||
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
EmittingLogger.registerEmitter(emitter);
|
||||
emitter.start();
|
||||
mockPeon = EasyMock.createMock(LoadQueuePeon.class);
|
||||
throttler = new ReplicationThrottler(2, 1);
|
||||
for (String tier : Arrays.asList("hot", DruidServer.DEFAULT_TIER)) {
|
||||
|
@ -283,4 +305,182 @@ public class LoadRuleTest
|
|||
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1);
|
||||
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get(DruidServer.DEFAULT_TIER).get() == 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadWithNonExistentTier() throws Exception
|
||||
{
|
||||
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
|
||||
EasyMock.expectLastCall().atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
|
||||
EasyMock.replay(mockPeon);
|
||||
|
||||
LoadRule rule = new LoadRule()
|
||||
{
|
||||
private final Map<String, Integer> tiers = ImmutableMap.of(
|
||||
"nonExistentTier", 1,
|
||||
"hot", 1
|
||||
);
|
||||
|
||||
@Override
|
||||
public Map<String, Integer> getTieredReplicants()
|
||||
{
|
||||
return tiers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumReplicants(String tier)
|
||||
{
|
||||
return tiers.get(tier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType()
|
||||
{
|
||||
return "test";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
DruidCluster druidCluster = new DruidCluster(
|
||||
ImmutableMap.of(
|
||||
"hot",
|
||||
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
|
||||
Arrays.asList(
|
||||
new ServerHolder(
|
||||
new DruidServer(
|
||||
"serverHot",
|
||||
"hostHot",
|
||||
1000,
|
||||
"historical",
|
||||
"hot",
|
||||
0
|
||||
).toImmutableDruidServer(),
|
||||
mockPeon
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
CoordinatorStats stats = rule.run(
|
||||
null,
|
||||
DruidCoordinatorRuntimeParams.newBuilder()
|
||||
.withDruidCluster(druidCluster)
|
||||
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
|
||||
.withReplicationManager(throttler)
|
||||
.withAvailableSegments(Arrays.asList(segment)).build(),
|
||||
segment
|
||||
);
|
||||
|
||||
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDropWithNonExistentTier() throws Exception
|
||||
{
|
||||
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
|
||||
EasyMock.expectLastCall().atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes();
|
||||
EasyMock.replay(mockPeon);
|
||||
|
||||
LoadRule rule = new LoadRule()
|
||||
{
|
||||
private final Map<String, Integer> tiers = ImmutableMap.of(
|
||||
"nonExistentTier", 1,
|
||||
"hot", 1
|
||||
);
|
||||
|
||||
@Override
|
||||
public Map<String, Integer> getTieredReplicants()
|
||||
{
|
||||
return tiers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumReplicants(String tier)
|
||||
{
|
||||
return tiers.get(tier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType()
|
||||
{
|
||||
return "test";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
DruidServer server1 = new DruidServer(
|
||||
"serverHot",
|
||||
"hostHot",
|
||||
1000,
|
||||
"historical",
|
||||
"hot",
|
||||
0
|
||||
);
|
||||
DruidServer server2 = new DruidServer(
|
||||
"serverHo2t",
|
||||
"hostHot2",
|
||||
1000,
|
||||
"historical",
|
||||
"hot",
|
||||
0
|
||||
);
|
||||
server1.addDataSegment(segment.getIdentifier(), segment);
|
||||
server2.addDataSegment(segment.getIdentifier(), segment);
|
||||
|
||||
DruidCluster druidCluster = new DruidCluster(
|
||||
ImmutableMap.of(
|
||||
"hot",
|
||||
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
|
||||
Arrays.asList(
|
||||
new ServerHolder(
|
||||
server1.toImmutableDruidServer(),
|
||||
mockPeon
|
||||
),
|
||||
new ServerHolder(
|
||||
server2.toImmutableDruidServer(),
|
||||
mockPeon
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
CoordinatorStats stats = rule.run(
|
||||
null,
|
||||
DruidCoordinatorRuntimeParams.newBuilder()
|
||||
.withDruidCluster(druidCluster)
|
||||
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
|
||||
.withReplicationManager(throttler)
|
||||
.withAvailableSegments(Arrays.asList(segment)).build(),
|
||||
segment
|
||||
);
|
||||
|
||||
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue