Allow MSQ engine only for compaction supervisors (#17033)

#16768 added the functionality to run compaction as a supervisor on the overlord.
This patch builds on top of that to restrict MSQ engine to compaction in the supervisor-mode only.
With these changes, users can no longer add MSQ engine as part of datasource compaction config,
or as the default cluster-level compaction engine, on the Coordinator. 

The patch also adds an Overlord runtime property `druid.supervisor.compaction.engine=<msq/native>`
to specify the default engine for compaction supervisors.

Since these updates require major changes to existing MSQ compaction integration tests,
this patch disables MSQ-specific compaction integration tests -- they will be taken up in a follow-up PR.

Key changed/added classes in this patch:
* CompactionSupervisor
* CompactionSupervisorSpec
* CoordinatorCompactionConfigsResource
* OverlordCompactionScheduler
This commit is contained in:
Vishesh Garg 2024-09-24 17:19:16 +05:30 committed by GitHub
parent 77a362c555
commit f576e299db
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 229 additions and 149 deletions

View File

@ -25,6 +25,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
@ -55,6 +56,13 @@ public class CompactionSupervisor implements Supervisor
if (supervisorSpec.isSuspended()) {
log.info("Suspending compaction for dataSource[%s].", dataSource);
scheduler.stopCompaction(dataSource);
} else if (!supervisorSpec.getValidationResult().isValid()) {
log.warn(
"Cannot start compaction supervisor for datasource[%s] since the compaction supervisor spec is invalid. "
+ "Reason[%s].",
dataSource,
supervisorSpec.getValidationResult().getReason()
);
} else {
log.info("Starting compaction for dataSource[%s].", dataSource);
scheduler.startCompaction(dataSource, supervisorSpec.getSpec());
@ -76,6 +84,13 @@ public class CompactionSupervisor implements Supervisor
snapshot = AutoCompactionSnapshot.builder(dataSource)
.withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED)
.build();
} else if (!supervisorSpec.getValidationResult().isValid()) {
snapshot = AutoCompactionSnapshot.builder(dataSource)
.withMessage(StringUtils.format(
"Compaction supervisor spec is invalid. Reason[%s].",
supervisorSpec.getValidationResult().getReason()
))
.build();
} else {
snapshot = scheduler.getCompactionSnapshot(dataSource);
}
@ -90,6 +105,8 @@ public class CompactionSupervisor implements Supervisor
return State.SCHEDULER_STOPPED;
} else if (supervisorSpec.isSuspended()) {
return State.SUSPENDED;
} else if (!supervisorSpec.getValidationResult().isValid()) {
return State.INVALID_SPEC;
} else {
return State.RUNNING;
}
@ -132,6 +149,7 @@ public class CompactionSupervisor implements Supervisor
SCHEDULER_STOPPED(true),
RUNNING(true),
SUSPENDED(true),
INVALID_SPEC(false),
UNHEALTHY(false);
private final boolean healthy;

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.common.config.Configs;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@ -40,6 +39,7 @@ public class CompactionSupervisorSpec implements SupervisorSpec
private final boolean suspended;
private final DataSourceCompactionConfig spec;
private final CompactionScheduler scheduler;
private final CompactionConfigValidationResult validationResult;
@JsonCreator
public CompactionSupervisorSpec(
@ -48,14 +48,10 @@ public class CompactionSupervisorSpec implements SupervisorSpec
@JacksonInject CompactionScheduler scheduler
)
{
final CompactionConfigValidationResult validationResult = scheduler.validateCompactionConfig(spec);
if (!validationResult.isValid()) {
throw InvalidInput.exception("Compaction supervisor 'spec' is invalid. Reason[%s].", validationResult.getReason());
}
this.spec = spec;
this.suspended = Configs.valueOrDefault(suspended, false);
this.scheduler = scheduler;
this.validationResult = scheduler.validateCompactionConfig(spec);
}
@JsonProperty
@ -77,6 +73,11 @@ public class CompactionSupervisorSpec implements SupervisorSpec
return ID_PREFIX + spec.getDataSource();
}
public CompactionConfigValidationResult getValidationResult()
{
return validationResult;
}
@Override
public CompactionSupervisor createSupervisor()
{

View File

@ -202,7 +202,7 @@ public class OverlordCompactionScheduler implements CompactionScheduler
} else {
return ClientCompactionRunnerInfo.validateCompactionConfig(
compactionConfig,
compactionConfigSupplier.get().getEngine()
supervisorConfig.getEngine()
);
}
}
@ -272,7 +272,7 @@ public class OverlordCompactionScheduler implements CompactionScheduler
private synchronized void runCompactionDuty()
{
final CoordinatorRunStats stats = new CoordinatorRunStats();
duty.run(getLatestConfig(), getCurrentDatasourceTimelines(), stats);
duty.run(getLatestConfig(), getCurrentDatasourceTimelines(), supervisorConfig.getEngine(), stats);
// Emit stats only if emission period has elapsed
if (!sinceStatsEmitted.isRunning() || sinceStatsEmitted.hasElapsed(METRIC_EMISSION_PERIOD)) {
@ -309,7 +309,8 @@ public class OverlordCompactionScheduler implements CompactionScheduler
if (isRunning()) {
return new CompactionRunSimulator(statusTracker, overlordClient).simulateRunWithConfig(
getLatestConfig().withClusterConfig(updateRequest),
getCurrentDatasourceTimelines()
getCurrentDatasourceTimelines(),
supervisorConfig.getEngine()
);
} else {
return new CompactionSimulateResult(Collections.emptyMap());

View File

@ -82,20 +82,29 @@ public class CompactionSupervisorSpecTest
}
@Test
public void testInvalidSpecThrowsException()
public void testGetStatusWithInvalidSpec()
{
Mockito.when(scheduler.validateCompactionConfig(ArgumentMatchers.any()))
.thenReturn(CompactionConfigValidationResult.failure("bad spec"));
final DruidException exception = Assert.assertThrows(
DruidException.class,
() -> new CompactionSupervisorSpec(null, false, scheduler)
);
Assert.assertEquals(
"Compaction supervisor 'spec' is invalid. Reason[bad spec].",
exception.getMessage()
"Compaction supervisor spec is invalid. Reason[bad spec].", new CompactionSupervisorSpec(
new DataSourceCompactionConfig.Builder().forDataSource("datasource").build(),
false,
scheduler
).createSupervisor().getStatus().getPayload().getMessage()
);
}
@Test
public void testGetValidationResultForInvalidSpec()
{
Mockito.when(scheduler.validateCompactionConfig(ArgumentMatchers.any()))
.thenReturn(CompactionConfigValidationResult.failure("bad spec"));
CompactionConfigValidationResult validationResult = new CompactionSupervisorSpec(null, false, scheduler).getValidationResult();
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals("bad spec", validationResult.getReason());
}
@Test
public void testGetIdAndDataSources()
{

View File

@ -120,7 +120,7 @@ public class OverlordCompactionSchedulerTest
serviceEmitter = new StubServiceEmitter();
segmentsMetadataManager = new TestSegmentsMetadataManager();
supervisorConfig = new CompactionSupervisorConfig(true);
supervisorConfig = new CompactionSupervisorConfig(true, null);
compactionConfig = DruidCompactionConfig.empty();
coordinatorOverlordServiceConfig = new CoordinatorOverlordServiceConfig(false, null);
@ -149,7 +149,7 @@ public class OverlordCompactionSchedulerTest
@Test
public void testStartStopWhenSchedulerIsEnabled()
{
supervisorConfig = new CompactionSupervisorConfig(true);
supervisorConfig = new CompactionSupervisorConfig(true, null);
Assert.assertFalse(scheduler.isRunning());
scheduler.start();
@ -168,7 +168,7 @@ public class OverlordCompactionSchedulerTest
@Test
public void testStartStopWhenScheduledIsDisabled()
{
supervisorConfig = new CompactionSupervisorConfig(false);
supervisorConfig = new CompactionSupervisorConfig(false, null);
initScheduler();
Assert.assertFalse(scheduler.isRunning());
@ -183,7 +183,7 @@ public class OverlordCompactionSchedulerTest
@Test
public void testSegmentsAreNotPolledWhenSchedulerIsDisabled()
{
supervisorConfig = new CompactionSupervisorConfig(false);
supervisorConfig = new CompactionSupervisorConfig(false, null);
initScheduler();
verifySegmentPolling(false);
@ -337,7 +337,7 @@ public class OverlordCompactionSchedulerTest
);
final CompactionSimulateResult simulateResult = scheduler.simulateRunWithConfigUpdate(
new ClusterCompactionConfig(null, null, null, null, null)
new ClusterCompactionConfig(null, null, null, null)
);
Assert.assertEquals(1, simulateResult.getCompactionStates().size());
final Table pendingCompactionTable = simulateResult.getCompactionStates().get(CompactionStatus.State.PENDING);
@ -362,7 +362,7 @@ public class OverlordCompactionSchedulerTest
scheduler.stopCompaction(TestDataSource.WIKI);
final CompactionSimulateResult simulateResultWhenDisabled = scheduler.simulateRunWithConfigUpdate(
new ClusterCompactionConfig(null, null, null, null, null)
new ClusterCompactionConfig(null, null, null, null)
);
Assert.assertTrue(simulateResultWhenDisabled.getCompactionStates().isEmpty());

View File

@ -44,9 +44,9 @@ import java.util.Map;
public class OverlordCompactionResourceTest
{
private static final CompactionSupervisorConfig SUPERVISOR_ENABLED
= new CompactionSupervisorConfig(true);
= new CompactionSupervisorConfig(true, null);
private static final CompactionSupervisorConfig SUPERVISOR_DISABLED
= new CompactionSupervisorConfig(false);
= new CompactionSupervisorConfig(false, null);
private CompactionScheduler scheduler;

View File

@ -118,7 +118,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
@DataProvider(name = "engine")
public static Object[][] engine()
{
return new Object[][]{{CompactionEngine.NATIVE}, {CompactionEngine.MSQ}};
return new Object[][]{{CompactionEngine.NATIVE}};
}
@Inject

View File

@ -27,6 +27,7 @@ import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
import org.apache.druid.client.indexing.IndexingWorkerInfo;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.report.TaskReport;
@ -75,7 +76,8 @@ public class CompactionRunSimulator
*/
public CompactionSimulateResult simulateRunWithConfig(
DruidCompactionConfig compactionConfig,
Map<String, SegmentTimeline> datasourceTimelines
Map<String, SegmentTimeline> datasourceTimelines,
CompactionEngine defaultEngine
)
{
final Table compactedIntervals
@ -138,13 +140,14 @@ public class CompactionRunSimulator
// Unlimited task slots to ensure that simulator does not skip any interval
final DruidCompactionConfig configWithUnlimitedTaskSlots = compactionConfig.withClusterConfig(
new ClusterCompactionConfig(1.0, Integer.MAX_VALUE, null, null, null)
new ClusterCompactionConfig(1.0, Integer.MAX_VALUE, null, null)
);
final CoordinatorRunStats stats = new CoordinatorRunStats();
new CompactSegments(simulationStatusTracker, readOnlyOverlordClient).run(
configWithUnlimitedTaskSlots,
datasourceTimelines,
defaultEngine,
stats
);

View File

@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.server.compaction.CompactionStatistics;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.Objects;
@ -41,6 +42,8 @@ public class AutoCompactionSnapshot
@JsonProperty
private final AutoCompactionScheduleStatus scheduleStatus;
@JsonProperty
private final String message;
@JsonProperty
private final long bytesAwaitingCompaction;
@JsonProperty
private final long bytesCompacted;
@ -68,6 +71,7 @@ public class AutoCompactionSnapshot
public AutoCompactionSnapshot(
@JsonProperty("dataSource") @NotNull String dataSource,
@JsonProperty("scheduleStatus") @NotNull AutoCompactionScheduleStatus scheduleStatus,
@JsonProperty("message") @Nullable String message,
@JsonProperty("bytesAwaitingCompaction") long bytesAwaitingCompaction,
@JsonProperty("bytesCompacted") long bytesCompacted,
@JsonProperty("bytesSkipped") long bytesSkipped,
@ -81,6 +85,7 @@ public class AutoCompactionSnapshot
{
this.dataSource = dataSource;
this.scheduleStatus = scheduleStatus;
this.message = message;
this.bytesAwaitingCompaction = bytesAwaitingCompaction;
this.bytesCompacted = bytesCompacted;
this.bytesSkipped = bytesSkipped;
@ -104,6 +109,12 @@ public class AutoCompactionSnapshot
return scheduleStatus;
}
@Nullable
public String getMessage()
{
return message;
}
public long getBytesAwaitingCompaction()
{
return bytesAwaitingCompaction;
@ -169,7 +180,8 @@ public class AutoCompactionSnapshot
intervalCountCompacted == that.intervalCountCompacted &&
intervalCountSkipped == that.intervalCountSkipped &&
dataSource.equals(that.dataSource) &&
scheduleStatus == that.scheduleStatus;
scheduleStatus == that.scheduleStatus &&
Objects.equals(message, that.message);
}
@Override
@ -178,6 +190,7 @@ public class AutoCompactionSnapshot
return Objects.hash(
dataSource,
scheduleStatus,
message,
bytesAwaitingCompaction,
bytesCompacted,
bytesSkipped,
@ -194,6 +207,7 @@ public class AutoCompactionSnapshot
{
private final String dataSource;
private AutoCompactionScheduleStatus scheduleStatus;
private String message;
private final CompactionStatistics compactedStats = new CompactionStatistics();
private final CompactionStatistics skippedStats = new CompactionStatistics();
@ -215,6 +229,12 @@ public class AutoCompactionSnapshot
return this;
}
public Builder withMessage(String message)
{
this.message = message;
return this;
}
public void incrementWaitingStats(CompactionStatistics entry)
{
waitingStats.increment(entry);
@ -235,6 +255,7 @@ public class AutoCompactionSnapshot
return new AutoCompactionSnapshot(
dataSource,
scheduleStatus,
message,
waitingStats.getTotalBytes(),
compactedStats.getTotalBytes(),
skippedStats.getTotalBytes(),

View File

@ -21,7 +21,6 @@ package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy;
import javax.annotation.Nullable;
@ -37,7 +36,6 @@ public class ClusterCompactionConfig
private final Double compactionTaskSlotRatio;
private final Integer maxCompactionTaskSlots;
private final Boolean useAutoScaleSlots;
private final CompactionEngine engine;
private final CompactionCandidateSearchPolicy compactionPolicy;
@JsonCreator
@ -45,7 +43,6 @@ public class ClusterCompactionConfig
@JsonProperty("compactionTaskSlotRatio") @Nullable Double compactionTaskSlotRatio,
@JsonProperty("maxCompactionTaskSlots") @Nullable Integer maxCompactionTaskSlots,
@JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots,
@JsonProperty("engine") @Nullable CompactionEngine engine,
@JsonProperty("compactionPolicy") @Nullable CompactionCandidateSearchPolicy compactionPolicy
)
{
@ -53,7 +50,6 @@ public class ClusterCompactionConfig
this.maxCompactionTaskSlots = maxCompactionTaskSlots;
this.useAutoScaleSlots = useAutoScaleSlots;
this.compactionPolicy = compactionPolicy;
this.engine = engine;
}
@Nullable
@ -77,13 +73,6 @@ public class ClusterCompactionConfig
return useAutoScaleSlots;
}
@Nullable
@JsonProperty
public CompactionEngine getEngine()
{
return engine;
}
@Nullable
@JsonProperty
public CompactionCandidateSearchPolicy getCompactionPolicy()
@ -104,8 +93,7 @@ public class ClusterCompactionConfig
return Objects.equals(compactionTaskSlotRatio, that.compactionTaskSlotRatio)
&& Objects.equals(maxCompactionTaskSlots, that.maxCompactionTaskSlots)
&& Objects.equals(useAutoScaleSlots, that.useAutoScaleSlots)
&& Objects.equals(compactionPolicy, that.compactionPolicy)
&& engine == that.engine;
&& Objects.equals(compactionPolicy, that.compactionPolicy);
}
@Override
@ -115,8 +103,7 @@ public class ClusterCompactionConfig
compactionTaskSlotRatio,
maxCompactionTaskSlots,
useAutoScaleSlots,
compactionPolicy,
engine
compactionPolicy
);
}
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.common.config.Configs;
import org.apache.druid.indexer.CompactionEngine;
import javax.annotation.Nullable;
import java.util.Objects;
@ -33,10 +34,12 @@ import java.util.Objects;
*/
public class CompactionSupervisorConfig
{
private static final CompactionSupervisorConfig DEFAULT = new CompactionSupervisorConfig(null);
private static final CompactionSupervisorConfig DEFAULT = new CompactionSupervisorConfig(null, null);
@JsonProperty
private final boolean enabled;
@JsonProperty
private final CompactionEngine engine;
public static CompactionSupervisorConfig defaultConfig()
{
@ -45,10 +48,12 @@ public class CompactionSupervisorConfig
@JsonCreator
public CompactionSupervisorConfig(
@JsonProperty("enabled") @Nullable Boolean enabled
@JsonProperty("enabled") @Nullable Boolean enabled,
@JsonProperty("engine") @Nullable CompactionEngine engine
)
{
this.enabled = Configs.valueOrDefault(enabled, false);
this.engine = Configs.valueOrDefault(engine, CompactionEngine.NATIVE);
}
public boolean isEnabled()
@ -56,6 +61,11 @@ public class CompactionSupervisorConfig
return enabled;
}
public CompactionEngine getEngine()
{
return engine;
}
@Override
public boolean equals(Object o)
{
@ -66,13 +76,13 @@ public class CompactionSupervisorConfig
return false;
}
CompactionSupervisorConfig that = (CompactionSupervisorConfig) o;
return enabled == that.enabled;
return enabled == that.enabled && engine == that.engine;
}
@Override
public int hashCode()
{
return Objects.hashCode(enabled);
return Objects.hash(enabled, engine);
}
@Override
@ -80,6 +90,7 @@ public class CompactionSupervisorConfig
{
return "CompactionSchedulerConfig{" +
"enabled=" + enabled +
"engine=" + engine +
'}';
}
}

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import org.apache.druid.common.config.Configs;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy;
import org.apache.druid.server.compaction.NewestSegmentFirstPolicy;
@ -43,13 +42,12 @@ public class DruidCompactionConfig
private static final CompactionCandidateSearchPolicy DEFAULT_COMPACTION_POLICY
= new NewestSegmentFirstPolicy(null);
private static final DruidCompactionConfig EMPTY_INSTANCE
= new DruidCompactionConfig(Collections.emptyList(), null, null, null, null, null);
= new DruidCompactionConfig(Collections.emptyList(), null, null, null, null);
private final List<DataSourceCompactionConfig> compactionConfigs;
private final double compactionTaskSlotRatio;
private final int maxCompactionTaskSlots;
private final boolean useAutoScaleSlots;
private final CompactionEngine engine;
private final CompactionCandidateSearchPolicy compactionPolicy;
public DruidCompactionConfig withDatasourceConfigs(
@ -61,7 +59,6 @@ public class DruidCompactionConfig
compactionTaskSlotRatio,
maxCompactionTaskSlots,
useAutoScaleSlots,
engine,
compactionPolicy
);
}
@ -75,7 +72,6 @@ public class DruidCompactionConfig
Configs.valueOrDefault(update.getCompactionTaskSlotRatio(), compactionTaskSlotRatio),
Configs.valueOrDefault(update.getMaxCompactionTaskSlots(), maxCompactionTaskSlots),
Configs.valueOrDefault(update.getUseAutoScaleSlots(), useAutoScaleSlots),
Configs.valueOrDefault(update.getEngine(), engine),
Configs.valueOrDefault(update.getCompactionPolicy(), compactionPolicy)
);
}
@ -98,7 +94,6 @@ public class DruidCompactionConfig
@JsonProperty("compactionTaskSlotRatio") @Nullable Double compactionTaskSlotRatio,
@JsonProperty("maxCompactionTaskSlots") @Nullable Integer maxCompactionTaskSlots,
@JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots,
@JsonProperty("engine") @Nullable CompactionEngine compactionEngine,
@JsonProperty("compactionPolicy") @Nullable CompactionCandidateSearchPolicy compactionPolicy
)
{
@ -106,7 +101,6 @@ public class DruidCompactionConfig
this.compactionTaskSlotRatio = Configs.valueOrDefault(compactionTaskSlotRatio, 0.1);
this.maxCompactionTaskSlots = Configs.valueOrDefault(maxCompactionTaskSlots, Integer.MAX_VALUE);
this.useAutoScaleSlots = Configs.valueOrDefault(useAutoScaleSlots, false);
this.engine = Configs.valueOrDefault(compactionEngine, CompactionEngine.NATIVE);
this.compactionPolicy = Configs.valueOrDefault(compactionPolicy, DEFAULT_COMPACTION_POLICY);
}
@ -134,12 +128,6 @@ public class DruidCompactionConfig
return useAutoScaleSlots;
}
@JsonProperty
public CompactionEngine getEngine()
{
return engine;
}
// Null-safe getters not used for serialization
public ClusterCompactionConfig clusterConfig()
@ -148,7 +136,6 @@ public class DruidCompactionConfig
compactionTaskSlotRatio,
maxCompactionTaskSlots,
useAutoScaleSlots,
engine,
compactionPolicy
);
}
@ -189,7 +176,6 @@ public class DruidCompactionConfig
return Double.compare(that.compactionTaskSlotRatio, compactionTaskSlotRatio) == 0 &&
maxCompactionTaskSlots == that.maxCompactionTaskSlots &&
useAutoScaleSlots == that.useAutoScaleSlots &&
engine == that.engine &&
Objects.equals(compactionPolicy, that.compactionPolicy) &&
Objects.equals(compactionConfigs, that.compactionConfigs);
}
@ -202,7 +188,6 @@ public class DruidCompactionConfig
compactionTaskSlotRatio,
maxCompactionTaskSlots,
useAutoScaleSlots,
engine,
compactionPolicy
);
}
@ -215,7 +200,6 @@ public class DruidCompactionConfig
", compactionTaskSlotRatio=" + compactionTaskSlotRatio +
", maxCompactionTaskSlots=" + maxCompactionTaskSlots +
", useAutoScaleSlots=" + useAutoScaleSlots +
", engine=" + engine +
", compactionPolicy=" + compactionPolicy +
'}';
}

View File

@ -40,6 +40,7 @@ import org.apache.druid.curator.discovery.ServiceAnnouncer;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
@ -362,7 +363,8 @@ public class DruidCoordinator
metadataManager.configs().getCurrentCompactionConfig().withClusterConfig(updateRequest),
metadataManager.segments()
.getSnapshotOfDataSourcesWithAllUsedSegments()
.getUsedSegmentsTimelinesPerDataSource()
.getUsedSegmentsTimelinesPerDataSource(),
CompactionEngine.NATIVE
);
}

View File

@ -120,9 +120,11 @@ public class CompactSegments implements CoordinatorCustomDuty
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
// Coordinator supports only native engine for compaction
run(
params.getCompactionConfig(),
params.getUsedSegmentsTimelinesPerDataSource(),
CompactionEngine.NATIVE,
params.getCoordinatorStats()
);
return params;
@ -131,6 +133,7 @@ public class CompactSegments implements CoordinatorCustomDuty
public void run(
DruidCompactionConfig dynamicConfig,
Map<String, SegmentTimeline> dataSources,
CompactionEngine defaultEngine,
CoordinatorRunStats stats
)
{
@ -234,7 +237,7 @@ public class CompactSegments implements CoordinatorCustomDuty
currentRunAutoCompactionSnapshotBuilders,
availableCompactionTaskSlots,
iterator,
dynamicConfig.getEngine()
defaultEngine
);
stats.add(Stats.Compaction.MAX_SLOTS, compactionTaskCapacity);

View File

@ -27,22 +27,20 @@ import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.audit.AuditEntry;
import org.apache.druid.audit.AuditInfo;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
import org.apache.druid.common.config.ConfigManager.SetResult;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.error.NotFound;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.CoordinatorConfigManager;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfigHistory;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
import org.apache.druid.server.http.security.ConfigResourceFilter;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Interval;
import javax.servlet.http.HttpServletRequest;
@ -100,30 +98,7 @@ public class CoordinatorCompactionConfigsResource
@Context HttpServletRequest req
)
{
UnaryOperator<DruidCompactionConfig> operator = current -> {
final DruidCompactionConfig newConfig = current.withClusterConfig(updatePayload);
final List<DataSourceCompactionConfig> datasourceConfigs = newConfig.getCompactionConfigs();
if (CollectionUtils.isNullOrEmpty(datasourceConfigs)
|| current.getEngine() == newConfig.getEngine()) {
return newConfig;
}
// Validate all the datasource configs against the new engine
for (DataSourceCompactionConfig datasourceConfig : datasourceConfigs) {
CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(datasourceConfig, newConfig.getEngine());
if (!validationResult.isValid()) {
throw InvalidInput.exception(
"Cannot update engine to [%s] as it does not support"
+ " compaction config of DataSource[%s]. Reason[%s].",
newConfig.getEngine(), datasourceConfig.getDataSource(), validationResult.getReason()
);
}
}
return newConfig;
};
UnaryOperator<DruidCompactionConfig> operator = current -> current.withClusterConfig(updatePayload);
return updateConfigHelper(operator, AuthorizationUtils.buildAuditInfo(req));
}
@ -146,7 +121,6 @@ public class CoordinatorCompactionConfigsResource
compactionTaskSlotRatio,
maxCompactionTaskSlots,
useAutoScaleSlots,
null,
null
),
req
@ -161,12 +135,11 @@ public class CoordinatorCompactionConfigsResource
)
{
UnaryOperator<DruidCompactionConfig> callable = current -> {
CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(newConfig, current.getEngine());
if (validationResult.isValid()) {
return current.withDatasourceConfig(newConfig);
if (newConfig.getEngine() == CompactionEngine.MSQ) {
throw InvalidInput.exception(
"MSQ engine in compaction config only supported with supervisor-based compaction on the Overlord.");
} else {
throw InvalidInput.exception("Compaction config not supported. Reason[%s].", validationResult.getReason());
return current.withDatasourceConfig(newConfig);
}
};
return updateConfigHelper(

View File

@ -24,6 +24,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.jackson.DefaultObjectMapper;
@ -76,7 +77,8 @@ public class CompactionRunSimulatorTest
DataSourceCompactionConfig.builder().forDataSource("wiki").build()
),
segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments()
.getUsedSegmentsTimelinesPerDataSource()
.getUsedSegmentsTimelinesPerDataSource(),
CompactionEngine.NATIVE
);
Assert.assertNotNull(simulateResult);

View File

@ -29,6 +29,7 @@ public class AutoCompactionSnapshotTest
public void testAutoCompactionSnapshotBuilder()
{
final String expectedDataSource = "data";
final String expectedMessage = "message";
final AutoCompactionSnapshot.Builder builder = AutoCompactionSnapshot.builder(expectedDataSource);
// Increment every stat twice
@ -38,7 +39,7 @@ public class AutoCompactionSnapshotTest
builder.incrementCompactedStats(CompactionStatistics.create(13, 13, 13));
}
final AutoCompactionSnapshot actual = builder.build();
final AutoCompactionSnapshot actual = builder.withMessage(expectedMessage).build();
Assert.assertNotNull(actual);
Assert.assertEquals(26, actual.getSegmentCountSkipped());
@ -52,10 +53,12 @@ public class AutoCompactionSnapshotTest
Assert.assertEquals(26, actual.getSegmentCountAwaitingCompaction());
Assert.assertEquals(AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING, actual.getScheduleStatus());
Assert.assertEquals(expectedDataSource, actual.getDataSource());
Assert.assertEquals(expectedMessage, actual.getMessage());
AutoCompactionSnapshot expected = new AutoCompactionSnapshot(
expectedDataSource,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
expectedMessage,
26,
26,
26,

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
public class CompactionSupervisorConfigTest
{
private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
@Test
public void testCompactionSupervisorConfigSerde() throws JsonProcessingException
{
final boolean enabled = true;
final CompactionEngine defaultEngine = CompactionEngine.MSQ;
CompactionSupervisorConfig compactionSupervisorConfig =
OBJECT_MAPPER.readValue(
OBJECT_MAPPER.writeValueAsString(ImmutableMap.of("enabled", enabled, "engine", defaultEngine)),
CompactionSupervisorConfig.class
);
Assert.assertEquals(new CompactionSupervisorConfig(enabled, defaultEngine), compactionSupervisorConfig);
}
@Test
public void testCompactionSupervisorConfigEquality()
{
Assert.assertEquals(
new CompactionSupervisorConfig(true, CompactionEngine.MSQ),
new CompactionSupervisorConfig(true, CompactionEngine.MSQ)
);
Assert.assertNotEquals(
new CompactionSupervisorConfig(true, CompactionEngine.NATIVE),
new CompactionSupervisorConfig(true, CompactionEngine.MSQ)
);
Assert.assertNotEquals(new CompactionSupervisorConfig(true, CompactionEngine.NATIVE), "true");
}
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.server.coordinator;
import org.apache.druid.audit.AuditInfo;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.TestDataSource;
import org.junit.Assert;
@ -34,7 +33,7 @@ public class DataSourceCompactionConfigAuditEntryTest
private final AuditInfo auditInfo = new AuditInfo("author", "identity", "comment", "ip");
private final DataSourceCompactionConfigAuditEntry firstEntry = new DataSourceCompactionConfigAuditEntry(
new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.MSQ, null),
new ClusterCompactionConfig(0.1, 9, true, null),
DataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(),
auditInfo,
DateTimes.nowUtc()
@ -44,7 +43,7 @@ public class DataSourceCompactionConfigAuditEntryTest
public void testhasSameConfigWithSameBaseConfigIsTrue()
{
final DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry(
new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.MSQ, null),
new ClusterCompactionConfig(0.1, 9, true, null),
DataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(),
auditInfo,
DateTimes.nowUtc()
@ -57,7 +56,7 @@ public class DataSourceCompactionConfigAuditEntryTest
public void testhasSameConfigWithDifferentClusterConfigIsFalse()
{
DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry(
new ClusterCompactionConfig(0.1, 9, false, CompactionEngine.MSQ, null),
new ClusterCompactionConfig(0.2, 9, false, null),
DataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(),
auditInfo,
DateTimes.nowUtc()
@ -66,7 +65,7 @@ public class DataSourceCompactionConfigAuditEntryTest
Assert.assertFalse(secondEntry.hasSameConfig(firstEntry));
secondEntry = new DataSourceCompactionConfigAuditEntry(
new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.NATIVE, null),
new ClusterCompactionConfig(0.1, 10, true, null),
DataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(),
auditInfo,
DateTimes.nowUtc()
@ -79,8 +78,8 @@ public class DataSourceCompactionConfigAuditEntryTest
public void testhasSameConfigWithDifferentDatasourceConfigIsFalse()
{
DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry(
new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.NATIVE, null),
DataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(),
new ClusterCompactionConfig(0.1, 9, true, null),
DataSourceCompactionConfig.builder().forDataSource(TestDataSource.KOALA).build(),
auditInfo,
DateTimes.nowUtc()
);
@ -92,7 +91,7 @@ public class DataSourceCompactionConfigAuditEntryTest
public void testhasSameConfigWithNullDatasourceConfigIsFalse()
{
final DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry(
new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.NATIVE, null),
new ClusterCompactionConfig(0.1, 9, true, null),
null,
auditInfo,
DateTimes.nowUtc()

View File

@ -20,7 +20,6 @@
package org.apache.druid.server.coordinator;
import org.apache.druid.audit.AuditInfo;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.TestDataSource;
import org.joda.time.DateTime;
@ -178,7 +177,7 @@ public class DataSourceCompactionConfigHistoryTest
wikiAuditHistory.add(originalConfig, auditInfo, DateTimes.nowUtc());
final DruidCompactionConfig updatedConfig = originalConfig.withClusterConfig(
new ClusterCompactionConfig(null, null, null, CompactionEngine.MSQ, null)
new ClusterCompactionConfig(0.2, null, null, null)
);
wikiAuditHistory.add(updatedConfig, auditInfo, DateTimes.nowUtc());

View File

@ -64,7 +64,6 @@ public class DruidCompactionConfigTest
null,
null,
null,
CompactionEngine.MSQ,
null
);
@ -82,7 +81,6 @@ public class DruidCompactionConfigTest
0.5,
10,
false,
CompactionEngine.MSQ,
new NewestSegmentFirstPolicy(null)
);
final DruidCompactionConfig copy = config.withClusterConfig(clusterConfig);

View File

@ -50,6 +50,7 @@ import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.compaction.CompactionSimulateResult;
import org.apache.druid.server.compaction.CompactionStatusTracker;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
@ -930,6 +931,23 @@ public class DruidCoordinatorTest extends CuratorTestBase
EasyMock.verify(metadataRuleManager);
}
@Test
public void testSimulateRunWithEmptyDatasourceCompactionConfigs()
{
DruidDataSource dataSource = new DruidDataSource("dataSource", Collections.emptyMap());
DataSourcesSnapshot dataSourcesSnapshot =
new DataSourcesSnapshot(ImmutableMap.of(dataSource.getName(), dataSource.toImmutableDruidDataSource()));
EasyMock
.expect(segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments())
.andReturn(dataSourcesSnapshot)
.anyTimes();
EasyMock.replay(segmentsMetadataManager);
CompactionSimulateResult result = coordinator.simulateRunWithConfigUpdate(
new ClusterCompactionConfig(0.2, null, null, null)
);
Assert.assertEquals(Collections.emptyMap(), result.getCompactionStates());
}
private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(
int latchCount,
PathChildrenCache pathChildrenCache,

View File

@ -1888,7 +1888,6 @@ public class CompactSegmentsTest
numCompactionTaskSlots == null ? null : 1.0, // 100% when numCompactionTaskSlots is not null
numCompactionTaskSlots,
useAutoScaleSlots,
null,
null
)
)

View File

@ -106,14 +106,13 @@ public class CoordinatorCompactionConfigsResourceTest
Assert.assertEquals(Integer.MAX_VALUE, defaultConfig.getMaxCompactionTaskSlots());
Assert.assertFalse(defaultConfig.isUseAutoScaleSlots());
Assert.assertTrue(defaultConfig.getCompactionConfigs().isEmpty());
Assert.assertEquals(CompactionEngine.NATIVE, defaultConfig.getEngine());
}
@Test
public void testUpdateClusterConfig()
{
Response response = resource.updateClusterCompactionConfig(
new ClusterCompactionConfig(0.5, 10, true, CompactionEngine.MSQ, null),
new ClusterCompactionConfig(0.5, 10, true, null),
mockHttpServletRequest
);
verifyStatus(Response.Status.OK, response);
@ -127,7 +126,6 @@ public class CoordinatorCompactionConfigsResourceTest
Assert.assertEquals(0.5, updatedConfig.getCompactionTaskSlotRatio(), DELTA);
Assert.assertEquals(10, updatedConfig.getMaxCompactionTaskSlots());
Assert.assertTrue(updatedConfig.isUseAutoScaleSlots());
Assert.assertEquals(CompactionEngine.MSQ, updatedConfig.getEngine());
}
@Test
@ -149,7 +147,6 @@ public class CoordinatorCompactionConfigsResourceTest
// Verify that the other fields are unchanged
Assert.assertEquals(defaultConfig.getCompactionConfigs(), updatedConfig.getCompactionConfigs());
Assert.assertEquals(defaultConfig.getEngine(), updatedConfig.getEngine());
}
@Test
@ -177,6 +174,23 @@ public class CoordinatorCompactionConfigsResourceTest
Assert.assertEquals(newDatasourceConfig, fullCompactionConfig.getCompactionConfigs().get(0));
}
@Test
public void testAddDatasourceConfigWithMSQEngineIsInvalid()
{
final DataSourceCompactionConfig newDatasourceConfig
= DataSourceCompactionConfig.builder()
.forDataSource(TestDataSource.WIKI)
.withEngine(CompactionEngine.MSQ)
.build();
Response response = resource.addOrUpdateDatasourceCompactionConfig(newDatasourceConfig, mockHttpServletRequest);
verifyStatus(Response.Status.BAD_REQUEST, response);
Assert.assertTrue(response.getEntity() instanceof ErrorResponse);
Assert.assertEquals(
"MSQ engine in compaction config only supported with supervisor-based compaction on the Overlord.",
((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage()
);
}
@Test
public void testUpdateDatasourceConfig()
{
@ -209,16 +223,16 @@ public class CoordinatorCompactionConfigsResourceTest
.build();
response = resource.addOrUpdateDatasourceCompactionConfig(updatedDatasourceConfig, mockHttpServletRequest);
verifyStatus(Response.Status.OK, response);
verifyStatus(Response.Status.BAD_REQUEST, response);
final DataSourceCompactionConfig latestDatasourceConfig
= verifyAndGetPayload(resource.getDatasourceCompactionConfig(TestDataSource.WIKI), DataSourceCompactionConfig.class);
Assert.assertEquals(updatedDatasourceConfig, latestDatasourceConfig);
Assert.assertEquals(originalDatasourceConfig, latestDatasourceConfig);
final DruidCompactionConfig fullCompactionConfig
= verifyAndGetPayload(resource.getCompactionConfig(), DruidCompactionConfig.class);
Assert.assertEquals(1, fullCompactionConfig.getCompactionConfigs().size());
Assert.assertEquals(updatedDatasourceConfig, fullCompactionConfig.getCompactionConfigs().get(0));
Assert.assertEquals(originalDatasourceConfig, fullCompactionConfig.getCompactionConfigs().get(0));
}
@Test
@ -285,7 +299,7 @@ public class CoordinatorCompactionConfigsResourceTest
resource.addOrUpdateDatasourceCompactionConfig(configV2, mockHttpServletRequest);
final DataSourceCompactionConfig configV3 = builder
.withEngine(CompactionEngine.MSQ)
.withEngine(CompactionEngine.NATIVE)
.withSkipOffsetFromLatest(Period.hours(1))
.build();
resource.addOrUpdateDatasourceCompactionConfig(configV3, mockHttpServletRequest);
@ -323,36 +337,10 @@ public class CoordinatorCompactionConfigsResourceTest
verifyStatus(Response.Status.BAD_REQUEST, response);
Assert.assertTrue(response.getEntity() instanceof ErrorResponse);
Assert.assertEquals(
"Compaction config not supported. Reason[MSQ: Context maxNumTasks[1]"
+ " must be at least 2 (1 controller + 1 worker)].",
"MSQ engine in compaction config only supported with supervisor-based compaction on the Overlord.",
((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage()
);
}
@Test
public void testUpdateEngineToMSQWithInvalidDatasourceConfigThrowsBadRequest()
{
final DataSourceCompactionConfig datasourceConfig = DataSourceCompactionConfig
.builder()
.forDataSource(TestDataSource.WIKI)
.withTaskContext(Collections.singletonMap(ClientMSQContext.CTX_MAX_NUM_TASKS, 1))
.build();
Response response = resource.addOrUpdateDatasourceCompactionConfig(datasourceConfig, mockHttpServletRequest);
verifyStatus(Response.Status.OK, response);
response = resource.updateClusterCompactionConfig(
new ClusterCompactionConfig(null, null, null, CompactionEngine.MSQ, null),
mockHttpServletRequest
);
verifyStatus(Response.Status.BAD_REQUEST, response);
Assert.assertTrue(response.getEntity() instanceof ErrorResponse);
Assert.assertEquals(
"Cannot update engine to [msq] as it does not support compaction config of DataSource[wiki]."
+ " Reason[MSQ: Context maxNumTasks[1] must be at least 2 (1 controller + 1 worker)].",
((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage()
);
}
@SuppressWarnings("unchecked")
private <T> T verifyAndGetPayload(Response response, Class<T> type)
{

View File

@ -50,6 +50,7 @@ public class CoordinatorCompactionResourceTest
private final AutoCompactionSnapshot expectedSnapshot = new AutoCompactionSnapshot(
dataSourceName,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
null,
1,
1,
1,