From 6feac204e331e196940e2936c7f2a212135e89b8 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 2 Apr 2018 09:40:20 -0700 Subject: [PATCH] Coordinator primary segment assignment fix (#5532) * fix issue where assign primary assigns segments to all historical servers in cluster * fix test * add test to ensure primary assignment will not assign to another server while loading is in progress --- .../coordinator/SegmentReplicantLookup.java | 43 +++++- .../server/coordinator/rules/LoadRule.java | 8 +- .../DruidCoordinatorRuleRunnerTest.java | 3 + .../coordinator/rules/LoadRuleTest.java | 139 ++++++++++++++++++ 4 files changed, 187 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java index a713c8d181c..357eeb99be7 100644 --- a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java +++ b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java @@ -36,6 +36,7 @@ public class SegmentReplicantLookup public static SegmentReplicantLookup make(DruidCluster cluster) { final Table segmentsInCluster = HashBasedTable.create(); + final Table loadingSegments = HashBasedTable.create(); for (SortedSet serversByType : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serversByType) { @@ -48,17 +49,29 @@ public class SegmentReplicantLookup } segmentsInCluster.put(segment.getIdentifier(), server.getTier(), ++numReplicants); } + + // Also account for queued segments + for (DataSegment segment : serverHolder.getPeon().getSegmentsToLoad()) { + Integer numReplicants = loadingSegments.get(segment.getIdentifier(), server.getTier()); + if (numReplicants == null) { + numReplicants = 0; + } + loadingSegments.put(segment.getIdentifier(), server.getTier(), ++numReplicants); + } } } - return new SegmentReplicantLookup(segmentsInCluster); + return new SegmentReplicantLookup(segmentsInCluster, loadingSegments); } private final Table segmentsInCluster; - private SegmentReplicantLookup(Table segmentsInCluster) + private final Table loadingSegments; + + private SegmentReplicantLookup(Table segmentsInCluster, Table loadingSegments) { this.segmentsInCluster = segmentsInCluster; + this.loadingSegments = loadingSegments; } public Map getClusterTiers(String segmentId) @@ -82,4 +95,30 @@ public class SegmentReplicantLookup Integer retVal = segmentsInCluster.get(segmentId, tier); return (retVal == null) ? 0 : retVal; } + + public int getLoadingReplicants(String segmentId, String tier) + { + Integer retVal = loadingSegments.get(segmentId, tier); + return (retVal == null) ? 0 : retVal; + } + + public int getLoadingReplicants(String segmentId) + { + Map allTiers = loadingSegments.row(segmentId); + int retVal = 0; + for (Integer replicants : allTiers.values()) { + retVal += replicants; + } + return retVal; + } + + public int getTotalReplicants(String segmentId) + { + return getLoadedReplicants(segmentId) + getLoadingReplicants(segmentId); + } + + public int getTotalReplicants(String segmentId, String tier) + { + return getLoadedReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier); + } } diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index 5af4d822c4b..dc615f80051 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -95,8 +95,9 @@ public abstract class LoadRule implements Rule final CoordinatorStats stats ) { - // if primary replica already exists - if (!currentReplicants.isEmpty()) { + // if primary replica already exists or is loading + final int loading = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier()); + if (!currentReplicants.isEmpty() || loading > 0) { assignReplicas(params, segment, stats, null); } else { final ServerHolder primaryHolderToLoad = assignPrimary(params, segment); @@ -169,7 +170,6 @@ public abstract class LoadRule implements Rule if (targetReplicantsInTier <= 0) { continue; } - final String tier = entry.getKey(); final List holders = getFilteredHolders( @@ -228,7 +228,7 @@ public abstract class LoadRule implements Rule final int numAssigned = assignReplicasForTier( tier, entry.getIntValue(), - currentReplicants.getOrDefault(tier, 0), + params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), tier), params, createLoadQueueSizeLimitingPredicate(params), segment diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index 0b7c71ba2ce..ba32af632cc 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -22,6 +22,7 @@ package io.druid.server.coordinator; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.druid.client.DruidServer; @@ -942,6 +943,8 @@ public class DruidCoordinatorRuleRunnerTest LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class); EasyMock.expect(anotherMockPeon.getLoadQueueSize()).andReturn(10L).atLeastOnce(); + EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).anyTimes(); + EasyMock.replay(anotherMockPeon); DruidCluster druidCluster = new DruidCluster( diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java index 51200461a95..4848503a573 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java @@ -20,7 +20,9 @@ package io.druid.server.coordinator.rules; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -58,7 +60,9 @@ import org.junit.Test; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeSet; import java.util.concurrent.Executors; import java.util.stream.Collectors; @@ -190,6 +194,127 @@ public class LoadRuleTest EasyMock.verify(throttler, mockPeon, mockBalancerStrategy); } + @Test + public void testLoadPrimaryAssignDoesNotOverAssign() + { + EasyMock.expect(throttler.canCreateReplicant(EasyMock.anyString())).andReturn(true).anyTimes(); + + final LoadQueuePeon mockPeon = createEmptyPeon(); + mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + + LoadRule rule = createLoadRule(ImmutableMap.of( + "hot", 1 + )); + + final DataSegment segment = createDataSegment("foo"); + + EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject())) + .andDelegateTo(balancerStrategy) + .anyTimes(); + + EasyMock.replay(throttler, mockPeon, mockBalancerStrategy); + + DruidCluster druidCluster = new DruidCluster( + null, + ImmutableMap.of( + "hot", + Stream.of( + new ServerHolder( + new DruidServer( + "serverHot", + "hostHot", + null, + 1000, + ServerType.HISTORICAL, + "hot", + 1 + ).toImmutableDruidServer(), + mockPeon + ), new ServerHolder( + new DruidServer( + "serverHot2", + "hostHot2", + null, + 1000, + ServerType.HISTORICAL, + "hot", + 1 + ).toImmutableDruidServer(), + mockPeon + ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) + ) + ); + + CoordinatorStats stats = rule.run( + null, + DruidCoordinatorRuntimeParams.newBuilder() + .withDruidCluster(druidCluster) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) + .withReplicationManager(throttler) + .withBalancerStrategy(mockBalancerStrategy) + .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01")) + .withAvailableSegments(Arrays.asList(segment)).build(), + segment + ); + + + Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot")); + + // ensure multiple runs don't assign primary segment again if at replication count + final LoadQueuePeon loadingPeon = createLoadingPeon(ImmutableList.of(segment)); + EasyMock.replay(loadingPeon); + + DruidCluster afterLoad = new DruidCluster( + null, + ImmutableMap.of( + "hot", + Stream.of( + new ServerHolder( + new DruidServer( + "serverHot", + "hostHot", + null, + 1000, + ServerType.HISTORICAL, + "hot", + 1 + ).toImmutableDruidServer(), + loadingPeon + ), new ServerHolder( + new DruidServer( + "serverHot2", + "hostHot2", + null, + 1000, + ServerType.HISTORICAL, + "hot", + 1 + ).toImmutableDruidServer(), + mockPeon + ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) + ) + ); + CoordinatorStats statsAfterLoadPrimary = rule.run( + null, + DruidCoordinatorRuntimeParams.newBuilder() + .withDruidCluster(afterLoad) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(afterLoad)) + .withReplicationManager(throttler) + .withBalancerStrategy(mockBalancerStrategy) + .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01")) + .withAvailableSegments(Arrays.asList(segment)).build(), + segment + ); + + + Assert.assertEquals(0, statsAfterLoadPrimary.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot")); + + EasyMock.verify(throttler, mockPeon, mockBalancerStrategy); + } + @Test public void testLoadPriority() { @@ -619,4 +744,18 @@ public class LoadRuleTest return mockPeon; } + + private static LoadQueuePeon createLoadingPeon(List segments) + { + final Set segs = ImmutableSet.copyOf(segments); + final long loadingSize = segs.stream().mapToLong(DataSegment::getSize).sum(); + + final LoadQueuePeon mockPeon = EasyMock.createMock(LoadQueuePeon.class); + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(segs).anyTimes(); + EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes(); + EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(loadingSize).anyTimes(); + EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(segs.size()).anyTimes(); + + return mockPeon; + } }