Coordinator primary segment assignment fix (#5532)

* fix issue where assign primary assigns segments to all historical servers in cluster

* fix test

* add test to ensure primary assignment will not assign to another server while loading is in progress
This commit is contained in:
Clint Wylie 2018-04-02 09:40:20 -07:00 committed by Gian Merlino
parent 05547e29b2
commit 6feac204e3
4 changed files with 187 additions and 6 deletions

View File

@ -36,6 +36,7 @@ public class SegmentReplicantLookup
public static SegmentReplicantLookup make(DruidCluster cluster)
{
final Table<String, String, Integer> segmentsInCluster = HashBasedTable.create();
final Table<String, String, Integer> loadingSegments = HashBasedTable.create();
for (SortedSet<ServerHolder> serversByType : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serversByType) {
@ -48,17 +49,29 @@ public class SegmentReplicantLookup
}
segmentsInCluster.put(segment.getIdentifier(), server.getTier(), ++numReplicants);
}
// Also account for queued segments
for (DataSegment segment : serverHolder.getPeon().getSegmentsToLoad()) {
Integer numReplicants = loadingSegments.get(segment.getIdentifier(), server.getTier());
if (numReplicants == null) {
numReplicants = 0;
}
loadingSegments.put(segment.getIdentifier(), server.getTier(), ++numReplicants);
}
}
}
return new SegmentReplicantLookup(segmentsInCluster);
return new SegmentReplicantLookup(segmentsInCluster, loadingSegments);
}
private final Table<String, String, Integer> segmentsInCluster;
private SegmentReplicantLookup(Table<String, String, Integer> segmentsInCluster)
private final Table<String, String, Integer> loadingSegments;
private SegmentReplicantLookup(Table<String, String, Integer> segmentsInCluster, Table<String, String, Integer> loadingSegments)
{
this.segmentsInCluster = segmentsInCluster;
this.loadingSegments = loadingSegments;
}
public Map<String, Integer> getClusterTiers(String segmentId)
@ -82,4 +95,30 @@ public class SegmentReplicantLookup
Integer retVal = segmentsInCluster.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
}
public int getLoadingReplicants(String segmentId, String tier)
{
Integer retVal = loadingSegments.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
}
public int getLoadingReplicants(String segmentId)
{
Map<String, Integer> allTiers = loadingSegments.row(segmentId);
int retVal = 0;
for (Integer replicants : allTiers.values()) {
retVal += replicants;
}
return retVal;
}
public int getTotalReplicants(String segmentId)
{
return getLoadedReplicants(segmentId) + getLoadingReplicants(segmentId);
}
public int getTotalReplicants(String segmentId, String tier)
{
return getLoadedReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier);
}
}

View File

@ -95,8 +95,9 @@ public abstract class LoadRule implements Rule
final CoordinatorStats stats
)
{
// if primary replica already exists
if (!currentReplicants.isEmpty()) {
// if primary replica already exists or is loading
final int loading = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier());
if (!currentReplicants.isEmpty() || loading > 0) {
assignReplicas(params, segment, stats, null);
} else {
final ServerHolder primaryHolderToLoad = assignPrimary(params, segment);
@ -169,7 +170,6 @@ public abstract class LoadRule implements Rule
if (targetReplicantsInTier <= 0) {
continue;
}
final String tier = entry.getKey();
final List<ServerHolder> holders = getFilteredHolders(
@ -228,7 +228,7 @@ public abstract class LoadRule implements Rule
final int numAssigned = assignReplicasForTier(
tier,
entry.getIntValue(),
currentReplicants.getOrDefault(tier, 0),
params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), tier),
params,
createLoadQueueSizeLimitingPredicate(params),
segment

View File

@ -22,6 +22,7 @@ package io.druid.server.coordinator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.DruidServer;
@ -942,6 +943,8 @@ public class DruidCoordinatorRuleRunnerTest
LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class);
EasyMock.expect(anotherMockPeon.getLoadQueueSize()).andReturn(10L).atLeastOnce();
EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).anyTimes();
EasyMock.replay(anotherMockPeon);
DruidCluster druidCluster = new DruidCluster(

View File

@ -20,7 +20,9 @@
package io.druid.server.coordinator.rules;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@ -58,7 +60,9 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@ -190,6 +194,127 @@ public class LoadRuleTest
EasyMock.verify(throttler, mockPeon, mockBalancerStrategy);
}
@Test
public void testLoadPrimaryAssignDoesNotOverAssign()
{
EasyMock.expect(throttler.canCreateReplicant(EasyMock.anyString())).andReturn(true).anyTimes();
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
LoadRule rule = createLoadRule(ImmutableMap.of(
"hot", 1
));
final DataSegment segment = createDataSegment("foo");
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.anyTimes();
EasyMock.replay(throttler, mockPeon, mockBalancerStrategy);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
Stream.of(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
null,
1000,
ServerType.HISTORICAL,
"hot",
1
).toImmutableDruidServer(),
mockPeon
), new ServerHolder(
new DruidServer(
"serverHot2",
"hostHot2",
null,
1000,
ServerType.HISTORICAL,
"hot",
1
).toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
CoordinatorStats stats = rule.run(
null,
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
);
Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));
// ensure multiple runs don't assign primary segment again if at replication count
final LoadQueuePeon loadingPeon = createLoadingPeon(ImmutableList.of(segment));
EasyMock.replay(loadingPeon);
DruidCluster afterLoad = new DruidCluster(
null,
ImmutableMap.of(
"hot",
Stream.of(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
null,
1000,
ServerType.HISTORICAL,
"hot",
1
).toImmutableDruidServer(),
loadingPeon
), new ServerHolder(
new DruidServer(
"serverHot2",
"hostHot2",
null,
1000,
ServerType.HISTORICAL,
"hot",
1
).toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
CoordinatorStats statsAfterLoadPrimary = rule.run(
null,
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(afterLoad)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(afterLoad))
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
);
Assert.assertEquals(0, statsAfterLoadPrimary.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));
EasyMock.verify(throttler, mockPeon, mockBalancerStrategy);
}
@Test
public void testLoadPriority()
{
@ -619,4 +744,18 @@ public class LoadRuleTest
return mockPeon;
}
private static LoadQueuePeon createLoadingPeon(List<DataSegment> segments)
{
final Set<DataSegment> segs = ImmutableSet.copyOf(segments);
final long loadingSize = segs.stream().mapToLong(DataSegment::getSize).sum();
final LoadQueuePeon mockPeon = EasyMock.createMock(LoadQueuePeon.class);
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(segs).anyTimes();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(loadingSize).anyTimes();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(segs.size()).anyTimes();
return mockPeon;
}
}