Cancel loads of unused segments (#14644)

This commit is contained in:
Kashif Faraz 2023-07-31 18:01:50 +05:30 committed by GitHub
parent e9b4f1e95c
commit 844a9c7ffb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 154 additions and 32 deletions

View File

@ -45,6 +45,7 @@ public class DruidCluster
private final Set<ServerHolder> realtimes;
private final Map<String, NavigableSet<ServerHolder>> historicals;
private final Set<ServerHolder> brokers;
private final List<ServerHolder> allServers;
private DruidCluster(
Set<ServerHolder> realtimes,
@ -58,6 +59,7 @@ public class DruidCluster
holders -> CollectionUtils.newTreeSet(Comparator.naturalOrder(), holders)
);
this.brokers = Collections.unmodifiableSet(brokers);
this.allServers = initAllServers();
}
public Set<ServerHolder> getRealtimes()
@ -85,7 +87,12 @@ public class DruidCluster
return historicals.get(tier);
}
public Collection<ServerHolder> getAllServers()
public List<ServerHolder> getAllServers()
{
return allServers;
}
private List<ServerHolder> initAllServers()
{
final int historicalSize = historicals.values().stream().mapToInt(Collection::size).sum();
final int realtimeSize = realtimes.size();

View File

@ -31,8 +31,10 @@ import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Unloads segments that are no longer marked as used from servers.
@ -56,52 +58,45 @@ public class UnloadUnusedSegments implements CoordinatorDuty
broadcastStatusByDatasource.put(broadcastDatasource, true);
}
final List<ServerHolder> allServers = params.getDruidCluster().getAllServers();
int numCancelledLoads = allServers.stream().mapToInt(
server -> cancelLoadOfUnusedSegments(server, broadcastStatusByDatasource, params)
).sum();
final CoordinatorRunStats stats = params.getCoordinatorStats();
params.getDruidCluster().getAllServers().forEach(
server -> handleUnusedSegmentsForServer(
server,
params,
stats,
broadcastStatusByDatasource
)
);
int numQueuedDrops = allServers.stream().mapToInt(
server -> dropUnusedSegments(server, params, stats, broadcastStatusByDatasource)
).sum();
if (numCancelledLoads > 0 || numQueuedDrops > 0) {
log.info("Cancelled [%d] loads and started [%d] drops of unused segments.", numCancelledLoads, numQueuedDrops);
}
return params;
}
private void handleUnusedSegmentsForServer(
private int dropUnusedSegments(
ServerHolder serverHolder,
DruidCoordinatorRuntimeParams params,
CoordinatorRunStats stats,
Map<String, Boolean> broadcastStatusByDatasource
)
{
ImmutableDruidServer server = serverHolder.getServer();
for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
boolean isBroadcastDatasource = broadcastStatusByDatasource.computeIfAbsent(
dataSource.getName(),
dataSourceName -> isBroadcastDatasource(dataSourceName, params)
);
final Set<DataSegment> usedSegments = params.getUsedSegments();
// The coordinator tracks used segments by examining the metadata store.
// For tasks, the segments they create are unpublished, so those segments will get dropped
// unless we exclude them here. We currently drop only broadcast segments in that case.
// This check relies on the assumption that queryable stream tasks will never
// ingest data to a broadcast datasource. If a broadcast datasource is switched to become a non-broadcast
// datasource, this will result in the those segments not being dropped from tasks.
// A more robust solution which requires a larger rework could be to expose
// the set of segments that were created by a task/indexer here, and exclude them.
if (serverHolder.isRealtimeServer() && !isBroadcastDatasource) {
final AtomicInteger numQueuedDrops = new AtomicInteger(0);
final ImmutableDruidServer server = serverHolder.getServer();
for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
if (shouldSkipUnload(serverHolder, dataSource.getName(), broadcastStatusByDatasource, params)) {
continue;
}
int totalUnneededCount = 0;
final Set<DataSegment> usedSegments = params.getUsedSegments();
for (DataSegment segment : dataSource.getSegments()) {
if (!usedSegments.contains(segment)
&& loadQueueManager.dropSegment(segment, serverHolder)) {
totalUnneededCount++;
log.info(
log.debug(
"Dropping uneeded segment[%s] from server[%s] in tier[%s]",
segment.getId(), server.getName(), server.getTier()
);
@ -110,8 +105,55 @@ public class UnloadUnusedSegments implements CoordinatorDuty
if (totalUnneededCount > 0) {
stats.addToSegmentStat(Stats.Segments.UNNEEDED, server.getTier(), dataSource.getName(), totalUnneededCount);
numQueuedDrops.addAndGet(totalUnneededCount);
}
}
return numQueuedDrops.get();
}
private int cancelLoadOfUnusedSegments(
ServerHolder server,
Map<String, Boolean> broadcastStatusByDatasource,
DruidCoordinatorRuntimeParams params
)
{
final Set<DataSegment> usedSegments = params.getUsedSegments();
final AtomicInteger cancelledOperations = new AtomicInteger(0);
server.getQueuedSegments().forEach((segment, action) -> {
if (shouldSkipUnload(server, segment.getDataSource(), broadcastStatusByDatasource, params)) {
// do nothing
} else if (usedSegments.contains(segment)) {
// do nothing
} else if (action.isLoad() && server.cancelOperation(action, segment)) {
cancelledOperations.incrementAndGet();
}
});
return cancelledOperations.get();
}
/**
* Returns true if the given server is a realtime server AND the datasource is
* NOT a broadcast datasource.
* <p>
* Realtime tasks work with unpublished segments and the tasks themselves are
* responsible for dropping those segments. However, segments belonging to a
* broadcast datasource should still be dropped by the Coordinator as realtime
* tasks do not ingest data to a broadcast datasource and are thus not
* responsible for the load/unload of those segments.
*/
private boolean shouldSkipUnload(
ServerHolder server,
String dataSource,
Map<String, Boolean> broadcastStatusByDatasource,
DruidCoordinatorRuntimeParams params
)
{
boolean isBroadcastDatasource = broadcastStatusByDatasource
.computeIfAbsent(dataSource, ds -> isBroadcastDatasource(ds, params));
return server.isRealtimeServer() && !isBroadcastDatasource;
}
/**

View File

@ -17,14 +17,14 @@
* under the License.
*/
package org.apache.druid.server.coordinator;
package org.apache.druid.server.coordinator.loading;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.loading.RoundRobinServerSelector;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.junit.Assert;

View File

@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.simulate;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.metrics.MetricsVerifier;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.timeline.DataSegment;
import java.util.List;
@ -74,6 +75,11 @@ public interface CoordinatorSimulation
*/
void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig);
/**
* Sets the retention rules for the given datasource.
*/
void setRetentionRules(String datasource, Rule... rules);
/**
* Gets the inventory view of the specified server as maintained by the
* coordinator.

View File

@ -26,6 +26,7 @@ import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.ForeverDropRule;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.coordinator.stats.Dimension;
@ -112,6 +113,12 @@ public abstract class CoordinatorSimulationBaseTest implements
sim.coordinator().setDynamicConfig(dynamicConfig);
}
@Override
public void setRetentionRules(String datasource, Rule... rules)
{
sim.coordinator().setRetentionRules(datasource, rules);
}
@Override
public void loadQueuedSegments()
{
@ -211,6 +218,8 @@ public abstract class CoordinatorSimulationBaseTest implements
static final String ASSIGNED_COUNT = "segment/assigned/count";
static final String MOVED_COUNT = "segment/moved/count";
static final String DROPPED_COUNT = "segment/dropped/count";
static final String OVERSHADOWED_COUNT = "segment/overshadowed/count";
static final String DELETED_COUNT = "segment/deleted/count";
static final String LOAD_QUEUE_COUNT = "segment/loadQueue/count";
static final String DROP_QUEUE_COUNT = "segment/dropQueue/count";
static final String CANCELLED_ACTIONS = "segment/loadQueue/cancelled";
@ -288,4 +297,15 @@ public abstract class CoordinatorSimulationBaseTest implements
return new ForeverBroadcastDistributionRule();
}
}
/**
* Builder for a drop rule.
*/
static class Drop
{
static Rule forever()
{
return new ForeverDropRule();
}
}
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.simulate;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.druid.audit.AuditInfo;
import org.apache.druid.client.DruidServer;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.curator.discovery.ServiceAnnouncer;
@ -330,6 +331,16 @@ public class CoordinatorSimulationBuilder
env.setDynamicConfig(dynamicConfig);
}
@Override
public void setRetentionRules(String datasource, Rule... rules)
{
env.ruleManager.overrideRule(
datasource,
Arrays.asList(rules),
new AuditInfo("sim", "sim", "localhost")
);
}
@Override
public DruidServer getInventoryView(String serverName)
{

View File

@ -482,6 +482,33 @@ public class SegmentLoadingTest extends CoordinatorSimulationBaseTest
Assert.assertEquals(0, historicalT12.getTotalSegments());
}
@Test
public void testLoadOfUnusedSegmentIsCancelled()
{
final CoordinatorSimulation sim =
CoordinatorSimulation.builder()
.withSegments(segments)
.withServers(historicalT11)
.withRules(datasource, Load.on(Tier.T1, 1).forever())
.build();
startSimulation(sim);
// Run 1: All segments are assigned
runCoordinatorCycle();
verifyValue(Metric.ASSIGNED_COUNT, 10L);
// Run 2: Update rules, all segments are marked as unused
setRetentionRules(datasource, Drop.forever());
runCoordinatorCycle();
verifyValue(Metric.DELETED_COUNT, 10L);
// Run 3: Loads of unused segments are cancelled
runCoordinatorCycle();
verifyValue(Metric.LOAD_QUEUE_COUNT, 0L);
verifyValue(Metric.CANCELLED_ACTIONS, 10L);
}
@Test
public void testSegmentsAreDroppedFromFullServersFirst()
{

View File

@ -127,13 +127,22 @@ public class TestSegmentsMetadataManager implements SegmentsMetadataManager
++numModifiedSegments;
}
}
if (numModifiedSegments > 0) {
snapshot = null;
}
return numModifiedSegments;
}
@Override
public boolean markSegmentAsUnused(SegmentId segmentId)
{
return usedSegments.remove(segmentId.toString()) != null;
boolean updated = usedSegments.remove(segmentId.toString()) != null;
if (updated) {
snapshot = null;
}
return updated;
}
@Nullable