Refactor: Clean up compaction config classes (#16810)

Changes:
- Rename `CoordinatorCompactionConfig` to `DruidCompactionConfig`
- Rename `CompactionConfigUpdateRequest` to `ClusterCompactionConfig`
- Refactor methods in `DruidCompactionConfig`
- Clean up `DataSourceCompactionConfigHistory` and its tests
- Clean up tests and add new tests
- Change API path `/druid/coordinator/v1/config/global` to `/druid/coordinator/v1/config/cluster`
This commit is contained in:
Kashif Faraz 2024-07-29 23:47:25 -07:00 committed by GitHub
parent 92a40d8169
commit 954aaafe0c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 759 additions and 836 deletions

View File

@ -28,8 +28,8 @@ import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.TestClient;
import org.jboss.netty.handler.codec.http.HttpMethod;
@ -86,7 +86,7 @@ public class CompactionResourceTestClient
}
}
public void deleteCompactionConfig(final String dataSource) throws Exception
public void deleteDataSourceCompactionConfig(final String dataSource) throws Exception
{
String url = StringUtils.format("%sconfig/compaction/%s", getCoordinatorURL(), StringUtils.urlEncode(dataSource));
StatusResponseHolder response = httpClient.go(new Request(HttpMethod.DELETE, new URL(url)), responseHandler).get();
@ -100,7 +100,7 @@ public class CompactionResourceTestClient
}
}
public CoordinatorCompactionConfig getCoordinatorCompactionConfigs() throws Exception
public DruidCompactionConfig getCompactionConfig() throws Exception
{
String url = StringUtils.format("%sconfig/compaction", getCoordinatorURL());
StatusResponseHolder response = httpClient.go(
@ -113,7 +113,7 @@ public class CompactionResourceTestClient
response.getContent()
);
}
return jsonMapper.readValue(response.getContent(), new TypeReference<CoordinatorCompactionConfig>() {});
return jsonMapper.readValue(response.getContent(), new TypeReference<DruidCompactionConfig>() {});
}
public DataSourceCompactionConfig getDataSourceCompactionConfig(String dataSource) throws Exception

View File

@ -1,83 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testing.utils;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.joda.time.Period;
/**
* Contains utility methods for Compaction.
*/
public class CompactionUtil
{
private CompactionUtil()
{
// no instantiation
}
public static DataSourceCompactionConfig createCompactionConfig(
String fullDatasourceName,
Integer maxRowsPerSegment,
Period skipOffsetFromLatest
)
{
return new DataSourceCompactionConfig(
fullDatasourceName,
null,
null,
null,
skipOffsetFromLatest,
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
new MaxSizeSplitHintSpec(null, 1),
new DynamicPartitionsSpec(maxRowsPerSegment, null),
null,
null,
null,
null,
null,
1,
null,
null,
null,
null,
null,
1,
null
),
null,
null,
null,
null,
new UserCompactionTaskIOConfig(true),
null,
null
);
}
}

View File

@ -27,12 +27,11 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
import org.apache.druid.testing.clients.CompactionResourceTestClient;
import org.apache.druid.testing.clients.TaskResponseObject;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.CompactionUtil;
import org.apache.druid.testing.utils.EventSerializer;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.KafkaUtil;
@ -305,26 +304,26 @@ public class ITAutoCompactionLockContentionTest extends AbstractKafkaIndexingSer
*/
private void submitAndVerifyCompactionConfig() throws Exception
{
final DataSourceCompactionConfig compactionConfig = CompactionUtil
.createCompactionConfig(fullDatasourceName, Specs.MAX_ROWS_PER_SEGMENT, Period.ZERO);
final DataSourceCompactionConfig dataSourceCompactionConfig = DataSourceCompactionConfig
.builder()
.forDataSource(fullDatasourceName)
.withSkipOffsetFromLatest(Period.ZERO)
.withMaxRowsPerSegment(Specs.MAX_ROWS_PER_SEGMENT)
.build();
compactionResource.updateCompactionTaskSlot(0.5, 10, null);
compactionResource.submitCompactionConfig(compactionConfig);
compactionResource.submitCompactionConfig(dataSourceCompactionConfig);
// Wait for compaction config to persist
Thread.sleep(2000);
// Verify that the compaction config is updated correctly.
CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
DataSourceCompactionConfig observedCompactionConfig = null;
for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) {
if (dataSourceCompactionConfig.getDataSource().equals(fullDatasourceName)) {
observedCompactionConfig = dataSourceCompactionConfig;
}
}
Assert.assertEquals(observedCompactionConfig, compactionConfig);
DruidCompactionConfig compactionConfig = compactionResource.getCompactionConfig();
DataSourceCompactionConfig observedCompactionConfig
= compactionConfig.findConfigForDatasource(fullDatasourceName).orNull();
Assert.assertEquals(observedCompactionConfig, dataSourceCompactionConfig);
observedCompactionConfig = compactionResource.getDataSourceCompactionConfig(fullDatasourceName);
Assert.assertEquals(observedCompactionConfig, compactionConfig);
Assert.assertEquals(observedCompactionConfig, dataSourceCompactionConfig);
}
/**

View File

@ -54,8 +54,8 @@ import org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregat
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
@ -1605,17 +1605,17 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
{
// First try update without useAutoScaleSlots
updateCompactionTaskSlot(3, 5, null);
CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
DruidCompactionConfig compactionConfig = compactionResource.getCompactionConfig();
// Should be default value which is false
Assert.assertFalse(coordinatorCompactionConfig.isUseAutoScaleSlots());
Assert.assertFalse(compactionConfig.isUseAutoScaleSlots());
// Now try update from default value to useAutoScaleSlots=true
updateCompactionTaskSlot(3, 5, true);
coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
Assert.assertTrue(coordinatorCompactionConfig.isUseAutoScaleSlots());
compactionConfig = compactionResource.getCompactionConfig();
Assert.assertTrue(compactionConfig.isUseAutoScaleSlots());
// Now try update from useAutoScaleSlots=true to useAutoScaleSlots=false
updateCompactionTaskSlot(3, 5, false);
coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
Assert.assertFalse(coordinatorCompactionConfig.isUseAutoScaleSlots());
compactionConfig = compactionResource.getCompactionConfig();
Assert.assertFalse(compactionConfig.isUseAutoScaleSlots());
}
private void loadData(String indexTask) throws Exception
@ -1802,7 +1802,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
@Nullable CompactionEngine engine
) throws Exception
{
DataSourceCompactionConfig compactionConfig = new DataSourceCompactionConfig(
DataSourceCompactionConfig dataSourceCompactionConfig = new DataSourceCompactionConfig(
fullDatasourceName,
null,
null,
@ -1837,19 +1837,15 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
engine,
ImmutableMap.of("maxNumTasks", 2)
);
compactionResource.submitCompactionConfig(compactionConfig);
compactionResource.submitCompactionConfig(dataSourceCompactionConfig);
// Wait for compaction config to persist
Thread.sleep(2000);
// Verify that the compaction config is updated correctly.
CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
DataSourceCompactionConfig foundDataSourceCompactionConfig = null;
for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) {
if (dataSourceCompactionConfig.getDataSource().equals(fullDatasourceName)) {
foundDataSourceCompactionConfig = dataSourceCompactionConfig;
}
}
DruidCompactionConfig compactionConfig = compactionResource.getCompactionConfig();
DataSourceCompactionConfig foundDataSourceCompactionConfig
= compactionConfig.findConfigForDatasource(fullDatasourceName).orNull();
Assert.assertNotNull(foundDataSourceCompactionConfig);
Assert.assertNotNull(foundDataSourceCompactionConfig.getTuningConfig());
Assert.assertEquals(foundDataSourceCompactionConfig.getTuningConfig().getPartitionsSpec(), partitionsSpec);
@ -1864,16 +1860,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
private void deleteCompactionConfig() throws Exception
{
compactionResource.deleteCompactionConfig(fullDatasourceName);
compactionResource.deleteDataSourceCompactionConfig(fullDatasourceName);
// Verify that the compaction config is updated correctly.
CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
DataSourceCompactionConfig foundDataSourceCompactionConfig = null;
for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) {
if (dataSourceCompactionConfig.getDataSource().equals(fullDatasourceName)) {
foundDataSourceCompactionConfig = dataSourceCompactionConfig;
}
}
DruidCompactionConfig compactionConfig = compactionResource.getCompactionConfig();
DataSourceCompactionConfig foundDataSourceCompactionConfig
= compactionConfig.findConfigForDatasource(fullDatasourceName).orNull();
Assert.assertNull(foundDataSourceCompactionConfig);
}
@ -1955,11 +1947,11 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
{
compactionResource.updateCompactionTaskSlot(compactionTaskSlotRatio, maxCompactionTaskSlots, useAutoScaleSlots);
// Verify that the compaction config is updated correctly.
CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
Assert.assertEquals(coordinatorCompactionConfig.getCompactionTaskSlotRatio(), compactionTaskSlotRatio);
Assert.assertEquals(coordinatorCompactionConfig.getMaxCompactionTaskSlots(), maxCompactionTaskSlots);
DruidCompactionConfig compactionConfig = compactionResource.getCompactionConfig();
Assert.assertEquals(compactionConfig.getCompactionTaskSlotRatio(), compactionTaskSlotRatio);
Assert.assertEquals(compactionConfig.getMaxCompactionTaskSlots(), maxCompactionTaskSlots);
if (useAutoScaleSlots != null) {
Assert.assertEquals(coordinatorCompactionConfig.isUseAutoScaleSlots(), useAutoScaleSlots.booleanValue());
Assert.assertEquals(compactionConfig.isUseAutoScaleSlots(), useAutoScaleSlots.booleanValue());
}
}

View File

@ -24,9 +24,8 @@ import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
@ -44,7 +43,6 @@ import org.testng.annotations.Test;
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITAutoCompactionUpgradeTest extends AbstractIndexerTest
{
private static final Logger LOG = new Logger(ITAutoCompactionUpgradeTest.class);
private static final String UPGRADE_DATASOURCE_NAME = "upgradeTest";
@Inject
@ -58,67 +56,56 @@ public class ITAutoCompactionUpgradeTest extends AbstractIndexerTest
{
// Verify that compaction config already exist. This config was inserted manually into the database using SQL script.
// This auto compaction configuration payload is from Druid 0.21.0
CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
DataSourceCompactionConfig foundDataSourceCompactionConfig = null;
for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) {
if (dataSourceCompactionConfig.getDataSource().equals(UPGRADE_DATASOURCE_NAME)) {
foundDataSourceCompactionConfig = dataSourceCompactionConfig;
}
}
DruidCompactionConfig coordinatorCompactionConfig = compactionResource.getCompactionConfig();
DataSourceCompactionConfig foundDataSourceCompactionConfig
= coordinatorCompactionConfig.findConfigForDatasource(UPGRADE_DATASOURCE_NAME).orNull();
Assert.assertNotNull(foundDataSourceCompactionConfig);
// Now submit a new auto compaction configuration
PartitionsSpec newPartitionsSpec = new DynamicPartitionsSpec(4000, null);
Period newSkipOffset = Period.seconds(0);
DataSourceCompactionConfig compactionConfig = new DataSourceCompactionConfig(
UPGRADE_DATASOURCE_NAME,
null,
null,
null,
newSkipOffset,
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
new MaxSizeSplitHintSpec(null, 1),
newPartitionsSpec,
null,
null,
null,
null,
null,
1,
null,
null,
null,
null,
null,
1,
null
),
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null),
null,
null,
null,
new UserCompactionTaskIOConfig(true),
null,
null
);
DataSourceCompactionConfig compactionConfig = DataSourceCompactionConfig
.builder()
.forDataSource(UPGRADE_DATASOURCE_NAME)
.withSkipOffsetFromLatest(newSkipOffset)
.withTuningConfig(
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
new MaxSizeSplitHintSpec(null, 1),
newPartitionsSpec,
null,
null,
null,
null,
null,
1,
null,
null,
null,
null,
null,
1,
null
)
)
.withGranularitySpec(
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null)
)
.withIoConfig(new UserCompactionTaskIOConfig(true))
.build();
compactionResource.submitCompactionConfig(compactionConfig);
// Wait for compaction config to persist
Thread.sleep(2000);
// Verify that compaction was successfully updated
coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
foundDataSourceCompactionConfig = null;
for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) {
if (dataSourceCompactionConfig.getDataSource().equals(UPGRADE_DATASOURCE_NAME)) {
foundDataSourceCompactionConfig = dataSourceCompactionConfig;
}
}
coordinatorCompactionConfig = compactionResource.getCompactionConfig();
foundDataSourceCompactionConfig
= coordinatorCompactionConfig.findConfigForDatasource(UPGRADE_DATASOURCE_NAME).orNull();
Assert.assertNotNull(foundDataSourceCompactionConfig);
Assert.assertNotNull(foundDataSourceCompactionConfig.getTuningConfig());
Assert.assertEquals(foundDataSourceCompactionConfig.getTuningConfig().getPartitionsSpec(), newPartitionsSpec);

View File

@ -17,38 +17,39 @@
* under the License.
*/
package org.apache.druid.server.http;
package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.CompactionEngine;
import javax.annotation.Nullable;
import java.util.Objects;
/**
* Payload to update the cluster-level compaction config.
* All fields of this class must be nullable. A non-value indicates that the
* corresponding field is being updated.
* Cluster-level compaction configs.
* All fields of this class are nullable. A non-null value denotes that the
* corresponding field has been explicitly specified.
*/
public class CompactionConfigUpdateRequest
public class ClusterCompactionConfig
{
private final Double compactionTaskSlotRatio;
private final Integer maxCompactionTaskSlots;
private final Boolean useAutoScaleSlots;
private final CompactionEngine compactionEngine;
private final CompactionEngine engine;
@JsonCreator
public CompactionConfigUpdateRequest(
public ClusterCompactionConfig(
@JsonProperty("compactionTaskSlotRatio") @Nullable Double compactionTaskSlotRatio,
@JsonProperty("maxCompactionTaskSlots") @Nullable Integer maxCompactionTaskSlots,
@JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots,
@JsonProperty("compactionEngine") @Nullable CompactionEngine compactionEngine
@JsonProperty("engine") @Nullable CompactionEngine engine
)
{
this.compactionTaskSlotRatio = compactionTaskSlotRatio;
this.maxCompactionTaskSlots = maxCompactionTaskSlots;
this.useAutoScaleSlots = useAutoScaleSlots;
this.compactionEngine = compactionEngine;
this.engine = engine;
}
@Nullable
@ -74,9 +75,30 @@ public class CompactionConfigUpdateRequest
@Nullable
@JsonProperty
public CompactionEngine getCompactionEngine()
public CompactionEngine getEngine()
{
return compactionEngine;
return engine;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ClusterCompactionConfig that = (ClusterCompactionConfig) o;
return Objects.equals(compactionTaskSlotRatio, that.compactionTaskSlotRatio)
&& Objects.equals(maxCompactionTaskSlots, that.maxCompactionTaskSlots)
&& Objects.equals(useAutoScaleSlots, that.useAutoScaleSlots)
&& engine == that.engine;
}
@Override
public int hashCode()
{
return Objects.hash(compactionTaskSlotRatio, maxCompactionTaskSlots, useAutoScaleSlots, engine);
}
}

View File

@ -31,7 +31,7 @@ import java.util.function.UnaryOperator;
/**
* Manager to fetch and update dynamic configs {@link CoordinatorDynamicConfig}
* and {@link CoordinatorCompactionConfig}.
* and {@link DruidCompactionConfig}.
*/
public class CoordinatorConfigManager
{
@ -71,12 +71,12 @@ public class CoordinatorConfigManager
);
}
public CoordinatorCompactionConfig getCurrentCompactionConfig()
public DruidCompactionConfig getCurrentCompactionConfig()
{
CoordinatorCompactionConfig config = jacksonConfigManager.watch(
CoordinatorCompactionConfig.CONFIG_KEY,
CoordinatorCompactionConfig.class,
CoordinatorCompactionConfig.empty()
DruidCompactionConfig config = jacksonConfigManager.watch(
DruidCompactionConfig.CONFIG_KEY,
DruidCompactionConfig.class,
DruidCompactionConfig.empty()
).get();
return Preconditions.checkNotNull(config, "Got null config from watcher?!");
@ -91,7 +91,7 @@ public class CoordinatorConfigManager
* or if the update was successful.
*/
public ConfigManager.SetResult getAndUpdateCompactionConfig(
UnaryOperator<CoordinatorCompactionConfig> operator,
UnaryOperator<DruidCompactionConfig> operator,
AuditInfo auditInfo
)
{
@ -102,16 +102,16 @@ public class CoordinatorConfigManager
tablesConfig.getConfigTable(),
MetadataStorageConnector.CONFIG_TABLE_KEY_COLUMN,
MetadataStorageConnector.CONFIG_TABLE_VALUE_COLUMN,
CoordinatorCompactionConfig.CONFIG_KEY
DruidCompactionConfig.CONFIG_KEY
);
CoordinatorCompactionConfig current = convertBytesToCompactionConfig(currentBytes);
CoordinatorCompactionConfig updated = operator.apply(current);
DruidCompactionConfig current = convertBytesToCompactionConfig(currentBytes);
DruidCompactionConfig updated = operator.apply(current);
if (current.equals(updated)) {
return ConfigManager.SetResult.ok();
} else {
return jacksonConfigManager.set(
CoordinatorCompactionConfig.CONFIG_KEY,
DruidCompactionConfig.CONFIG_KEY,
currentBytes,
updated,
auditInfo
@ -119,12 +119,12 @@ public class CoordinatorConfigManager
}
}
public CoordinatorCompactionConfig convertBytesToCompactionConfig(byte[] bytes)
public DruidCompactionConfig convertBytesToCompactionConfig(byte[] bytes)
{
return jacksonConfigManager.convertByteToConfig(
bytes,
CoordinatorCompactionConfig.class,
CoordinatorCompactionConfig.empty()
DruidCompactionConfig.class,
DruidCompactionConfig.empty()
);
}
}

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.server.coordinator.config.DataSourceCompactionConfigBuilder;
import org.joda.time.Period;
import javax.annotation.Nullable;
@ -44,9 +43,9 @@ public class DataSourceCompactionConfig
private final int taskPriority;
private final long inputSegmentSizeBytes;
public static DataSourceCompactionConfigBuilder builder()
public static Builder builder()
{
return new DataSourceCompactionConfigBuilder();
return new Builder();
}
/**
@ -234,4 +233,127 @@ public class DataSourceCompactionConfig
result = 31 * result + Arrays.hashCode(metricsSpec);
return result;
}
public static class Builder
{
private String dataSource;
private Integer taskPriority;
private Long inputSegmentSizeBytes;
private Integer maxRowsPerSegment;
private Period skipOffsetFromLatest;
private UserCompactionTaskQueryTuningConfig tuningConfig;
private UserCompactionTaskGranularityConfig granularitySpec;
private UserCompactionTaskDimensionsConfig dimensionsSpec;
private AggregatorFactory[] metricsSpec;
private UserCompactionTaskTransformConfig transformSpec;
private UserCompactionTaskIOConfig ioConfig;
private CompactionEngine engine;
private Map<String, Object> taskContext;
public DataSourceCompactionConfig build()
{
return new DataSourceCompactionConfig(
dataSource,
taskPriority,
inputSegmentSizeBytes,
maxRowsPerSegment,
skipOffsetFromLatest,
tuningConfig,
granularitySpec,
dimensionsSpec,
metricsSpec,
transformSpec,
ioConfig,
engine,
taskContext
);
}
public Builder forDataSource(String dataSource)
{
this.dataSource = dataSource;
return this;
}
public Builder withTaskPriority(Integer taskPriority)
{
this.taskPriority = taskPriority;
return this;
}
public Builder withInputSegmentSizeBytes(Long inputSegmentSizeBytes)
{
this.inputSegmentSizeBytes = inputSegmentSizeBytes;
return this;
}
@Deprecated
public Builder withMaxRowsPerSegment(Integer maxRowsPerSegment)
{
this.maxRowsPerSegment = maxRowsPerSegment;
return this;
}
public Builder withSkipOffsetFromLatest(Period skipOffsetFromLatest)
{
this.skipOffsetFromLatest = skipOffsetFromLatest;
return this;
}
public Builder withTuningConfig(
UserCompactionTaskQueryTuningConfig tuningConfig
)
{
this.tuningConfig = tuningConfig;
return this;
}
public Builder withGranularitySpec(
UserCompactionTaskGranularityConfig granularitySpec
)
{
this.granularitySpec = granularitySpec;
return this;
}
public Builder withDimensionsSpec(
UserCompactionTaskDimensionsConfig dimensionsSpec
)
{
this.dimensionsSpec = dimensionsSpec;
return this;
}
public Builder withMetricsSpec(AggregatorFactory[] metricsSpec)
{
this.metricsSpec = metricsSpec;
return this;
}
public Builder withTransformSpec(
UserCompactionTaskTransformConfig transformSpec
)
{
this.transformSpec = transformSpec;
return this;
}
public Builder withIoConfig(UserCompactionTaskIOConfig ioConfig)
{
this.ioConfig = ioConfig;
return this;
}
public Builder withEngine(CompactionEngine engine)
{
this.engine = engine;
return this;
}
public Builder withTaskContext(Map<String, Object> taskContext)
{
this.taskContext = taskContext;
return this;
}
}
}

View File

@ -20,24 +20,25 @@
package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.audit.AuditInfo;
import org.joda.time.DateTime;
import java.util.Objects;
/**
* A DTO containing audit information for compaction config for a datasource.
*/
public class DataSourceCompactionConfigAuditEntry
{
private final GlobalCompactionConfig globalConfig;
private final ClusterCompactionConfig globalConfig;
private final DataSourceCompactionConfig compactionConfig;
private final AuditInfo auditInfo;
private final DateTime auditTime;
@JsonCreator
public DataSourceCompactionConfigAuditEntry(
@JsonProperty("globalConfig") GlobalCompactionConfig globalConfig,
@JsonProperty("globalConfig") ClusterCompactionConfig globalConfig,
@JsonProperty("compactionConfig") DataSourceCompactionConfig compactionConfig,
@JsonProperty("auditInfo") AuditInfo auditInfo,
@JsonProperty("auditTime") DateTime auditTime
@ -50,7 +51,7 @@ public class DataSourceCompactionConfigAuditEntry
}
@JsonProperty
public GlobalCompactionConfig getGlobalConfig()
public ClusterCompactionConfig getGlobalConfig()
{
return globalConfig;
}
@ -73,52 +74,9 @@ public class DataSourceCompactionConfigAuditEntry
return auditTime;
}
/**
* A DTO containing compaction config for that affects the entire cluster.
*/
public static class GlobalCompactionConfig
public boolean hasSameConfig(DataSourceCompactionConfigAuditEntry other)
{
private final double compactionTaskSlotRatio;
private final int maxCompactionTaskSlots;
private final boolean useAutoScaleSlots;
@JsonCreator
public GlobalCompactionConfig(
@JsonProperty("compactionTaskSlotRatio")
double compactionTaskSlotRatio,
@JsonProperty("maxCompactionTaskSlots") int maxCompactionTaskSlots,
@JsonProperty("useAutoScaleSlots") boolean useAutoScaleSlots
)
{
this.compactionTaskSlotRatio = compactionTaskSlotRatio;
this.maxCompactionTaskSlots = maxCompactionTaskSlots;
this.useAutoScaleSlots = useAutoScaleSlots;
}
@JsonProperty
public double getCompactionTaskSlotRatio()
{
return compactionTaskSlotRatio;
}
@JsonProperty
public int getMaxCompactionTaskSlots()
{
return maxCompactionTaskSlots;
}
@JsonProperty
public boolean isUseAutoScaleSlots()
{
return useAutoScaleSlots;
}
@JsonIgnore
public boolean hasSameConfig(CoordinatorCompactionConfig coordinatorCompactionConfig)
{
return useAutoScaleSlots == coordinatorCompactionConfig.isUseAutoScaleSlots() &&
compactionTaskSlotRatio == coordinatorCompactionConfig.getCompactionTaskSlotRatio() &&
coordinatorCompactionConfig.getMaxCompactionTaskSlots() == maxCompactionTaskSlots;
}
return Objects.equals(this.compactionConfig, other.compactionConfig)
&& Objects.equals(this.globalConfig, other.globalConfig);
}
}

View File

@ -27,7 +27,7 @@ import java.util.Stack;
/**
* A utility class to build the config history for a datasource from audit entries for
* {@link CoordinatorCompactionConfig}. The {@link CoordinatorCompactionConfig} contains the entire config for the
* {@link DruidCompactionConfig}. The {@link DruidCompactionConfig} contains the entire config for the
* cluster, so this class creates adds audit entires to the history only when a setting for this datasource or a global
* setting has changed.
*/
@ -41,54 +41,29 @@ public class DataSourceCompactionConfigHistory
this.dataSource = dataSource;
}
public void add(CoordinatorCompactionConfig coordinatorCompactionConfig, AuditInfo auditInfo, DateTime auditTime)
public void add(DruidCompactionConfig compactionConfig, AuditInfo auditInfo, DateTime auditTime)
{
DataSourceCompactionConfigAuditEntry current = auditEntries.isEmpty() ? null : auditEntries.peek();
DataSourceCompactionConfigAuditEntry newEntry = null;
boolean hasDataSourceCompactionConfig = false;
for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) {
if (dataSource.equals(dataSourceCompactionConfig.getDataSource())) {
hasDataSourceCompactionConfig = true;
if (
current == null ||
(
!dataSourceCompactionConfig.equals(current.getCompactionConfig()) ||
!current.getGlobalConfig().hasSameConfig(coordinatorCompactionConfig)
)
) {
current = new DataSourceCompactionConfigAuditEntry(
new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig(
coordinatorCompactionConfig.getCompactionTaskSlotRatio(),
coordinatorCompactionConfig.getMaxCompactionTaskSlots(),
coordinatorCompactionConfig.isUseAutoScaleSlots()
),
dataSourceCompactionConfig,
auditInfo,
auditTime
);
newEntry = current;
}
break;
}
final DataSourceCompactionConfigAuditEntry previousEntry = auditEntries.isEmpty() ? null : auditEntries.peek();
final DataSourceCompactionConfigAuditEntry newEntry = new DataSourceCompactionConfigAuditEntry(
compactionConfig.clusterConfig(),
compactionConfig.findConfigForDatasource(dataSource).orNull(),
auditInfo,
auditTime
);
final boolean shouldAddEntry;
if (previousEntry == null) {
shouldAddEntry = newEntry.getCompactionConfig() != null;
} else {
shouldAddEntry = !newEntry.hasSameConfig(previousEntry);
}
if (newEntry != null) {
auditEntries.push(newEntry);
} else if (current != null && !hasDataSourceCompactionConfig) {
newEntry = new DataSourceCompactionConfigAuditEntry(
new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig(
coordinatorCompactionConfig.getCompactionTaskSlotRatio(),
coordinatorCompactionConfig.getMaxCompactionTaskSlots(),
coordinatorCompactionConfig.isUseAutoScaleSlots()
),
null,
auditInfo,
auditTime
);
if (shouldAddEntry) {
auditEntries.push(newEntry);
}
}
public List<DataSourceCompactionConfigAuditEntry> getHistory()
public List<DataSourceCompactionConfigAuditEntry> getEntries()
{
return auditEntries;
}

View File

@ -21,70 +21,72 @@ package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.base.Optional;
import org.apache.druid.common.config.Configs;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.server.http.CompactionConfigUpdateRequest;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
public class CoordinatorCompactionConfig
public class DruidCompactionConfig
{
public static final String CONFIG_KEY = "coordinator.compaction.config";
private static final double DEFAULT_COMPACTION_TASK_RATIO = 0.1;
private static final int DEFAULT_MAX_COMPACTION_TASK_SLOTS = Integer.MAX_VALUE;
private static final boolean DEFAULT_USE_AUTO_SCALE_SLOTS = false;
private static final CompactionEngine DEFAULT_COMPACTION_ENGINE = CompactionEngine.NATIVE;
private static final DruidCompactionConfig EMPTY_INSTANCE
= new DruidCompactionConfig(Collections.emptyList(), null, null, null, null);
private final List<DataSourceCompactionConfig> compactionConfigs;
private final double compactionTaskSlotRatio;
private final int maxCompactionTaskSlots;
private final boolean useAutoScaleSlots;
private final CompactionEngine compactionEngine;
private final CompactionEngine engine;
public static CoordinatorCompactionConfig from(
CoordinatorCompactionConfig baseConfig,
public DruidCompactionConfig withDatasourceConfigs(
List<DataSourceCompactionConfig> compactionConfigs
)
{
return new CoordinatorCompactionConfig(
return new DruidCompactionConfig(
compactionConfigs,
baseConfig.compactionTaskSlotRatio,
baseConfig.maxCompactionTaskSlots,
baseConfig.useAutoScaleSlots,
baseConfig.compactionEngine
compactionTaskSlotRatio,
maxCompactionTaskSlots,
useAutoScaleSlots,
engine
);
}
public static CoordinatorCompactionConfig from(
CoordinatorCompactionConfig baseConfig,
CompactionConfigUpdateRequest update
public DruidCompactionConfig withClusterConfig(
ClusterCompactionConfig update
)
{
return new CoordinatorCompactionConfig(
baseConfig.compactionConfigs,
Configs.valueOrDefault(update.getCompactionTaskSlotRatio(), baseConfig.compactionTaskSlotRatio),
Configs.valueOrDefault(update.getMaxCompactionTaskSlots(), baseConfig.maxCompactionTaskSlots),
Configs.valueOrDefault(update.getUseAutoScaleSlots(), baseConfig.useAutoScaleSlots),
Configs.valueOrDefault(update.getCompactionEngine(), baseConfig.compactionEngine)
return new DruidCompactionConfig(
this.compactionConfigs,
Configs.valueOrDefault(update.getCompactionTaskSlotRatio(), compactionTaskSlotRatio),
Configs.valueOrDefault(update.getMaxCompactionTaskSlots(), maxCompactionTaskSlots),
Configs.valueOrDefault(update.getUseAutoScaleSlots(), useAutoScaleSlots),
Configs.valueOrDefault(update.getEngine(), engine)
);
}
public static CoordinatorCompactionConfig from(List<DataSourceCompactionConfig> compactionConfigs)
public DruidCompactionConfig withDatasourceConfig(DataSourceCompactionConfig dataSourceConfig)
{
return new CoordinatorCompactionConfig(compactionConfigs, null, null, null, null);
final Map<String, DataSourceCompactionConfig> configs = dataSourceToCompactionConfigMap();
configs.put(dataSourceConfig.getDataSource(), dataSourceConfig);
return withDatasourceConfigs(new ArrayList<>(configs.values()));
}
public static CoordinatorCompactionConfig empty()
public static DruidCompactionConfig empty()
{
return new CoordinatorCompactionConfig(ImmutableList.of(), null, null, null, null);
return EMPTY_INSTANCE;
}
@JsonCreator
public CoordinatorCompactionConfig(
public DruidCompactionConfig(
@JsonProperty("compactionConfigs") List<DataSourceCompactionConfig> compactionConfigs,
@JsonProperty("compactionTaskSlotRatio") @Nullable Double compactionTaskSlotRatio,
@JsonProperty("maxCompactionTaskSlots") @Nullable Integer maxCompactionTaskSlots,
@ -92,11 +94,11 @@ public class CoordinatorCompactionConfig
@JsonProperty("engine") @Nullable CompactionEngine compactionEngine
)
{
this.compactionConfigs = compactionConfigs;
this.compactionTaskSlotRatio = Configs.valueOrDefault(compactionTaskSlotRatio, DEFAULT_COMPACTION_TASK_RATIO);
this.maxCompactionTaskSlots = Configs.valueOrDefault(maxCompactionTaskSlots, DEFAULT_MAX_COMPACTION_TASK_SLOTS);
this.useAutoScaleSlots = Configs.valueOrDefault(useAutoScaleSlots, DEFAULT_USE_AUTO_SCALE_SLOTS);
this.compactionEngine = Configs.valueOrDefault(compactionEngine, DEFAULT_COMPACTION_ENGINE);
this.compactionConfigs = Configs.valueOrDefault(compactionConfigs, Collections.emptyList());
this.compactionTaskSlotRatio = Configs.valueOrDefault(compactionTaskSlotRatio, 0.1);
this.maxCompactionTaskSlots = Configs.valueOrDefault(maxCompactionTaskSlots, Integer.MAX_VALUE);
this.useAutoScaleSlots = Configs.valueOrDefault(useAutoScaleSlots, false);
this.engine = Configs.valueOrDefault(compactionEngine, CompactionEngine.NATIVE);
}
@JsonProperty
@ -126,7 +128,36 @@ public class CoordinatorCompactionConfig
@JsonProperty
public CompactionEngine getEngine()
{
return compactionEngine;
return engine;
}
// Null-safe getters not used for serialization
public ClusterCompactionConfig clusterConfig()
{
return new ClusterCompactionConfig(
compactionTaskSlotRatio,
maxCompactionTaskSlots,
useAutoScaleSlots,
engine
);
}
public Map<String, DataSourceCompactionConfig> dataSourceToCompactionConfigMap()
{
return getCompactionConfigs().stream().collect(
Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity())
);
}
public Optional<DataSourceCompactionConfig> findConfigForDatasource(String dataSource)
{
for (DataSourceCompactionConfig dataSourceConfig : getCompactionConfigs()) {
if (dataSource.equals(dataSourceConfig.getDataSource())) {
return Optional.of(dataSourceConfig);
}
}
return Optional.absent();
}
@Override
@ -138,11 +169,11 @@ public class CoordinatorCompactionConfig
if (o == null || getClass() != o.getClass()) {
return false;
}
CoordinatorCompactionConfig that = (CoordinatorCompactionConfig) o;
DruidCompactionConfig that = (DruidCompactionConfig) o;
return Double.compare(that.compactionTaskSlotRatio, compactionTaskSlotRatio) == 0 &&
maxCompactionTaskSlots == that.maxCompactionTaskSlots &&
useAutoScaleSlots == that.useAutoScaleSlots &&
compactionEngine == that.compactionEngine &&
engine == that.engine &&
Objects.equals(compactionConfigs, that.compactionConfigs);
}
@ -154,7 +185,7 @@ public class CoordinatorCompactionConfig
compactionTaskSlotRatio,
maxCompactionTaskSlots,
useAutoScaleSlots,
compactionEngine
engine
);
}
@ -166,7 +197,7 @@ public class CoordinatorCompactionConfig
", compactionTaskSlotRatio=" + compactionTaskSlotRatio +
", maxCompactionTaskSlots=" + maxCompactionTaskSlots +
", useAutoScaleSlots=" + useAutoScaleSlots +
", compactionEngine=" + compactionEngine +
", engine=" + engine +
'}';
}
}

View File

@ -706,7 +706,7 @@ public class DruidCoordinator
= metadataManager.segments().getSnapshotOfDataSourcesWithAllUsedSegments();
final CoordinatorDynamicConfig dynamicConfig = metadataManager.configs().getCurrentDynamicConfig();
final CoordinatorCompactionConfig compactionConfig = metadataManager.configs().getCurrentCompactionConfig();
final DruidCompactionConfig compactionConfig = metadataManager.configs().getCurrentCompactionConfig();
DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams
.newBuilder(coordinatorStartTime)

View File

@ -67,7 +67,7 @@ public class DruidCoordinatorRuntimeParams
private final @Nullable TreeSet<DataSegment> usedSegments;
private final @Nullable DataSourcesSnapshot dataSourcesSnapshot;
private final CoordinatorDynamicConfig coordinatorDynamicConfig;
private final CoordinatorCompactionConfig coordinatorCompactionConfig;
private final DruidCompactionConfig compactionConfig;
private final SegmentLoadingConfig segmentLoadingConfig;
private final CoordinatorRunStats stats;
private final BalancerStrategy balancerStrategy;
@ -81,7 +81,7 @@ public class DruidCoordinatorRuntimeParams
@Nullable TreeSet<DataSegment> usedSegments,
@Nullable DataSourcesSnapshot dataSourcesSnapshot,
CoordinatorDynamicConfig coordinatorDynamicConfig,
CoordinatorCompactionConfig coordinatorCompactionConfig,
DruidCompactionConfig compactionConfig,
SegmentLoadingConfig segmentLoadingConfig,
CoordinatorRunStats stats,
BalancerStrategy balancerStrategy,
@ -95,7 +95,7 @@ public class DruidCoordinatorRuntimeParams
this.usedSegments = usedSegments;
this.dataSourcesSnapshot = dataSourcesSnapshot;
this.coordinatorDynamicConfig = coordinatorDynamicConfig;
this.coordinatorCompactionConfig = coordinatorCompactionConfig;
this.compactionConfig = compactionConfig;
this.segmentLoadingConfig = segmentLoadingConfig;
this.stats = stats;
this.balancerStrategy = balancerStrategy;
@ -151,9 +151,9 @@ public class DruidCoordinatorRuntimeParams
return coordinatorDynamicConfig;
}
public CoordinatorCompactionConfig getCoordinatorCompactionConfig()
public DruidCompactionConfig getCompactionConfig()
{
return coordinatorCompactionConfig;
return compactionConfig;
}
public SegmentLoadingConfig getSegmentLoadingConfig()
@ -197,7 +197,7 @@ public class DruidCoordinatorRuntimeParams
usedSegments,
dataSourcesSnapshot,
coordinatorDynamicConfig,
coordinatorCompactionConfig,
compactionConfig,
segmentLoadingConfig,
stats,
balancerStrategy,
@ -215,7 +215,7 @@ public class DruidCoordinatorRuntimeParams
private @Nullable TreeSet<DataSegment> usedSegments;
private @Nullable DataSourcesSnapshot dataSourcesSnapshot;
private CoordinatorDynamicConfig coordinatorDynamicConfig;
private CoordinatorCompactionConfig coordinatorCompactionConfig;
private DruidCompactionConfig compactionConfig;
private SegmentLoadingConfig segmentLoadingConfig;
private CoordinatorRunStats stats;
private BalancerStrategy balancerStrategy;
@ -225,7 +225,7 @@ public class DruidCoordinatorRuntimeParams
{
this.coordinatorStartTime = coordinatorStartTime;
this.coordinatorDynamicConfig = CoordinatorDynamicConfig.builder().build();
this.coordinatorCompactionConfig = CoordinatorCompactionConfig.empty();
this.compactionConfig = DruidCompactionConfig.empty();
this.broadcastDatasources = Collections.emptySet();
}
@ -237,7 +237,7 @@ public class DruidCoordinatorRuntimeParams
@Nullable TreeSet<DataSegment> usedSegments,
@Nullable DataSourcesSnapshot dataSourcesSnapshot,
CoordinatorDynamicConfig coordinatorDynamicConfig,
CoordinatorCompactionConfig coordinatorCompactionConfig,
DruidCompactionConfig compactionConfig,
SegmentLoadingConfig segmentLoadingConfig,
CoordinatorRunStats stats,
BalancerStrategy balancerStrategy,
@ -251,7 +251,7 @@ public class DruidCoordinatorRuntimeParams
this.usedSegments = usedSegments;
this.dataSourcesSnapshot = dataSourcesSnapshot;
this.coordinatorDynamicConfig = coordinatorDynamicConfig;
this.coordinatorCompactionConfig = coordinatorCompactionConfig;
this.compactionConfig = compactionConfig;
this.segmentLoadingConfig = segmentLoadingConfig;
this.stats = stats;
this.balancerStrategy = balancerStrategy;
@ -271,7 +271,7 @@ public class DruidCoordinatorRuntimeParams
usedSegments,
dataSourcesSnapshot,
coordinatorDynamicConfig,
coordinatorCompactionConfig,
compactionConfig,
segmentLoadingConfig,
stats,
balancerStrategy,
@ -367,9 +367,9 @@ public class DruidCoordinatorRuntimeParams
return this;
}
public Builder withCompactionConfig(CoordinatorCompactionConfig config)
public Builder withCompactionConfig(DruidCompactionConfig config)
{
this.coordinatorCompactionConfig = config;
this.compactionConfig = config;
return this;
}

View File

@ -1,155 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.config;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskTransformConfig;
import org.joda.time.Period;
import java.util.Map;
public class DataSourceCompactionConfigBuilder
{
private String dataSource;
private Integer taskPriority;
private Long inputSegmentSizeBytes;
private Integer maxRowsPerSegment;
private Period skipOffsetFromLatest;
private UserCompactionTaskQueryTuningConfig tuningConfig;
private UserCompactionTaskGranularityConfig granularitySpec;
private UserCompactionTaskDimensionsConfig dimensionsSpec;
private AggregatorFactory[] metricsSpec;
private UserCompactionTaskTransformConfig transformSpec;
private UserCompactionTaskIOConfig ioConfig;
private CompactionEngine engine;
private Map<String, Object> taskContext;
public DataSourceCompactionConfig build()
{
return new DataSourceCompactionConfig(
dataSource,
taskPriority,
inputSegmentSizeBytes,
maxRowsPerSegment,
skipOffsetFromLatest,
tuningConfig,
granularitySpec,
dimensionsSpec,
metricsSpec,
transformSpec,
ioConfig,
engine,
taskContext
);
}
public DataSourceCompactionConfigBuilder forDataSource(String dataSource)
{
this.dataSource = dataSource;
return this;
}
public DataSourceCompactionConfigBuilder withTaskPriority(Integer taskPriority)
{
this.taskPriority = taskPriority;
return this;
}
public DataSourceCompactionConfigBuilder withInputSegmentSizeBytes(Long inputSegmentSizeBytes)
{
this.inputSegmentSizeBytes = inputSegmentSizeBytes;
return this;
}
@Deprecated
public DataSourceCompactionConfigBuilder withMaxRowsPerSegment(Integer maxRowsPerSegment)
{
this.maxRowsPerSegment = maxRowsPerSegment;
return this;
}
public DataSourceCompactionConfigBuilder withSkipOffsetFromLatest(Period skipOffsetFromLatest)
{
this.skipOffsetFromLatest = skipOffsetFromLatest;
return this;
}
public DataSourceCompactionConfigBuilder withTuningConfig(
UserCompactionTaskQueryTuningConfig tuningConfig
)
{
this.tuningConfig = tuningConfig;
return this;
}
public DataSourceCompactionConfigBuilder withGranularitySpec(
UserCompactionTaskGranularityConfig granularitySpec
)
{
this.granularitySpec = granularitySpec;
return this;
}
public DataSourceCompactionConfigBuilder withDimensionsSpec(
UserCompactionTaskDimensionsConfig dimensionsSpec
)
{
this.dimensionsSpec = dimensionsSpec;
return this;
}
public DataSourceCompactionConfigBuilder withMetricsSpec(AggregatorFactory[] metricsSpec)
{
this.metricsSpec = metricsSpec;
return this;
}
public DataSourceCompactionConfigBuilder withTransformSpec(
UserCompactionTaskTransformConfig transformSpec
)
{
this.transformSpec = transformSpec;
return this;
}
public DataSourceCompactionConfigBuilder withIoConfig(UserCompactionTaskIOConfig ioConfig)
{
this.ioConfig = ioConfig;
return this;
}
public DataSourceCompactionConfigBuilder withEngine(CompactionEngine engine)
{
this.engine = engine;
return this;
}
public DataSourceCompactionConfigBuilder withTaskContext(Map<String, Object> taskContext)
{
this.taskContext = taskContext;
return this;
}
}

View File

@ -49,8 +49,8 @@ import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.compact.CompactionSegmentIterator;
import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
@ -120,7 +120,7 @@ public class CompactSegments implements CoordinatorCustomDuty
{
LOG.info("Running CompactSegments duty");
final CoordinatorCompactionConfig dynamicConfig = params.getCoordinatorCompactionConfig();
final DruidCompactionConfig dynamicConfig = params.getCompactionConfig();
final int maxCompactionTaskSlots = dynamicConfig.getMaxCompactionTaskSlots();
if (maxCompactionTaskSlots <= 0) {
LOG.info("Skipping compaction as maxCompactionTaskSlots is [%d].", maxCompactionTaskSlots);
@ -344,7 +344,7 @@ public class CompactSegments implements CoordinatorCustomDuty
return tuningConfig.getPartitionsSpec() instanceof DimensionRangePartitionsSpec;
}
private int getCompactionTaskCapacity(CoordinatorCompactionConfig dynamicConfig)
private int getCompactionTaskCapacity(DruidCompactionConfig dynamicConfig)
{
int totalWorkerCapacity = CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient);

View File

@ -26,9 +26,9 @@ import org.apache.druid.java.util.RetryableException;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorConfigManager;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
import org.apache.druid.server.coordinator.config.MetadataCleanupConfig;
import org.apache.druid.server.coordinator.stats.Stats;
import org.joda.time.DateTime;
@ -82,12 +82,12 @@ public class KillCompactionConfig extends MetadataCleanupDuty
/**
* Creates a new compaction config by deleting entries for inactive datasources.
*/
private CoordinatorCompactionConfig deleteConfigsForInactiveDatasources(
CoordinatorCompactionConfig current
private DruidCompactionConfig deleteConfigsForInactiveDatasources(
DruidCompactionConfig current
)
{
// If current compaction config is empty then there is nothing to do
if (CoordinatorCompactionConfig.empty().equals(current)) {
if (DruidCompactionConfig.empty().equals(current)) {
log.info("Nothing to do as compaction config is already empty.");
return current;
}
@ -102,7 +102,7 @@ public class KillCompactionConfig extends MetadataCleanupDuty
.filter(dataSourceCompactionConfig -> activeDatasources.contains(dataSourceCompactionConfig.getDataSource()))
.collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
return CoordinatorCompactionConfig.from(current, ImmutableList.copyOf(updated.values()));
return current.withDatasourceConfigs(ImmutableList.copyOf(updated.values()));
}
/**
@ -116,7 +116,7 @@ public class KillCompactionConfig extends MetadataCleanupDuty
ConfigManager.SetResult result = configManager.getAndUpdateCompactionConfig(
current -> {
final CoordinatorCompactionConfig updated = deleteConfigsForInactiveDatasources(current);
final DruidCompactionConfig updated = deleteConfigsForInactiveDatasources(current);
int numCurrentConfigs = current.getCompactionConfigs() == null ? 0 : current.getCompactionConfigs().size();
int numUpdatedConfigs = updated.getCompactionConfigs() == null ? 0 : updated.getCompactionConfigs().size();
compactionConfigRemoved.set(Math.max(0, numCurrentConfigs - numUpdatedConfigs));

View File

@ -19,6 +19,7 @@
package org.apache.druid.server.http;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
@ -33,11 +34,12 @@ import org.apache.druid.error.InvalidInput;
import org.apache.druid.error.NotFound;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorConfigManager;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfigHistory;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
import org.apache.druid.server.http.security.ConfigResourceFilter;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.utils.CollectionUtils;
@ -60,9 +62,7 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
@Path("/druid/coordinator/v1/config/compaction")
@ResourceFilters(ConfigResourceFilter.class)
@ -87,21 +87,21 @@ public class CoordinatorCompactionConfigsResource
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getClusterCompactionConfig()
public Response getCompactionConfig()
{
return Response.ok(configManager.getCurrentCompactionConfig()).build();
}
@POST
@Path("/global")
@Path("/cluster")
@Consumes(MediaType.APPLICATION_JSON)
public Response updateClusterCompactionConfig(
CompactionConfigUpdateRequest updatePayload,
ClusterCompactionConfig updatePayload,
@Context HttpServletRequest req
)
{
UnaryOperator<CoordinatorCompactionConfig> operator = current -> {
final CoordinatorCompactionConfig newConfig = CoordinatorCompactionConfig.from(current, updatePayload);
UnaryOperator<DruidCompactionConfig> operator = current -> {
final DruidCompactionConfig newConfig = current.withClusterConfig(updatePayload);
final List<DataSourceCompactionConfig> datasourceConfigs = newConfig.getCompactionConfigs();
if (CollectionUtils.isNullOrEmpty(datasourceConfigs)
@ -127,7 +127,11 @@ public class CoordinatorCompactionConfigsResource
return updateConfigHelper(operator, AuthorizationUtils.buildAuditInfo(req));
}
/**
* @deprecated in favor of {@link #updateClusterCompactionConfig}.
*/
@POST
@Deprecated
@Path("/taskslots")
@Consumes(MediaType.APPLICATION_JSON)
public Response setCompactionTaskLimit(
@ -138,7 +142,7 @@ public class CoordinatorCompactionConfigsResource
)
{
return updateClusterCompactionConfig(
new CompactionConfigUpdateRequest(
new ClusterCompactionConfig(
compactionTaskSlotRatio,
maxCompactionTaskSlots,
useAutoScaleSlots,
@ -155,22 +159,14 @@ public class CoordinatorCompactionConfigsResource
@Context HttpServletRequest req
)
{
UnaryOperator<CoordinatorCompactionConfig> callable = current -> {
final CoordinatorCompactionConfig newCompactionConfig;
final Map<String, DataSourceCompactionConfig> newConfigs = current
.getCompactionConfigs()
.stream()
.collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
UnaryOperator<DruidCompactionConfig> callable = current -> {
CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(newConfig, current.getEngine());
if (!validationResult.isValid()) {
if (validationResult.isValid()) {
return current.withDatasourceConfig(newConfig);
} else {
throw InvalidInput.exception("Compaction config not supported. Reason[%s].", validationResult.getReason());
}
// Don't persist config with the default engine if engine not specified, to enable update of the default.
newConfigs.put(newConfig.getDataSource(), newConfig);
newCompactionConfig = CoordinatorCompactionConfig.from(current, ImmutableList.copyOf(newConfigs.values()));
return newCompactionConfig;
};
return updateConfigHelper(
callable,
@ -183,18 +179,13 @@ public class CoordinatorCompactionConfigsResource
@Produces(MediaType.APPLICATION_JSON)
public Response getDatasourceCompactionConfig(@PathParam("dataSource") String dataSource)
{
final CoordinatorCompactionConfig current = configManager.getCurrentCompactionConfig();
final Map<String, DataSourceCompactionConfig> configs = current
.getCompactionConfigs()
.stream()
.collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
final DataSourceCompactionConfig config = configs.get(dataSource);
if (config == null) {
final DruidCompactionConfig current = configManager.getCurrentCompactionConfig();
final Optional<DataSourceCompactionConfig> config = current.findConfigForDatasource(dataSource);
if (config.isPresent()) {
return Response.ok().entity(config.get()).build();
} else {
return Response.status(Response.Status.NOT_FOUND).build();
}
return Response.ok().entity(config).build();
}
@GET
@ -211,25 +202,25 @@ public class CoordinatorCompactionConfigsResource
List<AuditEntry> auditEntries;
if (theInterval == null && count != null) {
auditEntries = auditManager.fetchAuditHistory(
CoordinatorCompactionConfig.CONFIG_KEY,
CoordinatorCompactionConfig.CONFIG_KEY,
DruidCompactionConfig.CONFIG_KEY,
DruidCompactionConfig.CONFIG_KEY,
count
);
} else {
auditEntries = auditManager.fetchAuditHistory(
CoordinatorCompactionConfig.CONFIG_KEY,
CoordinatorCompactionConfig.CONFIG_KEY,
DruidCompactionConfig.CONFIG_KEY,
DruidCompactionConfig.CONFIG_KEY,
theInterval
);
}
DataSourceCompactionConfigHistory history = new DataSourceCompactionConfigHistory(dataSource);
for (AuditEntry audit : auditEntries) {
CoordinatorCompactionConfig coordinatorCompactionConfig = configManager.convertBytesToCompactionConfig(
DruidCompactionConfig compactionConfig = configManager.convertBytesToCompactionConfig(
audit.getPayload().serialized().getBytes(StandardCharsets.UTF_8)
);
history.add(coordinatorCompactionConfig, audit.getAuditInfo(), audit.getAuditTime());
history.add(compactionConfig, audit.getAuditInfo(), audit.getAuditTime());
}
return Response.ok(history.getHistory()).build();
return Response.ok(history.getEntries()).build();
}
catch (IllegalArgumentException e) {
return Response.status(Response.Status.BAD_REQUEST)
@ -246,24 +237,20 @@ public class CoordinatorCompactionConfigsResource
@Context HttpServletRequest req
)
{
UnaryOperator<CoordinatorCompactionConfig> callable = current -> {
final Map<String, DataSourceCompactionConfig> configs = current
.getCompactionConfigs()
.stream()
.collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
UnaryOperator<DruidCompactionConfig> callable = current -> {
final Map<String, DataSourceCompactionConfig> configs = current.dataSourceToCompactionConfigMap();
final DataSourceCompactionConfig config = configs.remove(dataSource);
if (config == null) {
throw NotFound.exception("datasource not found");
}
return CoordinatorCompactionConfig.from(current, ImmutableList.copyOf(configs.values()));
return current.withDatasourceConfigs(ImmutableList.copyOf(configs.values()));
};
return updateConfigHelper(callable, AuthorizationUtils.buildAuditInfo(req));
}
private Response updateConfigHelper(
UnaryOperator<CoordinatorCompactionConfig> configOperator,
UnaryOperator<DruidCompactionConfig> configOperator,
AuditInfo auditInfo
)
{

View File

@ -1,40 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
public class CoordinatorCompactionConfigTest
{
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
@Test
public void testSerdeDefaultConfig() throws Exception
{
final CoordinatorCompactionConfig defaultConfig = CoordinatorCompactionConfig.empty();
final String json = MAPPER.writeValueAsString(defaultConfig);
CoordinatorCompactionConfig deserialized = MAPPER.readValue(json, CoordinatorCompactionConfig.class);
Assert.assertEquals(defaultConfig, deserialized);
}
}

View File

@ -19,81 +19,85 @@
package org.apache.druid.server.coordinator;
import org.apache.druid.audit.AuditInfo;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.java.util.common.DateTimes;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class DataSourceCompactionConfigAuditEntryTest
{
private static final double COMPACTION_TASK_SLOT_RATIO = 0.1;
private static final int MAX_COMPACTION_SLOTS = 9;
private static final boolean USE_AUTO_SCALE_SLOTS = true;
private static final String DS_WIKI = "wiki";
private final AuditInfo auditInfo = new AuditInfo("author", "identity", "comment", "ip");
private final DataSourceCompactionConfigAuditEntry firstEntry = new DataSourceCompactionConfigAuditEntry(
new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.MSQ),
DataSourceCompactionConfig.builder().forDataSource(DS_WIKI).build(),
auditInfo,
DateTimes.nowUtc()
);
@Mock
private CoordinatorCompactionConfig coordinatorCompactionConfig;
@Before
public void setUp()
@Test
public void testhasSameConfigWithSameBaseConfigIsTrue()
{
Mockito.when(coordinatorCompactionConfig.getCompactionTaskSlotRatio()).thenReturn(COMPACTION_TASK_SLOT_RATIO);
Mockito.when(coordinatorCompactionConfig.getMaxCompactionTaskSlots()).thenReturn(MAX_COMPACTION_SLOTS);
Mockito.when(coordinatorCompactionConfig.isUseAutoScaleSlots()).thenReturn(USE_AUTO_SCALE_SLOTS);
final DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry(
new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.MSQ),
DataSourceCompactionConfig.builder().forDataSource(DS_WIKI).build(),
auditInfo,
DateTimes.nowUtc()
);
Assert.assertTrue(firstEntry.hasSameConfig(secondEntry));
Assert.assertTrue(secondEntry.hasSameConfig(firstEntry));
}
@Test
public void testhasSameConfigWithSameBaseConfigShouldReturnTrue()
public void testhasSameConfigWithDifferentClusterConfigIsFalse()
{
DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig config =
new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig(
COMPACTION_TASK_SLOT_RATIO,
MAX_COMPACTION_SLOTS,
USE_AUTO_SCALE_SLOTS
);
DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry(
new ClusterCompactionConfig(0.1, 9, false, CompactionEngine.MSQ),
DataSourceCompactionConfig.builder().forDataSource(DS_WIKI).build(),
auditInfo,
DateTimes.nowUtc()
);
Assert.assertFalse(firstEntry.hasSameConfig(secondEntry));
Assert.assertFalse(secondEntry.hasSameConfig(firstEntry));
Assert.assertTrue(config.hasSameConfig(coordinatorCompactionConfig));
secondEntry = new DataSourceCompactionConfigAuditEntry(
new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.NATIVE),
DataSourceCompactionConfig.builder().forDataSource(DS_WIKI).build(),
auditInfo,
DateTimes.nowUtc()
);
Assert.assertFalse(firstEntry.hasSameConfig(secondEntry));
Assert.assertFalse(secondEntry.hasSameConfig(firstEntry));
}
@Test
public void testhasSameConfigWithDifferentUseAutoScaleSlotsShouldReturnFalse()
public void testhasSameConfigWithDifferentDatasourceConfigIsFalse()
{
DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig config =
new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig(
COMPACTION_TASK_SLOT_RATIO,
MAX_COMPACTION_SLOTS,
!USE_AUTO_SCALE_SLOTS
);
Assert.assertFalse(config.hasSameConfig(coordinatorCompactionConfig));
DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry(
new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.NATIVE),
DataSourceCompactionConfig.builder().forDataSource(DS_WIKI).build(),
auditInfo,
DateTimes.nowUtc()
);
Assert.assertFalse(firstEntry.hasSameConfig(secondEntry));
Assert.assertFalse(secondEntry.hasSameConfig(firstEntry));
}
@Test
public void testhasSameConfigWithDifferentMaxCompactionSlotsShouldReturnFalse()
public void testhasSameConfigWithNullDatasourceConfigIsFalse()
{
DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig config =
new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig(
COMPACTION_TASK_SLOT_RATIO,
MAX_COMPACTION_SLOTS + 1,
USE_AUTO_SCALE_SLOTS
);
Assert.assertFalse(config.hasSameConfig(coordinatorCompactionConfig));
}
@Test
public void testhasSameConfigWithDifferentCompactionSlotRatioShouldReturnFalse()
{
DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig config =
new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig(
COMPACTION_TASK_SLOT_RATIO - 0.03,
MAX_COMPACTION_SLOTS,
USE_AUTO_SCALE_SLOTS
);
Assert.assertFalse(config.hasSameConfig(coordinatorCompactionConfig));
final DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry(
new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.NATIVE),
null,
auditInfo,
DateTimes.nowUtc()
);
Assert.assertFalse(firstEntry.hasSameConfig(secondEntry));
Assert.assertFalse(secondEntry.hasSameConfig(firstEntry));
}
}

View File

@ -19,167 +19,183 @@
package org.apache.druid.server.coordinator;
import com.google.common.collect.ImmutableList;
import org.apache.druid.audit.AuditInfo;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.java.util.common.DateTimes;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.List;
@RunWith(MockitoJUnitRunner.class)
public class DataSourceCompactionConfigHistoryTest
{
private static final String DATASOURCE = "DATASOURCE";
private static final String DATASOURCE_2 = "DATASOURCE_2";
private static final String DATASOURCE_NOT_EXISTS = "DATASOURCE_NOT_EXISTS";
private static final double COMPACTION_TASK_SLOT_RATIO = 0.1;
private static final int MAX_COMPACTION_TASK_SLOTS = 9;
private static final boolean USE_AUTO_SCALE_SLOTS = false;
private static final DateTime AUDIT_TIME = DateTimes.of(2023, 1, 13, 9, 0);
private static final DateTime AUDIT_TIME_2 = DateTimes.of(2023, 1, 13, 9, 30);
private static final DateTime AUDIT_TIME_3 = DateTimes.of(2023, 1, 13, 10, 0);
private final AuditInfo auditInfo = new AuditInfo("author", "identity", "comment", "ip");
private final DataSourceCompactionConfig wikiCompactionConfig
= DataSourceCompactionConfig.builder().forDataSource(DS.WIKI).build();
@Mock
private CoordinatorCompactionConfig compactionConfig;
@Mock(answer = Answers.RETURNS_MOCKS)
private DataSourceCompactionConfig configForDataSource;
@Mock(answer = Answers.RETURNS_MOCKS)
private DataSourceCompactionConfig configForDataSourceWithChange;
@Mock(answer = Answers.RETURNS_MOCKS)
private DataSourceCompactionConfig configForDataSource2;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private AuditInfo auditInfo;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private AuditInfo auditInfo2;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private AuditInfo auditInfo3;
private DataSourceCompactionConfigHistory target;
private DataSourceCompactionConfigHistory wikiAuditHistory;
@Before
public void setUp()
public void setup()
{
Mockito.when(compactionConfig.getCompactionTaskSlotRatio()).thenReturn(COMPACTION_TASK_SLOT_RATIO);
Mockito.when(compactionConfig.getMaxCompactionTaskSlots()).thenReturn(MAX_COMPACTION_TASK_SLOTS);
Mockito.when(compactionConfig.isUseAutoScaleSlots()).thenReturn(USE_AUTO_SCALE_SLOTS);
Mockito.when(configForDataSource.getDataSource()).thenReturn(DATASOURCE);
Mockito.when(configForDataSourceWithChange.getDataSource()).thenReturn(DATASOURCE);
Mockito.when(configForDataSource2.getDataSource()).thenReturn(DATASOURCE_2);
Mockito.when(compactionConfig.getCompactionConfigs())
.thenReturn(ImmutableList.of(configForDataSource, configForDataSource2));
target = new DataSourceCompactionConfigHistory(DATASOURCE);
wikiAuditHistory = new DataSourceCompactionConfigHistory(DS.WIKI);
}
@Test
public void testAddCompactionConfigShouldAddToHistory()
public void testAddDatasourceConfigShouldAddToHistory()
{
target.add(compactionConfig, auditInfo, AUDIT_TIME);
Assert.assertEquals(1, target.getHistory().size());
DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0);
Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource());
final DateTime auditTime = DateTimes.nowUtc();
wikiAuditHistory.add(
DruidCompactionConfig.empty().withDatasourceConfig(wikiCompactionConfig),
auditInfo,
auditTime
);
Assert.assertEquals(1, wikiAuditHistory.getEntries().size());
DataSourceCompactionConfigAuditEntry auditEntry = wikiAuditHistory.getEntries().get(0);
Assert.assertEquals(wikiCompactionConfig, auditEntry.getCompactionConfig());
Assert.assertEquals(auditInfo, auditEntry.getAuditInfo());
Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime());
Assert.assertEquals(auditTime, auditEntry.getAuditTime());
}
@Test
public void testAddAndDeleteCompactionConfigShouldAddBothToHistory()
public void testAddDeleteDatasourceConfigShouldAddBothToHistory()
{
target.add(compactionConfig, auditInfo, AUDIT_TIME);
Mockito.when(compactionConfig.getCompactionConfigs()).thenReturn(ImmutableList.of(configForDataSource2));
target.add(compactionConfig, auditInfo2, AUDIT_TIME_2);
Assert.assertEquals(2, target.getHistory().size());
DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0);
Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource());
Assert.assertEquals(auditInfo, auditEntry.getAuditInfo());
Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime());
auditEntry = target.getHistory().get(1);
Assert.assertEquals(null, auditEntry.getCompactionConfig());
Assert.assertEquals(auditInfo2, auditEntry.getAuditInfo());
Assert.assertEquals(AUDIT_TIME_2, auditEntry.getAuditTime());
final DateTime auditTime = DateTimes.nowUtc();
wikiAuditHistory.add(
DruidCompactionConfig.empty().withDatasourceConfig(wikiCompactionConfig),
auditInfo,
auditTime
);
wikiAuditHistory.add(DruidCompactionConfig.empty(), auditInfo, auditTime.plusHours(2));
final List<DataSourceCompactionConfigAuditEntry> entries = wikiAuditHistory.getEntries();
Assert.assertEquals(2, entries.size());
final DataSourceCompactionConfigAuditEntry firstEntry = entries.get(0);
Assert.assertEquals(wikiCompactionConfig, firstEntry.getCompactionConfig());
Assert.assertEquals(auditInfo, firstEntry.getAuditInfo());
Assert.assertEquals(auditTime, firstEntry.getAuditTime());
final DataSourceCompactionConfigAuditEntry secondEntry = entries.get(1);
Assert.assertNull(secondEntry.getCompactionConfig());
Assert.assertEquals(firstEntry.getGlobalConfig(), secondEntry.getGlobalConfig());
Assert.assertEquals(auditInfo, secondEntry.getAuditInfo());
Assert.assertEquals(auditTime.plusHours(2), secondEntry.getAuditTime());
}
@Test
public void testAddAndDeleteAnotherCompactionConfigShouldNotAddToHistory()
public void testAddDeleteAnotherDatasourceConfigShouldNotAddToHistory()
{
target.add(compactionConfig, auditInfo, AUDIT_TIME);
Mockito.when(compactionConfig.getCompactionConfigs()).thenReturn(ImmutableList.of(configForDataSource));
target.add(compactionConfig, auditInfo2, AUDIT_TIME_2);
Assert.assertEquals(1, target.getHistory().size());
DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0);
Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource());
Assert.assertEquals(auditInfo, auditEntry.getAuditInfo());
Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime());
final DataSourceCompactionConfig koalaCompactionConfig
= DataSourceCompactionConfig.builder().forDataSource(DS.KOALA).build();
wikiAuditHistory.add(
DruidCompactionConfig.empty().withDatasourceConfig(koalaCompactionConfig),
auditInfo,
DateTimes.nowUtc()
);
wikiAuditHistory.add(DruidCompactionConfig.empty(), auditInfo, DateTimes.nowUtc());
Assert.assertTrue(wikiAuditHistory.getEntries().isEmpty());
}
@Test
public void testAddDeletedAddCompactionConfigShouldAddAllToHistory()
public void testAddDeleteAddDatasourceConfigShouldAddAllToHistory()
{
target.add(compactionConfig, auditInfo, AUDIT_TIME);
Mockito.when(compactionConfig.getCompactionConfigs()).thenReturn(ImmutableList.of(configForDataSource2));
target.add(compactionConfig, auditInfo2, AUDIT_TIME_2);
Mockito.when(compactionConfig.getCompactionConfigs())
.thenReturn(ImmutableList.of(configForDataSourceWithChange, configForDataSource2));
target.add(compactionConfig, auditInfo3, AUDIT_TIME_3);
Assert.assertEquals(3, target.getHistory().size());
DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0);
Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource());
Assert.assertEquals(auditInfo, auditEntry.getAuditInfo());
Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime());
auditEntry = target.getHistory().get(2);
Assert.assertEquals(configForDataSourceWithChange, auditEntry.getCompactionConfig());
Assert.assertEquals(auditInfo3, auditEntry.getAuditInfo());
Assert.assertEquals(AUDIT_TIME_3, auditEntry.getAuditTime());
final DateTime auditTime = DateTimes.nowUtc();
wikiAuditHistory.add(
DruidCompactionConfig.empty().withDatasourceConfig(wikiCompactionConfig),
auditInfo,
auditTime
);
wikiAuditHistory.add(
DruidCompactionConfig.empty(),
auditInfo,
auditTime.plusHours(2)
);
wikiAuditHistory.add(
DruidCompactionConfig.empty().withDatasourceConfig(wikiCompactionConfig),
auditInfo,
auditTime.plusHours(3)
);
final List<DataSourceCompactionConfigAuditEntry> entries = wikiAuditHistory.getEntries();
Assert.assertEquals(3, entries.size());
final DataSourceCompactionConfigAuditEntry firstEntry = entries.get(0);
final DataSourceCompactionConfigAuditEntry thirdEntry = entries.get(2);
Assert.assertTrue(firstEntry.hasSameConfig(thirdEntry));
}
@Test
public void testAddAndChangeCompactionConfigShouldAddBothToHistory()
public void testAddModifyDatasourceConfigShouldAddBothToHistory()
{
target.add(compactionConfig, auditInfo, AUDIT_TIME);
Mockito.when(compactionConfig.getCompactionConfigs()).thenReturn(ImmutableList.of(configForDataSourceWithChange));
target.add(compactionConfig, auditInfo2, AUDIT_TIME_2);
Assert.assertEquals(2, target.getHistory().size());
DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0);
Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource());
Assert.assertEquals(auditInfo, auditEntry.getAuditInfo());
Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime());
auditEntry = target.getHistory().get(1);
Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource());
Assert.assertEquals(auditInfo2, auditEntry.getAuditInfo());
Assert.assertEquals(AUDIT_TIME_2, auditEntry.getAuditTime());
final DateTime auditTime = DateTimes.nowUtc();
wikiAuditHistory.add(
DruidCompactionConfig.empty().withDatasourceConfig(wikiCompactionConfig),
auditInfo,
auditTime
);
final DataSourceCompactionConfig updatedWikiConfig
= DataSourceCompactionConfig.builder()
.forDataSource(DS.WIKI)
.withSkipOffsetFromLatest(Period.hours(5))
.build();
wikiAuditHistory.add(
DruidCompactionConfig.empty().withDatasourceConfig(updatedWikiConfig),
auditInfo,
auditTime.plusHours(3)
);
final List<DataSourceCompactionConfigAuditEntry> entries = wikiAuditHistory.getEntries();
Assert.assertEquals(2, entries.size());
final DataSourceCompactionConfigAuditEntry firstEntry = entries.get(0);
final DataSourceCompactionConfigAuditEntry secondEntry = entries.get(1);
Assert.assertEquals(firstEntry.getGlobalConfig(), secondEntry.getGlobalConfig());
Assert.assertEquals(wikiCompactionConfig, firstEntry.getCompactionConfig());
Assert.assertEquals(updatedWikiConfig, secondEntry.getCompactionConfig());
Assert.assertFalse(firstEntry.hasSameConfig(secondEntry));
}
@Test
public void testAddAndChangeGlobalSettingsShouldAddTwice()
public void testAddAndModifyClusterConfigShouldAddTwice()
{
target.add(compactionConfig, auditInfo, AUDIT_TIME);
int newMaxTaskSlots = MAX_COMPACTION_TASK_SLOTS - 1;
Mockito.when(compactionConfig.getMaxCompactionTaskSlots()).thenReturn(newMaxTaskSlots);
target.add(compactionConfig, auditInfo2, AUDIT_TIME_2);
Assert.assertEquals(2, target.getHistory().size());
DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0);
Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource());
Assert.assertEquals(auditInfo, auditEntry.getAuditInfo());
Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime());
Assert.assertEquals(MAX_COMPACTION_TASK_SLOTS, auditEntry.getGlobalConfig().getMaxCompactionTaskSlots());
auditEntry = target.getHistory().get(1);
Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource());
Assert.assertEquals(auditInfo2, auditEntry.getAuditInfo());
Assert.assertEquals(AUDIT_TIME_2, auditEntry.getAuditTime());
Assert.assertEquals(newMaxTaskSlots, auditEntry.getGlobalConfig().getMaxCompactionTaskSlots());
final DruidCompactionConfig originalConfig
= DruidCompactionConfig.empty().withDatasourceConfig(wikiCompactionConfig);
wikiAuditHistory.add(originalConfig, auditInfo, DateTimes.nowUtc());
final DruidCompactionConfig updatedConfig = originalConfig.withClusterConfig(
new ClusterCompactionConfig(null, null, null, CompactionEngine.MSQ)
);
wikiAuditHistory.add(updatedConfig, auditInfo, DateTimes.nowUtc());
final List<DataSourceCompactionConfigAuditEntry> entries = wikiAuditHistory.getEntries();
Assert.assertEquals(2, entries.size());
final DataSourceCompactionConfigAuditEntry firstEntry = entries.get(0);
final DataSourceCompactionConfigAuditEntry secondEntry = entries.get(1);
Assert.assertEquals(secondEntry.getCompactionConfig(), firstEntry.getCompactionConfig());
Assert.assertEquals(originalConfig.clusterConfig(), firstEntry.getGlobalConfig());
Assert.assertEquals(updatedConfig.clusterConfig(), secondEntry.getGlobalConfig());
Assert.assertFalse(firstEntry.hasSameConfig(secondEntry));
}
@Test
public void testAddCompactionConfigDoesNotHaveDataSourceWithNoHistoryShouldNotAdd()
private static class DS
{
target = new DataSourceCompactionConfigHistory(DATASOURCE_NOT_EXISTS);
target.add(compactionConfig, auditInfo, AUDIT_TIME);
Assert.assertTrue(target.getHistory().isEmpty());
static final String KOALA = "koala";
static final String WIKI = "wiki";
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
public class DruidCompactionConfigTest
{
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
@Test
public void testSerdeDefaultConfig() throws Exception
{
final DruidCompactionConfig defaultConfig = DruidCompactionConfig.empty();
final String json = MAPPER.writeValueAsString(defaultConfig);
DruidCompactionConfig deserialized = MAPPER.readValue(json, DruidCompactionConfig.class);
Assert.assertEquals(defaultConfig, deserialized);
}
@Test
public void testSerdeWithDatasourceConfigs() throws Exception
{
final DruidCompactionConfig config = new DruidCompactionConfig(
Arrays.asList(
DataSourceCompactionConfig
.builder()
.forDataSource(DS.WIKI)
.withSkipOffsetFromLatest(Period.hours(1))
.build(),
DataSourceCompactionConfig
.builder()
.forDataSource(DS.KOALA)
.withSkipOffsetFromLatest(Period.hours(2))
.build()
),
null,
null,
null,
CompactionEngine.MSQ
);
final String json = MAPPER.writeValueAsString(config);
DruidCompactionConfig deserialized = MAPPER.readValue(json, DruidCompactionConfig.class);
Assert.assertEquals(config, deserialized);
}
@Test
public void testCopyWithClusterConfig()
{
final DruidCompactionConfig config = DruidCompactionConfig.empty();
final ClusterCompactionConfig clusterConfig = new ClusterCompactionConfig(0.5, 10, false, CompactionEngine.MSQ);
final DruidCompactionConfig copy = config.withClusterConfig(clusterConfig);
Assert.assertEquals(clusterConfig, copy.clusterConfig());
Assert.assertNotEquals(clusterConfig, config.clusterConfig());
}
@Test
public void testCopyWithDatasourceConfigs()
{
final DruidCompactionConfig config = DruidCompactionConfig.empty();
Assert.assertTrue(config.getCompactionConfigs().isEmpty());
final DataSourceCompactionConfig dataSourceConfig
= DataSourceCompactionConfig.builder().forDataSource(DS.WIKI).withEngine(CompactionEngine.NATIVE).build();
final DruidCompactionConfig copy = config.withDatasourceConfigs(Collections.singletonList(dataSourceConfig));
Assert.assertEquals(1, copy.getCompactionConfigs().size());
Assert.assertEquals(dataSourceConfig, copy.findConfigForDatasource(DS.WIKI).orNull());
}
private static class DS
{
static final String WIKI = "wiki";
static final String KOALA = "koala";
}
}

View File

@ -136,11 +136,11 @@ public class DruidCoordinatorTest extends CuratorTestBase
).andReturn(new AtomicReference<>(CoordinatorDynamicConfig.builder().build())).anyTimes();
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorCompactionConfig.CONFIG_KEY),
EasyMock.eq(DruidCompactionConfig.CONFIG_KEY),
EasyMock.anyObject(Class.class),
EasyMock.anyObject()
)
).andReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty())).anyTimes();
).andReturn(new AtomicReference<>(DruidCompactionConfig.empty())).anyTimes();
EasyMock.replay(configManager);
setupServerAndCurator();
curator.start();
@ -734,11 +734,11 @@ public class DruidCoordinatorTest extends CuratorTestBase
).andReturn(new AtomicReference<>(CoordinatorDynamicConfig.builder().build())).anyTimes();
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorCompactionConfig.CONFIG_KEY),
EasyMock.eq(DruidCompactionConfig.CONFIG_KEY),
EasyMock.anyObject(Class.class),
EasyMock.anyObject()
)
).andReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty())).anyTimes();
).andReturn(new AtomicReference<>(DruidCompactionConfig.empty())).anyTimes();
EasyMock.replay(configManager);
DruidDataSource dataSource = new DruidDataSource("dataSource1", Collections.emptyMap());
DataSegment dataSegment = new DataSegment(

View File

@ -69,8 +69,8 @@ import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.indexing.BatchIOConfig;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
@ -1834,7 +1834,7 @@ public class CompactSegmentsTest
.newBuilder(DateTimes.nowUtc())
.withDataSourcesSnapshot(dataSources)
.withCompactionConfig(
new CoordinatorCompactionConfig(
new DruidCompactionConfig(
compactionConfigs,
numCompactionTaskSlots == null ? null : 1.0, // 100% when numCompactionTaskSlots is not null
numCompactionTaskSlots,

View File

@ -28,9 +28,9 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.metadata.MetadataStorageConnector;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorConfigManager;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.config.MetadataCleanupConfig;
@ -107,13 +107,13 @@ public class KillCompactionConfigTest
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
ArgumentMatchers.eq(DruidCompactionConfig.CONFIG_KEY))
).thenReturn(null);
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
).thenReturn(CoordinatorCompactionConfig.empty());
ArgumentMatchers.eq(DruidCompactionConfig.class),
ArgumentMatchers.eq(DruidCompactionConfig.empty()))
).thenReturn(DruidCompactionConfig.empty());
final MetadataCleanupConfig config
= new MetadataCleanupConfig(true, new Duration("PT6S"), null);
@ -129,14 +129,14 @@ public class KillCompactionConfigTest
Mockito.verify(mockJacksonConfigManager).convertByteToConfig(
ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
ArgumentMatchers.eq(DruidCompactionConfig.class),
ArgumentMatchers.eq(DruidCompactionConfig.empty())
);
Mockito.verify(mockConnector).lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
ArgumentMatchers.eq(DruidCompactionConfig.CONFIG_KEY)
);
Mockito.verifyNoMoreInteractions(mockJacksonConfigManager);
}
@ -177,24 +177,26 @@ public class KillCompactionConfigTest
null,
ImmutableMap.of("key", "val")
);
CoordinatorCompactionConfig originalCurrentConfig = CoordinatorCompactionConfig.from(ImmutableList.of(inactiveDatasourceConfig, activeDatasourceConfig));
DruidCompactionConfig originalCurrentConfig = DruidCompactionConfig.empty().withDatasourceConfigs(
ImmutableList.of(inactiveDatasourceConfig, activeDatasourceConfig)
);
byte[] originalCurrentConfigBytes = {1, 2, 3};
Mockito.when(mockConnector.lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
ArgumentMatchers.eq(DruidCompactionConfig.CONFIG_KEY))
).thenReturn(originalCurrentConfigBytes);
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(originalCurrentConfigBytes),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
ArgumentMatchers.eq(DruidCompactionConfig.class),
ArgumentMatchers.eq(DruidCompactionConfig.empty()))
).thenReturn(originalCurrentConfig);
Mockito.when(mockSqlSegmentsMetadataManager.retrieveAllDataSourceNames()).thenReturn(ImmutableSet.of(activeDatasourceName));
final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
final ArgumentCaptor<DruidCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(DruidCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
ArgumentMatchers.eq(DruidCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any())
@ -221,19 +223,19 @@ public class KillCompactionConfigTest
Mockito.verify(mockJacksonConfigManager).convertByteToConfig(
ArgumentMatchers.eq(originalCurrentConfigBytes),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
ArgumentMatchers.eq(DruidCompactionConfig.class),
ArgumentMatchers.eq(DruidCompactionConfig.empty())
);
Mockito.verify(mockConnector).lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
ArgumentMatchers.eq(DruidCompactionConfig.CONFIG_KEY)
);
Mockito.verify(mockJacksonConfigManager).set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
ArgumentMatchers.eq(DruidCompactionConfig.CONFIG_KEY),
ArgumentMatchers.any(byte[].class),
ArgumentMatchers.any(CoordinatorCompactionConfig.class),
ArgumentMatchers.any(DruidCompactionConfig.class),
ArgumentMatchers.any()
);
Mockito.verifyNoMoreInteractions(mockJacksonConfigManager);
@ -261,24 +263,26 @@ public class KillCompactionConfigTest
ImmutableMap.of("key", "val")
);
CoordinatorCompactionConfig originalCurrentConfig = CoordinatorCompactionConfig.from(ImmutableList.of(inactiveDatasourceConfig));
DruidCompactionConfig originalCurrentConfig = DruidCompactionConfig.empty().withDatasourceConfigs(
ImmutableList.of(inactiveDatasourceConfig)
);
byte[] originalCurrentConfigBytes = {1, 2, 3};
Mockito.when(mockConnector.lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
ArgumentMatchers.eq(DruidCompactionConfig.CONFIG_KEY))
).thenReturn(originalCurrentConfigBytes);
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(originalCurrentConfigBytes),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
ArgumentMatchers.eq(DruidCompactionConfig.class),
ArgumentMatchers.eq(DruidCompactionConfig.empty()))
).thenReturn(originalCurrentConfig);
Mockito.when(mockSqlSegmentsMetadataManager.retrieveAllDataSourceNames()).thenReturn(ImmutableSet.of());
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
ArgumentMatchers.eq(DruidCompactionConfig.CONFIG_KEY),
ArgumentMatchers.any(byte[].class),
ArgumentMatchers.any(CoordinatorCompactionConfig.class),
ArgumentMatchers.any(DruidCompactionConfig.class),
ArgumentMatchers.any())
).thenReturn(
// Return fail result with RetryableException the first three calls to updated set
@ -304,21 +308,21 @@ public class KillCompactionConfigTest
// Should call convertByteToConfig and lookup (to refresh current compaction config) four times due to RetryableException when failed
Mockito.verify(mockJacksonConfigManager, Mockito.times(4)).convertByteToConfig(
ArgumentMatchers.eq(originalCurrentConfigBytes),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
ArgumentMatchers.eq(DruidCompactionConfig.class),
ArgumentMatchers.eq(DruidCompactionConfig.empty())
);
Mockito.verify(mockConnector, Mockito.times(4)).lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
ArgumentMatchers.eq(DruidCompactionConfig.CONFIG_KEY)
);
// Should call set (to try set new updated compaction config) four times due to RetryableException when failed
Mockito.verify(mockJacksonConfigManager, Mockito.times(4)).set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
ArgumentMatchers.eq(DruidCompactionConfig.CONFIG_KEY),
ArgumentMatchers.any(byte[].class),
ArgumentMatchers.any(CoordinatorCompactionConfig.class),
ArgumentMatchers.any(DruidCompactionConfig.class),
ArgumentMatchers.any()
);
Mockito.verifyNoMoreInteractions(mockJacksonConfigManager);

View File

@ -37,9 +37,9 @@ import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.metrics.MetricsVerifier;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorConfigManager;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.MetadataManager;
import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
@ -512,11 +512,11 @@ public class CoordinatorSimulationBuilder
EasyMock.expect(
jacksonConfigManager.watch(
EasyMock.eq(CoordinatorCompactionConfig.CONFIG_KEY),
EasyMock.eq(CoordinatorCompactionConfig.class),
EasyMock.eq(DruidCompactionConfig.CONFIG_KEY),
EasyMock.eq(DruidCompactionConfig.class),
EasyMock.anyObject()
)
).andReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty())).anyTimes();
).andReturn(new AtomicReference<>(DruidCompactionConfig.empty())).anyTimes();
return jacksonConfigManager;
}

View File

@ -37,12 +37,12 @@ import org.apache.druid.metadata.MetadataCASUpdate;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.TestMetadataStorageConnector;
import org.apache.druid.metadata.TestMetadataStorageTablesConfig;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorConfigManager;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfigAuditEntry;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.config.DataSourceCompactionConfigBuilder;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
@ -97,9 +97,9 @@ public class CoordinatorCompactionConfigsResourceTest
@Test
public void testGetDefaultClusterConfig()
{
Response response = resource.getClusterCompactionConfig();
final CoordinatorCompactionConfig defaultConfig
= verifyAndGetPayload(response, CoordinatorCompactionConfig.class);
Response response = resource.getCompactionConfig();
final DruidCompactionConfig defaultConfig
= verifyAndGetPayload(response, DruidCompactionConfig.class);
Assert.assertEquals(0.1, defaultConfig.getCompactionTaskSlotRatio(), DELTA);
Assert.assertEquals(Integer.MAX_VALUE, defaultConfig.getMaxCompactionTaskSlots());
@ -112,14 +112,14 @@ public class CoordinatorCompactionConfigsResourceTest
public void testUpdateGlobalConfig()
{
Response response = resource.updateClusterCompactionConfig(
new CompactionConfigUpdateRequest(0.5, 10, true, CompactionEngine.MSQ),
new ClusterCompactionConfig(0.5, 10, true, CompactionEngine.MSQ),
mockHttpServletRequest
);
verifyStatus(Response.Status.OK, response);
final CoordinatorCompactionConfig updatedConfig = verifyAndGetPayload(
resource.getClusterCompactionConfig(),
CoordinatorCompactionConfig.class
final DruidCompactionConfig updatedConfig = verifyAndGetPayload(
resource.getCompactionConfig(),
DruidCompactionConfig.class
);
Assert.assertNotNull(updatedConfig);
@ -132,14 +132,14 @@ public class CoordinatorCompactionConfigsResourceTest
@Test
public void testSetCompactionTaskLimit()
{
final CoordinatorCompactionConfig defaultConfig
= verifyAndGetPayload(resource.getClusterCompactionConfig(), CoordinatorCompactionConfig.class);
final DruidCompactionConfig defaultConfig
= verifyAndGetPayload(resource.getCompactionConfig(), DruidCompactionConfig.class);
Response response = resource.setCompactionTaskLimit(0.5, 9, true, mockHttpServletRequest);
verifyStatus(Response.Status.OK, response);
final CoordinatorCompactionConfig updatedConfig
= verifyAndGetPayload(resource.getClusterCompactionConfig(), CoordinatorCompactionConfig.class);
final DruidCompactionConfig updatedConfig
= verifyAndGetPayload(resource.getCompactionConfig(), DruidCompactionConfig.class);
// Verify that the task slot fields have been updated
Assert.assertEquals(0.5, updatedConfig.getCompactionTaskSlotRatio(), DELTA);
@ -170,8 +170,8 @@ public class CoordinatorCompactionConfigsResourceTest
= verifyAndGetPayload(resource.getDatasourceCompactionConfig(DS.WIKI), DataSourceCompactionConfig.class);
Assert.assertEquals(newDatasourceConfig, fetchedDatasourceConfig);
final CoordinatorCompactionConfig fullCompactionConfig
= verifyAndGetPayload(resource.getClusterCompactionConfig(), CoordinatorCompactionConfig.class);
final DruidCompactionConfig fullCompactionConfig
= verifyAndGetPayload(resource.getCompactionConfig(), DruidCompactionConfig.class);
Assert.assertEquals(1, fullCompactionConfig.getCompactionConfigs().size());
Assert.assertEquals(newDatasourceConfig, fullCompactionConfig.getCompactionConfigs().get(0));
}
@ -214,8 +214,8 @@ public class CoordinatorCompactionConfigsResourceTest
= verifyAndGetPayload(resource.getDatasourceCompactionConfig(DS.WIKI), DataSourceCompactionConfig.class);
Assert.assertEquals(updatedDatasourceConfig, latestDatasourceConfig);
final CoordinatorCompactionConfig fullCompactionConfig
= verifyAndGetPayload(resource.getClusterCompactionConfig(), CoordinatorCompactionConfig.class);
final DruidCompactionConfig fullCompactionConfig
= verifyAndGetPayload(resource.getCompactionConfig(), DruidCompactionConfig.class);
Assert.assertEquals(1, fullCompactionConfig.getCompactionConfigs().size());
Assert.assertEquals(updatedDatasourceConfig, fullCompactionConfig.getCompactionConfigs().get(0));
}
@ -274,7 +274,7 @@ public class CoordinatorCompactionConfigsResourceTest
@Test
public void testGetDatasourceConfigHistory()
{
final DataSourceCompactionConfigBuilder builder
final DataSourceCompactionConfig.Builder builder
= DataSourceCompactionConfig.builder().forDataSource(DS.WIKI);
final DataSourceCompactionConfig configV1 = builder.build();
@ -340,7 +340,7 @@ public class CoordinatorCompactionConfigsResourceTest
verifyStatus(Response.Status.OK, response);
response = resource.updateClusterCompactionConfig(
new CompactionConfigUpdateRequest(null, null, null, CompactionEngine.MSQ),
new ClusterCompactionConfig(null, null, null, CompactionEngine.MSQ),
mockHttpServletRequest
);
verifyStatus(Response.Status.BAD_REQUEST, response);
@ -480,7 +480,7 @@ public class CoordinatorCompactionConfigsResourceTest
@Override
public ConfigManager.SetResult getAndUpdateCompactionConfig(
UnaryOperator<CoordinatorCompactionConfig> operator,
UnaryOperator<DruidCompactionConfig> operator,
AuditInfo auditInfo
)
{