Fix mark segment unused when overshadowed by zero replica segment (#16181)

Bug:
In the `MarkOvershadowedSegmentsAsUnused` duty, the coordinator marks a segment
as unused if it is overshadowed by a segment currently being served by a historical or broker.
But it is possible to have segments that are eligible for a load rule but require zero replicas
to be loaded. (Such segments can be queried only using the MSQ engine).
If such a zero-replica segment overshadows any other segment, the overshadowed segment will
never be marked as unused and will continue to exist in the metadata store as a dangling segment.

Fix:
- In a coordinator run, keep track of segments that are eligible for a load rule but require zero replicas
- Allow the zero-replicas segments to overshadow old segments and hence mark the latter as unused

Other changes:
- Add simulation test to verify new behaviour. This test fails with the current code.
- Clean up javadocs
This commit is contained in:
Kashif Faraz 2024-03-21 12:56:59 +05:30 committed by GitHub
parent 3d8b0ffae8
commit 352902156a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 106 additions and 8 deletions

View File

@ -42,10 +42,15 @@ import java.util.Map;
import java.util.Set;
/**
* Marks segments that are overshadowed by currently served segments as unused.
* Marks a segment as unused if it is overshadowed by:
* <ul>
* <li>a segment served by a historical or broker</li>
* <li>a segment that has zero required replicas and thus will never be loaded on a server</li>
* </ul>
* <p>
* This duty runs only if the Coordinator has been running long enough to have a
* refreshed metadata view. This duration is controlled by the dynamic config
* {@code millisToWaitBeforeDeleting}.
* {@link org.apache.druid.server.coordinator.CoordinatorDynamicConfig#markSegmentAsUnusedDelayMillis}.
*/
public class MarkOvershadowedSegmentsAsUnused implements CoordinatorDuty
{
@ -67,7 +72,7 @@ public class MarkOvershadowedSegmentsAsUnused implements CoordinatorDuty
final long delayMillis = params.getCoordinatorDynamicConfig().getMarkSegmentAsUnusedDelayMillis();
if (DateTimes.nowUtc().isBefore(coordinatorStartTime.plus(delayMillis))) {
log.info(
"Skipping MarkAsUnused until [%s] have elapsed after coordinator start [%s].",
"Skipping MarkAsUnused until [%s] have elapsed after coordinator start time[%s].",
Duration.ofMillis(delayMillis), coordinatorStartTime
);
return params;
@ -79,7 +84,7 @@ public class MarkOvershadowedSegmentsAsUnused implements CoordinatorDuty
return params;
}
DruidCluster cluster = params.getDruidCluster();
final DruidCluster cluster = params.getDruidCluster();
final Map<String, SegmentTimeline> timelines = new HashMap<>();
cluster.getHistoricals().values().forEach(
@ -91,7 +96,14 @@ public class MarkOvershadowedSegmentsAsUnused implements CoordinatorDuty
broker -> addSegmentsFromServer(broker, timelines)
);
// Note that we do not include segments from ingestion services such as tasks or indexers,
// Include all segments that require zero replicas to be loaded
params.getSegmentAssigner().getSegmentsWithZeroRequiredReplicas().forEach(
(datasource, segments) -> timelines
.computeIfAbsent(datasource, ds -> new SegmentTimeline())
.addSegments(segments.iterator())
);
// Do not include segments served by ingestion services such as tasks or indexers,
// to prevent unpublished segments from prematurely overshadowing segments.
// Mark all segments overshadowed by served segments as unused

View File

@ -22,7 +22,6 @@ package org.apache.druid.server.coordinator.loading;
import com.google.common.collect.Sets;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
@ -56,8 +55,6 @@ import java.util.stream.Collectors;
@NotThreadSafe
public class StrategicSegmentAssigner implements SegmentActionHandler
{
private static final EmittingLogger log = new EmittingLogger(StrategicSegmentAssigner.class);
private final SegmentLoadQueueManager loadQueueManager;
private final DruidCluster cluster;
private final CoordinatorRunStats stats;
@ -71,6 +68,7 @@ public class StrategicSegmentAssigner implements SegmentActionHandler
private final Map<String, Set<String>> datasourceToInvalidLoadTiers = new HashMap<>();
private final Map<String, Integer> tierToHistoricalCount = new HashMap<>();
private final Map<String, Set<SegmentId>> segmentsToDelete = new HashMap<>();
private final Map<String, Set<DataSegment>> segmentsWithZeroRequiredReplicas = new HashMap<>();
public StrategicSegmentAssigner(
SegmentLoadQueueManager loadQueueManager,
@ -109,6 +107,11 @@ public class StrategicSegmentAssigner implements SegmentActionHandler
return segmentsToDelete;
}
public Map<String, Set<DataSegment>> getSegmentsWithZeroRequiredReplicas()
{
return segmentsWithZeroRequiredReplicas;
}
public Map<String, Set<String>> getDatasourceToInvalidLoadTiers()
{
return datasourceToInvalidLoadTiers;
@ -223,6 +226,12 @@ public class StrategicSegmentAssigner implements SegmentActionHandler
}
SegmentReplicaCount replicaCountInCluster = replicaCountMap.getTotal(segment.getId());
if (replicaCountInCluster.required() <= 0) {
segmentsWithZeroRequiredReplicas
.computeIfAbsent(segment.getDataSource(), ds -> new HashSet<>())
.add(segment);
}
final int replicaSurplus = replicaCountInCluster.loadedNotDropping()
- replicaCountInCluster.requiredAndLoadable();

View File

@ -223,6 +223,8 @@ public abstract class CoordinatorSimulationBaseTest implements
static final String LOAD_QUEUE_COUNT = "segment/loadQueue/count";
static final String DROP_QUEUE_COUNT = "segment/dropQueue/count";
static final String CANCELLED_ACTIONS = "segment/loadQueue/cancelled";
static final String OVERSHADOWED_COUNT = "segment/overshadowed/count";
}
static class Segments

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.simulate;
import org.apache.druid.client.DruidServer;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.timeline.DataSegment;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class MarkSegmentsAsUnusedTest extends CoordinatorSimulationBaseTest
{
@Override
public void setUp()
{
}
@Test
public void testSegmentsOvershadowedByZeroReplicaSegmentsAreMarkedAsUnused()
{
final long size1TB = 1_000_000;
final List<DruidServer> servers = new ArrayList<>();
for (int i = 0; i < 2; i++) {
servers.add(createHistorical(i, Tier.T1, size1TB));
}
final List<DataSegment> segmentsV0 = Segments.WIKI_10X1D;
final CoordinatorDynamicConfig dynamicConfig
= CoordinatorDynamicConfig.builder().withMarkSegmentAsUnusedDelayMillis(0).build();
final CoordinatorSimulation sim =
CoordinatorSimulation.builder()
.withRules(DS.WIKI, Load.on(Tier.T1, 0).forever())
.withServers(servers)
.withSegments(segmentsV0)
.withDynamicConfig(dynamicConfig)
.build();
startSimulation(sim);
// Run 1: No segment is loaded
runCoordinatorCycle();
verifyNotEmitted(Metric.ASSIGNED_COUNT);
// Add v1 segments to overshadow v0 segments
final List<DataSegment> segmentsV1 = segmentsV0.stream().map(
segment -> segment.withVersion(segment.getVersion() + "__new")
).collect(Collectors.toList());
addSegments(segmentsV1);
// Run 2: nothing is loaded, v0 segments are overshadowed and marked as unused
runCoordinatorCycle();
verifyNotEmitted(Metric.ASSIGNED_COUNT);
verifyValue(Metric.OVERSHADOWED_COUNT, 10L);
}
}