From 4881bb273bfa1630d0feddd38d46654cdab30a9f Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 18 Oct 2017 11:11:42 -0700 Subject: [PATCH] Only consider loaded replicants when computing replication status. (#4921) * Only consider loaded replicants when computing replication status. This affects the computation of segment/underReplicated/count and segment/unavailable/count, as well as the loadstatus?simple and loadstatus?full APIs. I'm not sure why they currently consider segments in the load queues, but it would make more sense to me if they only considered segments that are actually loaded. * Fix tests. * Fix imports. --- .../server/coordinator/DruidCoordinator.java | 4 +- .../coordinator/SegmentReplicantLookup.java | 51 +------------------ .../DruidCoordinatorRuleRunnerTest.java | 6 +-- 3 files changed, 6 insertions(+), 55 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index be9ad30d1e6..e84bb7d0f3b 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -248,7 +248,7 @@ public class DruidCoordinator ((LoadRule) rule) .getTieredReplicants() .forEach((final String tier, final Integer ruleReplicants) -> { - int currentReplicants = segmentReplicantLookup.getTotalReplicants(segment.getIdentifier(), tier); + int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getIdentifier(), tier); retVal .computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>()) .addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0)); @@ -268,7 +268,7 @@ public class DruidCoordinator } for (DataSegment segment : getAvailableDataSegments()) { - if (segmentReplicantLookup.getTotalReplicants(segment.getIdentifier()) == 0) { + if (segmentReplicantLookup.getLoadedReplicants(segment.getIdentifier()) == 0) { retVal.addTo(segment.getDataSource(), 1); } else { retVal.addTo(segment.getDataSource(), 0); diff --git a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java index f23f1199e86..a713c8d181c 100644 --- a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java +++ b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java @@ -36,7 +36,6 @@ public class SegmentReplicantLookup public static SegmentReplicantLookup make(DruidCluster cluster) { final Table segmentsInCluster = HashBasedTable.create(); - final Table loadingSegments = HashBasedTable.create(); for (SortedSet serversByType : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serversByType) { @@ -49,31 +48,17 @@ public class SegmentReplicantLookup } segmentsInCluster.put(segment.getIdentifier(), server.getTier(), ++numReplicants); } - - // Also account for queued segments - for (DataSegment segment : serverHolder.getPeon().getSegmentsToLoad()) { - Integer numReplicants = loadingSegments.get(segment.getIdentifier(), server.getTier()); - if (numReplicants == null) { - numReplicants = 0; - } - loadingSegments.put(segment.getIdentifier(), server.getTier(), ++numReplicants); - } } } - return new SegmentReplicantLookup(segmentsInCluster, loadingSegments); + return new SegmentReplicantLookup(segmentsInCluster); } private final Table segmentsInCluster; - private final Table loadingSegments; - private SegmentReplicantLookup( - Table segmentsInCluster, - Table loadingSegments - ) + private SegmentReplicantLookup(Table segmentsInCluster) { this.segmentsInCluster = segmentsInCluster; - this.loadingSegments = loadingSegments; } public Map getClusterTiers(String segmentId) @@ -82,12 +67,6 @@ public class SegmentReplicantLookup return (retVal == null) ? Maps.newHashMap() : retVal; } - public Map getLoadingTiers(String segmentId) - { - Map retVal = loadingSegments.row(segmentId); - return (retVal == null) ? Maps.newHashMap() : retVal; - } - public int getLoadedReplicants(String segmentId) { Map allTiers = segmentsInCluster.row(segmentId); @@ -103,30 +82,4 @@ public class SegmentReplicantLookup Integer retVal = segmentsInCluster.get(segmentId, tier); return (retVal == null) ? 0 : retVal; } - - public int getLoadingReplicants(String segmentId, String tier) - { - Integer retVal = loadingSegments.get(segmentId, tier); - return (retVal == null) ? 0 : retVal; - } - - public int getLoadingReplicants(String segmentId) - { - Map allTiers = loadingSegments.row(segmentId); - int retVal = 0; - for (Integer replicants : allTiers.values()) { - retVal += replicants; - } - return retVal; - } - - public int getTotalReplicants(String segmentId) - { - return getLoadedReplicants(segmentId) + getLoadingReplicants(segmentId); - } - - public int getTotalReplicants(String segmentId, String tier) - { - return getLoadedReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier); - } } diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index 7357c9cdf72..3ef19d17483 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -22,7 +22,6 @@ package io.druid.server.coordinator; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.metamx.emitter.EmittingLogger; @@ -942,7 +941,6 @@ public class DruidCoordinatorRuleRunnerTest mockEmptyPeon(); LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class); - EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); EasyMock.expect(anotherMockPeon.getLoadQueueSize()).andReturn(10L).atLeastOnce(); EasyMock.replay(anotherMockPeon); @@ -1411,8 +1409,8 @@ public class DruidCoordinatorRuleRunnerTest private void mockEmptyPeon() { - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes(); + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(new HashSet<>()).anyTimes(); + EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(new HashSet<>()).anyTimes(); EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes(); EasyMock.replay(mockPeon);