Only consider loaded replicants when computing replication status. (#4921)

* Only consider loaded replicants when computing replication status.

This affects the computation of segment/underReplicated/count and
segment/unavailable/count, as well as the loadstatus?simple and
loadstatus?full APIs.

I'm not sure why they currently consider segments in the load
queues, but it would make more sense to me if they only considered
segments that are actually loaded.

* Fix tests.

* Fix imports.
This commit is contained in:
Gian Merlino 2017-10-18 11:11:42 -07:00 committed by GitHub
parent 9d91ffd039
commit 4881bb273b
3 changed files with 6 additions and 55 deletions

View File

@ -248,7 +248,7 @@ public class DruidCoordinator
((LoadRule) rule)
.getTieredReplicants()
.forEach((final String tier, final Integer ruleReplicants) -> {
int currentReplicants = segmentReplicantLookup.getTotalReplicants(segment.getIdentifier(), tier);
int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getIdentifier(), tier);
retVal
.computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>())
.addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0));
@ -268,7 +268,7 @@ public class DruidCoordinator
}
for (DataSegment segment : getAvailableDataSegments()) {
if (segmentReplicantLookup.getTotalReplicants(segment.getIdentifier()) == 0) {
if (segmentReplicantLookup.getLoadedReplicants(segment.getIdentifier()) == 0) {
retVal.addTo(segment.getDataSource(), 1);
} else {
retVal.addTo(segment.getDataSource(), 0);

View File

@ -36,7 +36,6 @@ public class SegmentReplicantLookup
public static SegmentReplicantLookup make(DruidCluster cluster)
{
final Table<String, String, Integer> segmentsInCluster = HashBasedTable.create();
final Table<String, String, Integer> loadingSegments = HashBasedTable.create();
for (SortedSet<ServerHolder> serversByType : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serversByType) {
@ -49,31 +48,17 @@ public class SegmentReplicantLookup
}
segmentsInCluster.put(segment.getIdentifier(), server.getTier(), ++numReplicants);
}
// Also account for queued segments
for (DataSegment segment : serverHolder.getPeon().getSegmentsToLoad()) {
Integer numReplicants = loadingSegments.get(segment.getIdentifier(), server.getTier());
if (numReplicants == null) {
numReplicants = 0;
}
loadingSegments.put(segment.getIdentifier(), server.getTier(), ++numReplicants);
}
}
}
return new SegmentReplicantLookup(segmentsInCluster, loadingSegments);
return new SegmentReplicantLookup(segmentsInCluster);
}
private final Table<String, String, Integer> segmentsInCluster;
private final Table<String, String, Integer> loadingSegments;
private SegmentReplicantLookup(
Table<String, String, Integer> segmentsInCluster,
Table<String, String, Integer> loadingSegments
)
private SegmentReplicantLookup(Table<String, String, Integer> segmentsInCluster)
{
this.segmentsInCluster = segmentsInCluster;
this.loadingSegments = loadingSegments;
}
public Map<String, Integer> getClusterTiers(String segmentId)
@ -82,12 +67,6 @@ public class SegmentReplicantLookup
return (retVal == null) ? Maps.<String, Integer>newHashMap() : retVal;
}
public Map<String, Integer> getLoadingTiers(String segmentId)
{
Map<String, Integer> retVal = loadingSegments.row(segmentId);
return (retVal == null) ? Maps.<String, Integer>newHashMap() : retVal;
}
public int getLoadedReplicants(String segmentId)
{
Map<String, Integer> allTiers = segmentsInCluster.row(segmentId);
@ -103,30 +82,4 @@ public class SegmentReplicantLookup
Integer retVal = segmentsInCluster.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
}
public int getLoadingReplicants(String segmentId, String tier)
{
Integer retVal = loadingSegments.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
}
public int getLoadingReplicants(String segmentId)
{
Map<String, Integer> allTiers = loadingSegments.row(segmentId);
int retVal = 0;
for (Integer replicants : allTiers.values()) {
retVal += replicants;
}
return retVal;
}
public int getTotalReplicants(String segmentId)
{
return getLoadedReplicants(segmentId) + getLoadingReplicants(segmentId);
}
public int getTotalReplicants(String segmentId, String tier)
{
return getLoadedReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier);
}
}

View File

@ -22,7 +22,6 @@ package io.druid.server.coordinator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.emitter.EmittingLogger;
@ -942,7 +941,6 @@ public class DruidCoordinatorRuleRunnerTest
mockEmptyPeon();
LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class);
EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(anotherMockPeon.getLoadQueueSize()).andReturn(10L).atLeastOnce();
EasyMock.replay(anotherMockPeon);
@ -1411,8 +1409,8 @@ public class DruidCoordinatorRuleRunnerTest
private void mockEmptyPeon()
{
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(new HashSet<>()).anyTimes();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(new HashSet<>()).anyTimes();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
EasyMock.replay(mockPeon);