mirror of https://github.com/apache/druid.git
Merge pull request #2240 from metamx/fix-load-rule
Fix loadRule when one of the tiers had no available servers
This commit is contained in:
commit
ea623e43d2
|
@ -67,7 +67,7 @@ public abstract class LoadRule implements Rule
|
|||
final MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(tier);
|
||||
if (serverQueue == null) {
|
||||
log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit();
|
||||
return stats;
|
||||
continue;
|
||||
}
|
||||
|
||||
final List<ServerHolder> serverHolderList = Lists.newArrayList(serverQueue);
|
||||
|
@ -192,7 +192,7 @@ public abstract class LoadRule implements Rule
|
|||
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().get(tier);
|
||||
if (serverQueue == null) {
|
||||
log.makeAlert("No holders found for tier[%s]", entry.getKey()).emit();
|
||||
return stats;
|
||||
continue;
|
||||
}
|
||||
|
||||
List<ServerHolder> droppedServers = Lists.newArrayList();
|
||||
|
|
|
@ -19,13 +19,19 @@
|
|||
|
||||
package io.druid.server.coordinator.rules;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.MinMaxPriorityQueue;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.core.LoggingEmitter;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import io.druid.client.DruidServer;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.server.coordinator.CoordinatorStats;
|
||||
import io.druid.server.coordinator.DruidCluster;
|
||||
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
|
||||
|
@ -51,13 +57,29 @@ import java.util.Map;
|
|||
*/
|
||||
public class LoadRuleTest
|
||||
{
|
||||
private static final Logger log = new Logger(LoadRuleTest.class);
|
||||
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
|
||||
private static final ServiceEmitter emitter = new ServiceEmitter(
|
||||
"service",
|
||||
"host",
|
||||
new LoggingEmitter(
|
||||
log,
|
||||
LoggingEmitter.Level.ERROR,
|
||||
jsonMapper
|
||||
)
|
||||
);
|
||||
|
||||
private LoadQueuePeon mockPeon;
|
||||
private ReplicationThrottler throttler;
|
||||
private DataSegment segment;
|
||||
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
EmittingLogger.registerEmitter(emitter);
|
||||
emitter.start();
|
||||
mockPeon = EasyMock.createMock(LoadQueuePeon.class);
|
||||
throttler = new ReplicationThrottler(2, 1);
|
||||
for (String tier : Arrays.asList("hot", DruidServer.DEFAULT_TIER)) {
|
||||
|
@ -283,4 +305,182 @@ public class LoadRuleTest
|
|||
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1);
|
||||
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get(DruidServer.DEFAULT_TIER).get() == 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadWithNonExistentTier() throws Exception
|
||||
{
|
||||
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
|
||||
EasyMock.expectLastCall().atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
|
||||
EasyMock.replay(mockPeon);
|
||||
|
||||
LoadRule rule = new LoadRule()
|
||||
{
|
||||
private final Map<String, Integer> tiers = ImmutableMap.of(
|
||||
"nonExistentTier", 1,
|
||||
"hot", 1
|
||||
);
|
||||
|
||||
@Override
|
||||
public Map<String, Integer> getTieredReplicants()
|
||||
{
|
||||
return tiers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumReplicants(String tier)
|
||||
{
|
||||
return tiers.get(tier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType()
|
||||
{
|
||||
return "test";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
DruidCluster druidCluster = new DruidCluster(
|
||||
ImmutableMap.of(
|
||||
"hot",
|
||||
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
|
||||
Arrays.asList(
|
||||
new ServerHolder(
|
||||
new DruidServer(
|
||||
"serverHot",
|
||||
"hostHot",
|
||||
1000,
|
||||
"historical",
|
||||
"hot",
|
||||
0
|
||||
).toImmutableDruidServer(),
|
||||
mockPeon
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
CoordinatorStats stats = rule.run(
|
||||
null,
|
||||
DruidCoordinatorRuntimeParams.newBuilder()
|
||||
.withDruidCluster(druidCluster)
|
||||
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
|
||||
.withReplicationManager(throttler)
|
||||
.withAvailableSegments(Arrays.asList(segment)).build(),
|
||||
segment
|
||||
);
|
||||
|
||||
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDropWithNonExistentTier() throws Exception
|
||||
{
|
||||
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
|
||||
EasyMock.expectLastCall().atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes();
|
||||
EasyMock.replay(mockPeon);
|
||||
|
||||
LoadRule rule = new LoadRule()
|
||||
{
|
||||
private final Map<String, Integer> tiers = ImmutableMap.of(
|
||||
"nonExistentTier", 1,
|
||||
"hot", 1
|
||||
);
|
||||
|
||||
@Override
|
||||
public Map<String, Integer> getTieredReplicants()
|
||||
{
|
||||
return tiers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumReplicants(String tier)
|
||||
{
|
||||
return tiers.get(tier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType()
|
||||
{
|
||||
return "test";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
DruidServer server1 = new DruidServer(
|
||||
"serverHot",
|
||||
"hostHot",
|
||||
1000,
|
||||
"historical",
|
||||
"hot",
|
||||
0
|
||||
);
|
||||
DruidServer server2 = new DruidServer(
|
||||
"serverHo2t",
|
||||
"hostHot2",
|
||||
1000,
|
||||
"historical",
|
||||
"hot",
|
||||
0
|
||||
);
|
||||
server1.addDataSegment(segment.getIdentifier(), segment);
|
||||
server2.addDataSegment(segment.getIdentifier(), segment);
|
||||
|
||||
DruidCluster druidCluster = new DruidCluster(
|
||||
ImmutableMap.of(
|
||||
"hot",
|
||||
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
|
||||
Arrays.asList(
|
||||
new ServerHolder(
|
||||
server1.toImmutableDruidServer(),
|
||||
mockPeon
|
||||
),
|
||||
new ServerHolder(
|
||||
server2.toImmutableDruidServer(),
|
||||
mockPeon
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
CoordinatorStats stats = rule.run(
|
||||
null,
|
||||
DruidCoordinatorRuntimeParams.newBuilder()
|
||||
.withDruidCluster(druidCluster)
|
||||
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
|
||||
.withReplicationManager(throttler)
|
||||
.withAvailableSegments(Arrays.asList(segment)).build(),
|
||||
segment
|
||||
);
|
||||
|
||||
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue