Fix reported replication factor of segment with zero required replicas (#14701)

This commit is contained in:
Kashif Faraz 2023-07-31 14:51:01 +05:30 committed by GitHub
parent c648b1cb36
commit e9b4f1e95c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 9 deletions

View File

@ -21,6 +21,7 @@ package org.apache.druid.server.coordinator.loading;
import com.google.common.collect.Sets;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.ServerHolder;
@ -194,18 +195,24 @@ public class StrategicSegmentAssigner implements SegmentActionHandler
@Override
public void replicateSegment(DataSegment segment, Map<String, Integer> tierToReplicaCount)
{
// Identify empty tiers and determine total required replicas
final Set<String> allTiersInCluster = Sets.newHashSet(cluster.getTierNames());
tierToReplicaCount.forEach((tier, requiredReplicas) -> {
reportTierCapacityStats(segment, requiredReplicas, tier);
SegmentReplicaCount replicaCount = replicaCountMap.computeIfAbsent(segment.getId(), tier);
replicaCount.setRequired(requiredReplicas, tierToHistoricalCount.getOrDefault(tier, 0));
if (tierToReplicaCount == null || tierToReplicaCount.isEmpty()) {
// Track the counts for a segment even if it requires 0 replicas on all tiers
replicaCountMap.computeIfAbsent(segment.getId(), DruidServer.DEFAULT_TIER);
} else {
// Identify empty tiers and determine total required replicas
tierToReplicaCount.forEach((tier, requiredReplicas) -> {
reportTierCapacityStats(segment, requiredReplicas, tier);
if (!allTiersInCluster.contains(tier)) {
tiersWithNoServer.add(tier);
}
});
SegmentReplicaCount replicaCount = replicaCountMap.computeIfAbsent(segment.getId(), tier);
replicaCount.setRequired(requiredReplicas, tierToHistoricalCount.getOrDefault(tier, 0));
if (!allTiersInCluster.contains(tier)) {
tiersWithNoServer.add(tier);
}
});
}
SegmentReplicaCount replicaCountInCluster = replicaCountMap.getTotal(segment.getId());
final int replicaSurplus = replicaCountInCluster.loadedNotDropping()

View File

@ -46,6 +46,9 @@ import org.apache.druid.server.coordinator.balancer.CostBalancerStrategy;
import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategy;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.SegmentReplicaCount;
import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.rules.IntervalDropRule;
import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
@ -1262,6 +1265,39 @@ public class RunRulesTest
EasyMock.verify(mockPeon);
}
@Test
public void testSegmentWithZeroRequiredReplicasHasZeroReplicationFactor()
{
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
Collections.singletonList(
new ForeverLoadRule(Collections.emptyMap(), false)
)
).anyTimes();
EasyMock.replay(databaseRuleManager);
final DruidCluster cluster = DruidCluster
.builder()
.add(createServerHolder("server", "normal", new TestLoadQueuePeon()))
.build();
final DataSegment segment = usedSegments.get(0);
DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(cluster, segment)
.withBalancerStrategy(new RandomBalancerStrategy())
.withSegmentAssignerUsing(loadQueueManager)
.build();
params = ruleRunner.run(params);
Assert.assertNotNull(params);
SegmentReplicationStatus replicationStatus = params.getSegmentReplicationStatus();
Assert.assertNotNull(replicationStatus);
SegmentReplicaCount replicaCounts = replicationStatus.getReplicaCountsInCluster(segment.getId());
Assert.assertNotNull(replicaCounts);
Assert.assertEquals(0, replicaCounts.required());
Assert.assertEquals(0, replicaCounts.totalLoaded());
Assert.assertEquals(0, replicaCounts.requiredAndLoadable());
}
private CoordinatorRunStats runDutyAndGetStats(DruidCoordinatorRuntimeParams params)
{
params = ruleRunner.run(params);