Add API to update compaction engine (#16803)

Changes:
- Add API `/druid/coordinator/v1/config/compaction/global` to update cluster level compaction config
- Add class `CompactionConfigUpdateRequest`
- Fix bug in `CoordinatorCompactionConfig` which caused compaction engine to not be persisted.
Use json field name `engine` instead of `compactionEngine` because JSON field names must align
with the getter name.
- Update MSQ validation error messages
- Complete overhaul of `CoordinatorCompactionConfigResourceTest` to remove unnecessary mocking
and add more meaningful tests.
- Add `TuningConfigBuilder` to easily build tuning configs for tests.
- Add `DatasourceCompactionConfigBuilder`
This commit is contained in:
Kashif Faraz 2024-07-26 20:44:51 -07:00 committed by GitHub
parent c07aeedbec
commit caedeb66cd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 932 additions and 754 deletions

View File

@ -92,21 +92,12 @@ public class NewestSegmentFirstPolicyBenchmark
final String dataSource = DATA_SOURCE_PREFIX + i;
compactionConfigs.put(
dataSource,
new DataSourceCompactionConfig(
dataSource,
0,
inputSegmentSizeBytes,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
DataSourceCompactionConfig
.builder()
.forDataSource(dataSource)
.withTaskPriority(0)
.withInputSegmentSizeBytes(inputSegmentSizeBytes)
.build()
);
}

View File

@ -143,7 +143,7 @@ public class MSQCompactionRunner implements CompactionRunner
return validationResults.stream()
.filter(result -> !result.isValid())
.findFirst()
.orElse(new CompactionConfigValidationResult(true, null));
.orElse(CompactionConfigValidationResult.success());
}
@Override

View File

@ -211,7 +211,7 @@ public class MSQCompactionRunnerTest
CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"Different name[sum_added] and fieldName(s)[[added]] for aggregator unsupported for MSQ engine.",
"MSQ: Different name[sum_added] and fieldName(s)[[added]] for aggregator",
validationResult.getReason()
);
}

View File

@ -88,7 +88,7 @@ public class NativeCompactionRunner implements CompactionRunner
CompactionTask compactionTask
)
{
return new CompactionConfigValidationResult(true, null);
return CompactionConfigValidationResult.success();
}
/**

View File

@ -92,7 +92,7 @@ public class ClientCompactionRunnerInfo
{
CompactionEngine compactionEngine = newConfig.getEngine() == null ? defaultCompactionEngine : newConfig.getEngine();
if (compactionEngine == CompactionEngine.NATIVE) {
return new CompactionConfigValidationResult(true, null);
return CompactionConfigValidationResult.success();
} else {
return compactionConfigSupportedByMSQEngine(newConfig);
}
@ -125,7 +125,7 @@ public class ClientCompactionRunnerInfo
return validationResults.stream()
.filter(result -> !result.isValid())
.findFirst()
.orElse(new CompactionConfigValidationResult(true, null));
.orElse(CompactionConfigValidationResult.success());
}
/**
@ -135,22 +135,19 @@ public class ClientCompactionRunnerInfo
{
if (!(partitionsSpec instanceof DimensionRangePartitionsSpec
|| partitionsSpec instanceof DynamicPartitionsSpec)) {
return new CompactionConfigValidationResult(
false,
"Invalid partitionsSpec type[%s] for MSQ engine. Type must be either 'dynamic' or 'range'.",
return CompactionConfigValidationResult.failure(
"MSQ: Invalid partitioning type[%s]. Must be either 'dynamic' or 'range'",
partitionsSpec.getClass().getSimpleName()
);
}
if (partitionsSpec instanceof DynamicPartitionsSpec
&& ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows() != null) {
return new CompactionConfigValidationResult(
false,
"maxTotalRows[%d] in DynamicPartitionsSpec not supported for MSQ engine.",
((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows()
return CompactionConfigValidationResult.failure(
"MSQ: 'maxTotalRows' not supported with 'dynamic' partitioning"
);
}
return new CompactionConfigValidationResult(true, null);
return CompactionConfigValidationResult.success();
}
/**
@ -162,12 +159,11 @@ public class ClientCompactionRunnerInfo
)
{
if (metricsSpec != null && isRollup != null && !isRollup) {
return new CompactionConfigValidationResult(
false,
"rollup in granularitySpec must be set to True if metricsSpec is specifed for MSQ engine."
return CompactionConfigValidationResult.failure(
"MSQ: 'granularitySpec.rollup' must be true if 'metricsSpec' is specified"
);
}
return new CompactionConfigValidationResult(true, null);
return CompactionConfigValidationResult.success();
}
/**
@ -179,14 +175,13 @@ public class ClientCompactionRunnerInfo
int maxNumTasks = QueryContext.of(context)
.getInt(ClientMSQContext.CTX_MAX_NUM_TASKS, ClientMSQContext.DEFAULT_MAX_NUM_TASKS);
if (maxNumTasks < 2) {
return new CompactionConfigValidationResult(false,
"MSQ context maxNumTasks [%,d] cannot be less than 2, "
+ "since at least 1 controller and 1 worker is necessary.",
maxNumTasks
return CompactionConfigValidationResult.failure(
"MSQ: Context maxNumTasks[%,d] must be at least 2 (1 controller + 1 worker)",
maxNumTasks
);
}
}
return new CompactionConfigValidationResult(true, null);
return CompactionConfigValidationResult.success();
}
/**
@ -195,7 +190,7 @@ public class ClientCompactionRunnerInfo
public static CompactionConfigValidationResult validateMetricsSpecForMSQ(AggregatorFactory[] metricsSpec)
{
if (metricsSpec == null) {
return new CompactionConfigValidationResult(true, null);
return CompactionConfigValidationResult.success();
}
return Arrays.stream(metricsSpec)
.filter(aggregatorFactory ->
@ -206,11 +201,10 @@ public class ClientCompactionRunnerInfo
.equals(aggregatorFactory.getName())))
.findFirst()
.map(aggregatorFactory ->
new CompactionConfigValidationResult(
false,
"Different name[%s] and fieldName(s)[%s] for aggregator unsupported for MSQ engine.",
CompactionConfigValidationResult.failure(
"MSQ: Different name[%s] and fieldName(s)[%s] for aggregator",
aggregatorFactory.getName(),
aggregatorFactory.requiredFields()
)).orElse(new CompactionConfigValidationResult(true, null));
)).orElse(CompactionConfigValidationResult.success());
}
}

View File

@ -23,10 +23,23 @@ import org.apache.druid.java.util.common.StringUtils;
public class CompactionConfigValidationResult
{
private static final CompactionConfigValidationResult SUCCESS
= new CompactionConfigValidationResult(true, null);
private final boolean valid;
private final String reason;
public CompactionConfigValidationResult(boolean valid, String format, Object... args)
public static CompactionConfigValidationResult success()
{
return SUCCESS;
}
public static CompactionConfigValidationResult failure(String msgFormat, Object... args)
{
return new CompactionConfigValidationResult(false, msgFormat, args);
}
private CompactionConfigValidationResult(boolean valid, String format, Object... args)
{
this.valid = valid;
this.reason = format == null ? null : StringUtils.format(format, args);

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.config.Configs;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.server.http.CompactionConfigUpdateRequest;
import javax.annotation.Nullable;
import java.util.List;
@ -54,23 +55,21 @@ public class CoordinatorCompactionConfig
baseConfig.compactionTaskSlotRatio,
baseConfig.maxCompactionTaskSlots,
baseConfig.useAutoScaleSlots,
null
baseConfig.compactionEngine
);
}
public static CoordinatorCompactionConfig from(
CoordinatorCompactionConfig baseConfig,
@Nullable Double compactionTaskSlotRatio,
@Nullable Integer maxCompactionTaskSlots,
@Nullable Boolean useAutoScaleSlots
CompactionConfigUpdateRequest update
)
{
return new CoordinatorCompactionConfig(
baseConfig.compactionConfigs,
compactionTaskSlotRatio == null ? baseConfig.compactionTaskSlotRatio : compactionTaskSlotRatio,
maxCompactionTaskSlots == null ? baseConfig.maxCompactionTaskSlots : maxCompactionTaskSlots,
useAutoScaleSlots == null ? baseConfig.useAutoScaleSlots : useAutoScaleSlots,
null
Configs.valueOrDefault(update.getCompactionTaskSlotRatio(), baseConfig.compactionTaskSlotRatio),
Configs.valueOrDefault(update.getMaxCompactionTaskSlots(), baseConfig.maxCompactionTaskSlots),
Configs.valueOrDefault(update.getUseAutoScaleSlots(), baseConfig.useAutoScaleSlots),
Configs.valueOrDefault(update.getCompactionEngine(), baseConfig.compactionEngine)
);
}
@ -90,7 +89,7 @@ public class CoordinatorCompactionConfig
@JsonProperty("compactionTaskSlotRatio") @Nullable Double compactionTaskSlotRatio,
@JsonProperty("maxCompactionTaskSlots") @Nullable Integer maxCompactionTaskSlots,
@JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots,
@JsonProperty("compactionEngine") @Nullable CompactionEngine compactionEngine
@JsonProperty("engine") @Nullable CompactionEngine compactionEngine
)
{
this.compactionConfigs = compactionConfigs;

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.server.coordinator.config.DataSourceCompactionConfigBuilder;
import org.joda.time.Period;
import javax.annotation.Nullable;
@ -42,6 +43,12 @@ public class DataSourceCompactionConfig
private final String dataSource;
private final int taskPriority;
private final long inputSegmentSizeBytes;
public static DataSourceCompactionConfigBuilder builder()
{
return new DataSourceCompactionConfigBuilder();
}
/**
* The number of input segments is limited because the byte size of a serialized task spec is limited by
* org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig.maxZnodeBytes.

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.config;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskTransformConfig;
import org.joda.time.Period;
import java.util.Map;
public class DataSourceCompactionConfigBuilder
{
private String dataSource;
private Integer taskPriority;
private Long inputSegmentSizeBytes;
private Integer maxRowsPerSegment;
private Period skipOffsetFromLatest;
private UserCompactionTaskQueryTuningConfig tuningConfig;
private UserCompactionTaskGranularityConfig granularitySpec;
private UserCompactionTaskDimensionsConfig dimensionsSpec;
private AggregatorFactory[] metricsSpec;
private UserCompactionTaskTransformConfig transformSpec;
private UserCompactionTaskIOConfig ioConfig;
private CompactionEngine engine;
private Map<String, Object> taskContext;
public DataSourceCompactionConfig build()
{
return new DataSourceCompactionConfig(
dataSource,
taskPriority,
inputSegmentSizeBytes,
maxRowsPerSegment,
skipOffsetFromLatest,
tuningConfig,
granularitySpec,
dimensionsSpec,
metricsSpec,
transformSpec,
ioConfig,
engine,
taskContext
);
}
public DataSourceCompactionConfigBuilder forDataSource(String dataSource)
{
this.dataSource = dataSource;
return this;
}
public DataSourceCompactionConfigBuilder withTaskPriority(Integer taskPriority)
{
this.taskPriority = taskPriority;
return this;
}
public DataSourceCompactionConfigBuilder withInputSegmentSizeBytes(Long inputSegmentSizeBytes)
{
this.inputSegmentSizeBytes = inputSegmentSizeBytes;
return this;
}
@Deprecated
public DataSourceCompactionConfigBuilder withMaxRowsPerSegment(Integer maxRowsPerSegment)
{
this.maxRowsPerSegment = maxRowsPerSegment;
return this;
}
public DataSourceCompactionConfigBuilder withSkipOffsetFromLatest(Period skipOffsetFromLatest)
{
this.skipOffsetFromLatest = skipOffsetFromLatest;
return this;
}
public DataSourceCompactionConfigBuilder withTuningConfig(
UserCompactionTaskQueryTuningConfig tuningConfig
)
{
this.tuningConfig = tuningConfig;
return this;
}
public DataSourceCompactionConfigBuilder withGranularitySpec(
UserCompactionTaskGranularityConfig granularitySpec
)
{
this.granularitySpec = granularitySpec;
return this;
}
public DataSourceCompactionConfigBuilder withDimensionsSpec(
UserCompactionTaskDimensionsConfig dimensionsSpec
)
{
this.dimensionsSpec = dimensionsSpec;
return this;
}
public DataSourceCompactionConfigBuilder withMetricsSpec(AggregatorFactory[] metricsSpec)
{
this.metricsSpec = metricsSpec;
return this;
}
public DataSourceCompactionConfigBuilder withTransformSpec(
UserCompactionTaskTransformConfig transformSpec
)
{
this.transformSpec = transformSpec;
return this;
}
public DataSourceCompactionConfigBuilder withIoConfig(UserCompactionTaskIOConfig ioConfig)
{
this.ioConfig = ioConfig;
return this;
}
public DataSourceCompactionConfigBuilder withEngine(CompactionEngine engine)
{
this.engine = engine;
return this;
}
public DataSourceCompactionConfigBuilder withTaskContext(Map<String, Object> taskContext)
{
this.taskContext = taskContext;
return this;
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.http;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.CompactionEngine;
import javax.annotation.Nullable;
/**
* Payload to update the cluster-level compaction config.
* All fields of this class must be nullable. A non-value indicates that the
* corresponding field is being updated.
*/
public class CompactionConfigUpdateRequest
{
private final Double compactionTaskSlotRatio;
private final Integer maxCompactionTaskSlots;
private final Boolean useAutoScaleSlots;
private final CompactionEngine compactionEngine;
@JsonCreator
public CompactionConfigUpdateRequest(
@JsonProperty("compactionTaskSlotRatio") @Nullable Double compactionTaskSlotRatio,
@JsonProperty("maxCompactionTaskSlots") @Nullable Integer maxCompactionTaskSlots,
@JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots,
@JsonProperty("compactionEngine") @Nullable CompactionEngine compactionEngine
)
{
this.compactionTaskSlotRatio = compactionTaskSlotRatio;
this.maxCompactionTaskSlots = maxCompactionTaskSlots;
this.useAutoScaleSlots = useAutoScaleSlots;
this.compactionEngine = compactionEngine;
}
@Nullable
@JsonProperty
public Double getCompactionTaskSlotRatio()
{
return compactionTaskSlotRatio;
}
@Nullable
@JsonProperty
public Integer getMaxCompactionTaskSlots()
{
return maxCompactionTaskSlots;
}
@Nullable
@JsonProperty
public Boolean getUseAutoScaleSlots()
{
return useAutoScaleSlots;
}
@Nullable
@JsonProperty
public CompactionEngine getCompactionEngine()
{
return compactionEngine;
}
}

View File

@ -40,6 +40,7 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfigHistory;
import org.apache.druid.server.http.security.ConfigResourceFilter;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Interval;
import javax.servlet.http.HttpServletRequest;
@ -69,7 +70,7 @@ public class CoordinatorCompactionConfigsResource
{
private static final Logger LOG = new Logger(CoordinatorCompactionConfigsResource.class);
private static final long UPDATE_RETRY_DELAY = 1000;
static final int UPDATE_NUM_RETRY = 5;
static final int MAX_UPDATE_RETRIES = 5;
private final CoordinatorConfigManager configManager;
private final AuditManager auditManager;
@ -86,11 +87,46 @@ public class CoordinatorCompactionConfigsResource
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getCompactionConfig()
public Response getClusterCompactionConfig()
{
return Response.ok(configManager.getCurrentCompactionConfig()).build();
}
@POST
@Path("/global")
@Consumes(MediaType.APPLICATION_JSON)
public Response updateClusterCompactionConfig(
CompactionConfigUpdateRequest updatePayload,
@Context HttpServletRequest req
)
{
UnaryOperator<CoordinatorCompactionConfig> operator = current -> {
final CoordinatorCompactionConfig newConfig = CoordinatorCompactionConfig.from(current, updatePayload);
final List<DataSourceCompactionConfig> datasourceConfigs = newConfig.getCompactionConfigs();
if (CollectionUtils.isNullOrEmpty(datasourceConfigs)
|| current.getEngine() == newConfig.getEngine()) {
return newConfig;
}
// Validate all the datasource configs against the new engine
for (DataSourceCompactionConfig datasourceConfig : datasourceConfigs) {
CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(datasourceConfig, newConfig.getEngine());
if (!validationResult.isValid()) {
throw InvalidInput.exception(
"Cannot update engine to [%s] as it does not support"
+ " compaction config of DataSource[%s]. Reason[%s].",
newConfig.getEngine(), datasourceConfig.getDataSource(), validationResult.getReason()
);
}
}
return newConfig;
};
return updateConfigHelper(operator, AuthorizationUtils.buildAuditInfo(req));
}
@POST
@Path("/taskslots")
@Consumes(MediaType.APPLICATION_JSON)
@ -101,19 +137,20 @@ public class CoordinatorCompactionConfigsResource
@Context HttpServletRequest req
)
{
UnaryOperator<CoordinatorCompactionConfig> operator =
current -> CoordinatorCompactionConfig.from(
current,
return updateClusterCompactionConfig(
new CompactionConfigUpdateRequest(
compactionTaskSlotRatio,
maxCompactionTaskSlots,
useAutoScaleSlots
);
return updateConfigHelper(operator, AuthorizationUtils.buildAuditInfo(req));
useAutoScaleSlots,
null
),
req
);
}
@POST
@Consumes(MediaType.APPLICATION_JSON)
public Response addOrUpdateCompactionConfig(
public Response addOrUpdateDatasourceCompactionConfig(
final DataSourceCompactionConfig newConfig,
@Context HttpServletRequest req
)
@ -144,7 +181,7 @@ public class CoordinatorCompactionConfigsResource
@GET
@Path("/{dataSource}")
@Produces(MediaType.APPLICATION_JSON)
public Response getCompactionConfig(@PathParam("dataSource") String dataSource)
public Response getDatasourceCompactionConfig(@PathParam("dataSource") String dataSource)
{
final CoordinatorCompactionConfig current = configManager.getCurrentCompactionConfig();
final Map<String, DataSourceCompactionConfig> configs = current
@ -233,7 +270,7 @@ public class CoordinatorCompactionConfigsResource
int attemps = 0;
SetResult setResult = null;
try {
while (attemps < UPDATE_NUM_RETRY) {
while (attemps < MAX_UPDATE_RETRIES) {
setResult = configManager.getAndUpdateCompactionConfig(configOperator, auditInfo);
if (setResult.isOk() || !setResult.isRetryable()) {
break;

View File

@ -64,8 +64,7 @@ public class ClientCompactionRunnerInfoTest
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"Invalid partitionsSpec type[HashedPartitionsSpec] for MSQ engine."
+ " Type must be either 'dynamic' or 'range'.",
"MSQ: Invalid partitioning type[HashedPartitionsSpec]. Must be either 'dynamic' or 'range'",
validationResult.getReason()
);
}
@ -85,7 +84,7 @@ public class ClientCompactionRunnerInfoTest
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"maxTotalRows[100] in DynamicPartitionsSpec not supported for MSQ engine.",
"MSQ: 'maxTotalRows' not supported with 'dynamic' partitioning",
validationResult.getReason()
);
}
@ -144,7 +143,7 @@ public class ClientCompactionRunnerInfoTest
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"rollup in granularitySpec must be set to True if metricsSpec is specifed for MSQ engine.",
"MSQ: 'granularitySpec.rollup' must be true if 'metricsSpec' is specified",
validationResult.getReason()
);
}
@ -167,7 +166,7 @@ public class ClientCompactionRunnerInfoTest
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"Different name[sum_added] and fieldName(s)[[added]] for aggregator unsupported for MSQ engine.",
"MSQ: Different name[sum_added] and fieldName(s)[[added]] for aggregator",
validationResult.getReason()
);
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
public class CoordinatorCompactionConfigTest
{
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
@Test
public void testSerdeDefaultConfig() throws Exception
{
final CoordinatorCompactionConfig defaultConfig = CoordinatorCompactionConfig.empty();
final String json = MAPPER.writeValueAsString(defaultConfig);
CoordinatorCompactionConfig deserialized = MAPPER.readValue(json, CoordinatorCompactionConfig.class);
Assert.assertEquals(defaultConfig, deserialized);
}
}

View File

@ -42,9 +42,7 @@ import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
@ -52,27 +50,15 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest
{
private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
@Rule
public final ExpectedException expectedException = ExpectedException.none();
@Test
public void testSerdeBasic() throws IOException
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
null,
null,
new Period(3600),
null,
null,
null,
null,
null,
null,
null,
ImmutableMap.of("key", "val")
);
final DataSourceCompactionConfig config = DataSourceCompactionConfig
.builder()
.forDataSource("dataSource")
.withSkipOffsetFromLatest(new Period(3600))
.withTaskContext(ImmutableMap.of("key", "val"))
.build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
@ -90,21 +76,15 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest
@Test
public void testSerdeWithMaxRowsPerSegment() throws IOException
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
30,
new Period(3600),
null,
null,
null,
null,
null,
null,
CompactionEngine.MSQ,
ImmutableMap.of("key", "val")
);
final DataSourceCompactionConfig config = DataSourceCompactionConfig
.builder()
.forDataSource("dataSource")
.withInputSegmentSizeBytes(500L)
.withMaxRowsPerSegment(30)
.withSkipOffsetFromLatest(new Period(3600))
.withEngine(CompactionEngine.MSQ)
.withTaskContext(ImmutableMap.of("key", "val"))
.build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
@ -121,41 +101,14 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest
@Test
public void testSerdeWithMaxTotalRows() throws IOException
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
10000L,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
CompactionEngine.NATIVE,
ImmutableMap.of("key", "val")
);
final DataSourceCompactionConfig config = DataSourceCompactionConfig
.builder()
.forDataSource("dataSource")
.withInputSegmentSizeBytes(500L)
.withSkipOffsetFromLatest(new Period(3600))
.withEngine(CompactionEngine.NATIVE)
.withTaskContext(ImmutableMap.of("key", "val"))
.build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
@ -172,42 +125,14 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest
@Test
public void testSerdeMaxTotalRowsWithMaxRowsPerSegment() throws IOException
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
10000,
new Period(3600),
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
10000L,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
ImmutableMap.of("key", "val")
);
final DataSourceCompactionConfig config = DataSourceCompactionConfig
.builder()
.forDataSource("dataSource")
.withInputSegmentSizeBytes(500L)
.withMaxRowsPerSegment(10000)
.withSkipOffsetFromLatest(new Period(3600))
.withTaskContext(ImmutableMap.of("key", "val"))
.build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
@ -301,21 +226,14 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest
@Test
public void testSerdeGranularitySpec() throws IOException
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
null,
null,
null,
null,
ImmutableMap.of("key", "val")
);
final DataSourceCompactionConfig config = DataSourceCompactionConfig
.builder()
.forDataSource("dataSource")
.withInputSegmentSizeBytes(500L)
.withSkipOffsetFromLatest(new Period(3600))
.withGranularitySpec(new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))
.withTaskContext(ImmutableMap.of("key", "val"))
.build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
@ -332,21 +250,14 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest
@Test
public void testSerdeGranularitySpecWithQueryGranularity() throws Exception
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(null, Granularities.YEAR, null),
null,
null,
null,
null,
null,
ImmutableMap.of("key", "val")
);
final DataSourceCompactionConfig config = DataSourceCompactionConfig
.builder()
.forDataSource("dataSource")
.withInputSegmentSizeBytes(500L)
.withSkipOffsetFromLatest(new Period(3600))
.withGranularitySpec(new UserCompactionTaskGranularityConfig(null, Granularities.YEAR, null))
.withTaskContext(ImmutableMap.of("key", "val"))
.build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
@ -366,21 +277,13 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest
@Test
public void testSerdeWithNullGranularitySpec() throws IOException
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
null,
null,
null,
null,
null,
null,
null,
ImmutableMap.of("key", "val")
);
final DataSourceCompactionConfig config = DataSourceCompactionConfig
.builder()
.forDataSource("dataSource")
.withInputSegmentSizeBytes(500L)
.withSkipOffsetFromLatest(new Period(3600))
.withTaskContext(ImmutableMap.of("key", "val"))
.build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
@ -397,21 +300,14 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest
@Test
public void testSerdeGranularitySpecWithNullValues() throws IOException
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(null, null, null),
null,
null,
null,
null,
null,
ImmutableMap.of("key", "val")
);
final DataSourceCompactionConfig config = DataSourceCompactionConfig
.builder()
.forDataSource("dataSource")
.withInputSegmentSizeBytes(500L)
.withSkipOffsetFromLatest(new Period(3600))
.withGranularitySpec(new UserCompactionTaskGranularityConfig(null, null, null))
.withTaskContext(ImmutableMap.of("key", "val"))
.build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
@ -428,21 +324,14 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest
@Test
public void testSerdeGranularitySpecWithRollup() throws IOException
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(null, null, true),
null,
null,
null,
null,
null,
ImmutableMap.of("key", "val")
);
final DataSourceCompactionConfig config = DataSourceCompactionConfig
.builder()
.forDataSource("dataSource")
.withInputSegmentSizeBytes(500L)
.withSkipOffsetFromLatest(new Period(3600))
.withGranularitySpec(new UserCompactionTaskGranularityConfig(null, null, true))
.withTaskContext(ImmutableMap.of("key", "val"))
.build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
@ -462,21 +351,15 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest
@Test
public void testSerdeIOConfigWithNonNullDropExisting() throws IOException
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
null,
null,
new UserCompactionTaskIOConfig(true),
null,
ImmutableMap.of("key", "val")
);
final DataSourceCompactionConfig config = DataSourceCompactionConfig
.builder()
.forDataSource("dataSource")
.withInputSegmentSizeBytes(500L)
.withSkipOffsetFromLatest(new Period(3600))
.withGranularitySpec(new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))
.withIoConfig(new UserCompactionTaskIOConfig(true))
.withTaskContext(ImmutableMap.of("key", "val"))
.build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
@ -494,21 +377,15 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest
@Test
public void testSerdeIOConfigWithNullDropExisting() throws IOException
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
null,
null,
new UserCompactionTaskIOConfig(null),
null,
ImmutableMap.of("key", "val")
);
final DataSourceCompactionConfig config = DataSourceCompactionConfig
.builder()
.forDataSource("dataSource")
.withInputSegmentSizeBytes(500L)
.withSkipOffsetFromLatest(new Period(3600))
.withGranularitySpec(new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))
.withIoConfig(new UserCompactionTaskIOConfig(null))
.withTaskContext(ImmutableMap.of("key", "val"))
.build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
@ -526,21 +403,18 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest
@Test
public void testSerdeDimensionsSpec() throws IOException
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
null,
null,
new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo"))),
null,
null,
null,
null,
ImmutableMap.of("key", "val")
);
final DataSourceCompactionConfig config = DataSourceCompactionConfig
.builder()
.forDataSource("dataSource")
.withInputSegmentSizeBytes(500L)
.withSkipOffsetFromLatest(new Period(3600))
.withDimensionsSpec(
new UserCompactionTaskDimensionsConfig(
DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo"))
)
)
.withTaskContext(ImmutableMap.of("key", "val"))
.build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
@ -558,21 +432,14 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest
public void testSerdeTransformSpec() throws IOException
{
NullHandling.initializeForTests();
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
null,
null,
null,
null,
new UserCompactionTaskTransformConfig(new SelectorDimFilter("dim1", "foo", null)),
null,
null,
ImmutableMap.of("key", "val")
);
final DataSourceCompactionConfig config = DataSourceCompactionConfig
.builder()
.forDataSource("dataSource")
.withInputSegmentSizeBytes(500L)
.withSkipOffsetFromLatest(new Period(3600))
.withTransformSpec(new UserCompactionTaskTransformConfig(new SelectorDimFilter("dim1", "foo", null)))
.withTaskContext(ImmutableMap.of("key", "val"))
.build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
@ -589,21 +456,14 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest
@Test
public void testSerdeMetricsSpec() throws IOException
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
null,
null,
null,
new AggregatorFactory[] {new CountAggregatorFactory("cnt")},
null,
null,
null,
ImmutableMap.of("key", "val")
);
final DataSourceCompactionConfig config = DataSourceCompactionConfig
.builder()
.forDataSource("dataSource")
.withInputSegmentSizeBytes(500L)
.withSkipOffsetFromLatest(new Period(3600))
.withMetricsSpec(new AggregatorFactory[]{new CountAggregatorFactory("cnt")})
.withTaskContext(ImmutableMap.of("key", "val"))
.build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);

View File

@ -19,528 +19,529 @@
package org.apache.druid.server.http;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
import org.apache.druid.audit.AuditEntry;
import org.apache.druid.audit.AuditInfo;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.client.indexing.ClientMSQContext;
import org.apache.druid.common.config.ConfigManager;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.error.DruidException;
import org.apache.druid.common.config.TestConfigManagerConfig;
import org.apache.druid.error.ErrorResponse;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.metadata.MetadataStorageConnector;
import org.apache.druid.metadata.MetadataCASUpdate;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.TestMetadataStorageConnector;
import org.apache.druid.metadata.TestMetadataStorageTablesConfig;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorConfigManager;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfigAuditEntry;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.config.DataSourceCompactionConfigBuilder;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.skife.jdbi.v2.Handle;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import java.util.Collection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.UnaryOperator;
@RunWith(MockitoJUnitRunner.class)
public class CoordinatorCompactionConfigsResourceTest
{
private static final DataSourceCompactionConfig OLD_CONFIG = new DataSourceCompactionConfig(
"oldDataSource",
null,
500L,
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
null,
null,
null,
null,
ImmutableMap.of("key", "val")
);
private static final DataSourceCompactionConfig NEW_CONFIG = new DataSourceCompactionConfig(
"newDataSource",
null,
500L,
null,
new Period(1800),
null,
new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null),
null,
null,
null,
null,
null,
ImmutableMap.of("key", "val")
);
private static final byte[] OLD_CONFIG_IN_BYTES = {1, 2, 3};
private static final CoordinatorCompactionConfig ORIGINAL_CONFIG
= CoordinatorCompactionConfig.from(ImmutableList.of(OLD_CONFIG));
private static final String DATASOURCE_NOT_EXISTS = "notExists";
@Mock
private JacksonConfigManager mockJacksonConfigManager;
private static final double DELTA = 1e-9;
private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
@Mock
private HttpServletRequest mockHttpServletRequest;
@Mock
private MetadataStorageConnector mockConnector;
@Mock
private MetadataStorageTablesConfig mockConnectorConfig;
@Mock
private AuditManager mockAuditManager;
private CoordinatorCompactionConfigsResource coordinatorCompactionConfigsResource;
private TestCoordinatorConfigManager configManager;
private CoordinatorCompactionConfigsResource resource;
@Before
public void setup()
{
Mockito.when(mockConnector.lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
)
).thenReturn(OLD_CONFIG_IN_BYTES);
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
)
).thenReturn(ORIGINAL_CONFIG);
Mockito.when(mockConnectorConfig.getConfigTable()).thenReturn("druid_config");
Mockito.when(mockAuditManager.fetchAuditHistory(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
ArgumentMatchers.any()
)
).thenReturn(ImmutableList.of());
coordinatorCompactionConfigsResource = new CoordinatorCompactionConfigsResource(
new CoordinatorConfigManager(mockJacksonConfigManager, mockConnector, mockConnectorConfig),
mockAuditManager
);
Mockito.when(mockHttpServletRequest.getRemoteAddr()).thenReturn("123");
final AuditManager auditManager = new TestAuditManager();
configManager = TestCoordinatorConfigManager.create(auditManager);
resource = new CoordinatorCompactionConfigsResource(configManager, auditManager);
configManager.delegate.start();
}
@After
public void tearDown()
{
configManager.delegate.stop();
}
@Test
public void testSetCompactionTaskLimitWithExistingConfig()
public void testGetDefaultClusterConfig()
{
final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(
CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any()
)
).thenReturn(ConfigManager.SetResult.ok());
Response response = resource.getClusterCompactionConfig();
final CoordinatorCompactionConfig defaultConfig
= verifyAndGetPayload(response, CoordinatorCompactionConfig.class);
double compactionTaskSlotRatio = 0.5;
int maxCompactionTaskSlots = 9;
Response result = coordinatorCompactionConfigsResource.setCompactionTaskLimit(
compactionTaskSlotRatio,
maxCompactionTaskSlots,
true,
Assert.assertEquals(0.1, defaultConfig.getCompactionTaskSlotRatio(), DELTA);
Assert.assertEquals(Integer.MAX_VALUE, defaultConfig.getMaxCompactionTaskSlots());
Assert.assertFalse(defaultConfig.isUseAutoScaleSlots());
Assert.assertTrue(defaultConfig.getCompactionConfigs().isEmpty());
Assert.assertEquals(CompactionEngine.NATIVE, defaultConfig.getEngine());
}
@Test
public void testUpdateGlobalConfig()
{
Response response = resource.updateClusterCompactionConfig(
new CompactionConfigUpdateRequest(0.5, 10, true, CompactionEngine.MSQ),
mockHttpServletRequest
);
Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
Assert.assertNotNull(oldConfigCaptor.getValue());
Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES);
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(), maxCompactionTaskSlots);
Assert.assertTrue(newConfigCaptor.getValue().isUseAutoScaleSlots());
Assert.assertEquals(compactionTaskSlotRatio, newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0);
verifyStatus(Response.Status.OK, response);
final CoordinatorCompactionConfig updatedConfig = verifyAndGetPayload(
resource.getClusterCompactionConfig(),
CoordinatorCompactionConfig.class
);
Assert.assertNotNull(updatedConfig);
Assert.assertEquals(0.5, updatedConfig.getCompactionTaskSlotRatio(), DELTA);
Assert.assertEquals(10, updatedConfig.getMaxCompactionTaskSlots());
Assert.assertTrue(updatedConfig.isUseAutoScaleSlots());
Assert.assertEquals(CompactionEngine.MSQ, updatedConfig.getEngine());
}
@Test
public void testAddOrUpdateCompactionConfigWithExistingConfig()
public void testSetCompactionTaskLimit()
{
final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(
CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any()
)
).thenReturn(ConfigManager.SetResult.ok());
final CoordinatorCompactionConfig defaultConfig
= verifyAndGetPayload(resource.getClusterCompactionConfig(), CoordinatorCompactionConfig.class);
final DataSourceCompactionConfig newConfig = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, true),
null,
null,
null,
null,
CompactionEngine.NATIVE,
ImmutableMap.of("key", "val")
);
Response result = coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
newConfig,
mockHttpServletRequest
);
Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
Assert.assertNotNull(oldConfigCaptor.getValue());
Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES);
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(2, newConfigCaptor.getValue().getCompactionConfigs().size());
Assert.assertEquals(OLD_CONFIG, newConfigCaptor.getValue().getCompactionConfigs().get(0));
Assert.assertEquals(newConfig, newConfigCaptor.getValue().getCompactionConfigs().get(1));
Assert.assertEquals(newConfig.getEngine(), newConfigCaptor.getValue().getEngine());
Response response = resource.setCompactionTaskLimit(0.5, 9, true, mockHttpServletRequest);
verifyStatus(Response.Status.OK, response);
final CoordinatorCompactionConfig updatedConfig
= verifyAndGetPayload(resource.getClusterCompactionConfig(), CoordinatorCompactionConfig.class);
// Verify that the task slot fields have been updated
Assert.assertEquals(0.5, updatedConfig.getCompactionTaskSlotRatio(), DELTA);
Assert.assertEquals(9, updatedConfig.getMaxCompactionTaskSlots());
Assert.assertTrue(updatedConfig.isUseAutoScaleSlots());
// Verify that the other fields are unchanged
Assert.assertEquals(defaultConfig.getCompactionConfigs(), updatedConfig.getCompactionConfigs());
Assert.assertEquals(defaultConfig.getEngine(), updatedConfig.getEngine());
}
@Test
public void testDeleteCompactionConfigWithExistingConfig()
public void testGetUnknownDatasourceConfigThrowsNotFound()
{
final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(
CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any()
)
).thenReturn(ConfigManager.SetResult.ok());
final String datasourceName = "dataSource";
final DataSourceCompactionConfig toDelete = new DataSourceCompactionConfig(
datasourceName,
null,
500L,
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
null,
null,
null,
null,
ImmutableMap.of("key", "val")
);
final CoordinatorCompactionConfig originalConfig = CoordinatorCompactionConfig.from(ImmutableList.of(toDelete));
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
)
).thenReturn(originalConfig);
Response result = coordinatorCompactionConfigsResource.deleteCompactionConfig(
datasourceName,
mockHttpServletRequest
);
Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
Assert.assertNotNull(oldConfigCaptor.getValue());
Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES);
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(0, newConfigCaptor.getValue().getCompactionConfigs().size());
Response response = resource.getDatasourceCompactionConfig(DS.WIKI);
verifyStatus(Response.Status.NOT_FOUND, response);
}
@Test
public void testUpdateShouldRetryIfRetryableException()
public void testAddDatasourceConfig()
{
Mockito.when(
mockJacksonConfigManager.set(
ArgumentMatchers.anyString(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
final DataSourceCompactionConfig newDatasourceConfig
= DataSourceCompactionConfig.builder().forDataSource(DS.WIKI).build();
Response response = resource.addOrUpdateDatasourceCompactionConfig(newDatasourceConfig, mockHttpServletRequest);
verifyStatus(Response.Status.OK, response);
final DataSourceCompactionConfig fetchedDatasourceConfig
= verifyAndGetPayload(resource.getDatasourceCompactionConfig(DS.WIKI), DataSourceCompactionConfig.class);
Assert.assertEquals(newDatasourceConfig, fetchedDatasourceConfig);
final CoordinatorCompactionConfig fullCompactionConfig
= verifyAndGetPayload(resource.getClusterCompactionConfig(), CoordinatorCompactionConfig.class);
Assert.assertEquals(1, fullCompactionConfig.getCompactionConfigs().size());
Assert.assertEquals(newDatasourceConfig, fullCompactionConfig.getCompactionConfigs().get(0));
}
@Test
public void testUpdateDatasourceConfig()
{
final DataSourceCompactionConfig originalDatasourceConfig = DataSourceCompactionConfig
.builder()
.forDataSource(DS.WIKI)
.withInputSegmentSizeBytes(500L)
.withSkipOffsetFromLatest(Period.hours(1))
.withGranularitySpec(
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, true)
)
).thenReturn(ConfigManager.SetResult.retryableFailure(new ISE("retryable")));
.withEngine(CompactionEngine.NATIVE)
.build();
coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
NEW_CONFIG,
Response response = resource.addOrUpdateDatasourceCompactionConfig(
originalDatasourceConfig,
mockHttpServletRequest
);
verifyStatus(Response.Status.OK, response);
// Verify that the update is retried upto the max number of retries
Mockito.verify(
mockJacksonConfigManager,
Mockito.times(CoordinatorCompactionConfigsResource.UPDATE_NUM_RETRY)
).set(
ArgumentMatchers.anyString(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
}
@Test
public void testUpdateShouldNotRetryIfNotRetryableException()
{
Mockito.when(
mockJacksonConfigManager.set(
ArgumentMatchers.anyString(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
final DataSourceCompactionConfig updatedDatasourceConfig = DataSourceCompactionConfig
.builder()
.forDataSource(DS.WIKI)
.withInputSegmentSizeBytes(1000L)
.withSkipOffsetFromLatest(Period.hours(3))
.withGranularitySpec(
new UserCompactionTaskGranularityConfig(Granularities.DAY, null, true)
)
).thenReturn(ConfigManager.SetResult.failure(new ISE("retryable")));
.withEngine(CompactionEngine.MSQ)
.build();
coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
NEW_CONFIG,
mockHttpServletRequest
);
response = resource.addOrUpdateDatasourceCompactionConfig(updatedDatasourceConfig, mockHttpServletRequest);
verifyStatus(Response.Status.OK, response);
// Verify that the update is tried only once
Mockito.verify(mockJacksonConfigManager, Mockito.times(1)).set(
ArgumentMatchers.anyString(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
final DataSourceCompactionConfig latestDatasourceConfig
= verifyAndGetPayload(resource.getDatasourceCompactionConfig(DS.WIKI), DataSourceCompactionConfig.class);
Assert.assertEquals(updatedDatasourceConfig, latestDatasourceConfig);
final CoordinatorCompactionConfig fullCompactionConfig
= verifyAndGetPayload(resource.getClusterCompactionConfig(), CoordinatorCompactionConfig.class);
Assert.assertEquals(1, fullCompactionConfig.getCompactionConfigs().size());
Assert.assertEquals(updatedDatasourceConfig, fullCompactionConfig.getCompactionConfigs().get(0));
}
@Test
public void testSetCompactionTaskLimitWithoutExistingConfig()
public void testDeleteDatasourceConfig()
{
Mockito.when(mockConnector.lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
)
).thenReturn(null);
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
)
).thenReturn(CoordinatorCompactionConfig.empty());
final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(
CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any()
)
).thenReturn(ConfigManager.SetResult.ok());
final DataSourceCompactionConfig datasourceConfig
= DataSourceCompactionConfig.builder().forDataSource(DS.WIKI).build();
Response response = resource.addOrUpdateDatasourceCompactionConfig(datasourceConfig, mockHttpServletRequest);
verifyStatus(Response.Status.OK, response);
double compactionTaskSlotRatio = 0.5;
int maxCompactionTaskSlots = 9;
Response result = coordinatorCompactionConfigsResource.setCompactionTaskLimit(
compactionTaskSlotRatio,
maxCompactionTaskSlots,
true,
mockHttpServletRequest
);
Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
Assert.assertNull(oldConfigCaptor.getValue());
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(), maxCompactionTaskSlots);
Assert.assertTrue(newConfigCaptor.getValue().isUseAutoScaleSlots());
Assert.assertEquals(compactionTaskSlotRatio, newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0);
response = resource.deleteCompactionConfig(DS.WIKI, mockHttpServletRequest);
verifyStatus(Response.Status.OK, response);
response = resource.getDatasourceCompactionConfig(DS.WIKI);
verifyStatus(Response.Status.NOT_FOUND, response);
}
@Test
public void testAddOrUpdateCompactionConfigWithoutExistingConfig()
public void testDeleteUnknownDatasourceConfigThrowsNotFound()
{
Mockito.when(mockConnector.lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
)
).thenReturn(null);
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
)
).thenReturn(CoordinatorCompactionConfig.empty());
final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(
CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any()
)
).thenReturn(ConfigManager.SetResult.ok());
final DataSourceCompactionConfig newConfig = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
null,
null,
null,
CompactionEngine.MSQ,
ImmutableMap.of("key", "val")
);
String author = "maytas";
String comment = "hello";
Response result = coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
newConfig,
mockHttpServletRequest
);
Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
Assert.assertNull(oldConfigCaptor.getValue());
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(1, newConfigCaptor.getValue().getCompactionConfigs().size());
Assert.assertEquals(newConfig, newConfigCaptor.getValue().getCompactionConfigs().get(0));
Assert.assertEquals(newConfig.getEngine(), newConfigCaptor.getValue().getCompactionConfigs().get(0).getEngine());
Response response = resource.deleteCompactionConfig(DS.WIKI, mockHttpServletRequest);
verifyStatus(Response.Status.NOT_FOUND, response);
}
@Test
public void testAddOrUpdateCompactionConfigWithoutExistingConfigAndEngineAsNull()
public void testUpdateIsRetriedIfFailureIsRetryable()
{
Mockito.when(mockConnector.lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
)
).thenReturn(null);
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
)
).thenReturn(CoordinatorCompactionConfig.empty());
final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(
CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any()
)
).thenReturn(ConfigManager.SetResult.ok());
final DataSourceCompactionConfig newConfig = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
null,
null,
null,
null,
ImmutableMap.of("key", "val")
);
coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
newConfig,
configManager.configUpdateResult
= ConfigManager.SetResult.retryableFailure(new Exception("retryable"));
resource.addOrUpdateDatasourceCompactionConfig(
DataSourceCompactionConfig.builder().forDataSource(DS.WIKI).build(),
mockHttpServletRequest
);
Assert.assertEquals(null, newConfigCaptor.getValue().getCompactionConfigs().get(0).getEngine());
}
@Test
public void testAddOrUpdateCompactionConfigWithInvalidMaxNumTasksForMSQEngine()
{
Mockito.when(mockConnector.lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
)
).thenReturn(null);
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
)
).thenReturn(CoordinatorCompactionConfig.empty());
int maxNumTasks = 1;
final DataSourceCompactionConfig newConfig = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
null,
null,
null,
CompactionEngine.MSQ,
ImmutableMap.of(ClientMSQContext.CTX_MAX_NUM_TASKS, maxNumTasks)
);
Response response = coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
newConfig,
mockHttpServletRequest
);
Assert.assertEquals(DruidException.Category.INVALID_INPUT.getExpectedStatus(), response.getStatus());
Assert.assertEquals(
"Compaction config not supported. Reason[MSQ context maxNumTasks [1] cannot be less than 2, "
+ "since at least 1 controller and 1 worker is necessary.].",
CoordinatorCompactionConfigsResource.MAX_UPDATE_RETRIES,
configManager.numUpdateAttempts
);
}
@Test
public void testUpdateIsNotRetriedIfFailureIsNotRetryable()
{
configManager.configUpdateResult
= ConfigManager.SetResult.failure(new Exception("not retryable"));
resource.addOrUpdateDatasourceCompactionConfig(
DataSourceCompactionConfig.builder().forDataSource(DS.WIKI).build(),
mockHttpServletRequest
);
Assert.assertEquals(1, configManager.numUpdateAttempts);
}
@Test
public void testGetDatasourceConfigHistory()
{
final DataSourceCompactionConfigBuilder builder
= DataSourceCompactionConfig.builder().forDataSource(DS.WIKI);
final DataSourceCompactionConfig configV1 = builder.build();
resource.addOrUpdateDatasourceCompactionConfig(configV1, mockHttpServletRequest);
final DataSourceCompactionConfig configV2 = builder.withEngine(CompactionEngine.NATIVE).build();
resource.addOrUpdateDatasourceCompactionConfig(configV2, mockHttpServletRequest);
final DataSourceCompactionConfig configV3 = builder
.withEngine(CompactionEngine.MSQ)
.withSkipOffsetFromLatest(Period.hours(1))
.build();
resource.addOrUpdateDatasourceCompactionConfig(configV3, mockHttpServletRequest);
Response response = resource.getCompactionConfigHistory(DS.WIKI, null, null);
verifyStatus(Response.Status.OK, response);
final List<DataSourceCompactionConfigAuditEntry> history
= (List<DataSourceCompactionConfigAuditEntry>) response.getEntity();
Assert.assertEquals(3, history.size());
Assert.assertEquals(configV1, history.get(0).getCompactionConfig());
Assert.assertEquals(configV2, history.get(1).getCompactionConfig());
Assert.assertEquals(configV3, history.get(2).getCompactionConfig());
}
@Test
public void testGetHistoryOfUnknownDatasourceReturnsEmpty()
{
Response response = resource.getCompactionConfigHistory(DS.WIKI, null, null);
verifyStatus(Response.Status.OK, response);
Assert.assertTrue(((List<?>) response.getEntity()).isEmpty());
}
@Test
public void testAddInvalidDatasourceConfigThrowsBadRequest()
{
final DataSourceCompactionConfig datasourceConfig = DataSourceCompactionConfig
.builder()
.forDataSource(DS.WIKI)
.withTaskContext(Collections.singletonMap(ClientMSQContext.CTX_MAX_NUM_TASKS, 1))
.withEngine(CompactionEngine.MSQ)
.build();
final Response response = resource.addOrUpdateDatasourceCompactionConfig(datasourceConfig, mockHttpServletRequest);
verifyStatus(Response.Status.BAD_REQUEST, response);
Assert.assertTrue(response.getEntity() instanceof ErrorResponse);
Assert.assertEquals(
"Compaction config not supported. Reason[MSQ: Context maxNumTasks[1]"
+ " must be at least 2 (1 controller + 1 worker)].",
((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage()
);
}
@Test
public void testDeleteCompactionConfigWithoutExistingConfigShouldFailAsDatasourceNotExist()
public void testUpdateEngineToMSQWithInvalidDatasourceConfigThrowsBadRequest()
{
Mockito.when(mockConnector.lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
)
).thenReturn(null);
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
)
).thenReturn(CoordinatorCompactionConfig.empty());
Response result = coordinatorCompactionConfigsResource.deleteCompactionConfig(
DATASOURCE_NOT_EXISTS,
final DataSourceCompactionConfig datasourceConfig = DataSourceCompactionConfig
.builder()
.forDataSource(DS.WIKI)
.withTaskContext(Collections.singletonMap(ClientMSQContext.CTX_MAX_NUM_TASKS, 1))
.build();
Response response = resource.addOrUpdateDatasourceCompactionConfig(datasourceConfig, mockHttpServletRequest);
verifyStatus(Response.Status.OK, response);
response = resource.updateClusterCompactionConfig(
new CompactionConfigUpdateRequest(null, null, null, CompactionEngine.MSQ),
mockHttpServletRequest
);
Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), result.getStatus());
verifyStatus(Response.Status.BAD_REQUEST, response);
Assert.assertTrue(response.getEntity() instanceof ErrorResponse);
Assert.assertEquals(
"Cannot update engine to [msq] as it does not support compaction config of DataSource[wiki]."
+ " Reason[MSQ: Context maxNumTasks[1] must be at least 2 (1 controller + 1 worker)].",
((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage()
);
}
@Test
public void testGetCompactionConfigHistoryForUnknownDataSourceShouldReturnEmptyList()
@SuppressWarnings("unchecked")
private <T> T verifyAndGetPayload(Response response, Class<T> type)
{
Response response = coordinatorCompactionConfigsResource.getCompactionConfigHistory(
DATASOURCE_NOT_EXISTS,
null,
null
);
Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
Assert.assertTrue(((Collection) response.getEntity()).isEmpty());
Assert.assertTrue(type.isInstance(response.getEntity()));
return (T) response.getEntity();
}
private void verifyStatus(Response.Status expectedStatus, Response response)
{
Assert.assertEquals(expectedStatus.getStatusCode(), response.getStatus());
}
/**
* Test implementation of AuditManager that keeps audit entries in memory.
*/
private static class TestAuditManager implements AuditManager
{
private final List<AuditEntry> audits = new ArrayList<>();
@Override
public void doAudit(AuditEntry event, Handle handle)
{
// do nothing
}
@Override
public void doAudit(AuditEntry event)
{
final String json;
try {
json = OBJECT_MAPPER.writeValueAsString(event.getPayload().raw());
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
final AuditEntry eventWithSerializedPayload
= AuditEntry.builder()
.key(event.getKey())
.type(event.getType())
.auditInfo(event.getAuditInfo())
.auditTime(event.getAuditTime())
.request(event.getRequest())
.serializedPayload(json)
.build();
audits.add(eventWithSerializedPayload);
}
@Override
public List<AuditEntry> fetchAuditHistory(String key, String type, Interval interval)
{
return audits;
}
@Override
public List<AuditEntry> fetchAuditHistory(String type, int limit)
{
return audits;
}
@Override
public List<AuditEntry> fetchAuditHistory(String type, Interval interval)
{
return audits;
}
@Override
public List<AuditEntry> fetchAuditHistory(String key, String type, int limit)
{
return audits;
}
@Override
public int removeAuditLogsOlderThan(long timestamp)
{
return 0;
}
}
/**
* Test implementation of CoordinatorConfigManager to track number of update attempts.
*/
private static class TestCoordinatorConfigManager extends CoordinatorConfigManager
{
private final ConfigManager delegate;
private int numUpdateAttempts;
private ConfigManager.SetResult configUpdateResult;
static TestCoordinatorConfigManager create(AuditManager auditManager)
{
final MetadataStorageTablesConfig tablesConfig = new TestMetadataStorageTablesConfig()
{
@Override
public String getConfigTable()
{
return "druid_config";
}
};
final TestDBConnector dbConnector = new TestDBConnector();
final ConfigManager configManager = new ConfigManager(
dbConnector,
Suppliers.ofInstance(tablesConfig),
Suppliers.ofInstance(new TestConfigManagerConfig())
);
return new TestCoordinatorConfigManager(configManager, dbConnector, tablesConfig, auditManager);
}
TestCoordinatorConfigManager(
ConfigManager configManager,
TestDBConnector dbConnector,
MetadataStorageTablesConfig tablesConfig,
AuditManager auditManager
)
{
super(
new JacksonConfigManager(configManager, OBJECT_MAPPER, auditManager),
dbConnector,
tablesConfig
);
this.delegate = configManager;
}
@Override
public ConfigManager.SetResult getAndUpdateCompactionConfig(
UnaryOperator<CoordinatorCompactionConfig> operator,
AuditInfo auditInfo
)
{
++numUpdateAttempts;
if (configUpdateResult == null) {
return super.getAndUpdateCompactionConfig(operator, auditInfo);
} else {
return configUpdateResult;
}
}
}
/**
* Test implementation for in-memory insert, lookup and compareAndSwap operations.
*/
private static class TestDBConnector extends TestMetadataStorageConnector
{
private final Map<List<String>, byte[]> values = new HashMap<>();
@Override
public Void insertOrUpdate(String tableName, String keyColumn, String valueColumn, String key, byte[] value)
{
values.put(
Arrays.asList(tableName, keyColumn, valueColumn, key),
value
);
return null;
}
@Nullable
@Override
public byte[] lookup(String tableName, String keyColumn, String valueColumn, String key)
{
return values.get(Arrays.asList(tableName, keyColumn, valueColumn, key));
}
@Override
public boolean compareAndSwap(List<MetadataCASUpdate> updates)
{
for (MetadataCASUpdate update : updates) {
final List<String> key = Arrays.asList(
update.getTableName(),
update.getKeyColumn(),
update.getValueColumn(),
update.getKey()
);
final byte[] currentValue = values.get(key);
if (currentValue == null || Arrays.equals(currentValue, update.getOldValue())) {
values.put(key, update.getNewValue());
} else {
return false;
}
}
return true;
}
}
private static class DS
{
static final String WIKI = "wiki";
}
}