fix bug with load throttling and more tests

This commit is contained in:
fjy 2014-02-04 15:21:45 -08:00
parent fc4cd79125
commit badc7b2e3f
4 changed files with 160 additions and 38 deletions

View File

@ -83,12 +83,22 @@ public class SegmentReplicantLookup
}
public Map<String, Integer> getLoadingTiers(String segmentId)
{
Map<String, Integer> retVal = loadingSegments.row(segmentId);
return (retVal == null) ? Maps.<String, Integer>newHashMap() : retVal;
}
{
Map<String, Integer> retVal = loadingSegments.row(segmentId);
return (retVal == null) ? Maps.<String, Integer>newHashMap() : retVal;
}
public int getClusterReplicants(String segmentId, String tier)
public int getLoadedReplicants(String segmentId)
{
Map<String, Integer> allTiers = segmentsInCluster.row(segmentId);
int retVal = 0;
for (Integer replicants : allTiers.values()) {
retVal += replicants;
}
return retVal;
}
public int getLoadedReplicants(String segmentId, String tier)
{
Integer retVal = segmentsInCluster.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
@ -100,8 +110,23 @@ public class SegmentReplicantLookup
return (retVal == null) ? 0 : retVal;
}
public int getLoadingReplicants(String segmentId)
{
Map<String, Integer> allTiers = loadingSegments.row(segmentId);
int retVal = 0;
for (Integer replicants : allTiers.values()) {
retVal += replicants;
}
return retVal;
}
public int getTotalReplicants(String segmentId)
{
return getLoadedReplicants(segmentId) + getLoadingReplicants(segmentId);
}
public int getTotalReplicants(String segmentId, String tier)
{
return getClusterReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier);
return getLoadedReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier);
}
}

View File

@ -42,6 +42,8 @@ import java.util.Map;
public abstract class LoadRule implements Rule
{
private static final EmittingLogger log = new EmittingLogger(LoadRule.class);
private static final String assignedCount = "assignedCount";
private static final String droppedCount = "droppedCount";
@Override
public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment)
@ -49,13 +51,17 @@ public abstract class LoadRule implements Rule
CoordinatorStats stats = new CoordinatorStats();
final Map<String, Integer> loadStatus = Maps.newHashMap();
int totalReplicantsInCluster = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier());
for (Map.Entry<String, Integer> entry : getTieredReplicants().entrySet()) {
final String tier = entry.getKey();
final int expectedReplicants = entry.getValue();
final int expectedReplicantsInTier = entry.getValue();
final int totalReplicantsInTier = params.getSegmentReplicantLookup()
.getTotalReplicants(segment.getIdentifier(), tier);
final int loadedReplicantsInTier = params.getSegmentReplicantLookup()
.getLoadedReplicants(segment.getIdentifier(), tier);
int totalReplicants = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), tier);
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(tier);
final MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(tier);
if (serverQueue == null) {
log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit();
return stats;
@ -65,22 +71,21 @@ public abstract class LoadRule implements Rule
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp);
if (params.getAvailableSegments().contains(segment)) {
stats.accumulate(
assign(
params.getReplicationManager(),
tier,
expectedReplicants,
totalReplicants,
strategy,
serverHolderList,
segment
)
CoordinatorStats assignStats = assign(
params.getReplicationManager(),
tier,
totalReplicantsInCluster,
expectedReplicantsInTier,
totalReplicantsInTier,
strategy,
serverHolderList,
segment
);
stats.accumulate(assignStats);
totalReplicantsInCluster += assignStats.getPerTierStats().get(assignedCount).get(tier).get();
}
int clusterReplicants = params.getSegmentReplicantLookup()
.getClusterReplicants(segment.getIdentifier(), tier);
loadStatus.put(tier, expectedReplicants - clusterReplicants);
loadStatus.put(tier, expectedReplicantsInTier - loadedReplicantsInTier);
}
// Remove over-replication
stats.accumulate(drop(loadStatus, segment, params));
@ -92,18 +97,21 @@ public abstract class LoadRule implements Rule
private CoordinatorStats assign(
final ReplicationThrottler replicationManager,
final String tier,
final int expectedReplicants,
int totalReplicants,
final int totalReplicantsInCluster,
final int expectedReplicantsInTier,
final int totalReplicantsInTier,
final BalancerStrategy strategy,
final List<ServerHolder> serverHolderList,
final DataSegment segment
)
{
final CoordinatorStats stats = new CoordinatorStats();
stats.addToTieredStat("assignedCount", tier, 0);
stats.addToTieredStat(assignedCount, tier, 0);
while (totalReplicants < expectedReplicants) {
boolean replicate = totalReplicants > 0;
int currReplicantsInTier = totalReplicantsInTier;
int currTotalReplicantsInCluster = totalReplicantsInCluster;
while (currReplicantsInTier < expectedReplicantsInTier) {
boolean replicate = currTotalReplicantsInCluster > 0;
if (replicate && !replicationManager.canCreateReplicant(tier)) {
break;
@ -116,7 +124,7 @@ public abstract class LoadRule implements Rule
"Not enough [%s] servers or node capacity to assign segment[%s]! Expected Replicants[%d]",
tier,
segment.getIdentifier(),
expectedReplicants
expectedReplicantsInTier
);
break;
}
@ -143,8 +151,9 @@ public abstract class LoadRule implements Rule
}
);
stats.addToTieredStat("assignedCount", tier, 1);
++totalReplicants;
stats.addToTieredStat(assignedCount, tier, 1);
++currReplicantsInTier;
++currTotalReplicantsInCluster;
}
return stats;
@ -162,7 +171,7 @@ public abstract class LoadRule implements Rule
return stats;
}
// Make sure we have enough actual replicants in the correct tiers in the cluster before doing anything
// Make sure we have enough loaded replicants in the correct tiers in the cluster before doing anything
for (Integer leftToLoad : loadStatus.values()) {
if (leftToLoad > 0) {
return stats;
@ -176,10 +185,10 @@ public abstract class LoadRule implements Rule
for (Map.Entry<String, Integer> entry : replicantsByTier.entrySet()) {
final String tier = entry.getKey();
int actualNumReplicantsForTier = entry.getValue();
int loadedNumReplicantsForTier = entry.getValue();
int expectedNumReplicantsForTier = getNumReplicants(tier);
stats.addToTieredStat("droppedCount", tier, 0);
stats.addToTieredStat(droppedCount, tier, 0);
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().get(tier);
if (serverQueue == null) {
@ -188,7 +197,7 @@ public abstract class LoadRule implements Rule
}
List<ServerHolder> droppedServers = Lists.newArrayList();
while (actualNumReplicantsForTier > expectedNumReplicantsForTier) {
while (loadedNumReplicantsForTier > expectedNumReplicantsForTier) {
final ServerHolder holder = serverQueue.pollLast();
if (holder == null) {
log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
@ -224,8 +233,8 @@ public abstract class LoadRule implements Rule
}
}
);
--actualNumReplicantsForTier;
stats.addToTieredStat("droppedCount", tier, 1);
--loadedNumReplicantsForTier;
stats.addToTieredStat(droppedCount, tier, 1);
}
droppedServers.add(holder);
}

View File

@ -979,6 +979,94 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.verify(mockPeon);
}
/**
* Nodes:
* hot - nothing loaded
* _default_tier - 1 segment loaded
*
* @throws Exception
*/
@Test
public void testReplicantThrottleAcrossTiers() throws Exception
{
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(
new Interval("2012-01-01T00:00:00.000Z/2013-01-01T00:00:00.000Z"),
ImmutableMap.<String, Integer>of(
"hot", 1,
DruidServer.DEFAULT_TIER, 1
),
null,
null
)
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
1000,
"historical",
"hot",
0
),
mockPeon
)
)
),
DruidServer.DEFAULT_TIER,
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
new DruidServer(
"serverNorm",
"hostNorm",
1000,
"historical",
DruidServer.DEFAULT_TIER,
0
),
mockPeon
)
)
)
)
);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build();
DruidCoordinatorRuleRunner runner = new DruidCoordinatorRuleRunner(new ReplicationThrottler(7, 1), coordinator);
DruidCoordinatorRuntimeParams afterParams = runner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 24);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get(DruidServer.DEFAULT_TIER).get() == 7);
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
EasyMock.verify(mockPeon);
}
@Test
public void testDropReplicantThrottle() throws Exception
{

View File

@ -59,7 +59,7 @@ public class LoadRuleTest
public void setUp() throws Exception
{
mockPeon = EasyMock.createMock(LoadQueuePeon.class);
throttler = new ReplicationThrottler(1, 1);
throttler = new ReplicationThrottler(2, 1);
for (String tier : Arrays.asList("hot", DruidServer.DEFAULT_TIER)) {
throttler.updateReplicationState(tier);
throttler.updateTerminationState(tier);