bug fixes again in load and drop

This commit is contained in:
Fangjin Yang 2012-12-15 00:43:16 -08:00
parent 5fff07a8b0
commit 585a812f6e
3 changed files with 54 additions and 25 deletions

View File

@ -24,7 +24,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Table;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.client.DruidServer;
import java.util.Map;
@ -37,6 +36,7 @@ public class SegmentReplicantLookup
public static SegmentReplicantLookup make(DruidCluster cluster)
{
final Table<String, String, Integer> segmentsInCluster = HashBasedTable.create();
final Table<String, String, Integer> loadingSegments = HashBasedTable.create();
for (MinMaxPriorityQueue<ServerHolder> serversByType : cluster.getSortedServersByTier()) {
for (ServerHolder serverHolder : serversByType) {
@ -49,28 +49,59 @@ public class SegmentReplicantLookup
}
segmentsInCluster.put(segment.getIdentifier(), server.getTier(), ++numReplicants);
}
// Also account for queued segments
for (DataSegment segment : serverHolder.getPeon().getSegmentsToLoad()) {
Integer numReplicants = segmentsInCluster.get(segment.getIdentifier(), server.getTier());
if (numReplicants == null) {
numReplicants = 0;
}
loadingSegments.put(segment.getIdentifier(), server.getTier(), ++numReplicants);
}
}
}
return new SegmentReplicantLookup(segmentsInCluster);
return new SegmentReplicantLookup(segmentsInCluster, loadingSegments);
}
private final Table<String, String, Integer> table;
private final Table<String, String, Integer> segmentsInCluster;
private final Table<String, String, Integer> loadingSegments;
private SegmentReplicantLookup(Table<String, String, Integer> table)
private SegmentReplicantLookup(
Table<String, String, Integer> segmentsInCluster,
Table<String, String, Integer> loadingSegments
)
{
this.table = table;
this.segmentsInCluster = segmentsInCluster;
this.loadingSegments = loadingSegments;
}
public Map<String, Integer> getTiers(String segmentId)
public Map<String, Integer> getClusterTiers(String segmentId)
{
Map<String, Integer> retVal = table.row(segmentId);
Map<String, Integer> retVal = segmentsInCluster.row(segmentId);
return (retVal == null) ? Maps.<String, Integer>newHashMap() : retVal;
}
public int lookup(String segmentId, String tier)
public Map<String, Integer> getLoadingTiers(String segmentId)
{
Map<String, Integer> retVal = loadingSegments.row(segmentId);
return (retVal == null) ? Maps.<String, Integer>newHashMap() : retVal;
}
public int getClusterReplicants(String segmentId, String tier)
{
Integer retVal = table.get(segmentId, tier);
Integer retVal = segmentsInCluster.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
}
public int getLoadingReplicants(String segmentId, String tier)
{
Integer retVal = loadingSegments.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
}
public int getTotalReplicants(String segmentId, String tier)
{
return getClusterReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier);
}
}

View File

@ -22,7 +22,6 @@ package com.metamx.druid.master.rules;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.common.ISE;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.DruidMasterRuntimeParams;
@ -47,7 +46,8 @@ public abstract class LoadRule implements Rule
MasterStats stats = new MasterStats();
int expectedReplicants = getReplicants();
int actualReplicants = params.getSegmentReplicantLookup().lookup(segment.getIdentifier(), getTier());
int totalReplicants = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), getTier());
int clusterReplicants = params.getSegmentReplicantLookup().getClusterReplicants(segment.getIdentifier(), getTier());
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(getTier());
if (serverQueue == null) {
@ -55,15 +55,15 @@ public abstract class LoadRule implements Rule
return stats;
}
stats.accumulate(assign(expectedReplicants, actualReplicants, serverQueue, segment));
stats.accumulate(drop(expectedReplicants, actualReplicants, segment, params));
stats.accumulate(assign(expectedReplicants, totalReplicants, serverQueue, segment));
stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params));
return stats;
}
private MasterStats assign(
int expectedReplicants,
int actualReplicants,
int totalReplicants,
MinMaxPriorityQueue<ServerHolder> serverQueue,
DataSegment segment
)
@ -71,7 +71,7 @@ public abstract class LoadRule implements Rule
MasterStats stats = new MasterStats();
List<ServerHolder> assignedServers = Lists.newArrayList();
while (actualReplicants < expectedReplicants) {
while (totalReplicants < expectedReplicants) {
ServerHolder holder = serverQueue.pollFirst();
if (holder == null) {
log.warn(
@ -83,12 +83,7 @@ public abstract class LoadRule implements Rule
);
break;
}
if (holder.isServingSegment(segment)) {
assignedServers.add(holder);
continue;
}
if (holder.isLoadingSegment(segment)) {
++actualReplicants;
if (holder.isServingSegment(segment) || holder.isLoadingSegment(segment)) {
assignedServers.add(holder);
continue;
}
@ -127,7 +122,7 @@ public abstract class LoadRule implements Rule
assignedServers.add(holder);
stats.addToTieredStat("assignedCount", getTier(), 1);
++actualReplicants;
++totalReplicants;
}
serverQueue.addAll(assignedServers);
@ -136,7 +131,7 @@ public abstract class LoadRule implements Rule
private MasterStats drop(
int expectedReplicants,
int actualReplicants,
int clusterReplicants,
DataSegment segment,
DruidMasterRuntimeParams params
)
@ -148,11 +143,11 @@ public abstract class LoadRule implements Rule
}
// Make sure we have enough actual replicants in the cluster before doing anything
if (actualReplicants < expectedReplicants) {
if (clusterReplicants < expectedReplicants) {
return stats;
}
Map<String, Integer> replicantsByType = params.getSegmentReplicantLookup().getTiers(segment.getIdentifier());
Map<String, Integer> replicantsByType = params.getSegmentReplicantLookup().getClusterTiers(segment.getIdentifier());
for (Map.Entry<String, Integer> entry : replicantsByType.entrySet()) {
String tier = entry.getKey();

View File

@ -536,6 +536,7 @@ public class DruidMasterRuleRunnerTest
{
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
@ -798,10 +799,12 @@ public class DruidMasterRuleRunnerTest
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class);
EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(anotherMockPeon.getLoadQueueSize()).andReturn(10L).atLeastOnce();
EasyMock.replay(anotherMockPeon);